LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow_common - vsiarrowfilesystem.hpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 77 115 67.0 %
Date: 2026-01-23 20:24:11 Functions: 7 19 36.8 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Parquet Translator
       4             :  * Purpose:  Implements OGRParquetDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #ifndef VSIARROWFILESYSTEM_HPP_INCLUDED
      14             : #define VSIARROWFILESYSTEM_HPP_INCLUDED
      15             : 
      16             : #include "cpl_multiproc.h"
      17             : 
      18             : #include "arrow/util/config.h"
      19             : 
      20             : #include "ograrrowrandomaccessfile.h"
      21             : 
      22             : #include <atomic>
      23             : #include <memory>
      24             : #include <mutex>
      25             : #include <set>
      26             : #include <vector>
      27             : #include <utility>
      28             : 
      29             : #if defined(__clang__)
      30             : #pragma clang diagnostic push
      31             : #pragma clang diagnostic ignored "-Wweak-vtables"
      32             : #endif
      33             : 
      34             : /************************************************************************/
      35             : /*                         VSIArrowFileSystem                           */
      36             : /************************************************************************/
      37             : 
      38             : class VSIArrowFileSystem final : public arrow::fs::FileSystem
      39             : {
      40             :     const std::string m_osEnvVarPrefix;
      41             :     const std::string m_osQueryParameters;
      42             : 
      43             :     std::atomic<bool> m_bAskedToClosed = false;
      44             :     std::mutex m_oMutex{};
      45             :     std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
      46             :         m_oSetFiles{};
      47             :     std::set<std::string> m_oSetFilesAskedToOpen{};
      48             : 
      49             :   public:
      50         367 :     VSIArrowFileSystem(const std::string &osEnvVarPrefix,
      51             :                        const std::string &osQueryParameters)
      52         367 :         : m_osEnvVarPrefix(osEnvVarPrefix),
      53         367 :           m_osQueryParameters(osQueryParameters)
      54             :     {
      55         367 :     }
      56             : 
      57             :     // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale
      58             :     // for this method
      59         352 :     void AskToClose()
      60             :     {
      61         352 :         m_bAskedToClosed = true;
      62             :         std::vector<
      63             :             std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
      64         704 :             oSetFiles;
      65             :         {
      66         704 :             std::lock_guard oLock(m_oMutex);
      67         352 :             oSetFiles = m_oSetFiles;
      68             :         }
      69        1508 :         for (auto &[osName, poFile] : oSetFiles)
      70             :         {
      71        1156 :             bool bWarned = false;
      72        1186 :             while (!poFile.expired())
      73             :             {
      74          30 :                 if (!bWarned)
      75             :                 {
      76          30 :                     bWarned = true;
      77          60 :                     auto poFileLocked = poFile.lock();
      78          30 :                     if (poFileLocked)
      79             :                     {
      80          30 :                         CPLDebug("PARQUET",
      81             :                                  "Still on-going reads on %s. Waiting for it "
      82             :                                  "to be closed.",
      83             :                                  osName.c_str());
      84          30 :                         poFileLocked->AskToClose();
      85             :                     }
      86             :                 }
      87          30 :                 CPLSleep(0.01);
      88             :             }
      89             :         }
      90         352 :     }
      91             : 
      92           0 :     std::string type_name() const override
      93             :     {
      94           0 :         return "vsi" + m_osEnvVarPrefix;
      95             :     }
      96             : 
      97             :     using arrow::fs::FileSystem::Equals;
      98             : 
      99           0 :     bool Equals(const arrow::fs::FileSystem &other) const override
     100             :     {
     101           0 :         const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other);
     102           0 :         return poOther != nullptr &&
     103           0 :                poOther->m_osEnvVarPrefix == m_osEnvVarPrefix &&
     104           0 :                poOther->m_osQueryParameters == m_osQueryParameters;
     105             :     }
     106             : 
     107             :     using arrow::fs::FileSystem::GetFileInfo;
     108             : 
     109             :     arrow::Result<arrow::fs::FileInfo>
     110         338 :     GetFileInfo(const std::string &path) override
     111             :     {
     112         338 :         auto fileType = arrow::fs::FileType::Unknown;
     113             :         VSIStatBufL sStat;
     114         338 :         if (VSIStatL(path.c_str(), &sStat) == 0)
     115             :         {
     116         338 :             if (VSI_ISREG(sStat.st_mode))
     117         336 :                 fileType = arrow::fs::FileType::File;
     118           2 :             else if (VSI_ISDIR(sStat.st_mode))
     119           2 :                 fileType = arrow::fs::FileType::Directory;
     120             :         }
     121             :         else
     122             :         {
     123           0 :             fileType = arrow::fs::FileType::NotFound;
     124             :         }
     125         676 :         arrow::fs::FileInfo info(path, fileType);
     126         338 :         if (fileType == arrow::fs::FileType::File)
     127         336 :             info.set_size(sStat.st_size);
     128         676 :         return info;
     129             :     }
     130             : 
     131             :     arrow::Result<arrow::fs::FileInfoVector>
     132           2 :     GetFileInfo(const arrow::fs::FileSelector &select) override
     133             :     {
     134           4 :         arrow::fs::FileInfoVector res;
     135           2 :         VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(),
     136           2 :                                    select.recursive ? -1 : 0, nullptr);
     137           2 :         if (psDir == nullptr)
     138           0 :             return res;
     139             : 
     140           2 :         bool bParquetFound = false;
     141           2 :         const int nMaxNonParquetFiles = atoi(
     142             :             CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100"));
     143             :         const int nMaxListedFiles =
     144           2 :             atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000"));
     145           6 :         while (const auto psEntry = VSIGetNextDirEntry(psDir))
     146             :         {
     147           4 :             if (!bParquetFound)
     148           2 :                 bParquetFound = EQUAL(
     149             :                     CPLGetExtensionSafe(psEntry->pszName).c_str(), "parquet");
     150             : 
     151           4 :             std::string osFilename = select.base_dir + '/' + psEntry->pszName;
     152           4 :             int nMode = psEntry->nMode;
     153           4 :             if (!psEntry->bModeKnown)
     154             :             {
     155             :                 VSIStatBufL sStat;
     156           0 :                 if (VSIStatL(osFilename.c_str(), &sStat) == 0)
     157           0 :                     nMode = sStat.st_mode;
     158             :             }
     159             : 
     160           4 :             auto fileType = arrow::fs::FileType::Unknown;
     161           4 :             if (VSI_ISREG(nMode))
     162           4 :                 fileType = arrow::fs::FileType::File;
     163           0 :             else if (VSI_ISDIR(nMode))
     164           0 :                 fileType = arrow::fs::FileType::Directory;
     165             : 
     166           4 :             arrow::fs::FileInfo info(std::move(osFilename), fileType);
     167           4 :             if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown)
     168             :             {
     169           4 :                 info.set_size(psEntry->nSize);
     170             :             }
     171           4 :             res.push_back(std::move(info));
     172             : 
     173           4 :             if (m_osEnvVarPrefix == "PARQUET")
     174             :             {
     175             :                 // Avoid iterating over too many files if there's no likely parquet
     176             :                 // files.
     177           4 :                 if (static_cast<int>(res.size()) == nMaxNonParquetFiles &&
     178           0 :                     !bParquetFound)
     179           0 :                     break;
     180           4 :                 if (static_cast<int>(res.size()) == nMaxListedFiles)
     181           0 :                     break;
     182             :             }
     183           4 :         }
     184           2 :         VSICloseDir(psDir);
     185           2 :         return res;
     186             :     }
     187             : 
     188           0 :     arrow::Status CreateDir(const std::string & /*path*/,
     189             :                             bool /*recursive*/ = true) override
     190             :     {
     191           0 :         return arrow::Status::IOError("CreateDir() unimplemented");
     192             :     }
     193             : 
     194           0 :     arrow::Status DeleteDir(const std::string & /*path*/) override
     195             :     {
     196           0 :         return arrow::Status::IOError("DeleteDir() unimplemented");
     197             :     }
     198             : 
     199           0 :     arrow::Status DeleteDirContents(const std::string & /*path*/
     200             : #if ARROW_VERSION_MAJOR >= 8
     201             :                                     ,
     202             :                                     bool /*missing_dir_ok*/ = false
     203             : #endif
     204             :                                     ) override
     205             :     {
     206           0 :         return arrow::Status::IOError("DeleteDirContents() unimplemented");
     207             :     }
     208             : 
     209           0 :     arrow::Status DeleteRootDirContents() override
     210             :     {
     211           0 :         return arrow::Status::IOError("DeleteRootDirContents() unimplemented");
     212             :     }
     213             : 
     214           0 :     arrow::Status DeleteFile(const std::string & /*path*/) override
     215             :     {
     216           0 :         return arrow::Status::IOError("DeleteFile() unimplemented");
     217             :     }
     218             : 
     219           0 :     arrow::Status Move(const std::string & /*src*/,
     220             :                        const std::string & /*dest*/) override
     221             :     {
     222           0 :         return arrow::Status::IOError("Move() unimplemented");
     223             :     }
     224             : 
     225           0 :     arrow::Status CopyFile(const std::string & /*src*/,
     226             :                            const std::string & /*dest*/) override
     227             :     {
     228           0 :         return arrow::Status::IOError("CopyFile() unimplemented");
     229             :     }
     230             : 
     231             :     using arrow::fs::FileSystem::OpenInputStream;
     232             : 
     233             :     arrow::Result<std::shared_ptr<arrow::io::InputStream>>
     234           0 :     OpenInputStream(const std::string &path) override
     235             :     {
     236           0 :         return OpenInputFile(path);
     237             :     }
     238             : 
     239             :     using arrow::fs::FileSystem::OpenInputFile;
     240             : 
     241             :     arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>>
     242        1190 :     OpenInputFile(const std::string &path) override
     243             :     {
     244        1190 :         if (m_bAskedToClosed)
     245           0 :             return arrow::Status::IOError(
     246           0 :                 "OpenInputFile(): file system in shutdown");
     247             : 
     248        2380 :         std::string osPath(path);
     249        1190 :         osPath += m_osQueryParameters;
     250        1190 :         m_oSetFilesAskedToOpen.insert(osPath);
     251        1190 :         CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str());
     252        2380 :         auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb"));
     253        1190 :         if (fp == nullptr)
     254           2 :             return arrow::Status::IOError("OpenInputFile() failed for " +
     255           1 :                                           osPath);
     256             :         auto poFile =
     257        2378 :             std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp));
     258             :         {
     259        2378 :             std::lock_guard oLock(m_oMutex);
     260        1189 :             m_oSetFiles.emplace_back(path, poFile);
     261             :         }
     262        1189 :         return poFile;
     263             :     }
     264             : 
     265             :     using arrow::fs::FileSystem::OpenOutputStream;
     266             : 
     267             :     arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
     268           0 :     OpenOutputStream(const std::string & /*path*/,
     269             :                      const std::shared_ptr<const arrow::KeyValueMetadata>
     270             :                          & /* metadata */) override
     271             :     {
     272           0 :         return arrow::Status::IOError("OpenOutputStream() unimplemented");
     273             :     }
     274             : 
     275             :     arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
     276           0 :     OpenAppendStream(const std::string & /*path*/,
     277             :                      const std::shared_ptr<const arrow::KeyValueMetadata>
     278             :                          & /* metadata */) override
     279             :     {
     280           0 :         return arrow::Status::IOError("OpenAppendStream() unimplemented");
     281             :     }
     282             : 
     283             :     // For debugging
     284           9 :     const std::set<std::string> &GetSetFilesAskedToOpen() const
     285             :     {
     286           9 :         return m_oSetFilesAskedToOpen;
     287             :     }
     288             : 
     289             :     // For debugging
     290           9 :     void ResetSetFilesAskedToOpen()
     291             :     {
     292           9 :         m_oSetFilesAskedToOpen.clear();
     293           9 :     }
     294             : };
     295             : 
     296             : #if defined(__clang__)
     297             : #pragma clang diagnostic pop
     298             : #endif
     299             : 
     300             : #endif  // VSIARROWFILESYSTEM_HPP_INCLUDED

Generated by: LCOV version 1.14