Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * Permission is hereby granted, free of charge, to any person obtaining a
11 : * copy of this software and associated documentation files (the "Software"),
12 : * to deal in the Software without restriction, including without limitation
13 : * the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 : * and/or sell copies of the Software, and to permit persons to whom the
15 : * Software is furnished to do so, subject to the following conditions:
16 : *
17 : * The above copyright notice and this permission notice shall be included
18 : * in all copies or substantial portions of the Software.
19 : *
20 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 : * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 : * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25 : * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26 : * DEALINGS IN THE SOFTWARE.
27 : ****************************************************************************/
28 :
29 : #include "gdal_pam.h"
30 : #include "ogrsf_frmts.h"
31 :
32 : #include <algorithm>
33 : #include <map>
34 :
35 : #include "ogr_parquet.h"
36 : #include "ogrparquetdrivercore.h"
37 :
38 : #include "../arrow_common/ograrrowrandomaccessfile.h"
39 : #include "../arrow_common/ograrrowwritablefile.h"
40 : #include "../arrow_common/ograrrowdataset.hpp"
41 : #include "../arrow_common/ograrrowlayer.hpp" // for the destructor
42 :
43 : #ifdef GDAL_USE_ARROWDATASET
44 :
45 : /************************************************************************/
46 : /* VSIArrowFileSystem */
47 : /************************************************************************/
48 :
49 : class VSIArrowFileSystem final : public arrow::fs::FileSystem
50 : {
51 : const std::string m_osQueryParameters;
52 :
53 : public:
54 13 : explicit VSIArrowFileSystem(const std::string &osQueryParameters)
55 13 : : m_osQueryParameters(osQueryParameters)
56 : {
57 13 : }
58 :
59 0 : std::string type_name() const override
60 : {
61 0 : return "vsi";
62 : }
63 :
64 : using arrow::fs::FileSystem::Equals;
65 :
66 0 : bool Equals(const arrow::fs::FileSystem &other) const override
67 : {
68 0 : const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other);
69 0 : return poOther != nullptr &&
70 0 : poOther->m_osQueryParameters == m_osQueryParameters;
71 : }
72 :
73 : using arrow::fs::FileSystem::GetFileInfo;
74 :
75 : arrow::Result<arrow::fs::FileInfo>
76 0 : GetFileInfo(const std::string &path) override
77 : {
78 0 : auto fileType = arrow::fs::FileType::Unknown;
79 : VSIStatBufL sStat;
80 0 : if (VSIStatL(path.c_str(), &sStat) == 0)
81 : {
82 0 : if (VSI_ISREG(sStat.st_mode))
83 0 : fileType = arrow::fs::FileType::File;
84 0 : else if (VSI_ISDIR(sStat.st_mode))
85 0 : fileType = arrow::fs::FileType::Directory;
86 : }
87 : else
88 : {
89 0 : fileType = arrow::fs::FileType::NotFound;
90 : }
91 0 : arrow::fs::FileInfo info(path, fileType);
92 0 : if (fileType == arrow::fs::FileType::File)
93 0 : info.set_size(sStat.st_size);
94 0 : return info;
95 : }
96 :
97 : arrow::Result<arrow::fs::FileInfoVector>
98 2 : GetFileInfo(const arrow::fs::FileSelector &select) override
99 : {
100 4 : arrow::fs::FileInfoVector res;
101 2 : VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(),
102 2 : select.recursive ? -1 : 0, nullptr);
103 2 : if (psDir == nullptr)
104 0 : return res;
105 :
106 2 : bool bParquetFound = false;
107 2 : const int nMaxNonParquetFiles = atoi(
108 : CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100"));
109 : const int nMaxListedFiles =
110 2 : atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000"));
111 6 : while (const auto psEntry = VSIGetNextDirEntry(psDir))
112 : {
113 4 : if (!bParquetFound)
114 2 : bParquetFound =
115 2 : EQUAL(CPLGetExtension(psEntry->pszName), "parquet");
116 :
117 : const std::string osFilename =
118 4 : select.base_dir + '/' + psEntry->pszName;
119 4 : int nMode = psEntry->nMode;
120 4 : if (!psEntry->bModeKnown)
121 : {
122 : VSIStatBufL sStat;
123 0 : if (VSIStatL(osFilename.c_str(), &sStat) == 0)
124 0 : nMode = sStat.st_mode;
125 : }
126 :
127 4 : auto fileType = arrow::fs::FileType::Unknown;
128 4 : if (VSI_ISREG(nMode))
129 4 : fileType = arrow::fs::FileType::File;
130 0 : else if (VSI_ISDIR(nMode))
131 0 : fileType = arrow::fs::FileType::Directory;
132 :
133 4 : arrow::fs::FileInfo info(osFilename, fileType);
134 4 : if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown)
135 : {
136 4 : info.set_size(psEntry->nSize);
137 : }
138 4 : res.push_back(info);
139 :
140 : // Avoid iterating over too many files if there's no likely parquet
141 : // files.
142 4 : if (static_cast<int>(res.size()) == nMaxNonParquetFiles &&
143 0 : !bParquetFound)
144 0 : break;
145 4 : if (static_cast<int>(res.size()) == nMaxListedFiles)
146 0 : break;
147 4 : }
148 2 : VSICloseDir(psDir);
149 2 : return res;
150 : }
151 :
152 0 : arrow::Status CreateDir(const std::string & /*path*/,
153 : bool /*recursive*/ = true) override
154 : {
155 0 : return arrow::Status::IOError("CreateDir() unimplemented");
156 : }
157 :
158 0 : arrow::Status DeleteDir(const std::string & /*path*/) override
159 : {
160 0 : return arrow::Status::IOError("DeleteDir() unimplemented");
161 : }
162 :
163 0 : arrow::Status DeleteDirContents(const std::string & /*path*/
164 : #if ARROW_VERSION_MAJOR >= 8
165 : ,
166 : bool /*missing_dir_ok*/ = false
167 : #endif
168 : ) override
169 : {
170 0 : return arrow::Status::IOError("DeleteDirContents() unimplemented");
171 : }
172 :
173 0 : arrow::Status DeleteRootDirContents() override
174 : {
175 0 : return arrow::Status::IOError("DeleteRootDirContents() unimplemented");
176 : }
177 :
178 0 : arrow::Status DeleteFile(const std::string & /*path*/) override
179 : {
180 0 : return arrow::Status::IOError("DeleteFile() unimplemented");
181 : }
182 :
183 0 : arrow::Status Move(const std::string & /*src*/,
184 : const std::string & /*dest*/) override
185 : {
186 0 : return arrow::Status::IOError("Move() unimplemented");
187 : }
188 :
189 0 : arrow::Status CopyFile(const std::string & /*src*/,
190 : const std::string & /*dest*/) override
191 : {
192 0 : return arrow::Status::IOError("CopyFile() unimplemented");
193 : }
194 :
195 : using arrow::fs::FileSystem::OpenInputStream;
196 :
197 : arrow::Result<std::shared_ptr<arrow::io::InputStream>>
198 0 : OpenInputStream(const std::string &path) override
199 : {
200 0 : return OpenInputFile(path);
201 : }
202 :
203 : using arrow::fs::FileSystem::OpenInputFile;
204 :
205 : arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>>
206 117 : OpenInputFile(const std::string &path) override
207 : {
208 234 : std::string osPath(path);
209 117 : osPath += m_osQueryParameters;
210 117 : CPLDebugOnly("PARQUET", "Opening %s", osPath.c_str());
211 234 : auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb"));
212 117 : if (fp == nullptr)
213 0 : return arrow::Status::IOError("OpenInputFile() failed for " +
214 0 : osPath);
215 117 : return std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
216 : }
217 :
218 : using arrow::fs::FileSystem::OpenOutputStream;
219 :
220 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
221 0 : OpenOutputStream(const std::string & /*path*/,
222 : const std::shared_ptr<const arrow::KeyValueMetadata>
223 : & /* metadata */) override
224 : {
225 0 : return arrow::Status::IOError("OpenOutputStream() unimplemented");
226 : }
227 :
228 : arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
229 0 : OpenAppendStream(const std::string & /*path*/,
230 : const std::shared_ptr<const arrow::KeyValueMetadata>
231 : & /* metadata */) override
232 : {
233 0 : return arrow::Status::IOError("OpenAppendStream() unimplemented");
234 : }
235 : };
236 :
237 : /************************************************************************/
238 : /* OpenFromDatasetFactory() */
239 : /************************************************************************/
240 :
241 21 : static GDALDataset *OpenFromDatasetFactory(
242 : const std::string &osBasePath,
243 : const std::shared_ptr<arrow::dataset::DatasetFactory> &factory,
244 : CSLConstList papszOpenOptions)
245 : {
246 21 : std::shared_ptr<arrow::dataset::Dataset> dataset;
247 42 : PARQUET_ASSIGN_OR_THROW(dataset, factory->Finish());
248 :
249 21 : std::shared_ptr<arrow::dataset::ScannerBuilder> scannerBuilder;
250 42 : PARQUET_ASSIGN_OR_THROW(scannerBuilder, dataset->NewScan());
251 :
252 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
253 42 : arrow::MemoryPool::CreateDefault().release());
254 :
255 : // We cannot use the above shared memory pool. Otherwise we get random
256 : // crashes in multi-threaded arrow code (apparently some cleanup code),
257 : // that may used the memory pool after it has been destroyed.
258 : // PARQUET_THROW_NOT_OK(scannerBuilder->Pool(poMemoryPool.get()));
259 :
260 21 : const bool bIsVSI = STARTS_WITH(osBasePath.c_str(), "/vsi");
261 21 : if (bIsVSI)
262 : {
263 : const int nFragmentReadAhead =
264 1 : atoi(CPLGetConfigOption("OGR_PARQUET_FRAGMENT_READ_AHEAD", "2"));
265 2 : PARQUET_THROW_NOT_OK(
266 : scannerBuilder->FragmentReadahead(nFragmentReadAhead));
267 :
268 : const char *pszBatchSize =
269 1 : CPLGetConfigOption("OGR_PARQUET_BATCH_SIZE", nullptr);
270 1 : if (pszBatchSize)
271 : {
272 0 : PARQUET_THROW_NOT_OK(
273 : scannerBuilder->BatchSize(CPLAtoGIntBig(pszBatchSize)));
274 : }
275 :
276 : const char *pszUseThreads =
277 1 : CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr);
278 1 : if (pszUseThreads)
279 : {
280 0 : PARQUET_THROW_NOT_OK(
281 : scannerBuilder->UseThreads(CPLTestBool(pszUseThreads)));
282 : }
283 :
284 : const char *pszNumThreads =
285 1 : CPLGetConfigOption("GDAL_NUM_THREADS", nullptr);
286 1 : int nNumThreads = 0;
287 1 : if (pszNumThreads == nullptr)
288 1 : nNumThreads = std::min(4, CPLGetNumCPUs());
289 : else
290 0 : nNumThreads = EQUAL(pszNumThreads, "ALL_CPUS")
291 0 : ? CPLGetNumCPUs()
292 0 : : atoi(pszNumThreads);
293 1 : if (nNumThreads > 1)
294 : {
295 1 : CPL_IGNORE_RET_VAL(arrow::SetCpuThreadPoolCapacity(nNumThreads));
296 : }
297 :
298 : #if PARQUET_VERSION_MAJOR >= 10
299 : const char *pszBatchReadAhead =
300 1 : CPLGetConfigOption("OGR_PARQUET_BATCH_READ_AHEAD", nullptr);
301 1 : if (pszBatchReadAhead)
302 : {
303 0 : PARQUET_THROW_NOT_OK(
304 : scannerBuilder->BatchReadahead(atoi(pszBatchReadAhead)));
305 : }
306 : #endif
307 : }
308 :
309 21 : std::shared_ptr<arrow::dataset::Scanner> scanner;
310 42 : PARQUET_ASSIGN_OR_THROW(scanner, scannerBuilder->Finish());
311 :
312 42 : auto poDS = std::make_unique<OGRParquetDataset>(poMemoryPool);
313 : auto poLayer = std::make_unique<OGRParquetDatasetLayer>(
314 21 : poDS.get(), CPLGetBasename(osBasePath.c_str()), scanner,
315 63 : scannerBuilder->schema(), papszOpenOptions);
316 21 : poDS->SetLayer(std::move(poLayer));
317 42 : return poDS.release();
318 : }
319 :
320 : /************************************************************************/
321 : /* GetFileSystem() */
322 : /************************************************************************/
323 :
324 : static std::shared_ptr<arrow::fs::FileSystem>
325 21 : GetFileSystem(std::string &osBasePathInOut,
326 : const std::string &osQueryParameters)
327 : {
328 : // Instantiate file system:
329 : // - VSIArrowFileSystem implementation for /vsi files
330 : // - base implementation for local files (if OGR_PARQUET_USE_VSI set to NO)
331 21 : std::shared_ptr<arrow::fs::FileSystem> fs;
332 21 : const bool bIsVSI = STARTS_WITH(osBasePathInOut.c_str(), "/vsi");
333 21 : if (bIsVSI || CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES")))
334 : {
335 13 : fs = std::make_shared<VSIArrowFileSystem>(osQueryParameters);
336 : }
337 : else
338 : {
339 : // FileSystemFromUriOrPath() doesn't like relative paths
340 : // so transform them to absolute.
341 8 : std::string osPath(osBasePathInOut);
342 8 : if (CPLIsFilenameRelative(osPath.c_str()))
343 : {
344 8 : char *pszCurDir = CPLGetCurrentDir();
345 8 : if (pszCurDir == nullptr)
346 0 : return nullptr;
347 8 : osPath = CPLFormFilename(pszCurDir, osPath.c_str(), nullptr);
348 8 : CPLFree(pszCurDir);
349 : }
350 8 : PARQUET_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(osPath));
351 : }
352 21 : return fs;
353 : }
354 :
355 : /************************************************************************/
356 : /* OpenParquetDatasetWithMetadata() */
357 : /************************************************************************/
358 :
359 18 : static GDALDataset *OpenParquetDatasetWithMetadata(
360 : const std::string &osBasePathIn, const char *pszMetadataFile,
361 : const std::string &osQueryParameters, CSLConstList papszOpenOptions)
362 : {
363 36 : std::string osBasePath(osBasePathIn);
364 36 : auto fs = GetFileSystem(osBasePath, osQueryParameters);
365 :
366 36 : arrow::dataset::ParquetFactoryOptions options;
367 36 : auto partitioningFactory = arrow::dataset::HivePartitioning::MakeFactory();
368 : options.partitioning =
369 18 : arrow::dataset::PartitioningOrFactory(std::move(partitioningFactory));
370 :
371 18 : std::shared_ptr<arrow::dataset::DatasetFactory> factory;
372 54 : PARQUET_ASSIGN_OR_THROW(
373 : factory, arrow::dataset::ParquetDatasetFactory::Make(
374 : osBasePath + '/' + pszMetadataFile, std::move(fs),
375 : std::make_shared<arrow::dataset::ParquetFileFormat>(),
376 : std::move(options)));
377 :
378 36 : return OpenFromDatasetFactory(osBasePath, factory, papszOpenOptions);
379 : }
380 :
381 : /************************************************************************/
382 : /* OpenParquetDatasetWithoutMetadata() */
383 : /************************************************************************/
384 :
385 : static GDALDataset *
386 3 : OpenParquetDatasetWithoutMetadata(const std::string &osBasePathIn,
387 : const std::string &osQueryParameters,
388 : CSLConstList papszOpenOptions)
389 : {
390 6 : std::string osBasePath(osBasePathIn);
391 6 : auto fs = GetFileSystem(osBasePath, osQueryParameters);
392 :
393 6 : arrow::dataset::FileSystemFactoryOptions options;
394 3 : std::shared_ptr<arrow::dataset::DatasetFactory> factory;
395 : VSIStatBufL sStat;
396 3 : if (VSIStatL(osBasePath.c_str(), &sStat) == 0 && VSI_ISREG(sStat.st_mode))
397 : {
398 4 : PARQUET_ASSIGN_OR_THROW(
399 : factory, arrow::dataset::FileSystemDatasetFactory::Make(
400 : std::move(fs), {osBasePath},
401 : std::make_shared<arrow::dataset::ParquetFileFormat>(),
402 : std::move(options)));
403 : }
404 : else
405 : {
406 : auto partitioningFactory =
407 4 : arrow::dataset::HivePartitioning::MakeFactory();
408 4 : options.partitioning = arrow::dataset::PartitioningOrFactory(
409 4 : std::move(partitioningFactory));
410 :
411 4 : arrow::fs::FileSelector selector;
412 2 : selector.base_dir = osBasePath;
413 2 : selector.recursive = true;
414 :
415 4 : PARQUET_ASSIGN_OR_THROW(
416 : factory, arrow::dataset::FileSystemDatasetFactory::Make(
417 : std::move(fs), std::move(selector),
418 : std::make_shared<arrow::dataset::ParquetFileFormat>(),
419 : std::move(options)));
420 : }
421 :
422 6 : return OpenFromDatasetFactory(osBasePath, factory, papszOpenOptions);
423 : }
424 :
425 : #endif
426 :
427 : /************************************************************************/
428 : /* BuildMemDatasetWithRowGroupExtents() */
429 : /************************************************************************/
430 :
431 : /** Builds a Memory dataset that contains, for each row-group of the input file,
432 : * the feature count and spatial extent of the features of this row group,
433 : * using Parquet statistics. This assumes that the Parquet file declares
434 : * a "covering":{"bbox":{ ... }} metadata item.
435 : *
436 : * Only for debug purposes.
437 : */
438 1 : static GDALDataset *BuildMemDatasetWithRowGroupExtents(OGRParquetLayer *poLayer)
439 : {
440 1 : int iParquetXMin = -1;
441 1 : int iParquetYMin = -1;
442 1 : int iParquetXMax = -1;
443 1 : int iParquetYMax = -1;
444 1 : if (poLayer->GeomColsBBOXParquet(0, iParquetXMin, iParquetYMin,
445 : iParquetXMax, iParquetYMax))
446 : {
447 1 : auto poMemDrv = GetGDALDriverManager()->GetDriverByName("Memory");
448 1 : if (!poMemDrv)
449 0 : return nullptr;
450 : auto poMemDS = std::unique_ptr<GDALDataset>(
451 2 : poMemDrv->Create("", 0, 0, 0, GDT_Unknown, nullptr));
452 1 : if (!poMemDS)
453 0 : return nullptr;
454 1 : OGRSpatialReference *poTmpSRS = nullptr;
455 1 : const auto poSrcSRS = poLayer->GetSpatialRef();
456 1 : if (poSrcSRS)
457 0 : poTmpSRS = poSrcSRS->Clone();
458 : auto poMemLayer =
459 1 : poMemDS->CreateLayer("footprint", poTmpSRS, wkbPolygon, nullptr);
460 1 : if (poTmpSRS)
461 0 : poTmpSRS->Release();
462 1 : if (!poMemLayer)
463 0 : return nullptr;
464 1 : poMemLayer->CreateField(
465 1 : std::make_unique<OGRFieldDefn>("feature_count", OFTInteger64)
466 1 : .get());
467 :
468 : const auto metadata =
469 2 : poLayer->GetReader()->parquet_reader()->metadata();
470 1 : const int numRowGroups = metadata->num_row_groups();
471 15 : for (int iRowGroup = 0; iRowGroup < numRowGroups; ++iRowGroup)
472 : {
473 28 : std::string osMinTmp, osMaxTmp;
474 : OGRField unusedF;
475 : bool unusedB;
476 : OGRFieldSubType unusedSubType;
477 :
478 : OGRField sXMin;
479 14 : OGR_RawField_SetNull(&sXMin);
480 14 : bool bFoundXMin = false;
481 14 : OGRFieldType eXMinType = OFTMaxType;
482 :
483 : OGRField sYMin;
484 14 : OGR_RawField_SetNull(&sYMin);
485 14 : bool bFoundYMin = false;
486 14 : OGRFieldType eYMinType = OFTMaxType;
487 :
488 : OGRField sXMax;
489 14 : OGR_RawField_SetNull(&sXMax);
490 14 : bool bFoundXMax = false;
491 14 : OGRFieldType eXMaxType = OFTMaxType;
492 :
493 : OGRField sYMax;
494 14 : OGR_RawField_SetNull(&sYMax);
495 14 : bool bFoundYMax = false;
496 14 : OGRFieldType eYMaxType = OFTMaxType;
497 :
498 14 : if (poLayer->GetMinMaxForParquetCol(
499 : iRowGroup, iParquetXMin, nullptr,
500 : /* bComputeMin = */ true, sXMin, bFoundXMin,
501 : /* bComputeMax = */ false, unusedF, unusedB, eXMinType,
502 8 : unusedSubType, osMinTmp, osMaxTmp) &&
503 8 : bFoundXMin && eXMinType == OFTReal &&
504 22 : poLayer->GetMinMaxForParquetCol(
505 : iRowGroup, iParquetYMin, nullptr,
506 : /* bComputeMin = */ true, sYMin, bFoundYMin,
507 : /* bComputeMax = */ false, unusedF, unusedB, eYMinType,
508 8 : unusedSubType, osMinTmp, osMaxTmp) &&
509 8 : bFoundYMin && eYMinType == OFTReal &&
510 22 : poLayer->GetMinMaxForParquetCol(
511 : iRowGroup, iParquetXMax, nullptr,
512 : /* bComputeMin = */ false, unusedF, unusedB,
513 : /* bComputeMax = */ true, sXMax, bFoundXMax, eXMaxType,
514 8 : unusedSubType, osMaxTmp, osMaxTmp) &&
515 8 : bFoundXMax && eXMaxType == OFTReal &&
516 22 : poLayer->GetMinMaxForParquetCol(
517 : iRowGroup, iParquetYMax, nullptr,
518 : /* bComputeMin = */ false, unusedF, unusedB,
519 : /* bComputeMax = */ true, sYMax, bFoundYMax, eYMaxType,
520 8 : unusedSubType, osMaxTmp, osMaxTmp) &&
521 22 : bFoundYMax && eYMaxType == OFTReal)
522 : {
523 16 : OGRFeature oFeat(poMemLayer->GetLayerDefn());
524 8 : oFeat.SetField(0,
525 : static_cast<GIntBig>(
526 8 : metadata->RowGroup(iRowGroup)->num_rows()));
527 16 : auto poPoly = std::make_unique<OGRPolygon>();
528 8 : auto poLR = std::make_unique<OGRLinearRing>();
529 8 : poLR->addPoint(sXMin.Real, sYMin.Real);
530 8 : poLR->addPoint(sXMin.Real, sYMax.Real);
531 8 : poLR->addPoint(sXMax.Real, sYMax.Real);
532 8 : poLR->addPoint(sXMax.Real, sYMin.Real);
533 8 : poLR->addPoint(sXMin.Real, sYMin.Real);
534 8 : poPoly->addRingDirectly(poLR.release());
535 8 : oFeat.SetGeometryDirectly(poPoly.release());
536 8 : CPL_IGNORE_RET_VAL(poMemLayer->CreateFeature(&oFeat));
537 : }
538 : }
539 :
540 1 : return poMemDS.release();
541 : }
542 0 : return nullptr;
543 : }
544 :
545 : /************************************************************************/
546 : /* Open() */
547 : /************************************************************************/
548 :
549 1114 : static GDALDataset *OGRParquetDriverOpen(GDALOpenInfo *poOpenInfo)
550 : {
551 1114 : if (poOpenInfo->eAccess == GA_Update)
552 59 : return nullptr;
553 :
554 : #ifdef GDAL_USE_ARROWDATASET
555 2110 : std::string osBasePath(poOpenInfo->pszFilename);
556 2110 : std::string osQueryParameters;
557 : const bool bStartedWithParquetPrefix =
558 1055 : STARTS_WITH(osBasePath.c_str(), "PARQUET:");
559 :
560 1055 : if (bStartedWithParquetPrefix)
561 : {
562 9 : osBasePath = osBasePath.substr(strlen("PARQUET:"));
563 : }
564 :
565 : // Little trick to allow using syntax of
566 : // https://github.com/opengeospatial/geoparquet/discussions/101
567 : // ogrinfo
568 : // "/vsicurl/https://ai4edataeuwest.blob.core.windows.net/us-census/2020/cb_2020_us_vtd_500k.parquet?${SAS_TOKEN}"
569 1055 : if (STARTS_WITH(osBasePath.c_str(), "/vsicurl/"))
570 : {
571 0 : const auto nPos = osBasePath.find(".parquet?st=");
572 0 : if (nPos != std::string::npos)
573 : {
574 0 : osQueryParameters = osBasePath.substr(nPos + strlen(".parquet"));
575 0 : osBasePath.resize(nPos + strlen(".parquet"));
576 : }
577 : }
578 :
579 1716 : if (bStartedWithParquetPrefix || poOpenInfo->bIsDirectory ||
580 661 : !osQueryParameters.empty())
581 : {
582 : VSIStatBufL sStat;
583 394 : if (!osBasePath.empty() && osBasePath.back() == '/')
584 0 : osBasePath.resize(osBasePath.size() - 1);
585 : std::string osMetadataPath =
586 394 : CPLFormFilename(osBasePath.c_str(), "_metadata", nullptr);
587 394 : if (CPLTestBool(
588 1182 : CPLGetConfigOption("OGR_PARQUET_USE_METADATA_FILE", "YES")) &&
589 788 : VSIStatL((osMetadataPath + osQueryParameters).c_str(), &sStat) == 0)
590 : {
591 : // If there's a _metadata file, then use it to avoid listing files
592 : try
593 : {
594 36 : return OpenParquetDatasetWithMetadata(
595 : osBasePath, "_metadata", osQueryParameters,
596 18 : poOpenInfo->papszOpenOptions);
597 : }
598 0 : catch (const std::exception &e)
599 : {
600 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
601 0 : e.what());
602 : }
603 0 : return nullptr;
604 : }
605 : else
606 : {
607 376 : bool bLikelyParquetDataset = false;
608 376 : if (poOpenInfo->bIsDirectory)
609 : {
610 : // Detect if the directory contains .parquet files, or
611 : // subdirectories with a name of the form "key=value", typical
612 : // of HIVE partitioning.
613 750 : const CPLStringList aosFiles(VSIReadDir(osBasePath.c_str()));
614 21273 : for (const char *pszFilename : cpl::Iterate(aosFiles))
615 : {
616 20900 : if (EQUAL(CPLGetExtension(pszFilename), "parquet"))
617 : {
618 2 : bLikelyParquetDataset = true;
619 2 : break;
620 : }
621 20898 : else if (strchr(pszFilename, '='))
622 : {
623 : // HIVE partitioning
624 0 : if (VSIStatL(CPLFormFilename(osBasePath.c_str(),
625 : pszFilename, nullptr),
626 0 : &sStat) == 0 &&
627 0 : VSI_ISDIR(sStat.st_mode))
628 : {
629 0 : bLikelyParquetDataset = true;
630 0 : break;
631 : }
632 : }
633 : }
634 : }
635 :
636 376 : if (bStartedWithParquetPrefix || bLikelyParquetDataset)
637 : {
638 : try
639 : {
640 6 : return OpenParquetDatasetWithoutMetadata(
641 : osBasePath, osQueryParameters,
642 3 : poOpenInfo->papszOpenOptions);
643 : }
644 0 : catch (const std::exception &e)
645 : {
646 : // If we aren't quite sure that the passed file name is
647 : // a directory, then silently continue
648 0 : if (poOpenInfo->bIsDirectory)
649 : {
650 0 : CPLError(CE_Failure, CPLE_AppDefined,
651 0 : "Parquet exception: %s", e.what());
652 0 : return nullptr;
653 : }
654 : }
655 : }
656 : }
657 : }
658 : #endif
659 :
660 1034 : if (!OGRParquetDriverIdentify(poOpenInfo))
661 : {
662 0 : return nullptr;
663 : }
664 :
665 1034 : if (poOpenInfo->bIsDirectory)
666 373 : return nullptr;
667 :
668 1322 : std::string osFilename(poOpenInfo->pszFilename);
669 661 : if (STARTS_WITH(poOpenInfo->pszFilename, "PARQUET:"))
670 : {
671 0 : osFilename = poOpenInfo->pszFilename + strlen("PARQUET:");
672 : }
673 :
674 : try
675 : {
676 661 : std::shared_ptr<arrow::io::RandomAccessFile> infile;
677 971 : if (STARTS_WITH(osFilename.c_str(), "/vsi") ||
678 310 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "NO")))
679 : {
680 351 : VSIVirtualHandleUniquePtr fp(poOpenInfo->fpL);
681 351 : poOpenInfo->fpL = nullptr;
682 351 : if (fp == nullptr)
683 : {
684 0 : fp.reset(VSIFOpenL(osFilename.c_str(), "rb"));
685 0 : if (fp == nullptr)
686 0 : return nullptr;
687 : }
688 351 : infile = std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
689 : }
690 : else
691 : {
692 310 : PARQUET_ASSIGN_OR_THROW(infile,
693 : arrow::io::ReadableFile::Open(osFilename));
694 : }
695 :
696 : // Open Parquet file reader
697 661 : std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
698 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
699 1322 : arrow::MemoryPool::CreateDefault().release());
700 661 : auto st = parquet::arrow::OpenFile(std::move(infile),
701 1983 : poMemoryPool.get(), &arrow_reader);
702 661 : if (!st.ok())
703 : {
704 1 : CPLError(CE_Failure, CPLE_AppDefined,
705 : "parquet::arrow::OpenFile() failed");
706 1 : return nullptr;
707 : }
708 :
709 1320 : auto poDS = std::make_unique<OGRParquetDataset>(poMemoryPool);
710 : auto poLayer = std::make_unique<OGRParquetLayer>(
711 660 : poDS.get(), CPLGetBasename(osFilename.c_str()),
712 1980 : std::move(arrow_reader), poOpenInfo->papszOpenOptions);
713 :
714 : // For debug purposes: return a layer with the extent of each row group
715 660 : if (CPLTestBool(
716 : CPLGetConfigOption("OGR_PARQUET_SHOW_ROW_GROUP_EXTENT", "NO")))
717 : {
718 1 : return BuildMemDatasetWithRowGroupExtents(poLayer.get());
719 : }
720 :
721 659 : poDS->SetLayer(std::move(poLayer));
722 659 : return poDS.release();
723 : }
724 0 : catch (const std::exception &e)
725 : {
726 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
727 0 : e.what());
728 0 : return nullptr;
729 : }
730 : }
731 :
732 : /************************************************************************/
733 : /* Create() */
734 : /************************************************************************/
735 :
736 199 : static GDALDataset *OGRParquetDriverCreate(const char *pszName, int nXSize,
737 : int nYSize, int nBands,
738 : GDALDataType eType,
739 : char ** /* papszOptions */)
740 : {
741 199 : if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
742 0 : return nullptr;
743 :
744 : try
745 : {
746 199 : std::shared_ptr<arrow::io::OutputStream> out_file;
747 246 : if (STARTS_WITH(pszName, "/vsi") ||
748 47 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES")))
749 : {
750 199 : VSILFILE *fp = VSIFOpenL(pszName, "wb");
751 199 : if (fp == nullptr)
752 : {
753 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
754 1 : return nullptr;
755 : }
756 198 : out_file = std::make_shared<OGRArrowWritableFile>(fp);
757 : }
758 : else
759 : {
760 0 : PARQUET_ASSIGN_OR_THROW(out_file,
761 : arrow::io::FileOutputStream::Open(pszName));
762 : }
763 :
764 198 : return new OGRParquetWriterDataset(out_file);
765 : }
766 0 : catch (const std::exception &e)
767 : {
768 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
769 0 : e.what());
770 0 : return nullptr;
771 : }
772 : }
773 :
774 : /************************************************************************/
775 : /* OGRParquetDriver() */
776 : /************************************************************************/
777 :
778 : class OGRParquetDriver final : public GDALDriver
779 : {
780 : bool m_bMetadataInitialized = false;
781 : void InitMetadata();
782 :
783 : public:
784 1288 : const char *GetMetadataItem(const char *pszName,
785 : const char *pszDomain) override
786 : {
787 1288 : if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
788 : {
789 228 : InitMetadata();
790 : }
791 1288 : return GDALDriver::GetMetadataItem(pszName, pszDomain);
792 : }
793 :
794 31 : char **GetMetadata(const char *pszDomain) override
795 : {
796 31 : InitMetadata();
797 31 : return GDALDriver::GetMetadata(pszDomain);
798 : }
799 : };
800 :
801 259 : void OGRParquetDriver::InitMetadata()
802 : {
803 259 : if (m_bMetadataInitialized)
804 251 : return;
805 8 : m_bMetadataInitialized = true;
806 :
807 : CPLXMLTreeCloser oTree(
808 16 : CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
809 :
810 16 : std::vector<const char *> apszCompressionMethods;
811 8 : bool bHasSnappy = false;
812 56 : for (const char *pszMethod :
813 64 : {"SNAPPY", "GZIP", "BROTLI", "ZSTD", "LZ4_RAW", "LZO", "LZ4_HADOOP"})
814 : {
815 : auto oResult = arrow::util::Codec::GetCompressionType(
816 112 : CPLString(pszMethod).tolower());
817 56 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
818 : {
819 48 : if (EQUAL(pszMethod, "SNAPPY"))
820 8 : bHasSnappy = true;
821 48 : apszCompressionMethods.emplace_back(pszMethod);
822 : }
823 : }
824 :
825 : {
826 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
827 8 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
828 8 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
829 8 : CPLAddXMLAttributeAndValue(psOption, "description",
830 : "Compression method");
831 8 : CPLAddXMLAttributeAndValue(psOption, "default",
832 : bHasSnappy ? "SNAPPY" : "NONE");
833 : {
834 8 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
835 8 : CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
836 8 : CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
837 : }
838 56 : for (const char *pszMethod : apszCompressionMethods)
839 : {
840 48 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
841 48 : CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
842 : }
843 : }
844 :
845 : {
846 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
847 8 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
848 8 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
849 8 : CPLAddXMLAttributeAndValue(psOption, "description",
850 : "Encoding of geometry columns");
851 8 : CPLAddXMLAttributeAndValue(psOption, "default", "WKB");
852 32 : for (const char *pszEncoding :
853 40 : {"WKB", "WKT", "GEOARROW", "GEOARROW_INTERLEAVED"})
854 : {
855 32 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
856 32 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
857 32 : if (EQUAL(pszEncoding, "GEOARROW"))
858 8 : CPLAddXMLAttributeAndValue(poValueNode, "alias",
859 : "GEOARROW_STRUCT");
860 : }
861 : }
862 :
863 : {
864 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
865 8 : CPLAddXMLAttributeAndValue(psOption, "name", "ROW_GROUP_SIZE");
866 8 : CPLAddXMLAttributeAndValue(psOption, "type", "integer");
867 8 : CPLAddXMLAttributeAndValue(psOption, "description",
868 : "Maximum number of rows per group");
869 8 : CPLAddXMLAttributeAndValue(psOption, "default", "65536");
870 : }
871 :
872 : {
873 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
874 8 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
875 8 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
876 8 : CPLAddXMLAttributeAndValue(psOption, "description",
877 : "Name of geometry column");
878 8 : CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
879 : }
880 :
881 : {
882 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
883 8 : CPLAddXMLAttributeAndValue(psOption, "name", "COORDINATE_PRECISION");
884 8 : CPLAddXMLAttributeAndValue(psOption, "type", "float");
885 8 : CPLAddXMLAttributeAndValue(psOption, "description",
886 : "Number of decimals for coordinates (only "
887 : "for GEOMETRY_ENCODING=WKT)");
888 : }
889 :
890 : {
891 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
892 8 : CPLAddXMLAttributeAndValue(psOption, "name", "FID");
893 8 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
894 8 : CPLAddXMLAttributeAndValue(psOption, "description",
895 : "Name of the FID column to create");
896 : }
897 :
898 : {
899 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
900 8 : CPLAddXMLAttributeAndValue(psOption, "name", "POLYGON_ORIENTATION");
901 8 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
902 8 : CPLAddXMLAttributeAndValue(
903 : psOption, "description",
904 : "Which ring orientation to use for polygons");
905 8 : CPLAddXMLAttributeAndValue(psOption, "default", "COUNTERCLOCKWISE");
906 8 : CPLCreateXMLElementAndValue(psOption, "Value", "COUNTERCLOCKWISE");
907 8 : CPLCreateXMLElementAndValue(psOption, "Value", "UNMODIFIED");
908 : }
909 :
910 : {
911 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
912 8 : CPLAddXMLAttributeAndValue(psOption, "name", "EDGES");
913 8 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
914 8 : CPLAddXMLAttributeAndValue(
915 : psOption, "description",
916 : "Name of the coordinate system for the edges");
917 8 : CPLAddXMLAttributeAndValue(psOption, "default", "PLANAR");
918 8 : CPLCreateXMLElementAndValue(psOption, "Value", "PLANAR");
919 8 : CPLCreateXMLElementAndValue(psOption, "Value", "SPHERICAL");
920 : }
921 :
922 : {
923 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
924 8 : CPLAddXMLAttributeAndValue(psOption, "name", "CREATOR");
925 8 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
926 8 : CPLAddXMLAttributeAndValue(psOption, "description",
927 : "Name of creating application");
928 : }
929 :
930 : {
931 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
932 8 : CPLAddXMLAttributeAndValue(psOption, "name", "WRITE_COVERING_BBOX");
933 8 : CPLAddXMLAttributeAndValue(psOption, "type", "boolean");
934 8 : CPLAddXMLAttributeAndValue(psOption, "default", "YES");
935 8 : CPLAddXMLAttributeAndValue(psOption, "description",
936 : "Whether to write xmin/ymin/xmax/ymax "
937 : "columns with the bounding box of "
938 : "geometries");
939 : }
940 :
941 : {
942 8 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
943 8 : CPLAddXMLAttributeAndValue(psOption, "name", "SORT_BY_BBOX");
944 8 : CPLAddXMLAttributeAndValue(psOption, "type", "boolean");
945 8 : CPLAddXMLAttributeAndValue(psOption, "default", "NO");
946 8 : CPLAddXMLAttributeAndValue(psOption, "description",
947 : "Whether features should be sorted based on "
948 : "the bounding box of their geometries");
949 : }
950 :
951 8 : char *pszXML = CPLSerializeXMLTree(oTree.get());
952 8 : GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
953 8 : CPLFree(pszXML);
954 : }
955 :
956 : /************************************************************************/
957 : /* RegisterOGRParquet() */
958 : /************************************************************************/
959 :
960 22 : void RegisterOGRParquet()
961 : {
962 22 : if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
963 0 : return;
964 :
965 44 : auto poDriver = std::make_unique<OGRParquetDriver>();
966 22 : OGRParquetDriverSetCommonMetadata(poDriver.get());
967 :
968 22 : poDriver->pfnOpen = OGRParquetDriverOpen;
969 22 : poDriver->pfnCreate = OGRParquetDriverCreate;
970 :
971 : #ifdef GDAL_USE_ARROWDATASET
972 22 : poDriver->SetMetadataItem("ARROW_DATASET", "YES");
973 : #endif
974 :
975 22 : GetGDALDriverManager()->RegisterDriver(poDriver.release());
976 : }
|