LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow_common - vsiarrowfilesystem.hpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 70 110 63.6 %
Date: 2024-11-25 23:50:41 Functions: 5 17 29.4 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Parquet Translator
       4             :  * Purpose:  Implements OGRParquetDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #ifndef VSIARROWFILESYSTEM_HPP_INCLUDED
      14             : #define VSIARROWFILESYSTEM_HPP_INCLUDED
      15             : 
      16             : #include "arrow/util/config.h"
      17             : 
      18             : #include "ograrrowrandomaccessfile.h"
      19             : 
      20             : #include <atomic>
      21             : #include <memory>
      22             : #include <mutex>
      23             : #include <vector>
      24             : #include <utility>
      25             : 
      26             : /************************************************************************/
      27             : /*                         VSIArrowFileSystem                           */
      28             : /************************************************************************/
      29             : 
      30             : class VSIArrowFileSystem final : public arrow::fs::FileSystem
      31             : {
      32             :     const std::string m_osEnvVarPrefix;
      33             :     const std::string m_osQueryParameters;
      34             : 
      35             :     std::atomic<bool> m_bAskedToClosed = false;
      36             :     std::mutex m_oMutex{};
      37             :     std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
      38             :         m_oSetFiles{};
      39             : 
      40             :   public:
      41         265 :     VSIArrowFileSystem(const std::string &osEnvVarPrefix,
      42             :                        const std::string &osQueryParameters)
      43         265 :         : m_osEnvVarPrefix(osEnvVarPrefix),
      44         265 :           m_osQueryParameters(osQueryParameters)
      45             :     {
      46         265 :     }
      47             : 
      48             :     // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale
      49             :     // for this method
      50         264 :     void AskToClose()
      51             :     {
      52         264 :         m_bAskedToClosed = true;
      53             :         std::vector<
      54             :             std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
      55         528 :             oSetFiles;
      56             :         {
      57         528 :             std::lock_guard oLock(m_oMutex);
      58         264 :             oSetFiles = m_oSetFiles;
      59             :         }
      60        1169 :         for (auto &[osName, poFile] : oSetFiles)
      61             :         {
      62         905 :             bool bWarned = false;
      63         907 :             while (!poFile.expired())
      64             :             {
      65           2 :                 if (!bWarned)
      66             :                 {
      67           2 :                     bWarned = true;
      68           4 :                     auto poFileLocked = poFile.lock();
      69           2 :                     if (poFileLocked)
      70             :                     {
      71           2 :                         CPLDebug("PARQUET",
      72             :                                  "Still on-going reads on %s. Waiting for it "
      73             :                                  "to be closed.",
      74             :                                  osName.c_str());
      75           2 :                         poFileLocked->AskToClose();
      76             :                     }
      77             :                 }
      78           2 :                 CPLSleep(0.01);
      79             :             }
      80             :         }
      81         264 :     }
      82             : 
      83           0 :     std::string type_name() const override
      84             :     {
      85           0 :         return "vsi" + m_osEnvVarPrefix;
      86             :     }
      87             : 
      88             :     using arrow::fs::FileSystem::Equals;
      89             : 
      90           0 :     bool Equals(const arrow::fs::FileSystem &other) const override
      91             :     {
      92           0 :         const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other);
      93           0 :         return poOther != nullptr &&
      94           0 :                poOther->m_osEnvVarPrefix == m_osEnvVarPrefix &&
      95           0 :                poOther->m_osQueryParameters == m_osQueryParameters;
      96             :     }
      97             : 
      98             :     using arrow::fs::FileSystem::GetFileInfo;
      99             : 
     100             :     arrow::Result<arrow::fs::FileInfo>
     101         254 :     GetFileInfo(const std::string &path) override
     102             :     {
     103         254 :         auto fileType = arrow::fs::FileType::Unknown;
     104             :         VSIStatBufL sStat;
     105         254 :         if (VSIStatL(path.c_str(), &sStat) == 0)
     106             :         {
     107         254 :             if (VSI_ISREG(sStat.st_mode))
     108         252 :                 fileType = arrow::fs::FileType::File;
     109           2 :             else if (VSI_ISDIR(sStat.st_mode))
     110           2 :                 fileType = arrow::fs::FileType::Directory;
     111             :         }
     112             :         else
     113             :         {
     114           0 :             fileType = arrow::fs::FileType::NotFound;
     115             :         }
     116         508 :         arrow::fs::FileInfo info(path, fileType);
     117         254 :         if (fileType == arrow::fs::FileType::File)
     118         252 :             info.set_size(sStat.st_size);
     119         508 :         return info;
     120             :     }
     121             : 
     122             :     arrow::Result<arrow::fs::FileInfoVector>
     123           2 :     GetFileInfo(const arrow::fs::FileSelector &select) override
     124             :     {
     125           4 :         arrow::fs::FileInfoVector res;
     126           2 :         VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(),
     127           2 :                                    select.recursive ? -1 : 0, nullptr);
     128           2 :         if (psDir == nullptr)
     129           0 :             return res;
     130             : 
     131           2 :         bool bParquetFound = false;
     132           2 :         const int nMaxNonParquetFiles = atoi(
     133             :             CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100"));
     134             :         const int nMaxListedFiles =
     135           2 :             atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000"));
     136           6 :         while (const auto psEntry = VSIGetNextDirEntry(psDir))
     137             :         {
     138           4 :             if (!bParquetFound)
     139           2 :                 bParquetFound =
     140           2 :                     EQUAL(CPLGetExtension(psEntry->pszName), "parquet");
     141             : 
     142             :             const std::string osFilename =
     143           4 :                 select.base_dir + '/' + psEntry->pszName;
     144           4 :             int nMode = psEntry->nMode;
     145           4 :             if (!psEntry->bModeKnown)
     146             :             {
     147             :                 VSIStatBufL sStat;
     148           0 :                 if (VSIStatL(osFilename.c_str(), &sStat) == 0)
     149           0 :                     nMode = sStat.st_mode;
     150             :             }
     151             : 
     152           4 :             auto fileType = arrow::fs::FileType::Unknown;
     153           4 :             if (VSI_ISREG(nMode))
     154           4 :                 fileType = arrow::fs::FileType::File;
     155           0 :             else if (VSI_ISDIR(nMode))
     156           0 :                 fileType = arrow::fs::FileType::Directory;
     157             : 
     158           4 :             arrow::fs::FileInfo info(osFilename, fileType);
     159           4 :             if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown)
     160             :             {
     161           4 :                 info.set_size(psEntry->nSize);
     162             :             }
     163           4 :             res.push_back(info);
     164             : 
     165           4 :             if (m_osEnvVarPrefix == "PARQUET")
     166             :             {
     167             :                 // Avoid iterating over too many files if there's no likely parquet
     168             :                 // files.
     169           4 :                 if (static_cast<int>(res.size()) == nMaxNonParquetFiles &&
     170           0 :                     !bParquetFound)
     171           0 :                     break;
     172           4 :                 if (static_cast<int>(res.size()) == nMaxListedFiles)
     173           0 :                     break;
     174             :             }
     175           4 :         }
     176           2 :         VSICloseDir(psDir);
     177           2 :         return res;
     178             :     }
     179             : 
     180           0 :     arrow::Status CreateDir(const std::string & /*path*/,
     181             :                             bool /*recursive*/ = true) override
     182             :     {
     183           0 :         return arrow::Status::IOError("CreateDir() unimplemented");
     184             :     }
     185             : 
     186           0 :     arrow::Status DeleteDir(const std::string & /*path*/) override
     187             :     {
     188           0 :         return arrow::Status::IOError("DeleteDir() unimplemented");
     189             :     }
     190             : 
     191           0 :     arrow::Status DeleteDirContents(const std::string & /*path*/
     192             : #if ARROW_VERSION_MAJOR >= 8
     193             :                                     ,
     194             :                                     bool /*missing_dir_ok*/ = false
     195             : #endif
     196             :                                     ) override
     197             :     {
     198           0 :         return arrow::Status::IOError("DeleteDirContents() unimplemented");
     199             :     }
     200             : 
     201           0 :     arrow::Status DeleteRootDirContents() override
     202             :     {
     203           0 :         return arrow::Status::IOError("DeleteRootDirContents() unimplemented");
     204             :     }
     205             : 
     206           0 :     arrow::Status DeleteFile(const std::string & /*path*/) override
     207             :     {
     208           0 :         return arrow::Status::IOError("DeleteFile() unimplemented");
     209             :     }
     210             : 
     211           0 :     arrow::Status Move(const std::string & /*src*/,
     212             :                        const std::string & /*dest*/) override
     213             :     {
     214           0 :         return arrow::Status::IOError("Move() unimplemented");
     215             :     }
     216             : 
     217           0 :     arrow::Status CopyFile(const std::string & /*src*/,
     218             :                            const std::string & /*dest*/) override
     219             :     {
     220           0 :         return arrow::Status::IOError("CopyFile() unimplemented");
     221             :     }
     222             : 
     223             :     using arrow::fs::FileSystem::OpenInputStream;
     224             : 
     225             :     arrow::Result<std::shared_ptr<arrow::io::InputStream>>
     226           0 :     OpenInputStream(const std::string &path) override
     227             :     {
     228           0 :         return OpenInputFile(path);
     229             :     }
     230             : 
     231             :     using arrow::fs::FileSystem::OpenInputFile;
     232             : 
     233             :     arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>>
     234         906 :     OpenInputFile(const std::string &path) override
     235             :     {
     236         906 :         if (m_bAskedToClosed)
     237           0 :             return arrow::Status::IOError(
     238           0 :                 "OpenInputFile(): file system in shutdown");
     239             : 
     240        1812 :         std::string osPath(path);
     241         906 :         osPath += m_osQueryParameters;
     242         906 :         CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str());
     243        1812 :         auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb"));
     244         906 :         if (fp == nullptr)
     245           0 :             return arrow::Status::IOError("OpenInputFile() failed for " +
     246           0 :                                           osPath);
     247             :         auto poFile =
     248        1812 :             std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp));
     249             :         {
     250        1812 :             std::lock_guard oLock(m_oMutex);
     251         906 :             m_oSetFiles.emplace_back(path, poFile);
     252             :         }
     253         906 :         return poFile;
     254             :     }
     255             : 
     256             :     using arrow::fs::FileSystem::OpenOutputStream;
     257             : 
     258             :     arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
     259           0 :     OpenOutputStream(const std::string & /*path*/,
     260             :                      const std::shared_ptr<const arrow::KeyValueMetadata>
     261             :                          & /* metadata */) override
     262             :     {
     263           0 :         return arrow::Status::IOError("OpenOutputStream() unimplemented");
     264             :     }
     265             : 
     266             :     arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
     267           0 :     OpenAppendStream(const std::string & /*path*/,
     268             :                      const std::shared_ptr<const arrow::KeyValueMetadata>
     269             :                          & /* metadata */) override
     270             :     {
     271           0 :         return arrow::Status::IOError("OpenAppendStream() unimplemented");
     272             :     }
     273             : };
     274             : 
     275             : #endif  // VSIARROWFILESYSTEM_HPP_INCLUDED

Generated by: LCOV version 1.14