Line data Source code
1 : /****************************************************************************** 2 : * 3 : * Project: Parquet Translator 4 : * Purpose: Implements OGRParquetDriver. 5 : * Author: Even Rouault, <even.rouault at spatialys.com> 6 : * 7 : ****************************************************************************** 8 : * Copyright (c) 2022, Planet Labs 9 : * 10 : * SPDX-License-Identifier: MIT 11 : ****************************************************************************/ 12 : 13 : #ifndef VSIARROWFILESYSTEM_HPP_INCLUDED 14 : #define VSIARROWFILESYSTEM_HPP_INCLUDED 15 : 16 : #include "arrow/util/config.h" 17 : 18 : #include "ograrrowrandomaccessfile.h" 19 : 20 : #include <atomic> 21 : #include <memory> 22 : #include <mutex> 23 : #include <vector> 24 : #include <utility> 25 : 26 : /************************************************************************/ 27 : /* VSIArrowFileSystem */ 28 : /************************************************************************/ 29 : 30 : class VSIArrowFileSystem final : public arrow::fs::FileSystem 31 : { 32 : const std::string m_osEnvVarPrefix; 33 : const std::string m_osQueryParameters; 34 : 35 : std::atomic<bool> m_bAskedToClosed = false; 36 : std::mutex m_oMutex{}; 37 : std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>> 38 : m_oSetFiles{}; 39 : 40 : public: 41 265 : VSIArrowFileSystem(const std::string &osEnvVarPrefix, 42 : const std::string &osQueryParameters) 43 265 : : m_osEnvVarPrefix(osEnvVarPrefix), 44 265 : m_osQueryParameters(osQueryParameters) 45 : { 46 265 : } 47 : 48 : // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale 49 : // for this method 50 264 : void AskToClose() 51 : { 52 264 : m_bAskedToClosed = true; 53 : std::vector< 54 : std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>> 55 528 : oSetFiles; 56 : { 57 528 : std::lock_guard oLock(m_oMutex); 58 264 : oSetFiles = m_oSetFiles; 59 : } 60 1169 : for (auto &[osName, poFile] : oSetFiles) 61 : { 62 905 : bool bWarned = false; 63 907 : while (!poFile.expired()) 64 : { 65 2 : if (!bWarned) 66 : { 67 2 : bWarned = true; 68 4 : auto poFileLocked = poFile.lock(); 69 2 : if (poFileLocked) 70 : { 71 2 : CPLDebug("PARQUET", 72 : "Still on-going reads on %s. Waiting for it " 73 : "to be closed.", 74 : osName.c_str()); 75 2 : poFileLocked->AskToClose(); 76 : } 77 : } 78 2 : CPLSleep(0.01); 79 : } 80 : } 81 264 : } 82 : 83 0 : std::string type_name() const override 84 : { 85 0 : return "vsi" + m_osEnvVarPrefix; 86 : } 87 : 88 : using arrow::fs::FileSystem::Equals; 89 : 90 0 : bool Equals(const arrow::fs::FileSystem &other) const override 91 : { 92 0 : const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other); 93 0 : return poOther != nullptr && 94 0 : poOther->m_osEnvVarPrefix == m_osEnvVarPrefix && 95 0 : poOther->m_osQueryParameters == m_osQueryParameters; 96 : } 97 : 98 : using arrow::fs::FileSystem::GetFileInfo; 99 : 100 : arrow::Result<arrow::fs::FileInfo> 101 254 : GetFileInfo(const std::string &path) override 102 : { 103 254 : auto fileType = arrow::fs::FileType::Unknown; 104 : VSIStatBufL sStat; 105 254 : if (VSIStatL(path.c_str(), &sStat) == 0) 106 : { 107 254 : if (VSI_ISREG(sStat.st_mode)) 108 252 : fileType = arrow::fs::FileType::File; 109 2 : else if (VSI_ISDIR(sStat.st_mode)) 110 2 : fileType = arrow::fs::FileType::Directory; 111 : } 112 : else 113 : { 114 0 : fileType = arrow::fs::FileType::NotFound; 115 : } 116 508 : arrow::fs::FileInfo info(path, fileType); 117 254 : if (fileType == arrow::fs::FileType::File) 118 252 : info.set_size(sStat.st_size); 119 508 : return info; 120 : } 121 : 122 : arrow::Result<arrow::fs::FileInfoVector> 123 2 : GetFileInfo(const arrow::fs::FileSelector &select) override 124 : { 125 4 : arrow::fs::FileInfoVector res; 126 2 : VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(), 127 2 : select.recursive ? -1 : 0, nullptr); 128 2 : if (psDir == nullptr) 129 0 : return res; 130 : 131 2 : bool bParquetFound = false; 132 2 : const int nMaxNonParquetFiles = atoi( 133 : CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100")); 134 : const int nMaxListedFiles = 135 2 : atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000")); 136 6 : while (const auto psEntry = VSIGetNextDirEntry(psDir)) 137 : { 138 4 : if (!bParquetFound) 139 2 : bParquetFound = 140 2 : EQUAL(CPLGetExtension(psEntry->pszName), "parquet"); 141 : 142 : const std::string osFilename = 143 4 : select.base_dir + '/' + psEntry->pszName; 144 4 : int nMode = psEntry->nMode; 145 4 : if (!psEntry->bModeKnown) 146 : { 147 : VSIStatBufL sStat; 148 0 : if (VSIStatL(osFilename.c_str(), &sStat) == 0) 149 0 : nMode = sStat.st_mode; 150 : } 151 : 152 4 : auto fileType = arrow::fs::FileType::Unknown; 153 4 : if (VSI_ISREG(nMode)) 154 4 : fileType = arrow::fs::FileType::File; 155 0 : else if (VSI_ISDIR(nMode)) 156 0 : fileType = arrow::fs::FileType::Directory; 157 : 158 4 : arrow::fs::FileInfo info(osFilename, fileType); 159 4 : if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown) 160 : { 161 4 : info.set_size(psEntry->nSize); 162 : } 163 4 : res.push_back(info); 164 : 165 4 : if (m_osEnvVarPrefix == "PARQUET") 166 : { 167 : // Avoid iterating over too many files if there's no likely parquet 168 : // files. 169 4 : if (static_cast<int>(res.size()) == nMaxNonParquetFiles && 170 0 : !bParquetFound) 171 0 : break; 172 4 : if (static_cast<int>(res.size()) == nMaxListedFiles) 173 0 : break; 174 : } 175 4 : } 176 2 : VSICloseDir(psDir); 177 2 : return res; 178 : } 179 : 180 0 : arrow::Status CreateDir(const std::string & /*path*/, 181 : bool /*recursive*/ = true) override 182 : { 183 0 : return arrow::Status::IOError("CreateDir() unimplemented"); 184 : } 185 : 186 0 : arrow::Status DeleteDir(const std::string & /*path*/) override 187 : { 188 0 : return arrow::Status::IOError("DeleteDir() unimplemented"); 189 : } 190 : 191 0 : arrow::Status DeleteDirContents(const std::string & /*path*/ 192 : #if ARROW_VERSION_MAJOR >= 8 193 : , 194 : bool /*missing_dir_ok*/ = false 195 : #endif 196 : ) override 197 : { 198 0 : return arrow::Status::IOError("DeleteDirContents() unimplemented"); 199 : } 200 : 201 0 : arrow::Status DeleteRootDirContents() override 202 : { 203 0 : return arrow::Status::IOError("DeleteRootDirContents() unimplemented"); 204 : } 205 : 206 0 : arrow::Status DeleteFile(const std::string & /*path*/) override 207 : { 208 0 : return arrow::Status::IOError("DeleteFile() unimplemented"); 209 : } 210 : 211 0 : arrow::Status Move(const std::string & /*src*/, 212 : const std::string & /*dest*/) override 213 : { 214 0 : return arrow::Status::IOError("Move() unimplemented"); 215 : } 216 : 217 0 : arrow::Status CopyFile(const std::string & /*src*/, 218 : const std::string & /*dest*/) override 219 : { 220 0 : return arrow::Status::IOError("CopyFile() unimplemented"); 221 : } 222 : 223 : using arrow::fs::FileSystem::OpenInputStream; 224 : 225 : arrow::Result<std::shared_ptr<arrow::io::InputStream>> 226 0 : OpenInputStream(const std::string &path) override 227 : { 228 0 : return OpenInputFile(path); 229 : } 230 : 231 : using arrow::fs::FileSystem::OpenInputFile; 232 : 233 : arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> 234 906 : OpenInputFile(const std::string &path) override 235 : { 236 906 : if (m_bAskedToClosed) 237 0 : return arrow::Status::IOError( 238 0 : "OpenInputFile(): file system in shutdown"); 239 : 240 1812 : std::string osPath(path); 241 906 : osPath += m_osQueryParameters; 242 906 : CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str()); 243 1812 : auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb")); 244 906 : if (fp == nullptr) 245 0 : return arrow::Status::IOError("OpenInputFile() failed for " + 246 0 : osPath); 247 : auto poFile = 248 1812 : std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp)); 249 : { 250 1812 : std::lock_guard oLock(m_oMutex); 251 906 : m_oSetFiles.emplace_back(path, poFile); 252 : } 253 906 : return poFile; 254 : } 255 : 256 : using arrow::fs::FileSystem::OpenOutputStream; 257 : 258 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 259 0 : OpenOutputStream(const std::string & /*path*/, 260 : const std::shared_ptr<const arrow::KeyValueMetadata> 261 : & /* metadata */) override 262 : { 263 0 : return arrow::Status::IOError("OpenOutputStream() unimplemented"); 264 : } 265 : 266 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 267 0 : OpenAppendStream(const std::string & /*path*/, 268 : const std::shared_ptr<const arrow::KeyValueMetadata> 269 : & /* metadata */) override 270 : { 271 0 : return arrow::Status::IOError("OpenAppendStream() unimplemented"); 272 : } 273 : }; 274 : 275 : #endif // VSIARROWFILESYSTEM_HPP_INCLUDED