Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "gdal_pam.h"
14 : #include "ogrsf_frmts.h"
15 :
16 : #include <algorithm>
17 : #include <map>
18 : #include <mutex>
19 : #include <tuple>
20 :
21 : #include "ogr_parquet.h"
22 : #include "ogrparquetdrivercore.h"
23 : #include "memdataset.h"
24 :
25 : #include "../arrow_common/ograrrowrandomaccessfile.h"
26 : #include "../arrow_common/vsiarrowfilesystem.hpp"
27 : #include "../arrow_common/ograrrowwritablefile.h"
28 : #include "../arrow_common/ograrrowdataset.hpp"
29 : #include "../arrow_common/ograrrowlayer.hpp" // for the destructor
30 :
31 : #ifdef GDAL_USE_ARROWDATASET
32 :
33 : /************************************************************************/
34 : /* OpenFromDatasetFactory() */
35 : /************************************************************************/
36 :
37 274 : static GDALDataset *OpenFromDatasetFactory(
38 : const std::string &osBasePath,
39 : const std::shared_ptr<arrow::dataset::DatasetFactory> &factory,
40 : CSLConstList papszOpenOptions,
41 : const std::shared_ptr<arrow::fs::FileSystem> &fs)
42 : {
43 274 : std::shared_ptr<arrow::dataset::Dataset> dataset;
44 548 : PARQUET_ASSIGN_OR_THROW(dataset, factory->Finish());
45 :
46 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
47 548 : arrow::MemoryPool::CreateDefault().release());
48 :
49 274 : const bool bIsVSI = STARTS_WITH(osBasePath.c_str(), "/vsi");
50 548 : auto poDS = std::make_unique<OGRParquetDataset>(poMemoryPool);
51 : auto poLayer = std::make_unique<OGRParquetDatasetLayer>(
52 548 : poDS.get(), CPLGetBasenameSafe(osBasePath.c_str()).c_str(), bIsVSI,
53 548 : dataset, papszOpenOptions);
54 274 : poDS->SetLayer(std::move(poLayer));
55 274 : poDS->SetFileSystem(fs);
56 548 : return poDS.release();
57 : }
58 :
59 : /************************************************************************/
60 : /* GetFileSystem() */
61 : /************************************************************************/
62 :
63 : static std::tuple<std::shared_ptr<arrow::fs::FileSystem>, std::string>
64 274 : GetFileSystem(std::string &osBasePathInOut,
65 : const std::string &osQueryParameters)
66 : {
67 : // Instantiate file system:
68 : // - VSIArrowFileSystem implementation for /vsi files
69 : // - base implementation for local files (if OGR_PARQUET_USE_VSI set to NO)
70 274 : std::shared_ptr<arrow::fs::FileSystem> fs;
71 274 : const bool bIsVSI = STARTS_WITH(osBasePathInOut.c_str(), "/vsi");
72 : VSIStatBufL sStat;
73 548 : std::string osFSFilename;
74 460 : if ((bIsVSI ||
75 540 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES"))) &&
76 266 : VSIStatL(osBasePathInOut.c_str(), &sStat) == 0)
77 : {
78 265 : osFSFilename = osBasePathInOut;
79 265 : fs = std::make_shared<VSIArrowFileSystem>("PARQUET", osQueryParameters);
80 : }
81 : else
82 : {
83 : // FileSystemFromUriOrPath() doesn't like relative paths
84 : // so transform them to absolute.
85 9 : std::string osPath(osBasePathInOut);
86 9 : if (CPLIsFilenameRelative(osPath.c_str()))
87 : {
88 8 : char *pszCurDir = CPLGetCurrentDir();
89 8 : if (pszCurDir == nullptr)
90 0 : return {nullptr, osFSFilename};
91 8 : osPath = CPLFormFilenameSafe(pszCurDir, osPath.c_str(), nullptr);
92 8 : CPLFree(pszCurDir);
93 : }
94 9 : PARQUET_ASSIGN_OR_THROW(
95 : fs, arrow::fs::FileSystemFromUriOrPath(osPath, &osFSFilename));
96 : }
97 274 : return {fs, osFSFilename};
98 : }
99 :
100 : /************************************************************************/
101 : /* MakeParquetFileFormat() */
102 : /************************************************************************/
103 :
104 : static std::shared_ptr<arrow::dataset::ParquetFileFormat>
105 274 : MakeParquetFileFormat()
106 : {
107 : auto parquetFileFormat =
108 274 : std::make_shared<arrow::dataset::ParquetFileFormat>();
109 : #if ARROW_VERSION_MAJOR >= 21
110 : auto fragmentScanOptions =
111 : std::dynamic_pointer_cast<arrow::dataset::ParquetFragmentScanOptions>(
112 : parquetFileFormat->default_fragment_scan_options);
113 : CPLAssert(fragmentScanOptions);
114 : fragmentScanOptions->arrow_reader_properties->set_arrow_extensions_enabled(
115 : CPLTestBool(
116 : CPLGetConfigOption("OGR_PARQUET_ENABLE_ARROW_EXTENSIONS", "YES")));
117 : #endif
118 274 : return parquetFileFormat;
119 : }
120 :
121 : /************************************************************************/
122 : /* OpenParquetDatasetWithMetadata() */
123 : /************************************************************************/
124 :
125 18 : static GDALDataset *OpenParquetDatasetWithMetadata(
126 : const std::string &osBasePathIn, const char *pszMetadataFile,
127 : const std::string &osQueryParameters, CSLConstList papszOpenOptions)
128 : {
129 36 : std::string osBasePath(osBasePathIn);
130 18 : const auto &[fs, osFSFilename] =
131 36 : GetFileSystem(osBasePath, osQueryParameters);
132 :
133 36 : arrow::dataset::ParquetFactoryOptions options;
134 36 : auto partitioningFactory = arrow::dataset::HivePartitioning::MakeFactory();
135 : options.partitioning =
136 18 : arrow::dataset::PartitioningOrFactory(std::move(partitioningFactory));
137 :
138 18 : std::shared_ptr<arrow::dataset::DatasetFactory> factory;
139 54 : PARQUET_ASSIGN_OR_THROW(factory,
140 : arrow::dataset::ParquetDatasetFactory::Make(
141 : osFSFilename + '/' + pszMetadataFile, fs,
142 : MakeParquetFileFormat(), std::move(options)));
143 :
144 36 : return OpenFromDatasetFactory(osBasePath, factory, papszOpenOptions, fs);
145 : }
146 :
147 : /************************************************************************/
148 : /* OpenParquetDatasetWithoutMetadata() */
149 : /************************************************************************/
150 :
151 : static GDALDataset *
152 256 : OpenParquetDatasetWithoutMetadata(const std::string &osBasePathIn,
153 : const std::string &osQueryParameters,
154 : CSLConstList papszOpenOptions)
155 : {
156 512 : std::string osBasePath(osBasePathIn);
157 256 : const auto &[fs, osFSFilename] =
158 512 : GetFileSystem(osBasePath, osQueryParameters);
159 :
160 512 : arrow::dataset::FileSystemFactoryOptions options;
161 256 : std::shared_ptr<arrow::dataset::DatasetFactory> factory;
162 :
163 512 : const auto fileInfo = fs->GetFileInfo(osFSFilename);
164 256 : if (fileInfo->IsFile())
165 : {
166 1008 : PARQUET_ASSIGN_OR_THROW(
167 : factory, arrow::dataset::FileSystemDatasetFactory::Make(
168 : fs, {std::move(osFSFilename)}, MakeParquetFileFormat(),
169 : std::move(options)));
170 : }
171 : else
172 : {
173 : auto partitioningFactory =
174 8 : arrow::dataset::HivePartitioning::MakeFactory();
175 8 : options.partitioning = arrow::dataset::PartitioningOrFactory(
176 8 : std::move(partitioningFactory));
177 :
178 8 : arrow::fs::FileSelector selector;
179 4 : selector.base_dir = std::move(osFSFilename);
180 4 : selector.recursive = true;
181 :
182 8 : PARQUET_ASSIGN_OR_THROW(
183 : factory, arrow::dataset::FileSystemDatasetFactory::Make(
184 : fs, std::move(selector), MakeParquetFileFormat(),
185 : std::move(options)));
186 : }
187 :
188 512 : return OpenFromDatasetFactory(osBasePath, factory, papszOpenOptions, fs);
189 : }
190 :
191 : #endif
192 :
193 : /************************************************************************/
194 : /* BuildMemDatasetWithRowGroupExtents() */
195 : /************************************************************************/
196 :
197 : /** Builds a MEM dataset that contains, for each row-group of the input file,
198 : * the feature count and spatial extent of the features of this row group,
199 : * using Parquet statistics. This assumes that the Parquet file declares
200 : * a "covering":{"bbox":{ ... }} metadata item.
201 : *
202 : * Only for debug purposes.
203 : */
204 1 : static GDALDataset *BuildMemDatasetWithRowGroupExtents(OGRParquetLayer *poLayer)
205 : {
206 1 : int iParquetXMin = -1;
207 1 : int iParquetYMin = -1;
208 1 : int iParquetXMax = -1;
209 1 : int iParquetYMax = -1;
210 1 : if (poLayer->GeomColsBBOXParquet(0, iParquetXMin, iParquetYMin,
211 : iParquetXMax, iParquetYMax))
212 : {
213 : auto poMemDS = std::unique_ptr<GDALDataset>(
214 2 : MEMDataset::Create("", 0, 0, 0, GDT_Unknown, nullptr));
215 1 : if (!poMemDS)
216 0 : return nullptr;
217 1 : OGRSpatialReference *poTmpSRS = nullptr;
218 1 : const auto poSrcSRS = poLayer->GetSpatialRef();
219 1 : if (poSrcSRS)
220 0 : poTmpSRS = poSrcSRS->Clone();
221 : auto poMemLayer =
222 1 : poMemDS->CreateLayer("footprint", poTmpSRS, wkbPolygon, nullptr);
223 1 : if (poTmpSRS)
224 0 : poTmpSRS->Release();
225 1 : if (!poMemLayer)
226 0 : return nullptr;
227 1 : poMemLayer->CreateField(
228 1 : std::make_unique<OGRFieldDefn>("feature_count", OFTInteger64)
229 1 : .get());
230 :
231 : const auto metadata =
232 2 : poLayer->GetReader()->parquet_reader()->metadata();
233 1 : const int numRowGroups = metadata->num_row_groups();
234 15 : for (int iRowGroup = 0; iRowGroup < numRowGroups; ++iRowGroup)
235 : {
236 28 : std::string osMinTmp, osMaxTmp;
237 : OGRField unusedF;
238 : bool unusedB;
239 : OGRFieldSubType unusedSubType;
240 :
241 : OGRField sXMin;
242 14 : OGR_RawField_SetNull(&sXMin);
243 14 : bool bFoundXMin = false;
244 14 : OGRFieldType eXMinType = OFTMaxType;
245 :
246 : OGRField sYMin;
247 14 : OGR_RawField_SetNull(&sYMin);
248 14 : bool bFoundYMin = false;
249 14 : OGRFieldType eYMinType = OFTMaxType;
250 :
251 : OGRField sXMax;
252 14 : OGR_RawField_SetNull(&sXMax);
253 14 : bool bFoundXMax = false;
254 14 : OGRFieldType eXMaxType = OFTMaxType;
255 :
256 : OGRField sYMax;
257 14 : OGR_RawField_SetNull(&sYMax);
258 14 : bool bFoundYMax = false;
259 14 : OGRFieldType eYMaxType = OFTMaxType;
260 :
261 14 : if (poLayer->GetMinMaxForParquetCol(
262 : iRowGroup, iParquetXMin, nullptr,
263 : /* bComputeMin = */ true, sXMin, bFoundXMin,
264 : /* bComputeMax = */ false, unusedF, unusedB, eXMinType,
265 8 : unusedSubType, osMinTmp, osMaxTmp) &&
266 8 : bFoundXMin && eXMinType == OFTReal &&
267 22 : poLayer->GetMinMaxForParquetCol(
268 : iRowGroup, iParquetYMin, nullptr,
269 : /* bComputeMin = */ true, sYMin, bFoundYMin,
270 : /* bComputeMax = */ false, unusedF, unusedB, eYMinType,
271 8 : unusedSubType, osMinTmp, osMaxTmp) &&
272 8 : bFoundYMin && eYMinType == OFTReal &&
273 22 : poLayer->GetMinMaxForParquetCol(
274 : iRowGroup, iParquetXMax, nullptr,
275 : /* bComputeMin = */ false, unusedF, unusedB,
276 : /* bComputeMax = */ true, sXMax, bFoundXMax, eXMaxType,
277 8 : unusedSubType, osMaxTmp, osMaxTmp) &&
278 8 : bFoundXMax && eXMaxType == OFTReal &&
279 22 : poLayer->GetMinMaxForParquetCol(
280 : iRowGroup, iParquetYMax, nullptr,
281 : /* bComputeMin = */ false, unusedF, unusedB,
282 : /* bComputeMax = */ true, sYMax, bFoundYMax, eYMaxType,
283 8 : unusedSubType, osMaxTmp, osMaxTmp) &&
284 22 : bFoundYMax && eYMaxType == OFTReal)
285 : {
286 16 : OGRFeature oFeat(poMemLayer->GetLayerDefn());
287 8 : oFeat.SetField(0,
288 : static_cast<GIntBig>(
289 8 : metadata->RowGroup(iRowGroup)->num_rows()));
290 16 : auto poPoly = std::make_unique<OGRPolygon>();
291 8 : auto poLR = std::make_unique<OGRLinearRing>();
292 8 : poLR->addPoint(sXMin.Real, sYMin.Real);
293 8 : poLR->addPoint(sXMin.Real, sYMax.Real);
294 8 : poLR->addPoint(sXMax.Real, sYMax.Real);
295 8 : poLR->addPoint(sXMax.Real, sYMin.Real);
296 8 : poLR->addPoint(sXMin.Real, sYMin.Real);
297 8 : poPoly->addRingDirectly(poLR.release());
298 8 : oFeat.SetGeometryDirectly(poPoly.release());
299 8 : CPL_IGNORE_RET_VAL(poMemLayer->CreateFeature(&oFeat));
300 : }
301 : }
302 :
303 1 : return poMemDS.release();
304 : }
305 0 : return nullptr;
306 : }
307 :
308 : /************************************************************************/
309 : /* Open() */
310 : /************************************************************************/
311 :
312 1679 : static GDALDataset *OGRParquetDriverOpen(GDALOpenInfo *poOpenInfo)
313 : {
314 1679 : if (poOpenInfo->eAccess == GA_Update)
315 64 : return nullptr;
316 :
317 : #if ARROW_VERSION_MAJOR >= 21
318 : // Register geoarrow.wkb extension if not already done
319 : if (!arrow::GetExtensionType(EXTENSION_NAME_GEOARROW_WKB) &&
320 : CPLTestBool(CPLGetConfigOption(
321 : "OGR_PARQUET_REGISTER_GEOARROW_WKB_EXTENSION", "YES")))
322 : {
323 : CPL_IGNORE_RET_VAL(arrow::RegisterExtensionType(
324 : std::make_shared<OGRGeoArrowWkbExtensionType>(
325 : std::move(arrow::binary()), std::string())));
326 : }
327 : #endif
328 :
329 : #ifdef GDAL_USE_ARROWDATASET
330 3230 : std::string osBasePath(poOpenInfo->pszFilename);
331 3230 : std::string osQueryParameters;
332 : const bool bStartedWithParquetPrefix =
333 1615 : STARTS_WITH(osBasePath.c_str(), "PARQUET:");
334 :
335 1615 : if (bStartedWithParquetPrefix)
336 : {
337 262 : osBasePath = osBasePath.substr(strlen("PARQUET:"));
338 : }
339 :
340 : // Little trick to allow using syntax of
341 : // https://github.com/opengeospatial/geoparquet/discussions/101
342 : // ogrinfo
343 : // "/vsicurl/https://ai4edataeuwest.blob.core.windows.net/us-census/2020/cb_2020_us_vtd_500k.parquet?${SAS_TOKEN}"
344 1615 : if (STARTS_WITH(osBasePath.c_str(), "/vsicurl/"))
345 : {
346 2 : const auto nPos = osBasePath.find(".parquet?st=");
347 2 : if (nPos != std::string::npos)
348 : {
349 0 : osQueryParameters = osBasePath.substr(nPos + strlen(".parquet"));
350 0 : osBasePath.resize(nPos + strlen(".parquet"));
351 : }
352 : }
353 :
354 2489 : if (bStartedWithParquetPrefix || poOpenInfo->bIsDirectory ||
355 874 : !osQueryParameters.empty())
356 : {
357 : VSIStatBufL sStat;
358 741 : if (!osBasePath.empty() && osBasePath.back() == '/')
359 0 : osBasePath.pop_back();
360 : const std::string osMetadataPath =
361 741 : CPLFormFilenameSafe(osBasePath.c_str(), "_metadata", nullptr);
362 741 : if (CPLTestBool(
363 2223 : CPLGetConfigOption("OGR_PARQUET_USE_METADATA_FILE", "YES")) &&
364 1482 : VSIStatL((osMetadataPath + osQueryParameters).c_str(), &sStat) == 0)
365 : {
366 : // If there's a _metadata file, then use it to avoid listing files
367 : try
368 : {
369 36 : return OpenParquetDatasetWithMetadata(
370 : osBasePath, "_metadata", osQueryParameters,
371 18 : poOpenInfo->papszOpenOptions);
372 : }
373 0 : catch (const std::exception &e)
374 : {
375 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
376 0 : e.what());
377 : }
378 0 : return nullptr;
379 : }
380 : else
381 : {
382 723 : bool bLikelyParquetDataset = false;
383 723 : if (poOpenInfo->bIsDirectory)
384 : {
385 : // Detect if the directory contains .parquet files, or
386 : // subdirectories with a name of the form "key=value", typical
387 : // of HIVE partitioning.
388 938 : const CPLStringList aosFiles(VSIReadDir(osBasePath.c_str()));
389 22210 : for (const char *pszFilename : cpl::Iterate(aosFiles))
390 : {
391 21743 : if (EQUAL(CPLGetExtensionSafe(pszFilename).c_str(),
392 : "parquet"))
393 : {
394 2 : bLikelyParquetDataset = true;
395 2 : break;
396 : }
397 21741 : else if (strchr(pszFilename, '='))
398 : {
399 : // HIVE partitioning
400 0 : if (VSIStatL(CPLFormFilenameSafe(osBasePath.c_str(),
401 : pszFilename, nullptr)
402 : .c_str(),
403 0 : &sStat) == 0 &&
404 0 : VSI_ISDIR(sStat.st_mode))
405 : {
406 0 : bLikelyParquetDataset = true;
407 0 : break;
408 : }
409 : }
410 : }
411 : }
412 :
413 723 : if (bStartedWithParquetPrefix || bLikelyParquetDataset)
414 : {
415 : try
416 : {
417 512 : return OpenParquetDatasetWithoutMetadata(
418 : osBasePath, osQueryParameters,
419 256 : poOpenInfo->papszOpenOptions);
420 : }
421 0 : catch (const std::exception &e)
422 : {
423 : // If we aren't quite sure that the passed file name is
424 : // a directory, then silently continue
425 0 : if (poOpenInfo->bIsDirectory)
426 : {
427 0 : CPLError(CE_Failure, CPLE_AppDefined,
428 0 : "Parquet exception: %s", e.what());
429 0 : return nullptr;
430 : }
431 : }
432 : }
433 : }
434 : }
435 : #endif
436 :
437 1341 : if (!OGRParquetDriverIdentify(poOpenInfo))
438 : {
439 0 : return nullptr;
440 : }
441 :
442 1341 : if (poOpenInfo->bIsDirectory)
443 467 : return nullptr;
444 :
445 1748 : std::string osFilename(poOpenInfo->pszFilename);
446 874 : if (STARTS_WITH(poOpenInfo->pszFilename, "PARQUET:"))
447 : {
448 0 : osFilename = poOpenInfo->pszFilename + strlen("PARQUET:");
449 : }
450 :
451 : try
452 : {
453 874 : std::shared_ptr<arrow::io::RandomAccessFile> infile;
454 1354 : if (STARTS_WITH(osFilename.c_str(), "/vsi") ||
455 480 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "NO")))
456 : {
457 394 : VSIVirtualHandleUniquePtr fp(poOpenInfo->fpL);
458 394 : poOpenInfo->fpL = nullptr;
459 394 : if (fp == nullptr)
460 : {
461 0 : fp.reset(VSIFOpenL(osFilename.c_str(), "rb"));
462 0 : if (fp == nullptr)
463 0 : return nullptr;
464 : }
465 788 : infile = std::make_shared<OGRArrowRandomAccessFile>(osFilename,
466 788 : std::move(fp));
467 : }
468 : else
469 : {
470 480 : PARQUET_ASSIGN_OR_THROW(infile,
471 : arrow::io::ReadableFile::Open(osFilename));
472 : }
473 :
474 : // Open Parquet file reader
475 874 : std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
476 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
477 1748 : arrow::MemoryPool::CreateDefault().release());
478 :
479 874 : const int nNumCPUs = OGRParquetLayerBase::GetNumCPUs();
480 : const char *pszUseThreads =
481 874 : CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr);
482 874 : if (!pszUseThreads && nNumCPUs > 1)
483 : {
484 874 : pszUseThreads = "YES";
485 : }
486 874 : const bool bUseThreads = pszUseThreads && CPLTestBool(pszUseThreads);
487 :
488 : const char *pszParquetBatchSize =
489 874 : CPLGetConfigOption("OGR_PARQUET_BATCH_SIZE", nullptr);
490 :
491 : #if ARROW_VERSION_MAJOR >= 21
492 : parquet::arrow::FileReaderBuilder fileReaderBuilder;
493 : {
494 : auto st = fileReaderBuilder.Open(std::move(infile));
495 : if (!st.ok())
496 : {
497 : CPLError(CE_Failure, CPLE_AppDefined,
498 : "parquet::arrow::FileReaderBuilder::Open() failed: %s",
499 : st.message().c_str());
500 : return nullptr;
501 : }
502 : }
503 : fileReaderBuilder.memory_pool(poMemoryPool.get());
504 : parquet::ArrowReaderProperties fileReaderProperties;
505 : fileReaderProperties.set_arrow_extensions_enabled(CPLTestBool(
506 : CPLGetConfigOption("OGR_PARQUET_ENABLE_ARROW_EXTENSIONS", "YES")));
507 : if (pszParquetBatchSize)
508 : {
509 : fileReaderProperties.set_batch_size(
510 : CPLAtoGIntBig(pszParquetBatchSize));
511 : }
512 : if (bUseThreads)
513 : {
514 : fileReaderProperties.set_use_threads(true);
515 : }
516 : fileReaderBuilder.properties(fileReaderProperties);
517 : {
518 : auto res = fileReaderBuilder.Build();
519 : if (!res.ok())
520 : {
521 : CPLError(
522 : CE_Failure, CPLE_AppDefined,
523 : "parquet::arrow::FileReaderBuilder::Build() failed: %s",
524 : res.status().message().c_str());
525 : return nullptr;
526 : }
527 : arrow_reader = std::move(*res);
528 : }
529 : #elif ARROW_VERSION_MAJOR >= 19
530 2622 : PARQUET_ASSIGN_OR_THROW(
531 : arrow_reader,
532 : parquet::arrow::OpenFile(std::move(infile), poMemoryPool.get()));
533 : #else
534 : auto st = parquet::arrow::OpenFile(std::move(infile),
535 : poMemoryPool.get(), &arrow_reader);
536 : if (!st.ok())
537 : {
538 : CPLError(CE_Failure, CPLE_AppDefined,
539 : "parquet::arrow::OpenFile() failed: %s",
540 : st.message().c_str());
541 : return nullptr;
542 : }
543 : #endif
544 :
545 : #if ARROW_VERSION_MAJOR < 21
546 873 : if (pszParquetBatchSize)
547 : {
548 5 : arrow_reader->set_batch_size(CPLAtoGIntBig(pszParquetBatchSize));
549 : }
550 :
551 873 : if (bUseThreads)
552 : {
553 873 : arrow_reader->set_use_threads(true);
554 : }
555 : #endif
556 :
557 1746 : auto poDS = std::make_unique<OGRParquetDataset>(poMemoryPool);
558 : auto poLayer = std::make_unique<OGRParquetLayer>(
559 1746 : poDS.get(), CPLGetBasenameSafe(osFilename.c_str()).c_str(),
560 2619 : std::move(arrow_reader), poOpenInfo->papszOpenOptions);
561 :
562 : // For debug purposes: return a layer with the extent of each row group
563 873 : if (CPLTestBool(
564 : CPLGetConfigOption("OGR_PARQUET_SHOW_ROW_GROUP_EXTENT", "NO")))
565 : {
566 1 : return BuildMemDatasetWithRowGroupExtents(poLayer.get());
567 : }
568 :
569 872 : poDS->SetLayer(std::move(poLayer));
570 872 : return poDS.release();
571 : }
572 1 : catch (const std::exception &e)
573 : {
574 1 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
575 1 : e.what());
576 1 : return nullptr;
577 : }
578 : }
579 :
580 : /************************************************************************/
581 : /* Create() */
582 : /************************************************************************/
583 :
584 284 : static GDALDataset *OGRParquetDriverCreate(const char *pszName, int nXSize,
585 : int nYSize, int nBands,
586 : GDALDataType eType,
587 : char ** /* papszOptions */)
588 : {
589 284 : if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
590 0 : return nullptr;
591 :
592 : try
593 : {
594 284 : std::shared_ptr<arrow::io::OutputStream> out_file;
595 367 : if (STARTS_WITH(pszName, "/vsi") ||
596 83 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES")))
597 : {
598 : VSIVirtualHandleUniquePtr fp =
599 284 : VSIFilesystemHandler::OpenStatic(pszName, "wb");
600 284 : if (fp == nullptr)
601 : {
602 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
603 1 : return nullptr;
604 : }
605 283 : out_file = std::make_shared<OGRArrowWritableFile>(std::move(fp));
606 : }
607 : else
608 : {
609 0 : PARQUET_ASSIGN_OR_THROW(out_file,
610 : arrow::io::FileOutputStream::Open(pszName));
611 : }
612 :
613 283 : return new OGRParquetWriterDataset(out_file);
614 : }
615 0 : catch (const std::exception &e)
616 : {
617 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
618 0 : e.what());
619 0 : return nullptr;
620 : }
621 : }
622 :
623 : /************************************************************************/
624 : /* OGRParquetDriver() */
625 : /************************************************************************/
626 :
627 : class OGRParquetDriver final : public GDALDriver
628 : {
629 : std::recursive_mutex m_oMutex{};
630 : bool m_bMetadataInitialized = false;
631 : void InitMetadata();
632 :
633 : public:
634 : const char *GetMetadataItem(const char *pszName,
635 : const char *pszDomain) override;
636 :
637 54 : char **GetMetadata(const char *pszDomain) override
638 : {
639 108 : std::lock_guard oLock(m_oMutex);
640 54 : InitMetadata();
641 108 : return GDALDriver::GetMetadata(pszDomain);
642 : }
643 : };
644 :
645 2138 : const char *OGRParquetDriver::GetMetadataItem(const char *pszName,
646 : const char *pszDomain)
647 : {
648 4276 : std::lock_guard oLock(m_oMutex);
649 2138 : if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
650 : {
651 338 : InitMetadata();
652 : }
653 4276 : return GDALDriver::GetMetadataItem(pszName, pszDomain);
654 : }
655 :
656 392 : void OGRParquetDriver::InitMetadata()
657 : {
658 392 : if (m_bMetadataInitialized)
659 369 : return;
660 23 : m_bMetadataInitialized = true;
661 :
662 : CPLXMLTreeCloser oTree(
663 46 : CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
664 :
665 46 : std::vector<const char *> apszCompressionMethods;
666 23 : bool bHasSnappy = false;
667 23 : int minComprLevel = INT_MAX;
668 23 : int maxComprLevel = INT_MIN;
669 46 : std::string osCompressionLevelDesc = "Compression level, codec dependent.";
670 161 : for (const char *pszMethod :
671 184 : {"SNAPPY", "GZIP", "BROTLI", "ZSTD", "LZ4_RAW", "LZO", "LZ4_HADOOP"})
672 : {
673 : auto compressionTypeRes = arrow::util::Codec::GetCompressionType(
674 322 : CPLString(pszMethod).tolower());
675 322 : if (compressionTypeRes.ok() &&
676 161 : arrow::util::Codec::IsAvailable(*compressionTypeRes))
677 : {
678 138 : const auto compressionType = *compressionTypeRes;
679 138 : if (EQUAL(pszMethod, "SNAPPY"))
680 23 : bHasSnappy = true;
681 138 : apszCompressionMethods.emplace_back(pszMethod);
682 :
683 : auto minCompressLevelRes =
684 276 : arrow::util::Codec::MinimumCompressionLevel(compressionType);
685 : auto maxCompressLevelRes =
686 276 : arrow::util::Codec::MaximumCompressionLevel(compressionType);
687 : auto defCompressLevelRes =
688 276 : arrow::util::Codec::DefaultCompressionLevel(compressionType);
689 230 : if (minCompressLevelRes.ok() && maxCompressLevelRes.ok() &&
690 92 : defCompressLevelRes.ok())
691 : {
692 92 : minComprLevel = std::min(minComprLevel, *minCompressLevelRes);
693 92 : maxComprLevel = std::max(maxComprLevel, *maxCompressLevelRes);
694 92 : osCompressionLevelDesc += ' ';
695 92 : osCompressionLevelDesc += pszMethod;
696 92 : osCompressionLevelDesc += ": [";
697 92 : osCompressionLevelDesc += std::to_string(*minCompressLevelRes);
698 92 : osCompressionLevelDesc += ',';
699 92 : osCompressionLevelDesc += std::to_string(*maxCompressLevelRes);
700 92 : osCompressionLevelDesc += "], default=";
701 92 : if (EQUAL(pszMethod, "ZSTD"))
702 46 : osCompressionLevelDesc += std::to_string(
703 23 : OGR_PARQUET_ZSTD_DEFAULT_COMPRESSION_LEVEL);
704 : else
705 : osCompressionLevelDesc +=
706 69 : std::to_string(*defCompressLevelRes);
707 92 : osCompressionLevelDesc += '.';
708 : }
709 : }
710 : }
711 :
712 : {
713 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
714 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
715 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
716 23 : CPLAddXMLAttributeAndValue(psOption, "description",
717 : "Compression method");
718 23 : CPLAddXMLAttributeAndValue(psOption, "default",
719 : bHasSnappy ? "SNAPPY" : "NONE");
720 : {
721 23 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
722 23 : CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
723 23 : CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
724 : }
725 161 : for (const char *pszMethod : apszCompressionMethods)
726 : {
727 138 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
728 138 : CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
729 : }
730 : }
731 :
732 23 : if (minComprLevel <= maxComprLevel)
733 : {
734 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
735 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION_LEVEL");
736 23 : CPLAddXMLAttributeAndValue(psOption, "type", "int");
737 23 : CPLAddXMLAttributeAndValue(
738 : psOption, "min",
739 : CPLSPrintf("%d",
740 23 : std::min(DEFAULT_COMPRESSION_LEVEL, minComprLevel)));
741 23 : CPLAddXMLAttributeAndValue(psOption, "max",
742 : CPLSPrintf("%d", maxComprLevel));
743 23 : CPLAddXMLAttributeAndValue(psOption, "description",
744 : osCompressionLevelDesc.c_str());
745 23 : CPLAddXMLAttributeAndValue(psOption, "default",
746 : CPLSPrintf("%d", DEFAULT_COMPRESSION_LEVEL));
747 : }
748 :
749 : {
750 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
751 23 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
752 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
753 23 : CPLAddXMLAttributeAndValue(psOption, "description",
754 : "Encoding of geometry columns");
755 23 : CPLAddXMLAttributeAndValue(psOption, "default", "WKB");
756 92 : for (const char *pszEncoding :
757 115 : {"WKB", "WKT", "GEOARROW", "GEOARROW_INTERLEAVED"})
758 : {
759 92 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
760 92 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
761 92 : if (EQUAL(pszEncoding, "GEOARROW"))
762 23 : CPLAddXMLAttributeAndValue(poValueNode, "alias",
763 : "GEOARROW_STRUCT");
764 : }
765 : }
766 :
767 : {
768 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
769 23 : CPLAddXMLAttributeAndValue(psOption, "name", "ROW_GROUP_SIZE");
770 23 : CPLAddXMLAttributeAndValue(psOption, "type", "integer");
771 23 : CPLAddXMLAttributeAndValue(psOption, "description",
772 : "Maximum number of rows per group");
773 23 : CPLAddXMLAttributeAndValue(psOption, "default", "65536");
774 : }
775 :
776 : {
777 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
778 23 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
779 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
780 23 : CPLAddXMLAttributeAndValue(psOption, "description",
781 : "Name of geometry column");
782 23 : CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
783 : }
784 :
785 : {
786 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
787 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COORDINATE_PRECISION");
788 23 : CPLAddXMLAttributeAndValue(psOption, "type", "float");
789 23 : CPLAddXMLAttributeAndValue(psOption, "description",
790 : "Number of decimals for coordinates (only "
791 : "for GEOMETRY_ENCODING=WKT)");
792 : }
793 :
794 : {
795 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
796 23 : CPLAddXMLAttributeAndValue(psOption, "name", "FID");
797 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
798 23 : CPLAddXMLAttributeAndValue(psOption, "description",
799 : "Name of the FID column to create");
800 : }
801 :
802 : {
803 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
804 23 : CPLAddXMLAttributeAndValue(psOption, "name", "POLYGON_ORIENTATION");
805 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
806 23 : CPLAddXMLAttributeAndValue(
807 : psOption, "description",
808 : "Which ring orientation to use for polygons");
809 23 : CPLAddXMLAttributeAndValue(psOption, "default", "COUNTERCLOCKWISE");
810 23 : CPLCreateXMLElementAndValue(psOption, "Value", "COUNTERCLOCKWISE");
811 23 : CPLCreateXMLElementAndValue(psOption, "Value", "UNMODIFIED");
812 : }
813 :
814 : {
815 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
816 23 : CPLAddXMLAttributeAndValue(psOption, "name", "EDGES");
817 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
818 23 : CPLAddXMLAttributeAndValue(
819 : psOption, "description",
820 : "Name of the coordinate system for the edges");
821 23 : CPLAddXMLAttributeAndValue(psOption, "default", "PLANAR");
822 23 : CPLCreateXMLElementAndValue(psOption, "Value", "PLANAR");
823 23 : CPLCreateXMLElementAndValue(psOption, "Value", "SPHERICAL");
824 : }
825 :
826 : {
827 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
828 23 : CPLAddXMLAttributeAndValue(psOption, "name", "CREATOR");
829 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
830 23 : CPLAddXMLAttributeAndValue(psOption, "description",
831 : "Name of creating application");
832 : }
833 :
834 : {
835 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
836 23 : CPLAddXMLAttributeAndValue(psOption, "name", "WRITE_COVERING_BBOX");
837 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
838 23 : CPLAddXMLAttributeAndValue(psOption, "default", "AUTO");
839 23 : CPLAddXMLAttributeAndValue(psOption, "description",
840 : "Whether to write xmin/ymin/xmax/ymax "
841 : "columns with the bounding box of "
842 : "geometries");
843 23 : CPLCreateXMLElementAndValue(psOption, "Value", "AUTO");
844 23 : CPLCreateXMLElementAndValue(psOption, "Value", "YES");
845 23 : CPLCreateXMLElementAndValue(psOption, "Value", "NO");
846 : }
847 :
848 : #if ARROW_VERSION_MAJOR >= 21
849 : {
850 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
851 : CPLAddXMLAttributeAndValue(psOption, "name", "USE_PARQUET_GEO_TYPES");
852 : CPLAddXMLAttributeAndValue(psOption, "default", "NO");
853 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
854 : CPLAddXMLAttributeAndValue(psOption, "description",
855 : "Whether to use Parquet Geometry/Geography "
856 : "logical types (introduced in libarrow 21), "
857 : "when using GEOMETRY_ENCODING=WKB encoding");
858 : CPLCreateXMLElementAndValue(psOption, "Value", "YES");
859 : CPLCreateXMLElementAndValue(psOption, "Value", "NO");
860 : CPLCreateXMLElementAndValue(psOption, "Value", "ONLY");
861 : }
862 : #endif
863 :
864 : {
865 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
866 23 : CPLAddXMLAttributeAndValue(psOption, "name", "SORT_BY_BBOX");
867 23 : CPLAddXMLAttributeAndValue(psOption, "type", "boolean");
868 23 : CPLAddXMLAttributeAndValue(psOption, "default", "NO");
869 23 : CPLAddXMLAttributeAndValue(psOption, "description",
870 : "Whether features should be sorted based on "
871 : "the bounding box of their geometries");
872 : }
873 :
874 23 : char *pszXML = CPLSerializeXMLTree(oTree.get());
875 23 : GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
876 23 : CPLFree(pszXML);
877 : }
878 :
879 : /************************************************************************/
880 : /* RegisterOGRParquet() */
881 : /************************************************************************/
882 :
883 36 : void RegisterOGRParquet()
884 : {
885 36 : if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
886 0 : return;
887 :
888 72 : auto poDriver = std::make_unique<OGRParquetDriver>();
889 36 : OGRParquetDriverSetCommonMetadata(poDriver.get());
890 :
891 36 : poDriver->pfnOpen = OGRParquetDriverOpen;
892 36 : poDriver->pfnCreate = OGRParquetDriverCreate;
893 :
894 36 : poDriver->SetMetadataItem("ARROW_VERSION", ARROW_VERSION_STRING);
895 : #ifdef GDAL_USE_ARROWDATASET
896 36 : poDriver->SetMetadataItem("ARROW_DATASET", "YES");
897 : #endif
898 :
899 36 : GetGDALDriverManager()->RegisterDriver(poDriver.release());
900 :
901 : #if ARROW_VERSION_MAJOR >= 16
902 : // Mostly for tests
903 : const char *pszPath =
904 36 : CPLGetConfigOption("OGR_PARQUET_LOAD_FILE_SYSTEM_FACTORIES", nullptr);
905 36 : if (pszPath)
906 : {
907 0 : auto result = arrow::fs::LoadFileSystemFactories(pszPath);
908 0 : if (!result.ok())
909 : {
910 0 : CPLError(CE_Warning, CPLE_AppDefined,
911 : "arrow::fs::LoadFileSystemFactories() failed with %s",
912 0 : result.message().c_str());
913 : }
914 : }
915 : #endif
916 :
917 : #if defined(GDAL_USE_ARROWDATASET) && defined(GDAL_USE_ARROWCOMPUTE)
918 : {
919 : auto status = arrow::compute::Initialize();
920 : if (!status.ok())
921 : {
922 : CPLError(CE_Warning, CPLE_AppDefined,
923 : "arrow::compute::Initialize() failed with %s",
924 : status.message().c_str());
925 : }
926 : }
927 : #endif
928 : }
|