Line data Source code
1 : /****************************************************************************** 2 : * 3 : * Project: Parquet Translator 4 : * Purpose: Implements OGRParquetDriver. 5 : * Author: Even Rouault, <even.rouault at spatialys.com> 6 : * 7 : ****************************************************************************** 8 : * Copyright (c) 2022, Planet Labs 9 : * 10 : * SPDX-License-Identifier: MIT 11 : ****************************************************************************/ 12 : 13 : #ifndef VSIARROWFILESYSTEM_HPP_INCLUDED 14 : #define VSIARROWFILESYSTEM_HPP_INCLUDED 15 : 16 : #include "arrow/util/config.h" 17 : 18 : #include "ograrrowrandomaccessfile.h" 19 : 20 : #include <atomic> 21 : #include <memory> 22 : #include <mutex> 23 : #include <vector> 24 : #include <utility> 25 : 26 : #if defined(__clang__) 27 : #pragma clang diagnostic push 28 : #pragma clang diagnostic ignored "-Wweak-vtables" 29 : #endif 30 : 31 : /************************************************************************/ 32 : /* VSIArrowFileSystem */ 33 : /************************************************************************/ 34 : 35 : class VSIArrowFileSystem final : public arrow::fs::FileSystem 36 : { 37 : const std::string m_osEnvVarPrefix; 38 : const std::string m_osQueryParameters; 39 : 40 : std::atomic<bool> m_bAskedToClosed = false; 41 : std::mutex m_oMutex{}; 42 : std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>> 43 : m_oSetFiles{}; 44 : 45 : public: 46 266 : VSIArrowFileSystem(const std::string &osEnvVarPrefix, 47 : const std::string &osQueryParameters) 48 266 : : m_osEnvVarPrefix(osEnvVarPrefix), 49 266 : m_osQueryParameters(osQueryParameters) 50 : { 51 266 : } 52 : 53 : // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale 54 : // for this method 55 265 : void AskToClose() 56 : { 57 265 : m_bAskedToClosed = true; 58 : std::vector< 59 : std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>> 60 530 : oSetFiles; 61 : { 62 530 : std::lock_guard oLock(m_oMutex); 63 265 : oSetFiles = m_oSetFiles; 64 : } 65 1174 : for (auto &[osName, poFile] : oSetFiles) 66 : { 67 909 : bool bWarned = false; 68 943 : while (!poFile.expired()) 69 : { 70 34 : if (!bWarned) 71 : { 72 34 : bWarned = true; 73 68 : auto poFileLocked = poFile.lock(); 74 34 : if (poFileLocked) 75 : { 76 34 : CPLDebug("PARQUET", 77 : "Still on-going reads on %s. Waiting for it " 78 : "to be closed.", 79 : osName.c_str()); 80 34 : poFileLocked->AskToClose(); 81 : } 82 : } 83 34 : CPLSleep(0.01); 84 : } 85 : } 86 265 : } 87 : 88 0 : std::string type_name() const override 89 : { 90 0 : return "vsi" + m_osEnvVarPrefix; 91 : } 92 : 93 : using arrow::fs::FileSystem::Equals; 94 : 95 0 : bool Equals(const arrow::fs::FileSystem &other) const override 96 : { 97 0 : const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other); 98 0 : return poOther != nullptr && 99 0 : poOther->m_osEnvVarPrefix == m_osEnvVarPrefix && 100 0 : poOther->m_osQueryParameters == m_osQueryParameters; 101 : } 102 : 103 : using arrow::fs::FileSystem::GetFileInfo; 104 : 105 : arrow::Result<arrow::fs::FileInfo> 106 255 : GetFileInfo(const std::string &path) override 107 : { 108 255 : auto fileType = arrow::fs::FileType::Unknown; 109 : VSIStatBufL sStat; 110 255 : if (VSIStatL(path.c_str(), &sStat) == 0) 111 : { 112 255 : if (VSI_ISREG(sStat.st_mode)) 113 252 : fileType = arrow::fs::FileType::File; 114 3 : else if (VSI_ISDIR(sStat.st_mode)) 115 3 : fileType = arrow::fs::FileType::Directory; 116 : } 117 : else 118 : { 119 0 : fileType = arrow::fs::FileType::NotFound; 120 : } 121 510 : arrow::fs::FileInfo info(path, fileType); 122 255 : if (fileType == arrow::fs::FileType::File) 123 252 : info.set_size(sStat.st_size); 124 510 : return info; 125 : } 126 : 127 : arrow::Result<arrow::fs::FileInfoVector> 128 3 : GetFileInfo(const arrow::fs::FileSelector &select) override 129 : { 130 6 : arrow::fs::FileInfoVector res; 131 3 : VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(), 132 3 : select.recursive ? -1 : 0, nullptr); 133 3 : if (psDir == nullptr) 134 0 : return res; 135 : 136 3 : bool bParquetFound = false; 137 3 : const int nMaxNonParquetFiles = atoi( 138 : CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100")); 139 : const int nMaxListedFiles = 140 3 : atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000")); 141 10 : while (const auto psEntry = VSIGetNextDirEntry(psDir)) 142 : { 143 7 : if (!bParquetFound) 144 3 : bParquetFound = EQUAL( 145 : CPLGetExtensionSafe(psEntry->pszName).c_str(), "parquet"); 146 : 147 7 : std::string osFilename = select.base_dir + '/' + psEntry->pszName; 148 7 : int nMode = psEntry->nMode; 149 7 : if (!psEntry->bModeKnown) 150 : { 151 : VSIStatBufL sStat; 152 0 : if (VSIStatL(osFilename.c_str(), &sStat) == 0) 153 0 : nMode = sStat.st_mode; 154 : } 155 : 156 7 : auto fileType = arrow::fs::FileType::Unknown; 157 7 : if (VSI_ISREG(nMode)) 158 7 : fileType = arrow::fs::FileType::File; 159 0 : else if (VSI_ISDIR(nMode)) 160 0 : fileType = arrow::fs::FileType::Directory; 161 : 162 7 : arrow::fs::FileInfo info(std::move(osFilename), fileType); 163 7 : if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown) 164 : { 165 7 : info.set_size(psEntry->nSize); 166 : } 167 7 : res.push_back(std::move(info)); 168 : 169 7 : if (m_osEnvVarPrefix == "PARQUET") 170 : { 171 : // Avoid iterating over too many files if there's no likely parquet 172 : // files. 173 7 : if (static_cast<int>(res.size()) == nMaxNonParquetFiles && 174 0 : !bParquetFound) 175 0 : break; 176 7 : if (static_cast<int>(res.size()) == nMaxListedFiles) 177 0 : break; 178 : } 179 7 : } 180 3 : VSICloseDir(psDir); 181 3 : return res; 182 : } 183 : 184 0 : arrow::Status CreateDir(const std::string & /*path*/, 185 : bool /*recursive*/ = true) override 186 : { 187 0 : return arrow::Status::IOError("CreateDir() unimplemented"); 188 : } 189 : 190 0 : arrow::Status DeleteDir(const std::string & /*path*/) override 191 : { 192 0 : return arrow::Status::IOError("DeleteDir() unimplemented"); 193 : } 194 : 195 0 : arrow::Status DeleteDirContents(const std::string & /*path*/ 196 : #if ARROW_VERSION_MAJOR >= 8 197 : , 198 : bool /*missing_dir_ok*/ = false 199 : #endif 200 : ) override 201 : { 202 0 : return arrow::Status::IOError("DeleteDirContents() unimplemented"); 203 : } 204 : 205 0 : arrow::Status DeleteRootDirContents() override 206 : { 207 0 : return arrow::Status::IOError("DeleteRootDirContents() unimplemented"); 208 : } 209 : 210 0 : arrow::Status DeleteFile(const std::string & /*path*/) override 211 : { 212 0 : return arrow::Status::IOError("DeleteFile() unimplemented"); 213 : } 214 : 215 0 : arrow::Status Move(const std::string & /*src*/, 216 : const std::string & /*dest*/) override 217 : { 218 0 : return arrow::Status::IOError("Move() unimplemented"); 219 : } 220 : 221 0 : arrow::Status CopyFile(const std::string & /*src*/, 222 : const std::string & /*dest*/) override 223 : { 224 0 : return arrow::Status::IOError("CopyFile() unimplemented"); 225 : } 226 : 227 : using arrow::fs::FileSystem::OpenInputStream; 228 : 229 : arrow::Result<std::shared_ptr<arrow::io::InputStream>> 230 0 : OpenInputStream(const std::string &path) override 231 : { 232 0 : return OpenInputFile(path); 233 : } 234 : 235 : using arrow::fs::FileSystem::OpenInputFile; 236 : 237 : arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> 238 910 : OpenInputFile(const std::string &path) override 239 : { 240 910 : if (m_bAskedToClosed) 241 0 : return arrow::Status::IOError( 242 0 : "OpenInputFile(): file system in shutdown"); 243 : 244 1820 : std::string osPath(path); 245 910 : osPath += m_osQueryParameters; 246 910 : CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str()); 247 1820 : auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb")); 248 910 : if (fp == nullptr) 249 0 : return arrow::Status::IOError("OpenInputFile() failed for " + 250 0 : osPath); 251 : auto poFile = 252 1820 : std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp)); 253 : { 254 1820 : std::lock_guard oLock(m_oMutex); 255 910 : m_oSetFiles.emplace_back(path, poFile); 256 : } 257 910 : return poFile; 258 : } 259 : 260 : using arrow::fs::FileSystem::OpenOutputStream; 261 : 262 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 263 0 : OpenOutputStream(const std::string & /*path*/, 264 : const std::shared_ptr<const arrow::KeyValueMetadata> 265 : & /* metadata */) override 266 : { 267 0 : return arrow::Status::IOError("OpenOutputStream() unimplemented"); 268 : } 269 : 270 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 271 0 : OpenAppendStream(const std::string & /*path*/, 272 : const std::shared_ptr<const arrow::KeyValueMetadata> 273 : & /* metadata */) override 274 : { 275 0 : return arrow::Status::IOError("OpenAppendStream() unimplemented"); 276 : } 277 : }; 278 : 279 : #if defined(__clang__) 280 : #pragma clang diagnostic pop 281 : #endif 282 : 283 : #endif // VSIARROWFILESYSTEM_HPP_INCLUDED