Line data Source code
1 : /****************************************************************************** 2 : * 3 : * Project: Parquet Translator 4 : * Purpose: Implements OGRParquetDriver. 5 : * Author: Even Rouault, <even.rouault at spatialys.com> 6 : * 7 : ****************************************************************************** 8 : * Copyright (c) 2022, Planet Labs 9 : * 10 : * SPDX-License-Identifier: MIT 11 : ****************************************************************************/ 12 : 13 : #ifndef VSIARROWFILESYSTEM_HPP_INCLUDED 14 : #define VSIARROWFILESYSTEM_HPP_INCLUDED 15 : 16 : #include "cpl_multiproc.h" 17 : 18 : #include "arrow/util/config.h" 19 : 20 : #include "ograrrowrandomaccessfile.h" 21 : 22 : #include <atomic> 23 : #include <memory> 24 : #include <mutex> 25 : #include <vector> 26 : #include <utility> 27 : 28 : #if defined(__clang__) 29 : #pragma clang diagnostic push 30 : #pragma clang diagnostic ignored "-Wweak-vtables" 31 : #endif 32 : 33 : /************************************************************************/ 34 : /* VSIArrowFileSystem */ 35 : /************************************************************************/ 36 : 37 : class VSIArrowFileSystem final : public arrow::fs::FileSystem 38 : { 39 : const std::string m_osEnvVarPrefix; 40 : const std::string m_osQueryParameters; 41 : 42 : std::atomic<bool> m_bAskedToClosed = false; 43 : std::mutex m_oMutex{}; 44 : std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>> 45 : m_oSetFiles{}; 46 : 47 : public: 48 267 : VSIArrowFileSystem(const std::string &osEnvVarPrefix, 49 : const std::string &osQueryParameters) 50 267 : : m_osEnvVarPrefix(osEnvVarPrefix), 51 267 : m_osQueryParameters(osQueryParameters) 52 : { 53 267 : } 54 : 55 : // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale 56 : // for this method 57 266 : void AskToClose() 58 : { 59 266 : m_bAskedToClosed = true; 60 : std::vector< 61 : std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>> 62 532 : oSetFiles; 63 : { 64 532 : std::lock_guard oLock(m_oMutex); 65 266 : oSetFiles = m_oSetFiles; 66 : } 67 1182 : for (auto &[osName, poFile] : oSetFiles) 68 : { 69 916 : bool bWarned = false; 70 921 : while (!poFile.expired()) 71 : { 72 5 : if (!bWarned) 73 : { 74 5 : bWarned = true; 75 10 : auto poFileLocked = poFile.lock(); 76 5 : if (poFileLocked) 77 : { 78 5 : CPLDebug("PARQUET", 79 : "Still on-going reads on %s. Waiting for it " 80 : "to be closed.", 81 : osName.c_str()); 82 5 : poFileLocked->AskToClose(); 83 : } 84 : } 85 5 : CPLSleep(0.01); 86 : } 87 : } 88 266 : } 89 : 90 0 : std::string type_name() const override 91 : { 92 0 : return "vsi" + m_osEnvVarPrefix; 93 : } 94 : 95 : using arrow::fs::FileSystem::Equals; 96 : 97 0 : bool Equals(const arrow::fs::FileSystem &other) const override 98 : { 99 0 : const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other); 100 0 : return poOther != nullptr && 101 0 : poOther->m_osEnvVarPrefix == m_osEnvVarPrefix && 102 0 : poOther->m_osQueryParameters == m_osQueryParameters; 103 : } 104 : 105 : using arrow::fs::FileSystem::GetFileInfo; 106 : 107 : arrow::Result<arrow::fs::FileInfo> 108 256 : GetFileInfo(const std::string &path) override 109 : { 110 256 : auto fileType = arrow::fs::FileType::Unknown; 111 : VSIStatBufL sStat; 112 256 : if (VSIStatL(path.c_str(), &sStat) == 0) 113 : { 114 256 : if (VSI_ISREG(sStat.st_mode)) 115 252 : fileType = arrow::fs::FileType::File; 116 4 : else if (VSI_ISDIR(sStat.st_mode)) 117 4 : fileType = arrow::fs::FileType::Directory; 118 : } 119 : else 120 : { 121 0 : fileType = arrow::fs::FileType::NotFound; 122 : } 123 512 : arrow::fs::FileInfo info(path, fileType); 124 256 : if (fileType == arrow::fs::FileType::File) 125 252 : info.set_size(sStat.st_size); 126 512 : return info; 127 : } 128 : 129 : arrow::Result<arrow::fs::FileInfoVector> 130 4 : GetFileInfo(const arrow::fs::FileSelector &select) override 131 : { 132 8 : arrow::fs::FileInfoVector res; 133 4 : VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(), 134 4 : select.recursive ? -1 : 0, nullptr); 135 4 : if (psDir == nullptr) 136 0 : return res; 137 : 138 4 : bool bParquetFound = false; 139 4 : const int nMaxNonParquetFiles = atoi( 140 : CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100")); 141 : const int nMaxListedFiles = 142 4 : atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000")); 143 15 : while (const auto psEntry = VSIGetNextDirEntry(psDir)) 144 : { 145 11 : if (!bParquetFound) 146 5 : bParquetFound = EQUAL( 147 : CPLGetExtensionSafe(psEntry->pszName).c_str(), "parquet"); 148 : 149 11 : std::string osFilename = select.base_dir + '/' + psEntry->pszName; 150 11 : int nMode = psEntry->nMode; 151 11 : if (!psEntry->bModeKnown) 152 : { 153 : VSIStatBufL sStat; 154 0 : if (VSIStatL(osFilename.c_str(), &sStat) == 0) 155 0 : nMode = sStat.st_mode; 156 : } 157 : 158 11 : auto fileType = arrow::fs::FileType::Unknown; 159 11 : if (VSI_ISREG(nMode)) 160 9 : fileType = arrow::fs::FileType::File; 161 2 : else if (VSI_ISDIR(nMode)) 162 2 : fileType = arrow::fs::FileType::Directory; 163 : 164 11 : arrow::fs::FileInfo info(std::move(osFilename), fileType); 165 11 : if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown) 166 : { 167 9 : info.set_size(psEntry->nSize); 168 : } 169 11 : res.push_back(std::move(info)); 170 : 171 11 : if (m_osEnvVarPrefix == "PARQUET") 172 : { 173 : // Avoid iterating over too many files if there's no likely parquet 174 : // files. 175 11 : if (static_cast<int>(res.size()) == nMaxNonParquetFiles && 176 0 : !bParquetFound) 177 0 : break; 178 11 : if (static_cast<int>(res.size()) == nMaxListedFiles) 179 0 : break; 180 : } 181 11 : } 182 4 : VSICloseDir(psDir); 183 4 : return res; 184 : } 185 : 186 0 : arrow::Status CreateDir(const std::string & /*path*/, 187 : bool /*recursive*/ = true) override 188 : { 189 0 : return arrow::Status::IOError("CreateDir() unimplemented"); 190 : } 191 : 192 0 : arrow::Status DeleteDir(const std::string & /*path*/) override 193 : { 194 0 : return arrow::Status::IOError("DeleteDir() unimplemented"); 195 : } 196 : 197 0 : arrow::Status DeleteDirContents(const std::string & /*path*/ 198 : #if ARROW_VERSION_MAJOR >= 8 199 : , 200 : bool /*missing_dir_ok*/ = false 201 : #endif 202 : ) override 203 : { 204 0 : return arrow::Status::IOError("DeleteDirContents() unimplemented"); 205 : } 206 : 207 0 : arrow::Status DeleteRootDirContents() override 208 : { 209 0 : return arrow::Status::IOError("DeleteRootDirContents() unimplemented"); 210 : } 211 : 212 0 : arrow::Status DeleteFile(const std::string & /*path*/) override 213 : { 214 0 : return arrow::Status::IOError("DeleteFile() unimplemented"); 215 : } 216 : 217 0 : arrow::Status Move(const std::string & /*src*/, 218 : const std::string & /*dest*/) override 219 : { 220 0 : return arrow::Status::IOError("Move() unimplemented"); 221 : } 222 : 223 0 : arrow::Status CopyFile(const std::string & /*src*/, 224 : const std::string & /*dest*/) override 225 : { 226 0 : return arrow::Status::IOError("CopyFile() unimplemented"); 227 : } 228 : 229 : using arrow::fs::FileSystem::OpenInputStream; 230 : 231 : arrow::Result<std::shared_ptr<arrow::io::InputStream>> 232 0 : OpenInputStream(const std::string &path) override 233 : { 234 0 : return OpenInputFile(path); 235 : } 236 : 237 : using arrow::fs::FileSystem::OpenInputFile; 238 : 239 : arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> 240 917 : OpenInputFile(const std::string &path) override 241 : { 242 917 : if (m_bAskedToClosed) 243 0 : return arrow::Status::IOError( 244 0 : "OpenInputFile(): file system in shutdown"); 245 : 246 1834 : std::string osPath(path); 247 917 : osPath += m_osQueryParameters; 248 917 : CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str()); 249 1834 : auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb")); 250 917 : if (fp == nullptr) 251 0 : return arrow::Status::IOError("OpenInputFile() failed for " + 252 0 : osPath); 253 : auto poFile = 254 1834 : std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp)); 255 : { 256 1834 : std::lock_guard oLock(m_oMutex); 257 917 : m_oSetFiles.emplace_back(path, poFile); 258 : } 259 917 : return poFile; 260 : } 261 : 262 : using arrow::fs::FileSystem::OpenOutputStream; 263 : 264 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 265 0 : OpenOutputStream(const std::string & /*path*/, 266 : const std::shared_ptr<const arrow::KeyValueMetadata> 267 : & /* metadata */) override 268 : { 269 0 : return arrow::Status::IOError("OpenOutputStream() unimplemented"); 270 : } 271 : 272 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 273 0 : OpenAppendStream(const std::string & /*path*/, 274 : const std::shared_ptr<const arrow::KeyValueMetadata> 275 : & /* metadata */) override 276 : { 277 0 : return arrow::Status::IOError("OpenAppendStream() unimplemented"); 278 : } 279 : }; 280 : 281 : #if defined(__clang__) 282 : #pragma clang diagnostic pop 283 : #endif 284 : 285 : #endif // VSIARROWFILESYSTEM_HPP_INCLUDED