LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow_common - vsiarrowfilesystem.hpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 69 109 63.3 %
Date: 2025-07-09 17:50:03 Functions: 5 17 29.4 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Parquet Translator
       4             :  * Purpose:  Implements OGRParquetDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #ifndef VSIARROWFILESYSTEM_HPP_INCLUDED
      14             : #define VSIARROWFILESYSTEM_HPP_INCLUDED
      15             : 
      16             : #include "arrow/util/config.h"
      17             : 
      18             : #include "ograrrowrandomaccessfile.h"
      19             : 
      20             : #include <atomic>
      21             : #include <memory>
      22             : #include <mutex>
      23             : #include <vector>
      24             : #include <utility>
      25             : 
      26             : #if defined(__clang__)
      27             : #pragma clang diagnostic push
      28             : #pragma clang diagnostic ignored "-Wweak-vtables"
      29             : #endif
      30             : 
      31             : /************************************************************************/
      32             : /*                         VSIArrowFileSystem                           */
      33             : /************************************************************************/
      34             : 
      35             : class VSIArrowFileSystem final : public arrow::fs::FileSystem
      36             : {
      37             :     const std::string m_osEnvVarPrefix;
      38             :     const std::string m_osQueryParameters;
      39             : 
      40             :     std::atomic<bool> m_bAskedToClosed = false;
      41             :     std::mutex m_oMutex{};
      42             :     std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
      43             :         m_oSetFiles{};
      44             : 
      45             :   public:
      46         266 :     VSIArrowFileSystem(const std::string &osEnvVarPrefix,
      47             :                        const std::string &osQueryParameters)
      48         266 :         : m_osEnvVarPrefix(osEnvVarPrefix),
      49         266 :           m_osQueryParameters(osQueryParameters)
      50             :     {
      51         266 :     }
      52             : 
      53             :     // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale
      54             :     // for this method
      55         265 :     void AskToClose()
      56             :     {
      57         265 :         m_bAskedToClosed = true;
      58             :         std::vector<
      59             :             std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
      60         530 :             oSetFiles;
      61             :         {
      62         530 :             std::lock_guard oLock(m_oMutex);
      63         265 :             oSetFiles = m_oSetFiles;
      64             :         }
      65        1174 :         for (auto &[osName, poFile] : oSetFiles)
      66             :         {
      67         909 :             bool bWarned = false;
      68         943 :             while (!poFile.expired())
      69             :             {
      70          34 :                 if (!bWarned)
      71             :                 {
      72          34 :                     bWarned = true;
      73          68 :                     auto poFileLocked = poFile.lock();
      74          34 :                     if (poFileLocked)
      75             :                     {
      76          34 :                         CPLDebug("PARQUET",
      77             :                                  "Still on-going reads on %s. Waiting for it "
      78             :                                  "to be closed.",
      79             :                                  osName.c_str());
      80          34 :                         poFileLocked->AskToClose();
      81             :                     }
      82             :                 }
      83          34 :                 CPLSleep(0.01);
      84             :             }
      85             :         }
      86         265 :     }
      87             : 
      88           0 :     std::string type_name() const override
      89             :     {
      90           0 :         return "vsi" + m_osEnvVarPrefix;
      91             :     }
      92             : 
      93             :     using arrow::fs::FileSystem::Equals;
      94             : 
      95           0 :     bool Equals(const arrow::fs::FileSystem &other) const override
      96             :     {
      97           0 :         const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other);
      98           0 :         return poOther != nullptr &&
      99           0 :                poOther->m_osEnvVarPrefix == m_osEnvVarPrefix &&
     100           0 :                poOther->m_osQueryParameters == m_osQueryParameters;
     101             :     }
     102             : 
     103             :     using arrow::fs::FileSystem::GetFileInfo;
     104             : 
     105             :     arrow::Result<arrow::fs::FileInfo>
     106         255 :     GetFileInfo(const std::string &path) override
     107             :     {
     108         255 :         auto fileType = arrow::fs::FileType::Unknown;
     109             :         VSIStatBufL sStat;
     110         255 :         if (VSIStatL(path.c_str(), &sStat) == 0)
     111             :         {
     112         255 :             if (VSI_ISREG(sStat.st_mode))
     113         252 :                 fileType = arrow::fs::FileType::File;
     114           3 :             else if (VSI_ISDIR(sStat.st_mode))
     115           3 :                 fileType = arrow::fs::FileType::Directory;
     116             :         }
     117             :         else
     118             :         {
     119           0 :             fileType = arrow::fs::FileType::NotFound;
     120             :         }
     121         510 :         arrow::fs::FileInfo info(path, fileType);
     122         255 :         if (fileType == arrow::fs::FileType::File)
     123         252 :             info.set_size(sStat.st_size);
     124         510 :         return info;
     125             :     }
     126             : 
     127             :     arrow::Result<arrow::fs::FileInfoVector>
     128           3 :     GetFileInfo(const arrow::fs::FileSelector &select) override
     129             :     {
     130           6 :         arrow::fs::FileInfoVector res;
     131           3 :         VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(),
     132           3 :                                    select.recursive ? -1 : 0, nullptr);
     133           3 :         if (psDir == nullptr)
     134           0 :             return res;
     135             : 
     136           3 :         bool bParquetFound = false;
     137           3 :         const int nMaxNonParquetFiles = atoi(
     138             :             CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100"));
     139             :         const int nMaxListedFiles =
     140           3 :             atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000"));
     141          10 :         while (const auto psEntry = VSIGetNextDirEntry(psDir))
     142             :         {
     143           7 :             if (!bParquetFound)
     144           3 :                 bParquetFound = EQUAL(
     145             :                     CPLGetExtensionSafe(psEntry->pszName).c_str(), "parquet");
     146             : 
     147           7 :             std::string osFilename = select.base_dir + '/' + psEntry->pszName;
     148           7 :             int nMode = psEntry->nMode;
     149           7 :             if (!psEntry->bModeKnown)
     150             :             {
     151             :                 VSIStatBufL sStat;
     152           0 :                 if (VSIStatL(osFilename.c_str(), &sStat) == 0)
     153           0 :                     nMode = sStat.st_mode;
     154             :             }
     155             : 
     156           7 :             auto fileType = arrow::fs::FileType::Unknown;
     157           7 :             if (VSI_ISREG(nMode))
     158           7 :                 fileType = arrow::fs::FileType::File;
     159           0 :             else if (VSI_ISDIR(nMode))
     160           0 :                 fileType = arrow::fs::FileType::Directory;
     161             : 
     162           7 :             arrow::fs::FileInfo info(std::move(osFilename), fileType);
     163           7 :             if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown)
     164             :             {
     165           7 :                 info.set_size(psEntry->nSize);
     166             :             }
     167           7 :             res.push_back(std::move(info));
     168             : 
     169           7 :             if (m_osEnvVarPrefix == "PARQUET")
     170             :             {
     171             :                 // Avoid iterating over too many files if there's no likely parquet
     172             :                 // files.
     173           7 :                 if (static_cast<int>(res.size()) == nMaxNonParquetFiles &&
     174           0 :                     !bParquetFound)
     175           0 :                     break;
     176           7 :                 if (static_cast<int>(res.size()) == nMaxListedFiles)
     177           0 :                     break;
     178             :             }
     179           7 :         }
     180           3 :         VSICloseDir(psDir);
     181           3 :         return res;
     182             :     }
     183             : 
     184           0 :     arrow::Status CreateDir(const std::string & /*path*/,
     185             :                             bool /*recursive*/ = true) override
     186             :     {
     187           0 :         return arrow::Status::IOError("CreateDir() unimplemented");
     188             :     }
     189             : 
     190           0 :     arrow::Status DeleteDir(const std::string & /*path*/) override
     191             :     {
     192           0 :         return arrow::Status::IOError("DeleteDir() unimplemented");
     193             :     }
     194             : 
     195           0 :     arrow::Status DeleteDirContents(const std::string & /*path*/
     196             : #if ARROW_VERSION_MAJOR >= 8
     197             :                                     ,
     198             :                                     bool /*missing_dir_ok*/ = false
     199             : #endif
     200             :                                     ) override
     201             :     {
     202           0 :         return arrow::Status::IOError("DeleteDirContents() unimplemented");
     203             :     }
     204             : 
     205           0 :     arrow::Status DeleteRootDirContents() override
     206             :     {
     207           0 :         return arrow::Status::IOError("DeleteRootDirContents() unimplemented");
     208             :     }
     209             : 
     210           0 :     arrow::Status DeleteFile(const std::string & /*path*/) override
     211             :     {
     212           0 :         return arrow::Status::IOError("DeleteFile() unimplemented");
     213             :     }
     214             : 
     215           0 :     arrow::Status Move(const std::string & /*src*/,
     216             :                        const std::string & /*dest*/) override
     217             :     {
     218           0 :         return arrow::Status::IOError("Move() unimplemented");
     219             :     }
     220             : 
     221           0 :     arrow::Status CopyFile(const std::string & /*src*/,
     222             :                            const std::string & /*dest*/) override
     223             :     {
     224           0 :         return arrow::Status::IOError("CopyFile() unimplemented");
     225             :     }
     226             : 
     227             :     using arrow::fs::FileSystem::OpenInputStream;
     228             : 
     229             :     arrow::Result<std::shared_ptr<arrow::io::InputStream>>
     230           0 :     OpenInputStream(const std::string &path) override
     231             :     {
     232           0 :         return OpenInputFile(path);
     233             :     }
     234             : 
     235             :     using arrow::fs::FileSystem::OpenInputFile;
     236             : 
     237             :     arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>>
     238         910 :     OpenInputFile(const std::string &path) override
     239             :     {
     240         910 :         if (m_bAskedToClosed)
     241           0 :             return arrow::Status::IOError(
     242           0 :                 "OpenInputFile(): file system in shutdown");
     243             : 
     244        1820 :         std::string osPath(path);
     245         910 :         osPath += m_osQueryParameters;
     246         910 :         CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str());
     247        1820 :         auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb"));
     248         910 :         if (fp == nullptr)
     249           0 :             return arrow::Status::IOError("OpenInputFile() failed for " +
     250           0 :                                           osPath);
     251             :         auto poFile =
     252        1820 :             std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp));
     253             :         {
     254        1820 :             std::lock_guard oLock(m_oMutex);
     255         910 :             m_oSetFiles.emplace_back(path, poFile);
     256             :         }
     257         910 :         return poFile;
     258             :     }
     259             : 
     260             :     using arrow::fs::FileSystem::OpenOutputStream;
     261             : 
     262             :     arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
     263           0 :     OpenOutputStream(const std::string & /*path*/,
     264             :                      const std::shared_ptr<const arrow::KeyValueMetadata>
     265             :                          & /* metadata */) override
     266             :     {
     267           0 :         return arrow::Status::IOError("OpenOutputStream() unimplemented");
     268             :     }
     269             : 
     270             :     arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
     271           0 :     OpenAppendStream(const std::string & /*path*/,
     272             :                      const std::shared_ptr<const arrow::KeyValueMetadata>
     273             :                          & /* metadata */) override
     274             :     {
     275           0 :         return arrow::Status::IOError("OpenAppendStream() unimplemented");
     276             :     }
     277             : };
     278             : 
     279             : #if defined(__clang__)
     280             : #pragma clang diagnostic pop
     281             : #endif
     282             : 
     283             : #endif  // VSIARROWFILESYSTEM_HPP_INCLUDED

Generated by: LCOV version 1.14