Line data Source code
1 : /****************************************************************************** 2 : * 3 : * Project: Parquet Translator 4 : * Purpose: Implements OGRParquetDriver. 5 : * Author: Even Rouault, <even.rouault at spatialys.com> 6 : * 7 : ****************************************************************************** 8 : * Copyright (c) 2022, Planet Labs 9 : * 10 : * SPDX-License-Identifier: MIT 11 : ****************************************************************************/ 12 : 13 : #ifndef VSIARROWFILESYSTEM_HPP_INCLUDED 14 : #define VSIARROWFILESYSTEM_HPP_INCLUDED 15 : 16 : #include "cpl_multiproc.h" 17 : 18 : #include "arrow/util/config.h" 19 : 20 : #include "ograrrowrandomaccessfile.h" 21 : 22 : #include <atomic> 23 : #include <memory> 24 : #include <mutex> 25 : #include <set> 26 : #include <vector> 27 : #include <utility> 28 : 29 : #if defined(__clang__) 30 : #pragma clang diagnostic push 31 : #pragma clang diagnostic ignored "-Wweak-vtables" 32 : #endif 33 : 34 : /************************************************************************/ 35 : /* VSIArrowFileSystem */ 36 : /************************************************************************/ 37 : 38 : class VSIArrowFileSystem final : public arrow::fs::FileSystem 39 : { 40 : const std::string m_osEnvVarPrefix; 41 : const std::string m_osQueryParameters; 42 : 43 : std::atomic<bool> m_bAskedToClosed = false; 44 : std::mutex m_oMutex{}; 45 : std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>> 46 : m_oSetFiles{}; 47 : std::set<std::string> m_oSetFilesAskedToOpen{}; 48 : 49 : public: 50 367 : VSIArrowFileSystem(const std::string &osEnvVarPrefix, 51 : const std::string &osQueryParameters) 52 367 : : m_osEnvVarPrefix(osEnvVarPrefix), 53 367 : m_osQueryParameters(osQueryParameters) 54 : { 55 367 : } 56 : 57 : // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale 58 : // for this method 59 352 : void AskToClose() 60 : { 61 352 : m_bAskedToClosed = true; 62 : std::vector< 63 : std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>> 64 704 : oSetFiles; 65 : { 66 704 : std::lock_guard oLock(m_oMutex); 67 352 : oSetFiles = m_oSetFiles; 68 : } 69 1508 : for (auto &[osName, poFile] : oSetFiles) 70 : { 71 1156 : bool bWarned = false; 72 1186 : while (!poFile.expired()) 73 : { 74 30 : if (!bWarned) 75 : { 76 30 : bWarned = true; 77 60 : auto poFileLocked = poFile.lock(); 78 30 : if (poFileLocked) 79 : { 80 30 : CPLDebug("PARQUET", 81 : "Still on-going reads on %s. Waiting for it " 82 : "to be closed.", 83 : osName.c_str()); 84 30 : poFileLocked->AskToClose(); 85 : } 86 : } 87 30 : CPLSleep(0.01); 88 : } 89 : } 90 352 : } 91 : 92 0 : std::string type_name() const override 93 : { 94 0 : return "vsi" + m_osEnvVarPrefix; 95 : } 96 : 97 : using arrow::fs::FileSystem::Equals; 98 : 99 0 : bool Equals(const arrow::fs::FileSystem &other) const override 100 : { 101 0 : const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other); 102 0 : return poOther != nullptr && 103 0 : poOther->m_osEnvVarPrefix == m_osEnvVarPrefix && 104 0 : poOther->m_osQueryParameters == m_osQueryParameters; 105 : } 106 : 107 : using arrow::fs::FileSystem::GetFileInfo; 108 : 109 : arrow::Result<arrow::fs::FileInfo> 110 338 : GetFileInfo(const std::string &path) override 111 : { 112 338 : auto fileType = arrow::fs::FileType::Unknown; 113 : VSIStatBufL sStat; 114 338 : if (VSIStatL(path.c_str(), &sStat) == 0) 115 : { 116 338 : if (VSI_ISREG(sStat.st_mode)) 117 336 : fileType = arrow::fs::FileType::File; 118 2 : else if (VSI_ISDIR(sStat.st_mode)) 119 2 : fileType = arrow::fs::FileType::Directory; 120 : } 121 : else 122 : { 123 0 : fileType = arrow::fs::FileType::NotFound; 124 : } 125 676 : arrow::fs::FileInfo info(path, fileType); 126 338 : if (fileType == arrow::fs::FileType::File) 127 336 : info.set_size(sStat.st_size); 128 676 : return info; 129 : } 130 : 131 : arrow::Result<arrow::fs::FileInfoVector> 132 2 : GetFileInfo(const arrow::fs::FileSelector &select) override 133 : { 134 4 : arrow::fs::FileInfoVector res; 135 2 : VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(), 136 2 : select.recursive ? -1 : 0, nullptr); 137 2 : if (psDir == nullptr) 138 0 : return res; 139 : 140 2 : bool bParquetFound = false; 141 2 : const int nMaxNonParquetFiles = atoi( 142 : CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100")); 143 : const int nMaxListedFiles = 144 2 : atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000")); 145 6 : while (const auto psEntry = VSIGetNextDirEntry(psDir)) 146 : { 147 4 : if (!bParquetFound) 148 2 : bParquetFound = EQUAL( 149 : CPLGetExtensionSafe(psEntry->pszName).c_str(), "parquet"); 150 : 151 4 : std::string osFilename = select.base_dir + '/' + psEntry->pszName; 152 4 : int nMode = psEntry->nMode; 153 4 : if (!psEntry->bModeKnown) 154 : { 155 : VSIStatBufL sStat; 156 0 : if (VSIStatL(osFilename.c_str(), &sStat) == 0) 157 0 : nMode = sStat.st_mode; 158 : } 159 : 160 4 : auto fileType = arrow::fs::FileType::Unknown; 161 4 : if (VSI_ISREG(nMode)) 162 4 : fileType = arrow::fs::FileType::File; 163 0 : else if (VSI_ISDIR(nMode)) 164 0 : fileType = arrow::fs::FileType::Directory; 165 : 166 4 : arrow::fs::FileInfo info(std::move(osFilename), fileType); 167 4 : if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown) 168 : { 169 4 : info.set_size(psEntry->nSize); 170 : } 171 4 : res.push_back(std::move(info)); 172 : 173 4 : if (m_osEnvVarPrefix == "PARQUET") 174 : { 175 : // Avoid iterating over too many files if there's no likely parquet 176 : // files. 177 4 : if (static_cast<int>(res.size()) == nMaxNonParquetFiles && 178 0 : !bParquetFound) 179 0 : break; 180 4 : if (static_cast<int>(res.size()) == nMaxListedFiles) 181 0 : break; 182 : } 183 4 : } 184 2 : VSICloseDir(psDir); 185 2 : return res; 186 : } 187 : 188 0 : arrow::Status CreateDir(const std::string & /*path*/, 189 : bool /*recursive*/ = true) override 190 : { 191 0 : return arrow::Status::IOError("CreateDir() unimplemented"); 192 : } 193 : 194 0 : arrow::Status DeleteDir(const std::string & /*path*/) override 195 : { 196 0 : return arrow::Status::IOError("DeleteDir() unimplemented"); 197 : } 198 : 199 0 : arrow::Status DeleteDirContents(const std::string & /*path*/ 200 : #if ARROW_VERSION_MAJOR >= 8 201 : , 202 : bool /*missing_dir_ok*/ = false 203 : #endif 204 : ) override 205 : { 206 0 : return arrow::Status::IOError("DeleteDirContents() unimplemented"); 207 : } 208 : 209 0 : arrow::Status DeleteRootDirContents() override 210 : { 211 0 : return arrow::Status::IOError("DeleteRootDirContents() unimplemented"); 212 : } 213 : 214 0 : arrow::Status DeleteFile(const std::string & /*path*/) override 215 : { 216 0 : return arrow::Status::IOError("DeleteFile() unimplemented"); 217 : } 218 : 219 0 : arrow::Status Move(const std::string & /*src*/, 220 : const std::string & /*dest*/) override 221 : { 222 0 : return arrow::Status::IOError("Move() unimplemented"); 223 : } 224 : 225 0 : arrow::Status CopyFile(const std::string & /*src*/, 226 : const std::string & /*dest*/) override 227 : { 228 0 : return arrow::Status::IOError("CopyFile() unimplemented"); 229 : } 230 : 231 : using arrow::fs::FileSystem::OpenInputStream; 232 : 233 : arrow::Result<std::shared_ptr<arrow::io::InputStream>> 234 0 : OpenInputStream(const std::string &path) override 235 : { 236 0 : return OpenInputFile(path); 237 : } 238 : 239 : using arrow::fs::FileSystem::OpenInputFile; 240 : 241 : arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> 242 1190 : OpenInputFile(const std::string &path) override 243 : { 244 1190 : if (m_bAskedToClosed) 245 0 : return arrow::Status::IOError( 246 0 : "OpenInputFile(): file system in shutdown"); 247 : 248 2380 : std::string osPath(path); 249 1190 : osPath += m_osQueryParameters; 250 1190 : m_oSetFilesAskedToOpen.insert(osPath); 251 1190 : CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str()); 252 2380 : auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb")); 253 1190 : if (fp == nullptr) 254 2 : return arrow::Status::IOError("OpenInputFile() failed for " + 255 1 : osPath); 256 : auto poFile = 257 2378 : std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp)); 258 : { 259 2378 : std::lock_guard oLock(m_oMutex); 260 1189 : m_oSetFiles.emplace_back(path, poFile); 261 : } 262 1189 : return poFile; 263 : } 264 : 265 : using arrow::fs::FileSystem::OpenOutputStream; 266 : 267 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 268 0 : OpenOutputStream(const std::string & /*path*/, 269 : const std::shared_ptr<const arrow::KeyValueMetadata> 270 : & /* metadata */) override 271 : { 272 0 : return arrow::Status::IOError("OpenOutputStream() unimplemented"); 273 : } 274 : 275 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 276 0 : OpenAppendStream(const std::string & /*path*/, 277 : const std::shared_ptr<const arrow::KeyValueMetadata> 278 : & /* metadata */) override 279 : { 280 0 : return arrow::Status::IOError("OpenAppendStream() unimplemented"); 281 : } 282 : 283 : // For debugging 284 9 : const std::set<std::string> &GetSetFilesAskedToOpen() const 285 : { 286 9 : return m_oSetFilesAskedToOpen; 287 : } 288 : 289 : // For debugging 290 9 : void ResetSetFilesAskedToOpen() 291 : { 292 9 : m_oSetFilesAskedToOpen.clear(); 293 9 : } 294 : }; 295 : 296 : #if defined(__clang__) 297 : #pragma clang diagnostic pop 298 : #endif 299 : 300 : #endif // VSIARROWFILESYSTEM_HPP_INCLUDED