LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow_common - vsiarrowfilesystem.hpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 71 109 65.1 %
Date: 2025-09-10 17:48:50 Functions: 5 17 29.4 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Parquet Translator
       4             :  * Purpose:  Implements OGRParquetDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #ifndef VSIARROWFILESYSTEM_HPP_INCLUDED
      14             : #define VSIARROWFILESYSTEM_HPP_INCLUDED
      15             : 
      16             : #include "cpl_multiproc.h"
      17             : 
      18             : #include "arrow/util/config.h"
      19             : 
      20             : #include "ograrrowrandomaccessfile.h"
      21             : 
      22             : #include <atomic>
      23             : #include <memory>
      24             : #include <mutex>
      25             : #include <vector>
      26             : #include <utility>
      27             : 
      28             : #if defined(__clang__)
      29             : #pragma clang diagnostic push
      30             : #pragma clang diagnostic ignored "-Wweak-vtables"
      31             : #endif
      32             : 
      33             : /************************************************************************/
      34             : /*                         VSIArrowFileSystem                           */
      35             : /************************************************************************/
      36             : 
      37             : class VSIArrowFileSystem final : public arrow::fs::FileSystem
      38             : {
      39             :     const std::string m_osEnvVarPrefix;
      40             :     const std::string m_osQueryParameters;
      41             : 
      42             :     std::atomic<bool> m_bAskedToClosed = false;
      43             :     std::mutex m_oMutex{};
      44             :     std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
      45             :         m_oSetFiles{};
      46             : 
      47             :   public:
      48         267 :     VSIArrowFileSystem(const std::string &osEnvVarPrefix,
      49             :                        const std::string &osQueryParameters)
      50         267 :         : m_osEnvVarPrefix(osEnvVarPrefix),
      51         267 :           m_osQueryParameters(osQueryParameters)
      52             :     {
      53         267 :     }
      54             : 
      55             :     // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale
      56             :     // for this method
      57         266 :     void AskToClose()
      58             :     {
      59         266 :         m_bAskedToClosed = true;
      60             :         std::vector<
      61             :             std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
      62         532 :             oSetFiles;
      63             :         {
      64         532 :             std::lock_guard oLock(m_oMutex);
      65         266 :             oSetFiles = m_oSetFiles;
      66             :         }
      67        1182 :         for (auto &[osName, poFile] : oSetFiles)
      68             :         {
      69         916 :             bool bWarned = false;
      70         921 :             while (!poFile.expired())
      71             :             {
      72           5 :                 if (!bWarned)
      73             :                 {
      74           5 :                     bWarned = true;
      75          10 :                     auto poFileLocked = poFile.lock();
      76           5 :                     if (poFileLocked)
      77             :                     {
      78           5 :                         CPLDebug("PARQUET",
      79             :                                  "Still on-going reads on %s. Waiting for it "
      80             :                                  "to be closed.",
      81             :                                  osName.c_str());
      82           5 :                         poFileLocked->AskToClose();
      83             :                     }
      84             :                 }
      85           5 :                 CPLSleep(0.01);
      86             :             }
      87             :         }
      88         266 :     }
      89             : 
      90           0 :     std::string type_name() const override
      91             :     {
      92           0 :         return "vsi" + m_osEnvVarPrefix;
      93             :     }
      94             : 
      95             :     using arrow::fs::FileSystem::Equals;
      96             : 
      97           0 :     bool Equals(const arrow::fs::FileSystem &other) const override
      98             :     {
      99           0 :         const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other);
     100           0 :         return poOther != nullptr &&
     101           0 :                poOther->m_osEnvVarPrefix == m_osEnvVarPrefix &&
     102           0 :                poOther->m_osQueryParameters == m_osQueryParameters;
     103             :     }
     104             : 
     105             :     using arrow::fs::FileSystem::GetFileInfo;
     106             : 
     107             :     arrow::Result<arrow::fs::FileInfo>
     108         256 :     GetFileInfo(const std::string &path) override
     109             :     {
     110         256 :         auto fileType = arrow::fs::FileType::Unknown;
     111             :         VSIStatBufL sStat;
     112         256 :         if (VSIStatL(path.c_str(), &sStat) == 0)
     113             :         {
     114         256 :             if (VSI_ISREG(sStat.st_mode))
     115         252 :                 fileType = arrow::fs::FileType::File;
     116           4 :             else if (VSI_ISDIR(sStat.st_mode))
     117           4 :                 fileType = arrow::fs::FileType::Directory;
     118             :         }
     119             :         else
     120             :         {
     121           0 :             fileType = arrow::fs::FileType::NotFound;
     122             :         }
     123         512 :         arrow::fs::FileInfo info(path, fileType);
     124         256 :         if (fileType == arrow::fs::FileType::File)
     125         252 :             info.set_size(sStat.st_size);
     126         512 :         return info;
     127             :     }
     128             : 
     129             :     arrow::Result<arrow::fs::FileInfoVector>
     130           4 :     GetFileInfo(const arrow::fs::FileSelector &select) override
     131             :     {
     132           8 :         arrow::fs::FileInfoVector res;
     133           4 :         VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(),
     134           4 :                                    select.recursive ? -1 : 0, nullptr);
     135           4 :         if (psDir == nullptr)
     136           0 :             return res;
     137             : 
     138           4 :         bool bParquetFound = false;
     139           4 :         const int nMaxNonParquetFiles = atoi(
     140             :             CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100"));
     141             :         const int nMaxListedFiles =
     142           4 :             atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000"));
     143          15 :         while (const auto psEntry = VSIGetNextDirEntry(psDir))
     144             :         {
     145          11 :             if (!bParquetFound)
     146           5 :                 bParquetFound = EQUAL(
     147             :                     CPLGetExtensionSafe(psEntry->pszName).c_str(), "parquet");
     148             : 
     149          11 :             std::string osFilename = select.base_dir + '/' + psEntry->pszName;
     150          11 :             int nMode = psEntry->nMode;
     151          11 :             if (!psEntry->bModeKnown)
     152             :             {
     153             :                 VSIStatBufL sStat;
     154           0 :                 if (VSIStatL(osFilename.c_str(), &sStat) == 0)
     155           0 :                     nMode = sStat.st_mode;
     156             :             }
     157             : 
     158          11 :             auto fileType = arrow::fs::FileType::Unknown;
     159          11 :             if (VSI_ISREG(nMode))
     160           9 :                 fileType = arrow::fs::FileType::File;
     161           2 :             else if (VSI_ISDIR(nMode))
     162           2 :                 fileType = arrow::fs::FileType::Directory;
     163             : 
     164          11 :             arrow::fs::FileInfo info(std::move(osFilename), fileType);
     165          11 :             if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown)
     166             :             {
     167           9 :                 info.set_size(psEntry->nSize);
     168             :             }
     169          11 :             res.push_back(std::move(info));
     170             : 
     171          11 :             if (m_osEnvVarPrefix == "PARQUET")
     172             :             {
     173             :                 // Avoid iterating over too many files if there's no likely parquet
     174             :                 // files.
     175          11 :                 if (static_cast<int>(res.size()) == nMaxNonParquetFiles &&
     176           0 :                     !bParquetFound)
     177           0 :                     break;
     178          11 :                 if (static_cast<int>(res.size()) == nMaxListedFiles)
     179           0 :                     break;
     180             :             }
     181          11 :         }
     182           4 :         VSICloseDir(psDir);
     183           4 :         return res;
     184             :     }
     185             : 
     186           0 :     arrow::Status CreateDir(const std::string & /*path*/,
     187             :                             bool /*recursive*/ = true) override
     188             :     {
     189           0 :         return arrow::Status::IOError("CreateDir() unimplemented");
     190             :     }
     191             : 
     192           0 :     arrow::Status DeleteDir(const std::string & /*path*/) override
     193             :     {
     194           0 :         return arrow::Status::IOError("DeleteDir() unimplemented");
     195             :     }
     196             : 
     197           0 :     arrow::Status DeleteDirContents(const std::string & /*path*/
     198             : #if ARROW_VERSION_MAJOR >= 8
     199             :                                     ,
     200             :                                     bool /*missing_dir_ok*/ = false
     201             : #endif
     202             :                                     ) override
     203             :     {
     204           0 :         return arrow::Status::IOError("DeleteDirContents() unimplemented");
     205             :     }
     206             : 
     207           0 :     arrow::Status DeleteRootDirContents() override
     208             :     {
     209           0 :         return arrow::Status::IOError("DeleteRootDirContents() unimplemented");
     210             :     }
     211             : 
     212           0 :     arrow::Status DeleteFile(const std::string & /*path*/) override
     213             :     {
     214           0 :         return arrow::Status::IOError("DeleteFile() unimplemented");
     215             :     }
     216             : 
     217           0 :     arrow::Status Move(const std::string & /*src*/,
     218             :                        const std::string & /*dest*/) override
     219             :     {
     220           0 :         return arrow::Status::IOError("Move() unimplemented");
     221             :     }
     222             : 
     223           0 :     arrow::Status CopyFile(const std::string & /*src*/,
     224             :                            const std::string & /*dest*/) override
     225             :     {
     226           0 :         return arrow::Status::IOError("CopyFile() unimplemented");
     227             :     }
     228             : 
     229             :     using arrow::fs::FileSystem::OpenInputStream;
     230             : 
     231             :     arrow::Result<std::shared_ptr<arrow::io::InputStream>>
     232           0 :     OpenInputStream(const std::string &path) override
     233             :     {
     234           0 :         return OpenInputFile(path);
     235             :     }
     236             : 
     237             :     using arrow::fs::FileSystem::OpenInputFile;
     238             : 
     239             :     arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>>
     240         917 :     OpenInputFile(const std::string &path) override
     241             :     {
     242         917 :         if (m_bAskedToClosed)
     243           0 :             return arrow::Status::IOError(
     244           0 :                 "OpenInputFile(): file system in shutdown");
     245             : 
     246        1834 :         std::string osPath(path);
     247         917 :         osPath += m_osQueryParameters;
     248         917 :         CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str());
     249        1834 :         auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb"));
     250         917 :         if (fp == nullptr)
     251           0 :             return arrow::Status::IOError("OpenInputFile() failed for " +
     252           0 :                                           osPath);
     253             :         auto poFile =
     254        1834 :             std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp));
     255             :         {
     256        1834 :             std::lock_guard oLock(m_oMutex);
     257         917 :             m_oSetFiles.emplace_back(path, poFile);
     258             :         }
     259         917 :         return poFile;
     260             :     }
     261             : 
     262             :     using arrow::fs::FileSystem::OpenOutputStream;
     263             : 
     264             :     arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
     265           0 :     OpenOutputStream(const std::string & /*path*/,
     266             :                      const std::shared_ptr<const arrow::KeyValueMetadata>
     267             :                          & /* metadata */) override
     268             :     {
     269           0 :         return arrow::Status::IOError("OpenOutputStream() unimplemented");
     270             :     }
     271             : 
     272             :     arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
     273           0 :     OpenAppendStream(const std::string & /*path*/,
     274             :                      const std::shared_ptr<const arrow::KeyValueMetadata>
     275             :                          & /* metadata */) override
     276             :     {
     277           0 :         return arrow::Status::IOError("OpenAppendStream() unimplemented");
     278             :     }
     279             : };
     280             : 
     281             : #if defined(__clang__)
     282             : #pragma clang diagnostic pop
     283             : #endif
     284             : 
     285             : #endif  // VSIARROWFILESYSTEM_HPP_INCLUDED

Generated by: LCOV version 1.14