Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "gdal_pam.h"
14 : #include "ogrsf_frmts.h"
15 : #include "cpl_enumerate.h"
16 :
17 : #include <algorithm>
18 : #include <map>
19 : #include <mutex>
20 : #include <tuple>
21 :
22 : #include "gdalalgorithm.h"
23 : #include "ogr_parquet.h"
24 : #include "ogrparquetdrivercore.h"
25 : #include "memdataset.h"
26 : #include "ogreditablelayer.h"
27 :
28 : #include "../arrow_common/ograrrowrandomaccessfile.h"
29 : #include "../arrow_common/vsiarrowfilesystem.hpp"
30 : #include "../arrow_common/ograrrowwritablefile.h"
31 : #include "../arrow_common/ograrrowdataset.hpp"
32 : #include "../arrow_common/ograrrowlayer.hpp" // for the destructor
33 :
34 : #ifdef GDAL_USE_ARROWDATASET
35 :
36 : /************************************************************************/
37 : /* OpenFromDatasetFactory() */
38 : /************************************************************************/
39 :
40 360 : static GDALDataset *OpenFromDatasetFactory(
41 : const std::string &osBasePath,
42 : const std::shared_ptr<arrow::dataset::DatasetFactory> &factory,
43 : CSLConstList papszOpenOptions,
44 : const std::shared_ptr<arrow::fs::FileSystem> &fs)
45 : {
46 360 : std::shared_ptr<arrow::dataset::Dataset> dataset;
47 720 : PARQUET_ASSIGN_OR_THROW(dataset, factory->Finish());
48 :
49 360 : const bool bIsVSI = STARTS_WITH(osBasePath.c_str(), "/vsi");
50 720 : auto poDS = std::make_unique<OGRParquetDataset>();
51 : auto poLayer = std::make_unique<OGRParquetDatasetLayer>(
52 720 : poDS.get(), CPLGetBasenameSafe(osBasePath.c_str()).c_str(), bIsVSI,
53 720 : dataset, papszOpenOptions);
54 360 : poDS->SetLayer(std::move(poLayer));
55 360 : poDS->SetFileSystem(fs);
56 720 : return poDS.release();
57 : }
58 :
59 : /************************************************************************/
60 : /* GetFileSystem() */
61 : /************************************************************************/
62 :
63 : static std::tuple<std::shared_ptr<arrow::fs::FileSystem>, std::string>
64 360 : GetFileSystem(std::string &osBasePathInOut,
65 : const std::string &osQueryParameters)
66 : {
67 : // Instantiate file system:
68 : // - VSIArrowFileSystem implementation for /vsi files
69 : // - base implementation for local files (if OGR_PARQUET_USE_VSI set to NO)
70 360 : std::shared_ptr<arrow::fs::FileSystem> fs;
71 360 : const bool bIsVSI = STARTS_WITH(osBasePathInOut.c_str(), "/vsi");
72 : VSIStatBufL sStat;
73 720 : std::string osFSFilename;
74 590 : if ((bIsVSI ||
75 712 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES"))) &&
76 352 : VSIStatL(osBasePathInOut.c_str(), &sStat) == 0)
77 : {
78 351 : osFSFilename = osBasePathInOut;
79 351 : fs = std::make_shared<VSIArrowFileSystem>("PARQUET", osQueryParameters);
80 : }
81 : else
82 : {
83 : // FileSystemFromUriOrPath() doesn't like relative paths
84 : // so transform them to absolute.
85 9 : std::string osPath(osBasePathInOut);
86 9 : if (CPLIsFilenameRelative(osPath.c_str()))
87 : {
88 8 : char *pszCurDir = CPLGetCurrentDir();
89 8 : if (pszCurDir == nullptr)
90 0 : return {nullptr, osFSFilename};
91 8 : osPath = CPLFormFilenameSafe(pszCurDir, osPath.c_str(), nullptr);
92 8 : CPLFree(pszCurDir);
93 : }
94 9 : PARQUET_ASSIGN_OR_THROW(
95 : fs, arrow::fs::FileSystemFromUriOrPath(osPath, &osFSFilename));
96 : }
97 360 : return {fs, osFSFilename};
98 : }
99 :
100 : /************************************************************************/
101 : /* MakeParquetFileFormat() */
102 : /************************************************************************/
103 :
104 : static std::shared_ptr<arrow::dataset::ParquetFileFormat>
105 360 : MakeParquetFileFormat()
106 : {
107 : auto parquetFileFormat =
108 360 : std::make_shared<arrow::dataset::ParquetFileFormat>();
109 : #if ARROW_VERSION_MAJOR >= 21
110 : auto fragmentScanOptions =
111 : std::dynamic_pointer_cast<arrow::dataset::ParquetFragmentScanOptions>(
112 : parquetFileFormat->default_fragment_scan_options);
113 : CPLAssert(fragmentScanOptions);
114 : fragmentScanOptions->arrow_reader_properties->set_arrow_extensions_enabled(
115 : CPLTestBool(
116 : CPLGetConfigOption("OGR_PARQUET_ENABLE_ARROW_EXTENSIONS", "YES")));
117 : #endif
118 360 : return parquetFileFormat;
119 : }
120 :
121 : /************************************************************************/
122 : /* OpenParquetDatasetWithMetadata() */
123 : /************************************************************************/
124 :
125 22 : static GDALDataset *OpenParquetDatasetWithMetadata(
126 : const std::string &osBasePathIn, const char *pszMetadataFile,
127 : const std::string &osQueryParameters, CSLConstList papszOpenOptions)
128 : {
129 44 : std::string osBasePath(osBasePathIn);
130 22 : const auto &[fs, osFSFilename] =
131 44 : GetFileSystem(osBasePath, osQueryParameters);
132 :
133 44 : arrow::dataset::ParquetFactoryOptions options;
134 44 : auto partitioningFactory = arrow::dataset::HivePartitioning::MakeFactory();
135 : options.partitioning =
136 22 : arrow::dataset::PartitioningOrFactory(std::move(partitioningFactory));
137 :
138 22 : std::shared_ptr<arrow::dataset::DatasetFactory> factory;
139 66 : PARQUET_ASSIGN_OR_THROW(factory,
140 : arrow::dataset::ParquetDatasetFactory::Make(
141 : osFSFilename + '/' + pszMetadataFile, fs,
142 : MakeParquetFileFormat(), std::move(options)));
143 :
144 44 : return OpenFromDatasetFactory(osBasePath, factory, papszOpenOptions, fs);
145 : }
146 :
147 : /************************************************************************/
148 : /* OpenParquetDatasetWithoutMetadata() */
149 : /************************************************************************/
150 :
151 : static GDALDataset *
152 338 : OpenParquetDatasetWithoutMetadata(const std::string &osBasePathIn,
153 : const std::string &osQueryParameters,
154 : CSLConstList papszOpenOptions)
155 : {
156 676 : std::string osBasePath(osBasePathIn);
157 338 : const auto &[fs, osFSFilename] =
158 676 : GetFileSystem(osBasePath, osQueryParameters);
159 :
160 676 : arrow::dataset::FileSystemFactoryOptions options;
161 338 : std::shared_ptr<arrow::dataset::DatasetFactory> factory;
162 :
163 676 : const auto fileInfo = fs->GetFileInfo(osFSFilename);
164 338 : if (fileInfo->IsFile())
165 : {
166 1344 : PARQUET_ASSIGN_OR_THROW(
167 : factory, arrow::dataset::FileSystemDatasetFactory::Make(
168 : fs, {std::move(osFSFilename)}, MakeParquetFileFormat(),
169 : std::move(options)));
170 : }
171 : else
172 : {
173 : auto partitioningFactory =
174 4 : arrow::dataset::HivePartitioning::MakeFactory();
175 4 : options.partitioning = arrow::dataset::PartitioningOrFactory(
176 4 : std::move(partitioningFactory));
177 :
178 4 : arrow::fs::FileSelector selector;
179 2 : selector.base_dir = std::move(osFSFilename);
180 2 : selector.recursive = true;
181 :
182 4 : PARQUET_ASSIGN_OR_THROW(
183 : factory, arrow::dataset::FileSystemDatasetFactory::Make(
184 : fs, std::move(selector), MakeParquetFileFormat(),
185 : std::move(options)));
186 : }
187 :
188 676 : return OpenFromDatasetFactory(osBasePath, factory, papszOpenOptions, fs);
189 : }
190 :
191 : #endif
192 :
193 : /************************************************************************/
194 : /* BuildMemDatasetWithRowGroupExtents() */
195 : /************************************************************************/
196 :
197 : /** Builds a MEM dataset that contains, for each row-group of the input file,
198 : * the feature count and spatial extent of the features of this row group,
199 : * using Parquet statistics. This assumes that the Parquet file declares
200 : * a "covering":{"bbox":{ ... }} metadata item.
201 : *
202 : * Only for debug purposes.
203 : */
204 1 : static GDALDataset *BuildMemDatasetWithRowGroupExtents(OGRParquetLayer *poLayer)
205 : {
206 1 : int iParquetXMin = -1;
207 1 : int iParquetYMin = -1;
208 1 : int iParquetXMax = -1;
209 1 : int iParquetYMax = -1;
210 1 : if (poLayer->GeomColsBBOXParquet(0, iParquetXMin, iParquetYMin,
211 : iParquetXMax, iParquetYMax))
212 : {
213 : auto poMemDS = std::unique_ptr<GDALDataset>(
214 2 : MEMDataset::Create("", 0, 0, 0, GDT_Unknown, nullptr));
215 1 : if (!poMemDS)
216 0 : return nullptr;
217 1 : OGRSpatialReference *poTmpSRS = nullptr;
218 1 : const auto poSrcSRS = poLayer->GetSpatialRef();
219 1 : if (poSrcSRS)
220 0 : poTmpSRS = poSrcSRS->Clone();
221 : auto poMemLayer =
222 1 : poMemDS->CreateLayer("footprint", poTmpSRS, wkbPolygon, nullptr);
223 1 : if (poTmpSRS)
224 0 : poTmpSRS->Release();
225 1 : if (!poMemLayer)
226 0 : return nullptr;
227 1 : CPL_IGNORE_RET_VAL(poMemLayer->CreateField(
228 1 : std::make_unique<OGRFieldDefn>("feature_count", OFTInteger64)
229 1 : .get()));
230 :
231 : const auto metadata =
232 2 : poLayer->GetReader()->parquet_reader()->metadata();
233 1 : const int numRowGroups = metadata->num_row_groups();
234 15 : for (int iRowGroup = 0; iRowGroup < numRowGroups; ++iRowGroup)
235 : {
236 28 : std::string osMinTmp, osMaxTmp;
237 : OGRField unusedF;
238 : bool unusedB;
239 : OGRFieldSubType unusedSubType;
240 :
241 : OGRField sXMin;
242 14 : OGR_RawField_SetNull(&sXMin);
243 14 : bool bFoundXMin = false;
244 14 : OGRFieldType eXMinType = OFTMaxType;
245 :
246 : OGRField sYMin;
247 14 : OGR_RawField_SetNull(&sYMin);
248 14 : bool bFoundYMin = false;
249 14 : OGRFieldType eYMinType = OFTMaxType;
250 :
251 : OGRField sXMax;
252 14 : OGR_RawField_SetNull(&sXMax);
253 14 : bool bFoundXMax = false;
254 14 : OGRFieldType eXMaxType = OFTMaxType;
255 :
256 : OGRField sYMax;
257 14 : OGR_RawField_SetNull(&sYMax);
258 14 : bool bFoundYMax = false;
259 14 : OGRFieldType eYMaxType = OFTMaxType;
260 :
261 14 : if (poLayer->GetMinMaxForParquetCol(
262 : iRowGroup, iParquetXMin, nullptr,
263 : /* bComputeMin = */ true, sXMin, bFoundXMin,
264 : /* bComputeMax = */ false, unusedF, unusedB, eXMinType,
265 8 : unusedSubType, osMinTmp, osMaxTmp) &&
266 8 : bFoundXMin && eXMinType == OFTReal &&
267 22 : poLayer->GetMinMaxForParquetCol(
268 : iRowGroup, iParquetYMin, nullptr,
269 : /* bComputeMin = */ true, sYMin, bFoundYMin,
270 : /* bComputeMax = */ false, unusedF, unusedB, eYMinType,
271 8 : unusedSubType, osMinTmp, osMaxTmp) &&
272 8 : bFoundYMin && eYMinType == OFTReal &&
273 22 : poLayer->GetMinMaxForParquetCol(
274 : iRowGroup, iParquetXMax, nullptr,
275 : /* bComputeMin = */ false, unusedF, unusedB,
276 : /* bComputeMax = */ true, sXMax, bFoundXMax, eXMaxType,
277 8 : unusedSubType, osMaxTmp, osMaxTmp) &&
278 8 : bFoundXMax && eXMaxType == OFTReal &&
279 22 : poLayer->GetMinMaxForParquetCol(
280 : iRowGroup, iParquetYMax, nullptr,
281 : /* bComputeMin = */ false, unusedF, unusedB,
282 : /* bComputeMax = */ true, sYMax, bFoundYMax, eYMaxType,
283 8 : unusedSubType, osMaxTmp, osMaxTmp) &&
284 22 : bFoundYMax && eYMaxType == OFTReal)
285 : {
286 16 : OGRFeature oFeat(poMemLayer->GetLayerDefn());
287 8 : oFeat.SetField(0,
288 : static_cast<GIntBig>(
289 8 : metadata->RowGroup(iRowGroup)->num_rows()));
290 16 : auto poPoly = std::make_unique<OGRPolygon>();
291 8 : auto poLR = std::make_unique<OGRLinearRing>();
292 8 : poLR->addPoint(sXMin.Real, sYMin.Real);
293 8 : poLR->addPoint(sXMin.Real, sYMax.Real);
294 8 : poLR->addPoint(sXMax.Real, sYMax.Real);
295 8 : poLR->addPoint(sXMax.Real, sYMin.Real);
296 8 : poLR->addPoint(sXMin.Real, sYMin.Real);
297 8 : poPoly->addRingDirectly(poLR.release());
298 8 : oFeat.SetGeometryDirectly(poPoly.release());
299 8 : CPL_IGNORE_RET_VAL(poMemLayer->CreateFeature(&oFeat));
300 : }
301 : }
302 :
303 1 : return poMemDS.release();
304 : }
305 0 : return nullptr;
306 : }
307 :
308 : /************************************************************************/
309 : /* OGRParquetEditableLayerSynchronizer */
310 : /************************************************************************/
311 :
312 : class OGRParquetEditableLayer;
313 :
314 : class OGRParquetEditableLayerSynchronizer final
315 : : public IOGREditableLayerSynchronizer
316 : {
317 : OGRParquetDataset *const m_poDS;
318 : OGRParquetEditableLayer *const m_poEditableLayer;
319 : const std::string m_osFilename;
320 : const CPLStringList m_aosOpenOptions;
321 :
322 : CPL_DISALLOW_COPY_ASSIGN(OGRParquetEditableLayerSynchronizer)
323 :
324 : public:
325 5 : OGRParquetEditableLayerSynchronizer(
326 : OGRParquetDataset *poDS, OGRParquetEditableLayer *poEditableLayer,
327 : const std::string &osFilename, CSLConstList papszOpenOptions)
328 5 : : m_poDS(poDS), m_poEditableLayer(poEditableLayer),
329 : m_osFilename(osFilename),
330 5 : m_aosOpenOptions(CSLDuplicate(papszOpenOptions))
331 : {
332 5 : }
333 :
334 : OGRErr EditableSyncToDisk(OGRLayer *poEditableLayer,
335 : OGRLayer **ppoDecoratedLayer) override;
336 : };
337 :
338 : /************************************************************************/
339 : /* OGRParquetEditableLayer */
340 : /************************************************************************/
341 :
342 : class OGRParquetEditableLayer final : public IOGRArrowLayer,
343 : public OGREditableLayer
344 : {
345 : public:
346 5 : OGRParquetEditableLayer(OGRParquetDataset *poDS,
347 : const std::string &osFilename,
348 : std::unique_ptr<OGRParquetLayer> poParquetLayer,
349 : CSLConstList papszOpenOptions)
350 5 : : OGREditableLayer(poParquetLayer.get(), false,
351 : new OGRParquetEditableLayerSynchronizer(
352 5 : poDS, this, osFilename, papszOpenOptions),
353 : false),
354 10 : m_poParquetLayer(std::move(poParquetLayer))
355 : {
356 5 : }
357 :
358 10 : ~OGRParquetEditableLayer() override
359 5 : {
360 5 : SyncToDisk();
361 5 : delete m_poSynchronizer;
362 5 : m_poSynchronizer = nullptr;
363 10 : }
364 :
365 : OGRLayer *GetLayer() override;
366 :
367 8 : OGRParquetLayer *GetUnderlyingArrowLayer() override
368 : {
369 8 : return m_poParquetLayer.get();
370 : }
371 :
372 : void
373 8 : SetUnderlyingArrowLayer(std::unique_ptr<OGRParquetLayer> poParquetLayer)
374 : {
375 8 : m_poParquetLayer = std::move(poParquetLayer);
376 8 : }
377 :
378 : private:
379 : std::unique_ptr<OGRParquetLayer> m_poParquetLayer{};
380 : };
381 :
382 : /************************************************************************/
383 : /* OGRParquetEditableLayer::GetLayer() */
384 : /************************************************************************/
385 :
386 30 : OGRLayer *OGRParquetEditableLayer::GetLayer()
387 : {
388 30 : return this;
389 : }
390 :
391 : /************************************************************************/
392 : /* OGRParquetEditableLayerSynchronizer::EditableSyncToDisk() */
393 : /************************************************************************/
394 :
395 4 : OGRErr OGRParquetEditableLayerSynchronizer::EditableSyncToDisk(
396 : OGRLayer *poEditableLayer, OGRLayer **ppoDecoratedLayer)
397 : {
398 4 : CPLAssert(*ppoDecoratedLayer ==
399 : m_poEditableLayer->GetUnderlyingArrowLayer());
400 :
401 8 : const std::string osTmpFilename = m_osFilename + ".tmp.parquet";
402 : try
403 : {
404 0 : std::shared_ptr<arrow::io::OutputStream> out_file;
405 8 : if (STARTS_WITH(osTmpFilename.c_str(), "/vsi") ||
406 4 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES")))
407 : {
408 : VSIVirtualHandleUniquePtr fp =
409 4 : VSIFilesystemHandler::OpenStatic(osTmpFilename.c_str(), "wb");
410 4 : if (fp == nullptr)
411 : {
412 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s",
413 : osTmpFilename.c_str());
414 0 : return OGRERR_FAILURE;
415 : }
416 4 : out_file = std::make_shared<OGRArrowWritableFile>(std::move(fp));
417 : }
418 : else
419 : {
420 0 : PARQUET_ASSIGN_OR_THROW(
421 : out_file, arrow::io::FileOutputStream::Open(osTmpFilename));
422 : }
423 :
424 4 : OGRParquetWriterDataset writerDS(out_file);
425 :
426 4 : auto poParquetLayer = m_poEditableLayer->GetUnderlyingArrowLayer();
427 4 : CPLStringList aosCreationOptions(poParquetLayer->GetCreationOptions());
428 4 : const char *pszFIDColumn = poParquetLayer->GetFIDColumn();
429 4 : if (pszFIDColumn[0])
430 1 : aosCreationOptions.SetNameValue("FID", pszFIDColumn);
431 4 : const char *pszEdges = poParquetLayer->GetMetadataItem("EDGES");
432 4 : if (pszEdges)
433 1 : aosCreationOptions.SetNameValue("EDGES", pszEdges);
434 4 : auto poWriterLayer = writerDS.CreateLayer(
435 8 : CPLGetBasenameSafe(m_osFilename.c_str()).c_str(),
436 4 : poParquetLayer->GetGeomType() == wkbNone
437 : ? nullptr
438 4 : : poParquetLayer->GetLayerDefn()->GetGeomFieldDefn(0),
439 4 : aosCreationOptions.List());
440 4 : if (!poWriterLayer)
441 0 : return OGRERR_FAILURE;
442 :
443 : // Create target fields from source fields
444 9 : for (const auto poSrcFieldDefn :
445 22 : poEditableLayer->GetLayerDefn()->GetFields())
446 : {
447 9 : if (poWriterLayer->CreateField(poSrcFieldDefn) != OGRERR_NONE)
448 0 : return OGRERR_FAILURE;
449 : }
450 :
451 : // Disable all filters and backup them.
452 4 : const char *pszQueryStringConst = poEditableLayer->GetAttrQueryString();
453 : const std::string osQueryString =
454 4 : pszQueryStringConst ? pszQueryStringConst : "";
455 4 : poEditableLayer->SetAttributeFilter(nullptr);
456 :
457 4 : const int iFilterGeomIndexBak = poEditableLayer->GetGeomFieldFilter();
458 0 : std::unique_ptr<OGRGeometry> poFilterGeomBak;
459 4 : if (const OGRGeometry *poFilterGeomSrc =
460 4 : poEditableLayer->GetSpatialFilter())
461 0 : poFilterGeomBak.reset(poFilterGeomSrc->clone());
462 4 : poEditableLayer->SetSpatialFilter(nullptr);
463 :
464 4 : bool bError = false;
465 :
466 : // Copy all features
467 12 : for (auto &&poSrcFeature : *poEditableLayer)
468 : {
469 8 : OGRFeature oDstFeature(poWriterLayer->GetLayerDefn());
470 8 : oDstFeature.SetFrom(poSrcFeature.get());
471 8 : oDstFeature.SetFID(poSrcFeature->GetFID());
472 8 : if (poWriterLayer->CreateFeature(&oDstFeature) != OGRERR_NONE)
473 : {
474 0 : bError = true;
475 0 : break;
476 : }
477 : }
478 :
479 : // Restore filters.
480 4 : if (!osQueryString.empty())
481 0 : poEditableLayer->SetAttributeFilter(osQueryString.c_str());
482 4 : poEditableLayer->SetSpatialFilter(iFilterGeomIndexBak,
483 4 : poFilterGeomBak.get());
484 :
485 : // Flush and close file
486 4 : if (bError || writerDS.Close() != CE_None)
487 0 : return OGRERR_FAILURE;
488 : }
489 0 : catch (const std::exception &e)
490 : {
491 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
492 0 : e.what());
493 0 : return OGRERR_FAILURE;
494 : }
495 :
496 : // Close original Parquet file
497 4 : m_poEditableLayer->SetUnderlyingArrowLayer(nullptr);
498 4 : *ppoDecoratedLayer = nullptr;
499 :
500 : // Backup original file, and rename new file into it
501 8 : const std::string osTmpOriFilename = m_osFilename + ".ogr_bak";
502 8 : if (VSIRename(m_osFilename.c_str(), osTmpOriFilename.c_str()) != 0 ||
503 4 : VSIRename(osTmpFilename.c_str(), m_osFilename.c_str()) != 0)
504 : {
505 0 : CPLError(CE_Failure, CPLE_AppDefined, "Cannot rename files");
506 0 : return OGRERR_FAILURE;
507 : }
508 : // Remove backup file
509 4 : VSIUnlink(osTmpOriFilename.c_str());
510 :
511 : // Re-open parquet file
512 4 : VSILFILE *fp = nullptr;
513 : auto poParquetLayer =
514 8 : m_poDS->CreateReaderLayer(m_osFilename, fp, m_aosOpenOptions.List());
515 4 : if (!poParquetLayer)
516 : {
517 0 : return OGRERR_FAILURE;
518 : }
519 :
520 : // Update adapters
521 4 : *ppoDecoratedLayer = poParquetLayer.get();
522 4 : m_poEditableLayer->SetUnderlyingArrowLayer(std::move(poParquetLayer));
523 :
524 4 : return OGRERR_NONE;
525 : }
526 :
527 : /************************************************************************/
528 : /* Open() */
529 : /************************************************************************/
530 :
531 1949 : static GDALDataset *OGRParquetDriverOpen(GDALOpenInfo *poOpenInfo)
532 : {
533 : #if ARROW_VERSION_MAJOR >= 21
534 : // Register geoarrow.wkb extension if not already done
535 : if (!arrow::GetExtensionType(EXTENSION_NAME_GEOARROW_WKB) &&
536 : CPLTestBool(CPLGetConfigOption(
537 : "OGR_PARQUET_REGISTER_GEOARROW_WKB_EXTENSION", "YES")))
538 : {
539 : CPL_IGNORE_RET_VAL(arrow::RegisterExtensionType(
540 : std::make_shared<OGRGeoArrowWkbExtensionType>(
541 : std::move(arrow::binary()), std::string())));
542 : }
543 : #endif
544 :
545 : #ifdef GDAL_USE_ARROWDATASET
546 3898 : std::string osBasePath(poOpenInfo->pszFilename);
547 3898 : std::string osQueryParameters;
548 : const bool bStartedWithParquetPrefix =
549 1949 : STARTS_WITH(osBasePath.c_str(), "PARQUET:");
550 :
551 1949 : if (bStartedWithParquetPrefix)
552 : {
553 345 : osBasePath = osBasePath.substr(strlen("PARQUET:"));
554 : }
555 :
556 : // Little trick to allow using syntax of
557 : // https://github.com/opengeospatial/geoparquet/discussions/101
558 : // ogrinfo
559 : // "/vsicurl/https://ai4edataeuwest.blob.core.windows.net/us-census/2020/cb_2020_us_vtd_500k.parquet?${SAS_TOKEN}"
560 1949 : if (STARTS_WITH(osBasePath.c_str(), "/vsicurl/"))
561 : {
562 1 : const auto nPos = osBasePath.find(".parquet?st=");
563 1 : if (nPos != std::string::npos)
564 : {
565 0 : osQueryParameters = osBasePath.substr(nPos + strlen(".parquet"));
566 0 : osBasePath.resize(nPos + strlen(".parquet"));
567 : }
568 : }
569 :
570 2999 : if (bStartedWithParquetPrefix || poOpenInfo->bIsDirectory ||
571 1050 : !osQueryParameters.empty())
572 : {
573 : VSIStatBufL sStat;
574 899 : if (!osBasePath.empty() && osBasePath.back() == '/')
575 0 : osBasePath.pop_back();
576 : const std::string osMetadataPath =
577 899 : CPLFormFilenameSafe(osBasePath.c_str(), "_metadata", nullptr);
578 899 : if (CPLTestBool(
579 2697 : CPLGetConfigOption("OGR_PARQUET_USE_METADATA_FILE", "YES")) &&
580 1798 : VSIStatL((osMetadataPath + osQueryParameters).c_str(), &sStat) == 0)
581 : {
582 : // If there's a _metadata file, then use it to avoid listing files
583 : try
584 : {
585 22 : if (poOpenInfo->eAccess == GA_Update)
586 0 : return nullptr;
587 :
588 44 : return OpenParquetDatasetWithMetadata(
589 : osBasePath, "_metadata", osQueryParameters,
590 22 : poOpenInfo->papszOpenOptions);
591 : }
592 0 : catch (const std::exception &e)
593 : {
594 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
595 0 : e.what());
596 : }
597 0 : return nullptr;
598 : }
599 : else
600 : {
601 877 : bool bLikelyParquetDataset = false;
602 877 : if (poOpenInfo->bIsDirectory)
603 : {
604 : // Detect if the directory contains .parquet files, or
605 : // subdirectories with a name of the form "key=value", typical
606 : // of HIVE partitioning.
607 1082 : const CPLStringList aosFiles(VSIReadDir(osBasePath.c_str()));
608 23894 : for (const char *pszFilename : cpl::Iterate(aosFiles))
609 : {
610 23355 : if (EQUAL(CPLGetExtensionSafe(pszFilename).c_str(),
611 : "parquet"))
612 : {
613 2 : bLikelyParquetDataset = true;
614 2 : break;
615 : }
616 23353 : else if (strchr(pszFilename, '='))
617 : {
618 : // HIVE partitioning
619 0 : if (VSIStatL(CPLFormFilenameSafe(osBasePath.c_str(),
620 : pszFilename, nullptr)
621 : .c_str(),
622 0 : &sStat) == 0 &&
623 0 : VSI_ISDIR(sStat.st_mode))
624 : {
625 0 : bLikelyParquetDataset = true;
626 0 : break;
627 : }
628 : }
629 : }
630 : }
631 :
632 877 : if (bStartedWithParquetPrefix || bLikelyParquetDataset)
633 : {
634 : try
635 : {
636 338 : if (poOpenInfo->eAccess == GA_Update)
637 0 : return nullptr;
638 :
639 676 : return OpenParquetDatasetWithoutMetadata(
640 : osBasePath, osQueryParameters,
641 338 : poOpenInfo->papszOpenOptions);
642 : }
643 0 : catch (const std::exception &e)
644 : {
645 : // If we aren't quite sure that the passed file name is
646 : // a directory, then silently continue
647 0 : if (poOpenInfo->bIsDirectory)
648 : {
649 0 : CPLError(CE_Failure, CPLE_AppDefined,
650 0 : "Parquet exception: %s", e.what());
651 0 : return nullptr;
652 : }
653 : }
654 : }
655 : }
656 : }
657 : #endif
658 :
659 1589 : if (!OGRParquetDriverIdentify(poOpenInfo))
660 : {
661 0 : return nullptr;
662 : }
663 :
664 1589 : if (poOpenInfo->bIsDirectory)
665 539 : return nullptr;
666 :
667 2100 : std::string osFilename(poOpenInfo->pszFilename);
668 1050 : if (STARTS_WITH(poOpenInfo->pszFilename, "PARQUET:"))
669 : {
670 0 : osFilename = poOpenInfo->pszFilename + strlen("PARQUET:");
671 : }
672 :
673 2100 : auto poDS = std::make_unique<OGRParquetDataset>();
674 1050 : auto poLayer = poDS->CreateReaderLayer(osFilename, poOpenInfo->fpL,
675 2100 : poOpenInfo->papszOpenOptions);
676 1050 : if (!poLayer)
677 1 : return nullptr;
678 :
679 : // For debug purposes: return a layer with the extent of each row group
680 1049 : if (CPLTestBool(
681 : CPLGetConfigOption("OGR_PARQUET_SHOW_ROW_GROUP_EXTENT", "NO")))
682 : {
683 1 : return BuildMemDatasetWithRowGroupExtents(poLayer.get());
684 : }
685 :
686 1048 : if (poOpenInfo->eAccess == GA_Update)
687 10 : poDS->SetLayer(std::make_unique<OGRParquetEditableLayer>(
688 10 : poDS.get(), osFilename, std::move(poLayer),
689 5 : poOpenInfo->papszOpenOptions));
690 : else
691 1043 : poDS->SetLayer(std::move(poLayer));
692 1048 : return poDS.release();
693 : }
694 :
695 : /************************************************************************/
696 : /* Create() */
697 : /************************************************************************/
698 :
699 359 : static GDALDataset *OGRParquetDriverCreate(const char *pszName, int nXSize,
700 : int nYSize, int nBands,
701 : GDALDataType eType,
702 : char ** /* papszOptions */)
703 : {
704 359 : if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
705 0 : return nullptr;
706 :
707 : try
708 : {
709 359 : std::shared_ptr<arrow::io::OutputStream> out_file;
710 473 : if (STARTS_WITH(pszName, "/vsi") ||
711 114 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES")))
712 : {
713 : VSIVirtualHandleUniquePtr fp =
714 359 : VSIFilesystemHandler::OpenStatic(pszName, "wb");
715 359 : if (fp == nullptr)
716 : {
717 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
718 1 : return nullptr;
719 : }
720 358 : out_file = std::make_shared<OGRArrowWritableFile>(std::move(fp));
721 : }
722 : else
723 : {
724 0 : PARQUET_ASSIGN_OR_THROW(out_file,
725 : arrow::io::FileOutputStream::Open(pszName));
726 : }
727 :
728 358 : return new OGRParquetWriterDataset(out_file);
729 : }
730 0 : catch (const std::exception &e)
731 : {
732 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
733 0 : e.what());
734 0 : return nullptr;
735 : }
736 : }
737 :
738 : /************************************************************************/
739 : /* OGRParquetDriver() */
740 : /************************************************************************/
741 :
742 : class OGRParquetDriver final : public GDALDriver
743 : {
744 : std::recursive_mutex m_oMutex{};
745 : bool m_bMetadataInitialized = false;
746 : void InitMetadata();
747 :
748 : public:
749 : const char *GetMetadataItem(const char *pszName,
750 : const char *pszDomain) override;
751 :
752 63 : CSLConstList GetMetadata(const char *pszDomain) override
753 : {
754 126 : std::lock_guard oLock(m_oMutex);
755 63 : InitMetadata();
756 126 : return GDALDriver::GetMetadata(pszDomain);
757 : }
758 : };
759 :
760 2622 : const char *OGRParquetDriver::GetMetadataItem(const char *pszName,
761 : const char *pszDomain)
762 : {
763 5244 : std::lock_guard oLock(m_oMutex);
764 2622 : if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
765 : {
766 419 : InitMetadata();
767 : }
768 5244 : return GDALDriver::GetMetadataItem(pszName, pszDomain);
769 : }
770 :
771 482 : void OGRParquetDriver::InitMetadata()
772 : {
773 482 : if (m_bMetadataInitialized)
774 459 : return;
775 23 : m_bMetadataInitialized = true;
776 :
777 : CPLXMLTreeCloser oTree(
778 46 : CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
779 :
780 46 : std::vector<const char *> apszCompressionMethods;
781 23 : bool bHasSnappy = false;
782 23 : int minComprLevel = INT_MAX;
783 23 : int maxComprLevel = INT_MIN;
784 46 : std::string osCompressionLevelDesc = "Compression level, codec dependent.";
785 161 : for (const char *pszMethod :
786 184 : {"SNAPPY", "GZIP", "BROTLI", "ZSTD", "LZ4_RAW", "LZO", "LZ4_HADOOP"})
787 : {
788 : auto compressionTypeRes = arrow::util::Codec::GetCompressionType(
789 322 : CPLString(pszMethod).tolower());
790 322 : if (compressionTypeRes.ok() &&
791 161 : arrow::util::Codec::IsAvailable(*compressionTypeRes))
792 : {
793 138 : const auto compressionType = *compressionTypeRes;
794 138 : if (EQUAL(pszMethod, "SNAPPY"))
795 23 : bHasSnappy = true;
796 138 : apszCompressionMethods.emplace_back(pszMethod);
797 :
798 : auto minCompressLevelRes =
799 276 : arrow::util::Codec::MinimumCompressionLevel(compressionType);
800 : auto maxCompressLevelRes =
801 276 : arrow::util::Codec::MaximumCompressionLevel(compressionType);
802 : auto defCompressLevelRes =
803 276 : arrow::util::Codec::DefaultCompressionLevel(compressionType);
804 230 : if (minCompressLevelRes.ok() && maxCompressLevelRes.ok() &&
805 92 : defCompressLevelRes.ok())
806 : {
807 92 : minComprLevel = std::min(minComprLevel, *minCompressLevelRes);
808 92 : maxComprLevel = std::max(maxComprLevel, *maxCompressLevelRes);
809 92 : osCompressionLevelDesc += ' ';
810 92 : osCompressionLevelDesc += pszMethod;
811 92 : osCompressionLevelDesc += ": [";
812 92 : osCompressionLevelDesc += std::to_string(*minCompressLevelRes);
813 92 : osCompressionLevelDesc += ',';
814 92 : osCompressionLevelDesc += std::to_string(*maxCompressLevelRes);
815 92 : osCompressionLevelDesc += "], default=";
816 92 : if (EQUAL(pszMethod, "ZSTD"))
817 46 : osCompressionLevelDesc += std::to_string(
818 23 : OGR_PARQUET_ZSTD_DEFAULT_COMPRESSION_LEVEL);
819 : else
820 : osCompressionLevelDesc +=
821 69 : std::to_string(*defCompressLevelRes);
822 92 : osCompressionLevelDesc += '.';
823 : }
824 : }
825 : }
826 :
827 : {
828 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
829 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
830 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
831 23 : CPLAddXMLAttributeAndValue(psOption, "description",
832 : "Compression method");
833 23 : CPLAddXMLAttributeAndValue(psOption, "default",
834 : bHasSnappy ? "SNAPPY" : "NONE");
835 : {
836 23 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
837 23 : CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
838 23 : CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
839 : }
840 161 : for (const char *pszMethod : apszCompressionMethods)
841 : {
842 138 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
843 138 : CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
844 : }
845 : }
846 :
847 23 : if (minComprLevel <= maxComprLevel)
848 : {
849 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
850 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION_LEVEL");
851 23 : CPLAddXMLAttributeAndValue(psOption, "type", "int");
852 23 : CPLAddXMLAttributeAndValue(
853 : psOption, "min",
854 : CPLSPrintf("%d",
855 23 : std::min(DEFAULT_COMPRESSION_LEVEL, minComprLevel)));
856 23 : CPLAddXMLAttributeAndValue(psOption, "max",
857 : CPLSPrintf("%d", maxComprLevel));
858 23 : CPLAddXMLAttributeAndValue(psOption, "description",
859 : osCompressionLevelDesc.c_str());
860 23 : CPLAddXMLAttributeAndValue(psOption, "default",
861 : CPLSPrintf("%d", DEFAULT_COMPRESSION_LEVEL));
862 : }
863 :
864 : {
865 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
866 23 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
867 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
868 23 : CPLAddXMLAttributeAndValue(psOption, "description",
869 : "Encoding of geometry columns");
870 23 : CPLAddXMLAttributeAndValue(psOption, "default", "WKB");
871 92 : for (const char *pszEncoding :
872 115 : {"WKB", "WKT", "GEOARROW", "GEOARROW_INTERLEAVED"})
873 : {
874 92 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
875 92 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
876 92 : if (EQUAL(pszEncoding, "GEOARROW"))
877 23 : CPLAddXMLAttributeAndValue(poValueNode, "alias",
878 : "GEOARROW_STRUCT");
879 : }
880 : }
881 :
882 : {
883 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
884 23 : CPLAddXMLAttributeAndValue(psOption, "name", "ROW_GROUP_SIZE");
885 23 : CPLAddXMLAttributeAndValue(psOption, "type", "integer");
886 23 : CPLAddXMLAttributeAndValue(psOption, "description",
887 : "Maximum number of rows per group");
888 23 : CPLAddXMLAttributeAndValue(psOption, "default", "65536");
889 : }
890 :
891 : {
892 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
893 23 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
894 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
895 23 : CPLAddXMLAttributeAndValue(psOption, "description",
896 : "Name of geometry column");
897 23 : CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
898 : }
899 :
900 : {
901 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
902 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COORDINATE_PRECISION");
903 23 : CPLAddXMLAttributeAndValue(psOption, "type", "float");
904 23 : CPLAddXMLAttributeAndValue(psOption, "description",
905 : "Number of decimals for coordinates (only "
906 : "for GEOMETRY_ENCODING=WKT)");
907 : }
908 :
909 : {
910 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
911 23 : CPLAddXMLAttributeAndValue(psOption, "name", "FID");
912 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
913 23 : CPLAddXMLAttributeAndValue(psOption, "description",
914 : "Name of the FID column to create");
915 : }
916 :
917 : {
918 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
919 23 : CPLAddXMLAttributeAndValue(psOption, "name", "POLYGON_ORIENTATION");
920 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
921 23 : CPLAddXMLAttributeAndValue(
922 : psOption, "description",
923 : "Which ring orientation to use for polygons");
924 23 : CPLAddXMLAttributeAndValue(psOption, "default", "COUNTERCLOCKWISE");
925 23 : CPLCreateXMLElementAndValue(psOption, "Value", "COUNTERCLOCKWISE");
926 23 : CPLCreateXMLElementAndValue(psOption, "Value", "UNMODIFIED");
927 : }
928 :
929 : {
930 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
931 23 : CPLAddXMLAttributeAndValue(psOption, "name", "EDGES");
932 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
933 23 : CPLAddXMLAttributeAndValue(
934 : psOption, "description",
935 : "Name of the coordinate system for the edges");
936 23 : CPLAddXMLAttributeAndValue(psOption, "default", "PLANAR");
937 23 : CPLCreateXMLElementAndValue(psOption, "Value", "PLANAR");
938 23 : CPLCreateXMLElementAndValue(psOption, "Value", "SPHERICAL");
939 : }
940 :
941 : {
942 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
943 23 : CPLAddXMLAttributeAndValue(psOption, "name", "CREATOR");
944 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
945 23 : CPLAddXMLAttributeAndValue(psOption, "description",
946 : "Name of creating application");
947 : }
948 :
949 : {
950 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
951 23 : CPLAddXMLAttributeAndValue(psOption, "name", "WRITE_COVERING_BBOX");
952 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
953 23 : CPLAddXMLAttributeAndValue(psOption, "default", "AUTO");
954 23 : CPLAddXMLAttributeAndValue(psOption, "description",
955 : "Whether to write xmin/ymin/xmax/ymax "
956 : "columns with the bounding box of "
957 : "geometries");
958 23 : CPLCreateXMLElementAndValue(psOption, "Value", "AUTO");
959 23 : CPLCreateXMLElementAndValue(psOption, "Value", "YES");
960 23 : CPLCreateXMLElementAndValue(psOption, "Value", "NO");
961 : }
962 :
963 : {
964 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
965 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COVERING_BBOX_NAME");
966 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
967 23 : CPLAddXMLAttributeAndValue(psOption, "description",
968 : "Name of the bounding box of "
969 : "geometries. If not same, "
970 : "equals to {'GEOMETRY_NAME}_bbox'");
971 : }
972 :
973 : #if ARROW_VERSION_MAJOR >= 21
974 : {
975 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
976 : CPLAddXMLAttributeAndValue(psOption, "name", "USE_PARQUET_GEO_TYPES");
977 : CPLAddXMLAttributeAndValue(psOption, "default", "NO");
978 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
979 : CPLAddXMLAttributeAndValue(psOption, "description",
980 : "Whether to use Parquet Geometry/Geography "
981 : "logical types (introduced in libarrow 21), "
982 : "when using GEOMETRY_ENCODING=WKB encoding");
983 : CPLCreateXMLElementAndValue(psOption, "Value", "YES");
984 : CPLCreateXMLElementAndValue(psOption, "Value", "NO");
985 : CPLCreateXMLElementAndValue(psOption, "Value", "ONLY");
986 : }
987 : #endif
988 :
989 : {
990 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
991 23 : CPLAddXMLAttributeAndValue(psOption, "name", "SORT_BY_BBOX");
992 23 : CPLAddXMLAttributeAndValue(psOption, "type", "boolean");
993 23 : CPLAddXMLAttributeAndValue(psOption, "default", "NO");
994 23 : CPLAddXMLAttributeAndValue(psOption, "description",
995 : "Whether features should be sorted based on "
996 : "the bounding box of their geometries");
997 : }
998 :
999 : {
1000 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
1001 23 : CPLAddXMLAttributeAndValue(psOption, "name", "TIMESTAMP_WITH_OFFSET");
1002 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
1003 23 : CPLAddXMLAttributeAndValue(psOption, "default", "AUTO");
1004 23 : CPLAddXMLAttributeAndValue(
1005 : psOption, "description",
1006 : "Whether timestamp with offset fields should be used");
1007 23 : CPLCreateXMLElementAndValue(psOption, "Value", "AUTO");
1008 23 : CPLCreateXMLElementAndValue(psOption, "Value", "YES");
1009 23 : CPLCreateXMLElementAndValue(psOption, "Value", "NO");
1010 : }
1011 :
1012 23 : char *pszXML = CPLSerializeXMLTree(oTree.get());
1013 23 : GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
1014 23 : CPLFree(pszXML);
1015 : }
1016 :
1017 : /************************************************************************/
1018 : /* OGRParquetCreateMetadataFileAlgorithm() */
1019 : /************************************************************************/
1020 :
1021 : #ifndef _
1022 : #define _(x) x
1023 : #endif
1024 :
1025 : class OGRParquetCreateMetadataFileAlgorithm final : public GDALAlgorithm
1026 : {
1027 : public:
1028 43 : OGRParquetCreateMetadataFileAlgorithm()
1029 43 : : GDALAlgorithm(
1030 : "create-metadata-file",
1031 : "Create the _metadata file for a partitioned Parquet dataset",
1032 43 : "/programs/gdal_driver_parquet_create_metadata_file.html")
1033 : {
1034 : auto &inputArg =
1035 : AddArg(GDAL_ARG_NAME_INPUT, 0, _("Input Parquet datasets"),
1036 86 : &m_input, GDAL_OF_VECTOR)
1037 43 : .SetPositional()
1038 43 : .SetAutoOpenDataset(false)
1039 43 : .SetDatasetInputFlags(GADV_NAME)
1040 43 : .SetRequired();
1041 43 : SetAutoCompleteFunctionForFilename(inputArg, GDAL_OF_VECTOR);
1042 :
1043 : auto &outputArg =
1044 : AddArg(GDAL_ARG_NAME_OUTPUT, 0, _("Output Parquet dataset"),
1045 86 : &m_output, GDAL_OF_VECTOR)
1046 43 : .SetPositional()
1047 43 : .SetIsOutput(true)
1048 43 : .SetDatasetInputFlags(GADV_NAME)
1049 43 : .SetDatasetOutputFlags(0)
1050 43 : .SetRequired();
1051 43 : SetAutoCompleteFunctionForFilename(outputArg, GDAL_OF_VECTOR);
1052 :
1053 43 : AddOverwriteArg(&m_overwrite);
1054 43 : }
1055 :
1056 : protected:
1057 : bool RunImpl(GDALProgressFunc, void *) override;
1058 :
1059 : private:
1060 : std::vector<GDALArgDatasetValue> m_input{};
1061 : GDALArgDatasetValue m_output{};
1062 : bool m_overwrite = false;
1063 : };
1064 :
1065 : /************************************************************************/
1066 : /* OGRParquetCreateMetadataFileAlgorithm::RunImpl() */
1067 : /************************************************************************/
1068 :
1069 14 : bool OGRParquetCreateMetadataFileAlgorithm::RunImpl(
1070 : GDALProgressFunc pfnProgress, void *pProgressData)
1071 : {
1072 : try
1073 : {
1074 : const std::string osOutputDir =
1075 28 : CPLGetPathSafe(m_output.GetName().c_str());
1076 : auto fs =
1077 28 : std::make_shared<VSIArrowFileSystem>("PARQUET", std::string());
1078 :
1079 14 : std::shared_ptr<parquet::FileMetaData> outputMetadata;
1080 :
1081 : // Iterate over input Parquet files
1082 31 : for (const auto &[i, input] : cpl::enumerate(m_input))
1083 : {
1084 3 : std::shared_ptr<arrow::io::RandomAccessFile> inputFile;
1085 25 : PARQUET_ASSIGN_OR_THROW(inputFile,
1086 : fs->OpenInputFile(input.GetName()));
1087 : auto reader =
1088 45 : parquet::ParquetFileReader::Open(std::move(inputFile));
1089 20 : if (!reader)
1090 : {
1091 0 : ReportError(CE_Failure, CPLE_AppDefined, "Cannot open %s",
1092 0 : input.GetName().c_str());
1093 0 : return false;
1094 : }
1095 :
1096 21 : auto inputMetadata = reader->metadata();
1097 20 : CPLAssert(inputMetadata);
1098 :
1099 20 : if (!outputMetadata)
1100 : {
1101 : // Opens a file descriptor on the output dataset
1102 : VSIVirtualHandleUniquePtr fp = VSIFilesystemHandler::OpenStatic(
1103 12 : m_output.GetName().c_str(), "wb");
1104 12 : if (fp == nullptr)
1105 : {
1106 1 : ReportError(CE_Failure, CPLE_FileIO,
1107 : "OpenStatic() failed: cannot create %s",
1108 1 : m_output.GetName().c_str());
1109 1 : return false;
1110 : }
1111 :
1112 : // We need to create an empty Parquet file to be able to
1113 : // get its ParquetMetadata
1114 : auto schemaNode =
1115 : std::dynamic_pointer_cast<parquet::schema::GroupNode>(
1116 11 : inputMetadata->schema()->schema_root());
1117 11 : CPLAssert(schemaNode);
1118 : auto writer = parquet::ParquetFileWriter::Open(
1119 22 : std::make_shared<OGRArrowWritableFile>(std::move(fp)),
1120 44 : std::move(schemaNode));
1121 11 : if (!writer)
1122 : {
1123 0 : ReportError(
1124 : CE_Failure, CPLE_FileIO,
1125 : "ParquetFileWriter::Open() failed: cannot create %s",
1126 0 : m_output.GetName().c_str());
1127 0 : return false;
1128 : }
1129 : // Close it and now re-open it to gets its metadata object
1130 11 : writer->Close();
1131 :
1132 22 : PARQUET_ASSIGN_OR_THROW(
1133 : inputFile, fs->OpenInputFile(m_output.GetName().c_str()));
1134 : auto readerMetadataFile =
1135 22 : parquet::ParquetFileReader::Open(std::move(inputFile));
1136 11 : if (!readerMetadataFile)
1137 : {
1138 0 : ReportError(CE_Failure, CPLE_AppDefined, "Cannot open %s",
1139 0 : m_output.GetName().c_str());
1140 0 : return false;
1141 : }
1142 :
1143 11 : outputMetadata = readerMetadataFile->metadata();
1144 11 : CPLAssert(outputMetadata);
1145 : }
1146 :
1147 19 : int bGotRelative = false;
1148 : const std::string osRelativePath(CPLExtractRelativePath(
1149 20 : osOutputDir.c_str(), input.GetName().c_str(), &bGotRelative));
1150 19 : if (!bGotRelative)
1151 : {
1152 2 : ReportError(
1153 : CE_Failure, CPLE_AppDefined,
1154 : "Cannot infer relative path of '%s' with respect to '%s'",
1155 1 : input.GetName().c_str(), osOutputDir.c_str());
1156 1 : return false;
1157 : }
1158 :
1159 : // Add the row groups from the current input file to the output
1160 : // metadata, and set the appropriate relative path.
1161 18 : inputMetadata->set_file_path(osRelativePath);
1162 18 : outputMetadata->AppendRowGroups(*inputMetadata);
1163 :
1164 19 : if (pfnProgress &&
1165 2 : !pfnProgress(static_cast<double>(i + 1) /
1166 2 : static_cast<double>(m_input.size()),
1167 : "", pProgressData))
1168 : {
1169 0 : ReportError(CE_Failure, CPLE_UserInterrupt,
1170 : "Interrupted by user");
1171 0 : return false;
1172 : }
1173 : }
1174 :
1175 : auto fp =
1176 18 : VSIFilesystemHandler::OpenStatic(m_output.GetName().c_str(), "wb");
1177 9 : if (fp == nullptr)
1178 : {
1179 0 : ReportError(CE_Failure, CPLE_FileIO, "Cannot create %s",
1180 0 : m_output.GetName().c_str());
1181 0 : return false;
1182 : }
1183 18 : OGRArrowWritableFile out_file(std::move(fp));
1184 9 : parquet::WriteMetaDataFile(*outputMetadata, &out_file);
1185 18 : auto status = out_file.Close();
1186 9 : if (!status.ok())
1187 : {
1188 0 : ReportError(CE_Failure, CPLE_FileIO, "Cannot close %s: %s",
1189 0 : m_output.GetName().c_str(), status.message().c_str());
1190 0 : return false;
1191 : }
1192 :
1193 9 : return true;
1194 : }
1195 3 : catch (const std::exception &e)
1196 : {
1197 3 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
1198 3 : e.what());
1199 3 : return false;
1200 : }
1201 : }
1202 :
1203 : /************************************************************************/
1204 : /* OGRParquetDriverInstantiateAlgorithm() */
1205 : /************************************************************************/
1206 :
1207 : static GDALAlgorithm *
1208 43 : OGRParquetDriverInstantiateAlgorithm(const std::vector<std::string> &aosPath)
1209 : {
1210 43 : if (aosPath.size() == 1 && aosPath[0] == "create-metadata-file")
1211 : {
1212 86 : return std::make_unique<OGRParquetCreateMetadataFileAlgorithm>()
1213 43 : .release();
1214 : }
1215 : else
1216 : {
1217 0 : return nullptr;
1218 : }
1219 : }
1220 :
1221 : /************************************************************************/
1222 : /* RegisterOGRParquet() */
1223 : /************************************************************************/
1224 :
1225 38 : void RegisterOGRParquet()
1226 : {
1227 38 : if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
1228 0 : return;
1229 :
1230 76 : auto poDriver = std::make_unique<OGRParquetDriver>();
1231 38 : OGRParquetDriverSetCommonMetadata(poDriver.get());
1232 :
1233 38 : poDriver->pfnOpen = OGRParquetDriverOpen;
1234 38 : poDriver->pfnCreate = OGRParquetDriverCreate;
1235 :
1236 38 : poDriver->pfnInstantiateAlgorithm = OGRParquetDriverInstantiateAlgorithm;
1237 :
1238 38 : poDriver->SetMetadataItem("ARROW_VERSION", ARROW_VERSION_STRING);
1239 : #ifdef GDAL_USE_ARROWDATASET
1240 38 : poDriver->SetMetadataItem("ARROW_DATASET", "YES");
1241 : #endif
1242 :
1243 38 : GetGDALDriverManager()->RegisterDriver(poDriver.release());
1244 :
1245 : #if ARROW_VERSION_MAJOR >= 16
1246 : // Mostly for tests
1247 : const char *pszPath =
1248 38 : CPLGetConfigOption("OGR_PARQUET_LOAD_FILE_SYSTEM_FACTORIES", nullptr);
1249 38 : if (pszPath)
1250 : {
1251 0 : auto result = arrow::fs::LoadFileSystemFactories(pszPath);
1252 0 : if (!result.ok())
1253 : {
1254 0 : CPLError(CE_Warning, CPLE_AppDefined,
1255 : "arrow::fs::LoadFileSystemFactories() failed with %s",
1256 0 : result.message().c_str());
1257 : }
1258 : }
1259 : #endif
1260 :
1261 : #if defined(GDAL_USE_ARROWDATASET) && defined(GDAL_USE_ARROWCOMPUTE)
1262 : {
1263 : auto status = arrow::compute::Initialize();
1264 : if (!status.ok())
1265 : {
1266 : CPLError(CE_Warning, CPLE_AppDefined,
1267 : "arrow::compute::Initialize() failed with %s",
1268 : status.message().c_str());
1269 : }
1270 : }
1271 : #endif
1272 : }
|