Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "gdal_pam.h"
14 : #include "ogrsf_frmts.h"
15 :
16 : #include <algorithm>
17 : #include <map>
18 : #include <mutex>
19 : #include <tuple>
20 :
21 : #include "ogr_parquet.h"
22 : #include "ogrparquetdrivercore.h"
23 : #include "memdataset.h"
24 :
25 : #include "../arrow_common/ograrrowrandomaccessfile.h"
26 : #include "../arrow_common/vsiarrowfilesystem.hpp"
27 : #include "../arrow_common/ograrrowwritablefile.h"
28 : #include "../arrow_common/ograrrowdataset.hpp"
29 : #include "../arrow_common/ograrrowlayer.hpp" // for the destructor
30 :
31 : #ifdef GDAL_USE_ARROWDATASET
32 :
33 : /************************************************************************/
34 : /* OpenFromDatasetFactory() */
35 : /************************************************************************/
36 :
37 273 : static GDALDataset *OpenFromDatasetFactory(
38 : const std::string &osBasePath,
39 : const std::shared_ptr<arrow::dataset::DatasetFactory> &factory,
40 : CSLConstList papszOpenOptions,
41 : const std::shared_ptr<arrow::fs::FileSystem> &fs)
42 : {
43 273 : std::shared_ptr<arrow::dataset::Dataset> dataset;
44 546 : PARQUET_ASSIGN_OR_THROW(dataset, factory->Finish());
45 :
46 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
47 546 : arrow::MemoryPool::CreateDefault().release());
48 :
49 273 : const bool bIsVSI = STARTS_WITH(osBasePath.c_str(), "/vsi");
50 546 : auto poDS = std::make_unique<OGRParquetDataset>(poMemoryPool);
51 : auto poLayer = std::make_unique<OGRParquetDatasetLayer>(
52 546 : poDS.get(), CPLGetBasenameSafe(osBasePath.c_str()).c_str(), bIsVSI,
53 546 : dataset, papszOpenOptions);
54 273 : poDS->SetLayer(std::move(poLayer));
55 273 : poDS->SetFileSystem(fs);
56 546 : return poDS.release();
57 : }
58 :
59 : /************************************************************************/
60 : /* GetFileSystem() */
61 : /************************************************************************/
62 :
63 : static std::tuple<std::shared_ptr<arrow::fs::FileSystem>, std::string>
64 273 : GetFileSystem(std::string &osBasePathInOut,
65 : const std::string &osQueryParameters)
66 : {
67 : // Instantiate file system:
68 : // - VSIArrowFileSystem implementation for /vsi files
69 : // - base implementation for local files (if OGR_PARQUET_USE_VSI set to NO)
70 273 : std::shared_ptr<arrow::fs::FileSystem> fs;
71 273 : const bool bIsVSI = STARTS_WITH(osBasePathInOut.c_str(), "/vsi");
72 : VSIStatBufL sStat;
73 546 : std::string osFSFilename;
74 459 : if ((bIsVSI ||
75 538 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES"))) &&
76 265 : VSIStatL(osBasePathInOut.c_str(), &sStat) == 0)
77 : {
78 264 : osFSFilename = osBasePathInOut;
79 264 : fs = std::make_shared<VSIArrowFileSystem>("PARQUET", osQueryParameters);
80 : }
81 : else
82 : {
83 : // FileSystemFromUriOrPath() doesn't like relative paths
84 : // so transform them to absolute.
85 9 : std::string osPath(osBasePathInOut);
86 9 : if (CPLIsFilenameRelative(osPath.c_str()))
87 : {
88 8 : char *pszCurDir = CPLGetCurrentDir();
89 8 : if (pszCurDir == nullptr)
90 0 : return {nullptr, osFSFilename};
91 8 : osPath = CPLFormFilenameSafe(pszCurDir, osPath.c_str(), nullptr);
92 8 : CPLFree(pszCurDir);
93 : }
94 9 : PARQUET_ASSIGN_OR_THROW(
95 : fs, arrow::fs::FileSystemFromUriOrPath(osPath, &osFSFilename));
96 : }
97 273 : return {fs, osFSFilename};
98 : }
99 :
100 : /************************************************************************/
101 : /* MakeParquetFileFormat() */
102 : /************************************************************************/
103 :
104 : static std::shared_ptr<arrow::dataset::ParquetFileFormat>
105 273 : MakeParquetFileFormat()
106 : {
107 : auto parquetFileFormat =
108 273 : std::make_shared<arrow::dataset::ParquetFileFormat>();
109 : #if ARROW_VERSION_MAJOR >= 21
110 : auto fragmentScanOptions =
111 : std::dynamic_pointer_cast<arrow::dataset::ParquetFragmentScanOptions>(
112 : parquetFileFormat->default_fragment_scan_options);
113 : CPLAssert(fragmentScanOptions);
114 : fragmentScanOptions->arrow_reader_properties->set_arrow_extensions_enabled(
115 : CPLTestBool(
116 : CPLGetConfigOption("OGR_PARQUET_ENABLE_ARROW_EXTENSIONS", "YES")));
117 : #endif
118 273 : return parquetFileFormat;
119 : }
120 :
121 : /************************************************************************/
122 : /* OpenParquetDatasetWithMetadata() */
123 : /************************************************************************/
124 :
125 18 : static GDALDataset *OpenParquetDatasetWithMetadata(
126 : const std::string &osBasePathIn, const char *pszMetadataFile,
127 : const std::string &osQueryParameters, CSLConstList papszOpenOptions)
128 : {
129 36 : std::string osBasePath(osBasePathIn);
130 18 : const auto &[fs, osFSFilename] =
131 36 : GetFileSystem(osBasePath, osQueryParameters);
132 :
133 36 : arrow::dataset::ParquetFactoryOptions options;
134 36 : auto partitioningFactory = arrow::dataset::HivePartitioning::MakeFactory();
135 : options.partitioning =
136 18 : arrow::dataset::PartitioningOrFactory(std::move(partitioningFactory));
137 :
138 18 : std::shared_ptr<arrow::dataset::DatasetFactory> factory;
139 54 : PARQUET_ASSIGN_OR_THROW(factory,
140 : arrow::dataset::ParquetDatasetFactory::Make(
141 : osFSFilename + '/' + pszMetadataFile, fs,
142 : MakeParquetFileFormat(), std::move(options)));
143 :
144 36 : return OpenFromDatasetFactory(osBasePath, factory, papszOpenOptions, fs);
145 : }
146 :
147 : /************************************************************************/
148 : /* OpenParquetDatasetWithoutMetadata() */
149 : /************************************************************************/
150 :
151 : static GDALDataset *
152 255 : OpenParquetDatasetWithoutMetadata(const std::string &osBasePathIn,
153 : const std::string &osQueryParameters,
154 : CSLConstList papszOpenOptions)
155 : {
156 510 : std::string osBasePath(osBasePathIn);
157 255 : const auto &[fs, osFSFilename] =
158 510 : GetFileSystem(osBasePath, osQueryParameters);
159 :
160 510 : arrow::dataset::FileSystemFactoryOptions options;
161 255 : std::shared_ptr<arrow::dataset::DatasetFactory> factory;
162 :
163 510 : const auto fileInfo = fs->GetFileInfo(osFSFilename);
164 255 : if (fileInfo->IsFile())
165 : {
166 1008 : PARQUET_ASSIGN_OR_THROW(
167 : factory, arrow::dataset::FileSystemDatasetFactory::Make(
168 : fs, {std::move(osFSFilename)}, MakeParquetFileFormat(),
169 : std::move(options)));
170 : }
171 : else
172 : {
173 : auto partitioningFactory =
174 6 : arrow::dataset::HivePartitioning::MakeFactory();
175 6 : options.partitioning = arrow::dataset::PartitioningOrFactory(
176 6 : std::move(partitioningFactory));
177 :
178 6 : arrow::fs::FileSelector selector;
179 3 : selector.base_dir = std::move(osFSFilename);
180 3 : selector.recursive = true;
181 :
182 6 : PARQUET_ASSIGN_OR_THROW(
183 : factory, arrow::dataset::FileSystemDatasetFactory::Make(
184 : fs, std::move(selector), MakeParquetFileFormat(),
185 : std::move(options)));
186 : }
187 :
188 510 : return OpenFromDatasetFactory(osBasePath, factory, papszOpenOptions, fs);
189 : }
190 :
191 : #endif
192 :
193 : /************************************************************************/
194 : /* BuildMemDatasetWithRowGroupExtents() */
195 : /************************************************************************/
196 :
197 : /** Builds a MEM dataset that contains, for each row-group of the input file,
198 : * the feature count and spatial extent of the features of this row group,
199 : * using Parquet statistics. This assumes that the Parquet file declares
200 : * a "covering":{"bbox":{ ... }} metadata item.
201 : *
202 : * Only for debug purposes.
203 : */
204 1 : static GDALDataset *BuildMemDatasetWithRowGroupExtents(OGRParquetLayer *poLayer)
205 : {
206 1 : int iParquetXMin = -1;
207 1 : int iParquetYMin = -1;
208 1 : int iParquetXMax = -1;
209 1 : int iParquetYMax = -1;
210 1 : if (poLayer->GeomColsBBOXParquet(0, iParquetXMin, iParquetYMin,
211 : iParquetXMax, iParquetYMax))
212 : {
213 : auto poMemDS = std::unique_ptr<GDALDataset>(
214 2 : MEMDataset::Create("", 0, 0, 0, GDT_Unknown, nullptr));
215 1 : if (!poMemDS)
216 0 : return nullptr;
217 1 : OGRSpatialReference *poTmpSRS = nullptr;
218 1 : const auto poSrcSRS = poLayer->GetSpatialRef();
219 1 : if (poSrcSRS)
220 0 : poTmpSRS = poSrcSRS->Clone();
221 : auto poMemLayer =
222 1 : poMemDS->CreateLayer("footprint", poTmpSRS, wkbPolygon, nullptr);
223 1 : if (poTmpSRS)
224 0 : poTmpSRS->Release();
225 1 : if (!poMemLayer)
226 0 : return nullptr;
227 1 : poMemLayer->CreateField(
228 1 : std::make_unique<OGRFieldDefn>("feature_count", OFTInteger64)
229 1 : .get());
230 :
231 : const auto metadata =
232 2 : poLayer->GetReader()->parquet_reader()->metadata();
233 1 : const int numRowGroups = metadata->num_row_groups();
234 15 : for (int iRowGroup = 0; iRowGroup < numRowGroups; ++iRowGroup)
235 : {
236 28 : std::string osMinTmp, osMaxTmp;
237 : OGRField unusedF;
238 : bool unusedB;
239 : OGRFieldSubType unusedSubType;
240 :
241 : OGRField sXMin;
242 14 : OGR_RawField_SetNull(&sXMin);
243 14 : bool bFoundXMin = false;
244 14 : OGRFieldType eXMinType = OFTMaxType;
245 :
246 : OGRField sYMin;
247 14 : OGR_RawField_SetNull(&sYMin);
248 14 : bool bFoundYMin = false;
249 14 : OGRFieldType eYMinType = OFTMaxType;
250 :
251 : OGRField sXMax;
252 14 : OGR_RawField_SetNull(&sXMax);
253 14 : bool bFoundXMax = false;
254 14 : OGRFieldType eXMaxType = OFTMaxType;
255 :
256 : OGRField sYMax;
257 14 : OGR_RawField_SetNull(&sYMax);
258 14 : bool bFoundYMax = false;
259 14 : OGRFieldType eYMaxType = OFTMaxType;
260 :
261 14 : if (poLayer->GetMinMaxForParquetCol(
262 : iRowGroup, iParquetXMin, nullptr,
263 : /* bComputeMin = */ true, sXMin, bFoundXMin,
264 : /* bComputeMax = */ false, unusedF, unusedB, eXMinType,
265 8 : unusedSubType, osMinTmp, osMaxTmp) &&
266 8 : bFoundXMin && eXMinType == OFTReal &&
267 22 : poLayer->GetMinMaxForParquetCol(
268 : iRowGroup, iParquetYMin, nullptr,
269 : /* bComputeMin = */ true, sYMin, bFoundYMin,
270 : /* bComputeMax = */ false, unusedF, unusedB, eYMinType,
271 8 : unusedSubType, osMinTmp, osMaxTmp) &&
272 8 : bFoundYMin && eYMinType == OFTReal &&
273 22 : poLayer->GetMinMaxForParquetCol(
274 : iRowGroup, iParquetXMax, nullptr,
275 : /* bComputeMin = */ false, unusedF, unusedB,
276 : /* bComputeMax = */ true, sXMax, bFoundXMax, eXMaxType,
277 8 : unusedSubType, osMaxTmp, osMaxTmp) &&
278 8 : bFoundXMax && eXMaxType == OFTReal &&
279 22 : poLayer->GetMinMaxForParquetCol(
280 : iRowGroup, iParquetYMax, nullptr,
281 : /* bComputeMin = */ false, unusedF, unusedB,
282 : /* bComputeMax = */ true, sYMax, bFoundYMax, eYMaxType,
283 8 : unusedSubType, osMaxTmp, osMaxTmp) &&
284 22 : bFoundYMax && eYMaxType == OFTReal)
285 : {
286 16 : OGRFeature oFeat(poMemLayer->GetLayerDefn());
287 8 : oFeat.SetField(0,
288 : static_cast<GIntBig>(
289 8 : metadata->RowGroup(iRowGroup)->num_rows()));
290 16 : auto poPoly = std::make_unique<OGRPolygon>();
291 8 : auto poLR = std::make_unique<OGRLinearRing>();
292 8 : poLR->addPoint(sXMin.Real, sYMin.Real);
293 8 : poLR->addPoint(sXMin.Real, sYMax.Real);
294 8 : poLR->addPoint(sXMax.Real, sYMax.Real);
295 8 : poLR->addPoint(sXMax.Real, sYMin.Real);
296 8 : poLR->addPoint(sXMin.Real, sYMin.Real);
297 8 : poPoly->addRingDirectly(poLR.release());
298 8 : oFeat.SetGeometryDirectly(poPoly.release());
299 8 : CPL_IGNORE_RET_VAL(poMemLayer->CreateFeature(&oFeat));
300 : }
301 : }
302 :
303 1 : return poMemDS.release();
304 : }
305 0 : return nullptr;
306 : }
307 :
308 : /************************************************************************/
309 : /* Open() */
310 : /************************************************************************/
311 :
312 1667 : static GDALDataset *OGRParquetDriverOpen(GDALOpenInfo *poOpenInfo)
313 : {
314 1667 : if (poOpenInfo->eAccess == GA_Update)
315 64 : return nullptr;
316 :
317 : #if ARROW_VERSION_MAJOR >= 21
318 : // Register geoarrow.wkb extension if not already done
319 : if (!arrow::GetExtensionType(EXTENSION_NAME_GEOARROW_WKB) &&
320 : CPLTestBool(CPLGetConfigOption(
321 : "OGR_PARQUET_REGISTER_GEOARROW_WKB_EXTENSION", "YES")))
322 : {
323 : CPL_IGNORE_RET_VAL(arrow::RegisterExtensionType(
324 : std::make_shared<OGRGeoArrowWkbExtensionType>(
325 : std::move(arrow::binary()), std::string())));
326 : }
327 : #endif
328 :
329 : #ifdef GDAL_USE_ARROWDATASET
330 3206 : std::string osBasePath(poOpenInfo->pszFilename);
331 3206 : std::string osQueryParameters;
332 : const bool bStartedWithParquetPrefix =
333 1603 : STARTS_WITH(osBasePath.c_str(), "PARQUET:");
334 :
335 1603 : if (bStartedWithParquetPrefix)
336 : {
337 261 : osBasePath = osBasePath.substr(strlen("PARQUET:"));
338 : }
339 :
340 : // Little trick to allow using syntax of
341 : // https://github.com/opengeospatial/geoparquet/discussions/101
342 : // ogrinfo
343 : // "/vsicurl/https://ai4edataeuwest.blob.core.windows.net/us-census/2020/cb_2020_us_vtd_500k.parquet?${SAS_TOKEN}"
344 1603 : if (STARTS_WITH(osBasePath.c_str(), "/vsicurl/"))
345 : {
346 2 : const auto nPos = osBasePath.find(".parquet?st=");
347 2 : if (nPos != std::string::npos)
348 : {
349 0 : osQueryParameters = osBasePath.substr(nPos + strlen(".parquet"));
350 0 : osBasePath.resize(nPos + strlen(".parquet"));
351 : }
352 : }
353 :
354 2467 : if (bStartedWithParquetPrefix || poOpenInfo->bIsDirectory ||
355 864 : !osQueryParameters.empty())
356 : {
357 : VSIStatBufL sStat;
358 739 : if (!osBasePath.empty() && osBasePath.back() == '/')
359 0 : osBasePath.pop_back();
360 : const std::string osMetadataPath =
361 739 : CPLFormFilenameSafe(osBasePath.c_str(), "_metadata", nullptr);
362 739 : if (CPLTestBool(
363 2217 : CPLGetConfigOption("OGR_PARQUET_USE_METADATA_FILE", "YES")) &&
364 1478 : VSIStatL((osMetadataPath + osQueryParameters).c_str(), &sStat) == 0)
365 : {
366 : // If there's a _metadata file, then use it to avoid listing files
367 : try
368 : {
369 36 : return OpenParquetDatasetWithMetadata(
370 : osBasePath, "_metadata", osQueryParameters,
371 18 : poOpenInfo->papszOpenOptions);
372 : }
373 0 : catch (const std::exception &e)
374 : {
375 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
376 0 : e.what());
377 : }
378 0 : return nullptr;
379 : }
380 : else
381 : {
382 721 : bool bLikelyParquetDataset = false;
383 721 : if (poOpenInfo->bIsDirectory)
384 : {
385 : // Detect if the directory contains .parquet files, or
386 : // subdirectories with a name of the form "key=value", typical
387 : // of HIVE partitioning.
388 936 : const CPLStringList aosFiles(VSIReadDir(osBasePath.c_str()));
389 22206 : for (const char *pszFilename : cpl::Iterate(aosFiles))
390 : {
391 21740 : if (EQUAL(CPLGetExtensionSafe(pszFilename).c_str(),
392 : "parquet"))
393 : {
394 2 : bLikelyParquetDataset = true;
395 2 : break;
396 : }
397 21738 : else if (strchr(pszFilename, '='))
398 : {
399 : // HIVE partitioning
400 0 : if (VSIStatL(CPLFormFilenameSafe(osBasePath.c_str(),
401 : pszFilename, nullptr)
402 : .c_str(),
403 0 : &sStat) == 0 &&
404 0 : VSI_ISDIR(sStat.st_mode))
405 : {
406 0 : bLikelyParquetDataset = true;
407 0 : break;
408 : }
409 : }
410 : }
411 : }
412 :
413 721 : if (bStartedWithParquetPrefix || bLikelyParquetDataset)
414 : {
415 : try
416 : {
417 510 : return OpenParquetDatasetWithoutMetadata(
418 : osBasePath, osQueryParameters,
419 255 : poOpenInfo->papszOpenOptions);
420 : }
421 0 : catch (const std::exception &e)
422 : {
423 : // If we aren't quite sure that the passed file name is
424 : // a directory, then silently continue
425 0 : if (poOpenInfo->bIsDirectory)
426 : {
427 0 : CPLError(CE_Failure, CPLE_AppDefined,
428 0 : "Parquet exception: %s", e.what());
429 0 : return nullptr;
430 : }
431 : }
432 : }
433 : }
434 : }
435 : #endif
436 :
437 1330 : if (!OGRParquetDriverIdentify(poOpenInfo))
438 : {
439 0 : return nullptr;
440 : }
441 :
442 1330 : if (poOpenInfo->bIsDirectory)
443 466 : return nullptr;
444 :
445 1728 : std::string osFilename(poOpenInfo->pszFilename);
446 864 : if (STARTS_WITH(poOpenInfo->pszFilename, "PARQUET:"))
447 : {
448 0 : osFilename = poOpenInfo->pszFilename + strlen("PARQUET:");
449 : }
450 :
451 : try
452 : {
453 864 : std::shared_ptr<arrow::io::RandomAccessFile> infile;
454 1343 : if (STARTS_WITH(osFilename.c_str(), "/vsi") ||
455 479 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "NO")))
456 : {
457 385 : VSIVirtualHandleUniquePtr fp(poOpenInfo->fpL);
458 385 : poOpenInfo->fpL = nullptr;
459 385 : if (fp == nullptr)
460 : {
461 0 : fp.reset(VSIFOpenL(osFilename.c_str(), "rb"));
462 0 : if (fp == nullptr)
463 0 : return nullptr;
464 : }
465 770 : infile = std::make_shared<OGRArrowRandomAccessFile>(osFilename,
466 770 : std::move(fp));
467 : }
468 : else
469 : {
470 479 : PARQUET_ASSIGN_OR_THROW(infile,
471 : arrow::io::ReadableFile::Open(osFilename));
472 : }
473 :
474 : // Open Parquet file reader
475 864 : std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
476 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
477 1728 : arrow::MemoryPool::CreateDefault().release());
478 :
479 864 : const int nNumCPUs = OGRParquetLayerBase::GetNumCPUs();
480 : const char *pszUseThreads =
481 864 : CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr);
482 864 : if (!pszUseThreads && nNumCPUs > 1)
483 : {
484 864 : pszUseThreads = "YES";
485 : }
486 864 : const bool bUseThreads = pszUseThreads && CPLTestBool(pszUseThreads);
487 :
488 : const char *pszParquetBatchSize =
489 864 : CPLGetConfigOption("OGR_PARQUET_BATCH_SIZE", nullptr);
490 :
491 : #if ARROW_VERSION_MAJOR >= 21
492 : parquet::arrow::FileReaderBuilder fileReaderBuilder;
493 : {
494 : auto st = fileReaderBuilder.Open(std::move(infile));
495 : if (!st.ok())
496 : {
497 : CPLError(CE_Failure, CPLE_AppDefined,
498 : "parquet::arrow::FileReaderBuilder::Open() failed: %s",
499 : st.message().c_str());
500 : return nullptr;
501 : }
502 : }
503 : fileReaderBuilder.memory_pool(poMemoryPool.get());
504 : parquet::ArrowReaderProperties fileReaderProperties;
505 : fileReaderProperties.set_arrow_extensions_enabled(CPLTestBool(
506 : CPLGetConfigOption("OGR_PARQUET_ENABLE_ARROW_EXTENSIONS", "YES")));
507 : if (pszParquetBatchSize)
508 : {
509 : fileReaderProperties.set_batch_size(
510 : CPLAtoGIntBig(pszParquetBatchSize));
511 : }
512 : if (bUseThreads)
513 : {
514 : fileReaderProperties.set_use_threads(true);
515 : }
516 : fileReaderBuilder.properties(fileReaderProperties);
517 : {
518 : auto res = fileReaderBuilder.Build();
519 : if (!res.ok())
520 : {
521 : CPLError(
522 : CE_Failure, CPLE_AppDefined,
523 : "parquet::arrow::FileReaderBuilder::Build() failed: %s",
524 : res.status().message().c_str());
525 : return nullptr;
526 : }
527 : arrow_reader = std::move(*res);
528 : }
529 : #elif ARROW_VERSION_MAJOR >= 19
530 2592 : PARQUET_ASSIGN_OR_THROW(
531 : arrow_reader,
532 : parquet::arrow::OpenFile(std::move(infile), poMemoryPool.get()));
533 : #else
534 : auto st = parquet::arrow::OpenFile(std::move(infile),
535 : poMemoryPool.get(), &arrow_reader);
536 : if (!st.ok())
537 : {
538 : CPLError(CE_Failure, CPLE_AppDefined,
539 : "parquet::arrow::OpenFile() failed: %s",
540 : st.message().c_str());
541 : return nullptr;
542 : }
543 : #endif
544 :
545 : #if ARROW_VERSION_MAJOR < 21
546 863 : if (pszParquetBatchSize)
547 : {
548 5 : arrow_reader->set_batch_size(CPLAtoGIntBig(pszParquetBatchSize));
549 : }
550 :
551 863 : if (bUseThreads)
552 : {
553 863 : arrow_reader->set_use_threads(true);
554 : }
555 : #endif
556 :
557 1726 : auto poDS = std::make_unique<OGRParquetDataset>(poMemoryPool);
558 : auto poLayer = std::make_unique<OGRParquetLayer>(
559 1726 : poDS.get(), CPLGetBasenameSafe(osFilename.c_str()).c_str(),
560 2589 : std::move(arrow_reader), poOpenInfo->papszOpenOptions);
561 :
562 : // For debug purposes: return a layer with the extent of each row group
563 863 : if (CPLTestBool(
564 : CPLGetConfigOption("OGR_PARQUET_SHOW_ROW_GROUP_EXTENT", "NO")))
565 : {
566 1 : return BuildMemDatasetWithRowGroupExtents(poLayer.get());
567 : }
568 :
569 862 : poDS->SetLayer(std::move(poLayer));
570 862 : return poDS.release();
571 : }
572 1 : catch (const std::exception &e)
573 : {
574 1 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
575 1 : e.what());
576 1 : return nullptr;
577 : }
578 : }
579 :
580 : /************************************************************************/
581 : /* Create() */
582 : /************************************************************************/
583 :
584 271 : static GDALDataset *OGRParquetDriverCreate(const char *pszName, int nXSize,
585 : int nYSize, int nBands,
586 : GDALDataType eType,
587 : char ** /* papszOptions */)
588 : {
589 271 : if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
590 0 : return nullptr;
591 :
592 : try
593 : {
594 271 : std::shared_ptr<arrow::io::OutputStream> out_file;
595 353 : if (STARTS_WITH(pszName, "/vsi") ||
596 82 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES")))
597 : {
598 271 : VSILFILE *fp = VSIFOpenL(pszName, "wb");
599 271 : if (fp == nullptr)
600 : {
601 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
602 1 : return nullptr;
603 : }
604 270 : out_file = std::make_shared<OGRArrowWritableFile>(fp);
605 : }
606 : else
607 : {
608 0 : PARQUET_ASSIGN_OR_THROW(out_file,
609 : arrow::io::FileOutputStream::Open(pszName));
610 : }
611 :
612 270 : return new OGRParquetWriterDataset(out_file);
613 : }
614 0 : catch (const std::exception &e)
615 : {
616 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
617 0 : e.what());
618 0 : return nullptr;
619 : }
620 : }
621 :
622 : /************************************************************************/
623 : /* OGRParquetDriver() */
624 : /************************************************************************/
625 :
626 : class OGRParquetDriver final : public GDALDriver
627 : {
628 : std::mutex m_oMutex{};
629 : bool m_bMetadataInitialized = false;
630 : void InitMetadata();
631 :
632 : public:
633 : const char *GetMetadataItem(const char *pszName,
634 : const char *pszDomain) override;
635 :
636 53 : char **GetMetadata(const char *pszDomain) override
637 : {
638 106 : std::lock_guard oLock(m_oMutex);
639 53 : InitMetadata();
640 106 : return GDALDriver::GetMetadata(pszDomain);
641 : }
642 : };
643 :
644 2014 : const char *OGRParquetDriver::GetMetadataItem(const char *pszName,
645 : const char *pszDomain)
646 : {
647 4028 : std::lock_guard oLock(m_oMutex);
648 2014 : if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
649 : {
650 315 : InitMetadata();
651 : }
652 4028 : return GDALDriver::GetMetadataItem(pszName, pszDomain);
653 : }
654 :
655 368 : void OGRParquetDriver::InitMetadata()
656 : {
657 368 : if (m_bMetadataInitialized)
658 345 : return;
659 23 : m_bMetadataInitialized = true;
660 :
661 : CPLXMLTreeCloser oTree(
662 46 : CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
663 :
664 46 : std::vector<const char *> apszCompressionMethods;
665 23 : bool bHasSnappy = false;
666 23 : int minComprLevel = INT_MAX;
667 23 : int maxComprLevel = INT_MIN;
668 46 : std::string osCompressionLevelDesc = "Compression level, codec dependent.";
669 161 : for (const char *pszMethod :
670 184 : {"SNAPPY", "GZIP", "BROTLI", "ZSTD", "LZ4_RAW", "LZO", "LZ4_HADOOP"})
671 : {
672 : auto compressionTypeRes = arrow::util::Codec::GetCompressionType(
673 322 : CPLString(pszMethod).tolower());
674 322 : if (compressionTypeRes.ok() &&
675 161 : arrow::util::Codec::IsAvailable(*compressionTypeRes))
676 : {
677 138 : const auto compressionType = *compressionTypeRes;
678 138 : if (EQUAL(pszMethod, "SNAPPY"))
679 23 : bHasSnappy = true;
680 138 : apszCompressionMethods.emplace_back(pszMethod);
681 :
682 : auto minCompressLevelRes =
683 276 : arrow::util::Codec::MinimumCompressionLevel(compressionType);
684 : auto maxCompressLevelRes =
685 276 : arrow::util::Codec::MaximumCompressionLevel(compressionType);
686 : auto defCompressLevelRes =
687 276 : arrow::util::Codec::DefaultCompressionLevel(compressionType);
688 230 : if (minCompressLevelRes.ok() && maxCompressLevelRes.ok() &&
689 92 : defCompressLevelRes.ok())
690 : {
691 92 : minComprLevel = std::min(minComprLevel, *minCompressLevelRes);
692 92 : maxComprLevel = std::max(maxComprLevel, *maxCompressLevelRes);
693 92 : osCompressionLevelDesc += ' ';
694 92 : osCompressionLevelDesc += pszMethod;
695 92 : osCompressionLevelDesc += ": [";
696 92 : osCompressionLevelDesc += std::to_string(*minCompressLevelRes);
697 92 : osCompressionLevelDesc += ',';
698 92 : osCompressionLevelDesc += std::to_string(*maxCompressLevelRes);
699 92 : osCompressionLevelDesc += "], default=";
700 92 : osCompressionLevelDesc += std::to_string(*defCompressLevelRes);
701 92 : osCompressionLevelDesc += '.';
702 : }
703 : }
704 : }
705 :
706 : {
707 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
708 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
709 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
710 23 : CPLAddXMLAttributeAndValue(psOption, "description",
711 : "Compression method");
712 23 : CPLAddXMLAttributeAndValue(psOption, "default",
713 : bHasSnappy ? "SNAPPY" : "NONE");
714 : {
715 23 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
716 23 : CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
717 23 : CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
718 : }
719 161 : for (const char *pszMethod : apszCompressionMethods)
720 : {
721 138 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
722 138 : CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
723 : }
724 : }
725 :
726 23 : if (minComprLevel <= maxComprLevel)
727 : {
728 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
729 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION_LEVEL");
730 23 : CPLAddXMLAttributeAndValue(psOption, "type", "int");
731 23 : CPLAddXMLAttributeAndValue(
732 : psOption, "min",
733 : CPLSPrintf("%d",
734 23 : std::min(DEFAULT_COMPRESSION_LEVEL, minComprLevel)));
735 23 : CPLAddXMLAttributeAndValue(psOption, "max",
736 : CPLSPrintf("%d", maxComprLevel));
737 23 : CPLAddXMLAttributeAndValue(psOption, "description",
738 : osCompressionLevelDesc.c_str());
739 23 : CPLAddXMLAttributeAndValue(psOption, "default",
740 : CPLSPrintf("%d", DEFAULT_COMPRESSION_LEVEL));
741 : }
742 :
743 : {
744 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
745 23 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
746 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
747 23 : CPLAddXMLAttributeAndValue(psOption, "description",
748 : "Encoding of geometry columns");
749 23 : CPLAddXMLAttributeAndValue(psOption, "default", "WKB");
750 92 : for (const char *pszEncoding :
751 115 : {"WKB", "WKT", "GEOARROW", "GEOARROW_INTERLEAVED"})
752 : {
753 92 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
754 92 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
755 92 : if (EQUAL(pszEncoding, "GEOARROW"))
756 23 : CPLAddXMLAttributeAndValue(poValueNode, "alias",
757 : "GEOARROW_STRUCT");
758 : }
759 : }
760 :
761 : {
762 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
763 23 : CPLAddXMLAttributeAndValue(psOption, "name", "ROW_GROUP_SIZE");
764 23 : CPLAddXMLAttributeAndValue(psOption, "type", "integer");
765 23 : CPLAddXMLAttributeAndValue(psOption, "description",
766 : "Maximum number of rows per group");
767 23 : CPLAddXMLAttributeAndValue(psOption, "default", "65536");
768 : }
769 :
770 : {
771 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
772 23 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
773 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
774 23 : CPLAddXMLAttributeAndValue(psOption, "description",
775 : "Name of geometry column");
776 23 : CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
777 : }
778 :
779 : {
780 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
781 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COORDINATE_PRECISION");
782 23 : CPLAddXMLAttributeAndValue(psOption, "type", "float");
783 23 : CPLAddXMLAttributeAndValue(psOption, "description",
784 : "Number of decimals for coordinates (only "
785 : "for GEOMETRY_ENCODING=WKT)");
786 : }
787 :
788 : {
789 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
790 23 : CPLAddXMLAttributeAndValue(psOption, "name", "FID");
791 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
792 23 : CPLAddXMLAttributeAndValue(psOption, "description",
793 : "Name of the FID column to create");
794 : }
795 :
796 : {
797 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
798 23 : CPLAddXMLAttributeAndValue(psOption, "name", "POLYGON_ORIENTATION");
799 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
800 23 : CPLAddXMLAttributeAndValue(
801 : psOption, "description",
802 : "Which ring orientation to use for polygons");
803 23 : CPLAddXMLAttributeAndValue(psOption, "default", "COUNTERCLOCKWISE");
804 23 : CPLCreateXMLElementAndValue(psOption, "Value", "COUNTERCLOCKWISE");
805 23 : CPLCreateXMLElementAndValue(psOption, "Value", "UNMODIFIED");
806 : }
807 :
808 : {
809 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
810 23 : CPLAddXMLAttributeAndValue(psOption, "name", "EDGES");
811 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
812 23 : CPLAddXMLAttributeAndValue(
813 : psOption, "description",
814 : "Name of the coordinate system for the edges");
815 23 : CPLAddXMLAttributeAndValue(psOption, "default", "PLANAR");
816 23 : CPLCreateXMLElementAndValue(psOption, "Value", "PLANAR");
817 23 : CPLCreateXMLElementAndValue(psOption, "Value", "SPHERICAL");
818 : }
819 :
820 : {
821 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
822 23 : CPLAddXMLAttributeAndValue(psOption, "name", "CREATOR");
823 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
824 23 : CPLAddXMLAttributeAndValue(psOption, "description",
825 : "Name of creating application");
826 : }
827 :
828 : {
829 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
830 23 : CPLAddXMLAttributeAndValue(psOption, "name", "WRITE_COVERING_BBOX");
831 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
832 23 : CPLAddXMLAttributeAndValue(psOption, "default", "AUTO");
833 23 : CPLAddXMLAttributeAndValue(psOption, "description",
834 : "Whether to write xmin/ymin/xmax/ymax "
835 : "columns with the bounding box of "
836 : "geometries");
837 23 : CPLCreateXMLElementAndValue(psOption, "Value", "AUTO");
838 23 : CPLCreateXMLElementAndValue(psOption, "Value", "YES");
839 23 : CPLCreateXMLElementAndValue(psOption, "Value", "NO");
840 : }
841 :
842 : #if ARROW_VERSION_MAJOR >= 21
843 : {
844 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
845 : CPLAddXMLAttributeAndValue(psOption, "name", "USE_PARQUET_GEO_TYPES");
846 : CPLAddXMLAttributeAndValue(psOption, "default", "NO");
847 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
848 : CPLAddXMLAttributeAndValue(psOption, "description",
849 : "Whether to use Parquet Geometry/Geography "
850 : "logical types (introduced in libarrow 21), "
851 : "when using GEOMETRY_ENCODING=WKB encoding");
852 : CPLCreateXMLElementAndValue(psOption, "Value", "YES");
853 : CPLCreateXMLElementAndValue(psOption, "Value", "NO");
854 : CPLCreateXMLElementAndValue(psOption, "Value", "ONLY");
855 : }
856 : #endif
857 :
858 : {
859 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
860 23 : CPLAddXMLAttributeAndValue(psOption, "name", "SORT_BY_BBOX");
861 23 : CPLAddXMLAttributeAndValue(psOption, "type", "boolean");
862 23 : CPLAddXMLAttributeAndValue(psOption, "default", "NO");
863 23 : CPLAddXMLAttributeAndValue(psOption, "description",
864 : "Whether features should be sorted based on "
865 : "the bounding box of their geometries");
866 : }
867 :
868 23 : char *pszXML = CPLSerializeXMLTree(oTree.get());
869 23 : GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
870 23 : CPLFree(pszXML);
871 : }
872 :
873 : /************************************************************************/
874 : /* RegisterOGRParquet() */
875 : /************************************************************************/
876 :
877 36 : void RegisterOGRParquet()
878 : {
879 36 : if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
880 0 : return;
881 :
882 72 : auto poDriver = std::make_unique<OGRParquetDriver>();
883 36 : OGRParquetDriverSetCommonMetadata(poDriver.get());
884 :
885 36 : poDriver->pfnOpen = OGRParquetDriverOpen;
886 36 : poDriver->pfnCreate = OGRParquetDriverCreate;
887 :
888 36 : poDriver->SetMetadataItem("ARROW_VERSION", ARROW_VERSION_STRING);
889 : #ifdef GDAL_USE_ARROWDATASET
890 36 : poDriver->SetMetadataItem("ARROW_DATASET", "YES");
891 : #endif
892 :
893 36 : GetGDALDriverManager()->RegisterDriver(poDriver.release());
894 :
895 : #if ARROW_VERSION_MAJOR >= 16
896 : // Mostly for tests
897 : const char *pszPath =
898 36 : CPLGetConfigOption("OGR_PARQUET_LOAD_FILE_SYSTEM_FACTORIES", nullptr);
899 36 : if (pszPath)
900 : {
901 0 : auto result = arrow::fs::LoadFileSystemFactories(pszPath);
902 0 : if (!result.ok())
903 : {
904 0 : CPLError(CE_Warning, CPLE_AppDefined,
905 : "arrow::fs::LoadFileSystemFactories() failed with %s",
906 0 : result.message().c_str());
907 : }
908 : }
909 : #endif
910 :
911 : #if defined(GDAL_USE_ARROWDATASET) && defined(GDAL_USE_ARROWCOMPUTE)
912 : {
913 : auto status = arrow::compute::Initialize();
914 : if (!status.ok())
915 : {
916 : CPLError(CE_Warning, CPLE_AppDefined,
917 : "arrow::compute::Initialize() failed with %s",
918 : status.message().c_str());
919 : }
920 : }
921 : #endif
922 : }
|