Line data Source code
1 : /****************************************************************************** 2 : * 3 : * Project: Parquet Translator 4 : * Purpose: Implements OGRParquetDriver. 5 : * Author: Even Rouault, <even.rouault at spatialys.com> 6 : * 7 : ****************************************************************************** 8 : * Copyright (c) 2022, Planet Labs 9 : * 10 : * SPDX-License-Identifier: MIT 11 : ****************************************************************************/ 12 : 13 : #ifndef VSIARROWFILESYSTEM_HPP_INCLUDED 14 : #define VSIARROWFILESYSTEM_HPP_INCLUDED 15 : 16 : #include "arrow/util/config.h" 17 : 18 : #include "ograrrowrandomaccessfile.h" 19 : 20 : #include <atomic> 21 : #include <memory> 22 : #include <mutex> 23 : #include <vector> 24 : #include <utility> 25 : 26 : /************************************************************************/ 27 : /* VSIArrowFileSystem */ 28 : /************************************************************************/ 29 : 30 : class VSIArrowFileSystem final : public arrow::fs::FileSystem 31 : { 32 : const std::string m_osEnvVarPrefix; 33 : const std::string m_osQueryParameters; 34 : 35 : std::atomic<bool> m_bAskedToClosed = false; 36 : std::mutex m_oMutex{}; 37 : std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>> 38 : m_oSetFiles{}; 39 : 40 : public: 41 266 : VSIArrowFileSystem(const std::string &osEnvVarPrefix, 42 : const std::string &osQueryParameters) 43 266 : : m_osEnvVarPrefix(osEnvVarPrefix), 44 266 : m_osQueryParameters(osQueryParameters) 45 : { 46 266 : } 47 : 48 : // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale 49 : // for this method 50 265 : void AskToClose() 51 : { 52 265 : m_bAskedToClosed = true; 53 : std::vector< 54 : std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>> 55 530 : oSetFiles; 56 : { 57 530 : std::lock_guard oLock(m_oMutex); 58 265 : oSetFiles = m_oSetFiles; 59 : } 60 1174 : for (auto &[osName, poFile] : oSetFiles) 61 : { 62 909 : bool bWarned = false; 63 912 : while (!poFile.expired()) 64 : { 65 3 : if (!bWarned) 66 : { 67 3 : bWarned = true; 68 6 : auto poFileLocked = poFile.lock(); 69 3 : if (poFileLocked) 70 : { 71 3 : CPLDebug("PARQUET", 72 : "Still on-going reads on %s. Waiting for it " 73 : "to be closed.", 74 : osName.c_str()); 75 3 : poFileLocked->AskToClose(); 76 : } 77 : } 78 3 : CPLSleep(0.01); 79 : } 80 : } 81 265 : } 82 : 83 0 : std::string type_name() const override 84 : { 85 0 : return "vsi" + m_osEnvVarPrefix; 86 : } 87 : 88 : using arrow::fs::FileSystem::Equals; 89 : 90 0 : bool Equals(const arrow::fs::FileSystem &other) const override 91 : { 92 0 : const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other); 93 0 : return poOther != nullptr && 94 0 : poOther->m_osEnvVarPrefix == m_osEnvVarPrefix && 95 0 : poOther->m_osQueryParameters == m_osQueryParameters; 96 : } 97 : 98 : using arrow::fs::FileSystem::GetFileInfo; 99 : 100 : arrow::Result<arrow::fs::FileInfo> 101 255 : GetFileInfo(const std::string &path) override 102 : { 103 255 : auto fileType = arrow::fs::FileType::Unknown; 104 : VSIStatBufL sStat; 105 255 : if (VSIStatL(path.c_str(), &sStat) == 0) 106 : { 107 255 : if (VSI_ISREG(sStat.st_mode)) 108 252 : fileType = arrow::fs::FileType::File; 109 3 : else if (VSI_ISDIR(sStat.st_mode)) 110 3 : fileType = arrow::fs::FileType::Directory; 111 : } 112 : else 113 : { 114 0 : fileType = arrow::fs::FileType::NotFound; 115 : } 116 510 : arrow::fs::FileInfo info(path, fileType); 117 255 : if (fileType == arrow::fs::FileType::File) 118 252 : info.set_size(sStat.st_size); 119 510 : return info; 120 : } 121 : 122 : arrow::Result<arrow::fs::FileInfoVector> 123 3 : GetFileInfo(const arrow::fs::FileSelector &select) override 124 : { 125 6 : arrow::fs::FileInfoVector res; 126 3 : VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(), 127 3 : select.recursive ? -1 : 0, nullptr); 128 3 : if (psDir == nullptr) 129 0 : return res; 130 : 131 3 : bool bParquetFound = false; 132 3 : const int nMaxNonParquetFiles = atoi( 133 : CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100")); 134 : const int nMaxListedFiles = 135 3 : atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000")); 136 10 : while (const auto psEntry = VSIGetNextDirEntry(psDir)) 137 : { 138 7 : if (!bParquetFound) 139 3 : bParquetFound = EQUAL( 140 : CPLGetExtensionSafe(psEntry->pszName).c_str(), "parquet"); 141 : 142 7 : std::string osFilename = select.base_dir + '/' + psEntry->pszName; 143 7 : int nMode = psEntry->nMode; 144 7 : if (!psEntry->bModeKnown) 145 : { 146 : VSIStatBufL sStat; 147 0 : if (VSIStatL(osFilename.c_str(), &sStat) == 0) 148 0 : nMode = sStat.st_mode; 149 : } 150 : 151 7 : auto fileType = arrow::fs::FileType::Unknown; 152 7 : if (VSI_ISREG(nMode)) 153 7 : fileType = arrow::fs::FileType::File; 154 0 : else if (VSI_ISDIR(nMode)) 155 0 : fileType = arrow::fs::FileType::Directory; 156 : 157 7 : arrow::fs::FileInfo info(std::move(osFilename), fileType); 158 7 : if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown) 159 : { 160 7 : info.set_size(psEntry->nSize); 161 : } 162 7 : res.push_back(std::move(info)); 163 : 164 7 : if (m_osEnvVarPrefix == "PARQUET") 165 : { 166 : // Avoid iterating over too many files if there's no likely parquet 167 : // files. 168 7 : if (static_cast<int>(res.size()) == nMaxNonParquetFiles && 169 0 : !bParquetFound) 170 0 : break; 171 7 : if (static_cast<int>(res.size()) == nMaxListedFiles) 172 0 : break; 173 : } 174 7 : } 175 3 : VSICloseDir(psDir); 176 3 : return res; 177 : } 178 : 179 0 : arrow::Status CreateDir(const std::string & /*path*/, 180 : bool /*recursive*/ = true) override 181 : { 182 0 : return arrow::Status::IOError("CreateDir() unimplemented"); 183 : } 184 : 185 0 : arrow::Status DeleteDir(const std::string & /*path*/) override 186 : { 187 0 : return arrow::Status::IOError("DeleteDir() unimplemented"); 188 : } 189 : 190 0 : arrow::Status DeleteDirContents(const std::string & /*path*/ 191 : #if ARROW_VERSION_MAJOR >= 8 192 : , 193 : bool /*missing_dir_ok*/ = false 194 : #endif 195 : ) override 196 : { 197 0 : return arrow::Status::IOError("DeleteDirContents() unimplemented"); 198 : } 199 : 200 0 : arrow::Status DeleteRootDirContents() override 201 : { 202 0 : return arrow::Status::IOError("DeleteRootDirContents() unimplemented"); 203 : } 204 : 205 0 : arrow::Status DeleteFile(const std::string & /*path*/) override 206 : { 207 0 : return arrow::Status::IOError("DeleteFile() unimplemented"); 208 : } 209 : 210 0 : arrow::Status Move(const std::string & /*src*/, 211 : const std::string & /*dest*/) override 212 : { 213 0 : return arrow::Status::IOError("Move() unimplemented"); 214 : } 215 : 216 0 : arrow::Status CopyFile(const std::string & /*src*/, 217 : const std::string & /*dest*/) override 218 : { 219 0 : return arrow::Status::IOError("CopyFile() unimplemented"); 220 : } 221 : 222 : using arrow::fs::FileSystem::OpenInputStream; 223 : 224 : arrow::Result<std::shared_ptr<arrow::io::InputStream>> 225 0 : OpenInputStream(const std::string &path) override 226 : { 227 0 : return OpenInputFile(path); 228 : } 229 : 230 : using arrow::fs::FileSystem::OpenInputFile; 231 : 232 : arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> 233 910 : OpenInputFile(const std::string &path) override 234 : { 235 910 : if (m_bAskedToClosed) 236 0 : return arrow::Status::IOError( 237 0 : "OpenInputFile(): file system in shutdown"); 238 : 239 1820 : std::string osPath(path); 240 910 : osPath += m_osQueryParameters; 241 910 : CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str()); 242 1820 : auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb")); 243 910 : if (fp == nullptr) 244 0 : return arrow::Status::IOError("OpenInputFile() failed for " + 245 0 : osPath); 246 : auto poFile = 247 1820 : std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp)); 248 : { 249 1820 : std::lock_guard oLock(m_oMutex); 250 910 : m_oSetFiles.emplace_back(path, poFile); 251 : } 252 910 : return poFile; 253 : } 254 : 255 : using arrow::fs::FileSystem::OpenOutputStream; 256 : 257 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 258 0 : OpenOutputStream(const std::string & /*path*/, 259 : const std::shared_ptr<const arrow::KeyValueMetadata> 260 : & /* metadata */) override 261 : { 262 0 : return arrow::Status::IOError("OpenOutputStream() unimplemented"); 263 : } 264 : 265 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 266 0 : OpenAppendStream(const std::string & /*path*/, 267 : const std::shared_ptr<const arrow::KeyValueMetadata> 268 : & /* metadata */) override 269 : { 270 0 : return arrow::Status::IOError("OpenAppendStream() unimplemented"); 271 : } 272 : }; 273 : 274 : #endif // VSIARROWFILESYSTEM_HPP_INCLUDED