Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "gdal_pam.h"
14 : #include "ogrsf_frmts.h"
15 :
16 : #include <algorithm>
17 : #include <map>
18 : #include <mutex>
19 : #include <tuple>
20 :
21 : #include "ogr_parquet.h"
22 : #include "ogrparquetdrivercore.h"
23 : #include "memdataset.h"
24 : #include "ogreditablelayer.h"
25 :
26 : #include "../arrow_common/ograrrowrandomaccessfile.h"
27 : #include "../arrow_common/vsiarrowfilesystem.hpp"
28 : #include "../arrow_common/ograrrowwritablefile.h"
29 : #include "../arrow_common/ograrrowdataset.hpp"
30 : #include "../arrow_common/ograrrowlayer.hpp" // for the destructor
31 :
32 : #ifdef GDAL_USE_ARROWDATASET
33 :
34 : /************************************************************************/
35 : /* OpenFromDatasetFactory() */
36 : /************************************************************************/
37 :
38 274 : static GDALDataset *OpenFromDatasetFactory(
39 : const std::string &osBasePath,
40 : const std::shared_ptr<arrow::dataset::DatasetFactory> &factory,
41 : CSLConstList papszOpenOptions,
42 : const std::shared_ptr<arrow::fs::FileSystem> &fs)
43 : {
44 274 : std::shared_ptr<arrow::dataset::Dataset> dataset;
45 548 : PARQUET_ASSIGN_OR_THROW(dataset, factory->Finish());
46 :
47 274 : const bool bIsVSI = STARTS_WITH(osBasePath.c_str(), "/vsi");
48 548 : auto poDS = std::make_unique<OGRParquetDataset>();
49 : auto poLayer = std::make_unique<OGRParquetDatasetLayer>(
50 548 : poDS.get(), CPLGetBasenameSafe(osBasePath.c_str()).c_str(), bIsVSI,
51 548 : dataset, papszOpenOptions);
52 274 : poDS->SetLayer(std::move(poLayer));
53 274 : poDS->SetFileSystem(fs);
54 548 : return poDS.release();
55 : }
56 :
57 : /************************************************************************/
58 : /* GetFileSystem() */
59 : /************************************************************************/
60 :
61 : static std::tuple<std::shared_ptr<arrow::fs::FileSystem>, std::string>
62 274 : GetFileSystem(std::string &osBasePathInOut,
63 : const std::string &osQueryParameters)
64 : {
65 : // Instantiate file system:
66 : // - VSIArrowFileSystem implementation for /vsi files
67 : // - base implementation for local files (if OGR_PARQUET_USE_VSI set to NO)
68 274 : std::shared_ptr<arrow::fs::FileSystem> fs;
69 274 : const bool bIsVSI = STARTS_WITH(osBasePathInOut.c_str(), "/vsi");
70 : VSIStatBufL sStat;
71 548 : std::string osFSFilename;
72 460 : if ((bIsVSI ||
73 540 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES"))) &&
74 266 : VSIStatL(osBasePathInOut.c_str(), &sStat) == 0)
75 : {
76 265 : osFSFilename = osBasePathInOut;
77 265 : fs = std::make_shared<VSIArrowFileSystem>("PARQUET", osQueryParameters);
78 : }
79 : else
80 : {
81 : // FileSystemFromUriOrPath() doesn't like relative paths
82 : // so transform them to absolute.
83 9 : std::string osPath(osBasePathInOut);
84 9 : if (CPLIsFilenameRelative(osPath.c_str()))
85 : {
86 8 : char *pszCurDir = CPLGetCurrentDir();
87 8 : if (pszCurDir == nullptr)
88 0 : return {nullptr, osFSFilename};
89 8 : osPath = CPLFormFilenameSafe(pszCurDir, osPath.c_str(), nullptr);
90 8 : CPLFree(pszCurDir);
91 : }
92 9 : PARQUET_ASSIGN_OR_THROW(
93 : fs, arrow::fs::FileSystemFromUriOrPath(osPath, &osFSFilename));
94 : }
95 274 : return {fs, osFSFilename};
96 : }
97 :
98 : /************************************************************************/
99 : /* MakeParquetFileFormat() */
100 : /************************************************************************/
101 :
102 : static std::shared_ptr<arrow::dataset::ParquetFileFormat>
103 274 : MakeParquetFileFormat()
104 : {
105 : auto parquetFileFormat =
106 274 : std::make_shared<arrow::dataset::ParquetFileFormat>();
107 : #if ARROW_VERSION_MAJOR >= 21
108 : auto fragmentScanOptions =
109 : std::dynamic_pointer_cast<arrow::dataset::ParquetFragmentScanOptions>(
110 : parquetFileFormat->default_fragment_scan_options);
111 : CPLAssert(fragmentScanOptions);
112 : fragmentScanOptions->arrow_reader_properties->set_arrow_extensions_enabled(
113 : CPLTestBool(
114 : CPLGetConfigOption("OGR_PARQUET_ENABLE_ARROW_EXTENSIONS", "YES")));
115 : #endif
116 274 : return parquetFileFormat;
117 : }
118 :
119 : /************************************************************************/
120 : /* OpenParquetDatasetWithMetadata() */
121 : /************************************************************************/
122 :
123 18 : static GDALDataset *OpenParquetDatasetWithMetadata(
124 : const std::string &osBasePathIn, const char *pszMetadataFile,
125 : const std::string &osQueryParameters, CSLConstList papszOpenOptions)
126 : {
127 36 : std::string osBasePath(osBasePathIn);
128 18 : const auto &[fs, osFSFilename] =
129 36 : GetFileSystem(osBasePath, osQueryParameters);
130 :
131 36 : arrow::dataset::ParquetFactoryOptions options;
132 36 : auto partitioningFactory = arrow::dataset::HivePartitioning::MakeFactory();
133 : options.partitioning =
134 18 : arrow::dataset::PartitioningOrFactory(std::move(partitioningFactory));
135 :
136 18 : std::shared_ptr<arrow::dataset::DatasetFactory> factory;
137 54 : PARQUET_ASSIGN_OR_THROW(factory,
138 : arrow::dataset::ParquetDatasetFactory::Make(
139 : osFSFilename + '/' + pszMetadataFile, fs,
140 : MakeParquetFileFormat(), std::move(options)));
141 :
142 36 : return OpenFromDatasetFactory(osBasePath, factory, papszOpenOptions, fs);
143 : }
144 :
145 : /************************************************************************/
146 : /* OpenParquetDatasetWithoutMetadata() */
147 : /************************************************************************/
148 :
149 : static GDALDataset *
150 256 : OpenParquetDatasetWithoutMetadata(const std::string &osBasePathIn,
151 : const std::string &osQueryParameters,
152 : CSLConstList papszOpenOptions)
153 : {
154 512 : std::string osBasePath(osBasePathIn);
155 256 : const auto &[fs, osFSFilename] =
156 512 : GetFileSystem(osBasePath, osQueryParameters);
157 :
158 512 : arrow::dataset::FileSystemFactoryOptions options;
159 256 : std::shared_ptr<arrow::dataset::DatasetFactory> factory;
160 :
161 512 : const auto fileInfo = fs->GetFileInfo(osFSFilename);
162 256 : if (fileInfo->IsFile())
163 : {
164 1008 : PARQUET_ASSIGN_OR_THROW(
165 : factory, arrow::dataset::FileSystemDatasetFactory::Make(
166 : fs, {std::move(osFSFilename)}, MakeParquetFileFormat(),
167 : std::move(options)));
168 : }
169 : else
170 : {
171 : auto partitioningFactory =
172 8 : arrow::dataset::HivePartitioning::MakeFactory();
173 8 : options.partitioning = arrow::dataset::PartitioningOrFactory(
174 8 : std::move(partitioningFactory));
175 :
176 8 : arrow::fs::FileSelector selector;
177 4 : selector.base_dir = std::move(osFSFilename);
178 4 : selector.recursive = true;
179 :
180 8 : PARQUET_ASSIGN_OR_THROW(
181 : factory, arrow::dataset::FileSystemDatasetFactory::Make(
182 : fs, std::move(selector), MakeParquetFileFormat(),
183 : std::move(options)));
184 : }
185 :
186 512 : return OpenFromDatasetFactory(osBasePath, factory, papszOpenOptions, fs);
187 : }
188 :
189 : #endif
190 :
191 : /************************************************************************/
192 : /* BuildMemDatasetWithRowGroupExtents() */
193 : /************************************************************************/
194 :
195 : /** Builds a MEM dataset that contains, for each row-group of the input file,
196 : * the feature count and spatial extent of the features of this row group,
197 : * using Parquet statistics. This assumes that the Parquet file declares
198 : * a "covering":{"bbox":{ ... }} metadata item.
199 : *
200 : * Only for debug purposes.
201 : */
202 1 : static GDALDataset *BuildMemDatasetWithRowGroupExtents(OGRParquetLayer *poLayer)
203 : {
204 1 : int iParquetXMin = -1;
205 1 : int iParquetYMin = -1;
206 1 : int iParquetXMax = -1;
207 1 : int iParquetYMax = -1;
208 1 : if (poLayer->GeomColsBBOXParquet(0, iParquetXMin, iParquetYMin,
209 : iParquetXMax, iParquetYMax))
210 : {
211 : auto poMemDS = std::unique_ptr<GDALDataset>(
212 2 : MEMDataset::Create("", 0, 0, 0, GDT_Unknown, nullptr));
213 1 : if (!poMemDS)
214 0 : return nullptr;
215 1 : OGRSpatialReference *poTmpSRS = nullptr;
216 1 : const auto poSrcSRS = poLayer->GetSpatialRef();
217 1 : if (poSrcSRS)
218 0 : poTmpSRS = poSrcSRS->Clone();
219 : auto poMemLayer =
220 1 : poMemDS->CreateLayer("footprint", poTmpSRS, wkbPolygon, nullptr);
221 1 : if (poTmpSRS)
222 0 : poTmpSRS->Release();
223 1 : if (!poMemLayer)
224 0 : return nullptr;
225 1 : CPL_IGNORE_RET_VAL(poMemLayer->CreateField(
226 1 : std::make_unique<OGRFieldDefn>("feature_count", OFTInteger64)
227 1 : .get()));
228 :
229 : const auto metadata =
230 2 : poLayer->GetReader()->parquet_reader()->metadata();
231 1 : const int numRowGroups = metadata->num_row_groups();
232 15 : for (int iRowGroup = 0; iRowGroup < numRowGroups; ++iRowGroup)
233 : {
234 28 : std::string osMinTmp, osMaxTmp;
235 : OGRField unusedF;
236 : bool unusedB;
237 : OGRFieldSubType unusedSubType;
238 :
239 : OGRField sXMin;
240 14 : OGR_RawField_SetNull(&sXMin);
241 14 : bool bFoundXMin = false;
242 14 : OGRFieldType eXMinType = OFTMaxType;
243 :
244 : OGRField sYMin;
245 14 : OGR_RawField_SetNull(&sYMin);
246 14 : bool bFoundYMin = false;
247 14 : OGRFieldType eYMinType = OFTMaxType;
248 :
249 : OGRField sXMax;
250 14 : OGR_RawField_SetNull(&sXMax);
251 14 : bool bFoundXMax = false;
252 14 : OGRFieldType eXMaxType = OFTMaxType;
253 :
254 : OGRField sYMax;
255 14 : OGR_RawField_SetNull(&sYMax);
256 14 : bool bFoundYMax = false;
257 14 : OGRFieldType eYMaxType = OFTMaxType;
258 :
259 14 : if (poLayer->GetMinMaxForParquetCol(
260 : iRowGroup, iParquetXMin, nullptr,
261 : /* bComputeMin = */ true, sXMin, bFoundXMin,
262 : /* bComputeMax = */ false, unusedF, unusedB, eXMinType,
263 8 : unusedSubType, osMinTmp, osMaxTmp) &&
264 8 : bFoundXMin && eXMinType == OFTReal &&
265 22 : poLayer->GetMinMaxForParquetCol(
266 : iRowGroup, iParquetYMin, nullptr,
267 : /* bComputeMin = */ true, sYMin, bFoundYMin,
268 : /* bComputeMax = */ false, unusedF, unusedB, eYMinType,
269 8 : unusedSubType, osMinTmp, osMaxTmp) &&
270 8 : bFoundYMin && eYMinType == OFTReal &&
271 22 : poLayer->GetMinMaxForParquetCol(
272 : iRowGroup, iParquetXMax, nullptr,
273 : /* bComputeMin = */ false, unusedF, unusedB,
274 : /* bComputeMax = */ true, sXMax, bFoundXMax, eXMaxType,
275 8 : unusedSubType, osMaxTmp, osMaxTmp) &&
276 8 : bFoundXMax && eXMaxType == OFTReal &&
277 22 : poLayer->GetMinMaxForParquetCol(
278 : iRowGroup, iParquetYMax, nullptr,
279 : /* bComputeMin = */ false, unusedF, unusedB,
280 : /* bComputeMax = */ true, sYMax, bFoundYMax, eYMaxType,
281 8 : unusedSubType, osMaxTmp, osMaxTmp) &&
282 22 : bFoundYMax && eYMaxType == OFTReal)
283 : {
284 16 : OGRFeature oFeat(poMemLayer->GetLayerDefn());
285 8 : oFeat.SetField(0,
286 : static_cast<GIntBig>(
287 8 : metadata->RowGroup(iRowGroup)->num_rows()));
288 16 : auto poPoly = std::make_unique<OGRPolygon>();
289 8 : auto poLR = std::make_unique<OGRLinearRing>();
290 8 : poLR->addPoint(sXMin.Real, sYMin.Real);
291 8 : poLR->addPoint(sXMin.Real, sYMax.Real);
292 8 : poLR->addPoint(sXMax.Real, sYMax.Real);
293 8 : poLR->addPoint(sXMax.Real, sYMin.Real);
294 8 : poLR->addPoint(sXMin.Real, sYMin.Real);
295 8 : poPoly->addRingDirectly(poLR.release());
296 8 : oFeat.SetGeometryDirectly(poPoly.release());
297 8 : CPL_IGNORE_RET_VAL(poMemLayer->CreateFeature(&oFeat));
298 : }
299 : }
300 :
301 1 : return poMemDS.release();
302 : }
303 0 : return nullptr;
304 : }
305 :
306 : /************************************************************************/
307 : /* OGRParquetEditableLayerSynchronizer */
308 : /************************************************************************/
309 :
310 : class OGRParquetEditableLayer;
311 :
312 : class OGRParquetEditableLayerSynchronizer final
313 : : public IOGREditableLayerSynchronizer
314 : {
315 : OGRParquetDataset *const m_poDS;
316 : OGRParquetEditableLayer *const m_poEditableLayer;
317 : const std::string m_osFilename;
318 : const CPLStringList m_aosOpenOptions;
319 :
320 : CPL_DISALLOW_COPY_ASSIGN(OGRParquetEditableLayerSynchronizer)
321 :
322 : public:
323 5 : OGRParquetEditableLayerSynchronizer(
324 : OGRParquetDataset *poDS, OGRParquetEditableLayer *poEditableLayer,
325 : const std::string &osFilename, CSLConstList papszOpenOptions)
326 5 : : m_poDS(poDS), m_poEditableLayer(poEditableLayer),
327 : m_osFilename(osFilename),
328 5 : m_aosOpenOptions(CSLDuplicate(papszOpenOptions))
329 : {
330 5 : }
331 :
332 : OGRErr EditableSyncToDisk(OGRLayer *poEditableLayer,
333 : OGRLayer **ppoDecoratedLayer) override;
334 : };
335 :
336 : /************************************************************************/
337 : /* OGRParquetEditableLayer */
338 : /************************************************************************/
339 :
340 : class OGRParquetEditableLayer final : public IOGRArrowLayer,
341 : public OGREditableLayer
342 : {
343 : public:
344 5 : OGRParquetEditableLayer(OGRParquetDataset *poDS,
345 : const std::string &osFilename,
346 : std::unique_ptr<OGRParquetLayer> poParquetLayer,
347 : CSLConstList papszOpenOptions)
348 5 : : OGREditableLayer(poParquetLayer.get(), false,
349 : new OGRParquetEditableLayerSynchronizer(
350 5 : poDS, this, osFilename, papszOpenOptions),
351 : false),
352 10 : m_poParquetLayer(std::move(poParquetLayer))
353 : {
354 5 : }
355 :
356 10 : ~OGRParquetEditableLayer() override
357 5 : {
358 5 : SyncToDisk();
359 5 : delete m_poSynchronizer;
360 5 : m_poSynchronizer = nullptr;
361 10 : }
362 :
363 : OGRLayer *GetLayer() override;
364 :
365 8 : OGRParquetLayer *GetUnderlyingArrowLayer() override
366 : {
367 8 : return m_poParquetLayer.get();
368 : }
369 :
370 : void
371 8 : SetUnderlyingArrowLayer(std::unique_ptr<OGRParquetLayer> poParquetLayer)
372 : {
373 8 : m_poParquetLayer = std::move(poParquetLayer);
374 8 : }
375 :
376 : private:
377 : std::unique_ptr<OGRParquetLayer> m_poParquetLayer{};
378 : };
379 :
380 : /************************************************************************/
381 : /* OGRParquetEditableLayer::GetLayer() */
382 : /************************************************************************/
383 :
384 30 : OGRLayer *OGRParquetEditableLayer::GetLayer()
385 : {
386 30 : return this;
387 : }
388 :
389 : /************************************************************************/
390 : /* OGRParquetEditableLayerSynchronizer::EditableSyncToDisk() */
391 : /************************************************************************/
392 :
393 4 : OGRErr OGRParquetEditableLayerSynchronizer::EditableSyncToDisk(
394 : OGRLayer *poEditableLayer, OGRLayer **ppoDecoratedLayer)
395 : {
396 4 : CPLAssert(*ppoDecoratedLayer ==
397 : m_poEditableLayer->GetUnderlyingArrowLayer());
398 :
399 8 : const std::string osTmpFilename = m_osFilename + ".tmp.parquet";
400 : try
401 : {
402 0 : std::shared_ptr<arrow::io::OutputStream> out_file;
403 8 : if (STARTS_WITH(osTmpFilename.c_str(), "/vsi") ||
404 4 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES")))
405 : {
406 : VSIVirtualHandleUniquePtr fp =
407 4 : VSIFilesystemHandler::OpenStatic(osTmpFilename.c_str(), "wb");
408 4 : if (fp == nullptr)
409 : {
410 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s",
411 : osTmpFilename.c_str());
412 0 : return OGRERR_FAILURE;
413 : }
414 4 : out_file = std::make_shared<OGRArrowWritableFile>(std::move(fp));
415 : }
416 : else
417 : {
418 0 : PARQUET_ASSIGN_OR_THROW(
419 : out_file, arrow::io::FileOutputStream::Open(osTmpFilename));
420 : }
421 :
422 4 : OGRParquetWriterDataset writerDS(out_file);
423 :
424 4 : auto poParquetLayer = m_poEditableLayer->GetUnderlyingArrowLayer();
425 4 : CPLStringList aosCreationOptions(poParquetLayer->GetCreationOptions());
426 4 : const char *pszFIDColumn = poParquetLayer->GetFIDColumn();
427 4 : if (pszFIDColumn[0])
428 1 : aosCreationOptions.SetNameValue("FID", pszFIDColumn);
429 4 : const char *pszEdges = poParquetLayer->GetMetadataItem("EDGES");
430 4 : if (pszEdges)
431 1 : aosCreationOptions.SetNameValue("EDGES", pszEdges);
432 4 : auto poWriterLayer = writerDS.CreateLayer(
433 8 : CPLGetBasenameSafe(m_osFilename.c_str()).c_str(),
434 4 : poParquetLayer->GetGeomType() == wkbNone
435 : ? nullptr
436 4 : : poParquetLayer->GetLayerDefn()->GetGeomFieldDefn(0),
437 4 : aosCreationOptions.List());
438 4 : if (!poWriterLayer)
439 0 : return OGRERR_FAILURE;
440 :
441 : // Create target fields from source fields
442 9 : for (const auto poSrcFieldDefn :
443 22 : poEditableLayer->GetLayerDefn()->GetFields())
444 : {
445 9 : if (poWriterLayer->CreateField(poSrcFieldDefn) != OGRERR_NONE)
446 0 : return OGRERR_FAILURE;
447 : }
448 :
449 : // Disable all filters and backup them.
450 4 : const char *pszQueryStringConst = poEditableLayer->GetAttrQueryString();
451 : const std::string osQueryString =
452 4 : pszQueryStringConst ? pszQueryStringConst : "";
453 4 : poEditableLayer->SetAttributeFilter(nullptr);
454 :
455 4 : const int iFilterGeomIndexBak = poEditableLayer->GetGeomFieldFilter();
456 0 : std::unique_ptr<OGRGeometry> poFilterGeomBak;
457 4 : if (const OGRGeometry *poFilterGeomSrc =
458 4 : poEditableLayer->GetSpatialFilter())
459 0 : poFilterGeomBak.reset(poFilterGeomSrc->clone());
460 4 : poEditableLayer->SetSpatialFilter(nullptr);
461 :
462 4 : bool bError = false;
463 :
464 : // Copy all features
465 12 : for (auto &&poSrcFeature : *poEditableLayer)
466 : {
467 8 : OGRFeature oDstFeature(poWriterLayer->GetLayerDefn());
468 8 : oDstFeature.SetFrom(poSrcFeature.get());
469 8 : oDstFeature.SetFID(poSrcFeature->GetFID());
470 8 : if (poWriterLayer->CreateFeature(&oDstFeature) != OGRERR_NONE)
471 : {
472 0 : bError = true;
473 0 : break;
474 : }
475 : }
476 :
477 : // Restore filters.
478 4 : if (!osQueryString.empty())
479 0 : poEditableLayer->SetAttributeFilter(osQueryString.c_str());
480 4 : poEditableLayer->SetSpatialFilter(iFilterGeomIndexBak,
481 4 : poFilterGeomBak.get());
482 :
483 : // Flush and close file
484 4 : if (bError || writerDS.Close() != CE_None)
485 0 : return OGRERR_FAILURE;
486 : }
487 0 : catch (const std::exception &e)
488 : {
489 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
490 0 : e.what());
491 0 : return OGRERR_FAILURE;
492 : }
493 :
494 : // Close original Parquet file
495 4 : m_poEditableLayer->SetUnderlyingArrowLayer(nullptr);
496 4 : *ppoDecoratedLayer = nullptr;
497 :
498 : // Backup original file, and rename new file into it
499 8 : const std::string osTmpOriFilename = m_osFilename + ".ogr_bak";
500 8 : if (VSIRename(m_osFilename.c_str(), osTmpOriFilename.c_str()) != 0 ||
501 4 : VSIRename(osTmpFilename.c_str(), m_osFilename.c_str()) != 0)
502 : {
503 0 : CPLError(CE_Failure, CPLE_AppDefined, "Cannot rename files");
504 0 : return OGRERR_FAILURE;
505 : }
506 : // Remove backup file
507 4 : VSIUnlink(osTmpOriFilename.c_str());
508 :
509 : // Re-open parquet file
510 4 : VSILFILE *fp = nullptr;
511 : auto poParquetLayer =
512 8 : m_poDS->CreateReaderLayer(m_osFilename, fp, m_aosOpenOptions.List());
513 4 : if (!poParquetLayer)
514 : {
515 0 : return OGRERR_FAILURE;
516 : }
517 :
518 : // Update adapters
519 4 : *ppoDecoratedLayer = poParquetLayer.get();
520 4 : m_poEditableLayer->SetUnderlyingArrowLayer(std::move(poParquetLayer));
521 :
522 4 : return OGRERR_NONE;
523 : }
524 :
525 : /************************************************************************/
526 : /* Open() */
527 : /************************************************************************/
528 :
529 1695 : static GDALDataset *OGRParquetDriverOpen(GDALOpenInfo *poOpenInfo)
530 : {
531 : #if ARROW_VERSION_MAJOR >= 21
532 : // Register geoarrow.wkb extension if not already done
533 : if (!arrow::GetExtensionType(EXTENSION_NAME_GEOARROW_WKB) &&
534 : CPLTestBool(CPLGetConfigOption(
535 : "OGR_PARQUET_REGISTER_GEOARROW_WKB_EXTENSION", "YES")))
536 : {
537 : CPL_IGNORE_RET_VAL(arrow::RegisterExtensionType(
538 : std::make_shared<OGRGeoArrowWkbExtensionType>(
539 : std::move(arrow::binary()), std::string())));
540 : }
541 : #endif
542 :
543 : #ifdef GDAL_USE_ARROWDATASET
544 3390 : std::string osBasePath(poOpenInfo->pszFilename);
545 3390 : std::string osQueryParameters;
546 : const bool bStartedWithParquetPrefix =
547 1695 : STARTS_WITH(osBasePath.c_str(), "PARQUET:");
548 :
549 1695 : if (bStartedWithParquetPrefix)
550 : {
551 262 : osBasePath = osBasePath.substr(strlen("PARQUET:"));
552 : }
553 :
554 : // Little trick to allow using syntax of
555 : // https://github.com/opengeospatial/geoparquet/discussions/101
556 : // ogrinfo
557 : // "/vsicurl/https://ai4edataeuwest.blob.core.windows.net/us-census/2020/cb_2020_us_vtd_500k.parquet?${SAS_TOKEN}"
558 1695 : if (STARTS_WITH(osBasePath.c_str(), "/vsicurl/"))
559 : {
560 2 : const auto nPos = osBasePath.find(".parquet?st=");
561 2 : if (nPos != std::string::npos)
562 : {
563 0 : osQueryParameters = osBasePath.substr(nPos + strlen(".parquet"));
564 0 : osBasePath.resize(nPos + strlen(".parquet"));
565 : }
566 : }
567 :
568 2585 : if (bStartedWithParquetPrefix || poOpenInfo->bIsDirectory ||
569 890 : !osQueryParameters.empty())
570 : {
571 : VSIStatBufL sStat;
572 805 : if (!osBasePath.empty() && osBasePath.back() == '/')
573 0 : osBasePath.pop_back();
574 : const std::string osMetadataPath =
575 805 : CPLFormFilenameSafe(osBasePath.c_str(), "_metadata", nullptr);
576 805 : if (CPLTestBool(
577 2415 : CPLGetConfigOption("OGR_PARQUET_USE_METADATA_FILE", "YES")) &&
578 1610 : VSIStatL((osMetadataPath + osQueryParameters).c_str(), &sStat) == 0)
579 : {
580 : // If there's a _metadata file, then use it to avoid listing files
581 : try
582 : {
583 18 : if (poOpenInfo->eAccess == GA_Update)
584 0 : return nullptr;
585 :
586 36 : return OpenParquetDatasetWithMetadata(
587 : osBasePath, "_metadata", osQueryParameters,
588 18 : poOpenInfo->papszOpenOptions);
589 : }
590 0 : catch (const std::exception &e)
591 : {
592 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
593 0 : e.what());
594 : }
595 0 : return nullptr;
596 : }
597 : else
598 : {
599 787 : bool bLikelyParquetDataset = false;
600 787 : if (poOpenInfo->bIsDirectory)
601 : {
602 : // Detect if the directory contains .parquet files, or
603 : // subdirectories with a name of the form "key=value", typical
604 : // of HIVE partitioning.
605 1066 : const CPLStringList aosFiles(VSIReadDir(osBasePath.c_str()));
606 23861 : for (const char *pszFilename : cpl::Iterate(aosFiles))
607 : {
608 23330 : if (EQUAL(CPLGetExtensionSafe(pszFilename).c_str(),
609 : "parquet"))
610 : {
611 2 : bLikelyParquetDataset = true;
612 2 : break;
613 : }
614 23328 : else if (strchr(pszFilename, '='))
615 : {
616 : // HIVE partitioning
617 0 : if (VSIStatL(CPLFormFilenameSafe(osBasePath.c_str(),
618 : pszFilename, nullptr)
619 : .c_str(),
620 0 : &sStat) == 0 &&
621 0 : VSI_ISDIR(sStat.st_mode))
622 : {
623 0 : bLikelyParquetDataset = true;
624 0 : break;
625 : }
626 : }
627 : }
628 : }
629 :
630 787 : if (bStartedWithParquetPrefix || bLikelyParquetDataset)
631 : {
632 : try
633 : {
634 256 : if (poOpenInfo->eAccess == GA_Update)
635 0 : return nullptr;
636 :
637 512 : return OpenParquetDatasetWithoutMetadata(
638 : osBasePath, osQueryParameters,
639 256 : poOpenInfo->papszOpenOptions);
640 : }
641 0 : catch (const std::exception &e)
642 : {
643 : // If we aren't quite sure that the passed file name is
644 : // a directory, then silently continue
645 0 : if (poOpenInfo->bIsDirectory)
646 : {
647 0 : CPLError(CE_Failure, CPLE_AppDefined,
648 0 : "Parquet exception: %s", e.what());
649 0 : return nullptr;
650 : }
651 : }
652 : }
653 : }
654 : }
655 : #endif
656 :
657 1421 : if (!OGRParquetDriverIdentify(poOpenInfo))
658 : {
659 0 : return nullptr;
660 : }
661 :
662 1421 : if (poOpenInfo->bIsDirectory)
663 531 : return nullptr;
664 :
665 1780 : std::string osFilename(poOpenInfo->pszFilename);
666 890 : if (STARTS_WITH(poOpenInfo->pszFilename, "PARQUET:"))
667 : {
668 0 : osFilename = poOpenInfo->pszFilename + strlen("PARQUET:");
669 : }
670 :
671 1780 : auto poDS = std::make_unique<OGRParquetDataset>();
672 890 : auto poLayer = poDS->CreateReaderLayer(osFilename, poOpenInfo->fpL,
673 1780 : poOpenInfo->papszOpenOptions);
674 890 : if (!poLayer)
675 1 : return nullptr;
676 :
677 : // For debug purposes: return a layer with the extent of each row group
678 889 : if (CPLTestBool(
679 : CPLGetConfigOption("OGR_PARQUET_SHOW_ROW_GROUP_EXTENT", "NO")))
680 : {
681 1 : return BuildMemDatasetWithRowGroupExtents(poLayer.get());
682 : }
683 :
684 888 : if (poOpenInfo->eAccess == GA_Update)
685 10 : poDS->SetLayer(std::make_unique<OGRParquetEditableLayer>(
686 10 : poDS.get(), osFilename, std::move(poLayer),
687 5 : poOpenInfo->papszOpenOptions));
688 : else
689 883 : poDS->SetLayer(std::move(poLayer));
690 888 : return poDS.release();
691 : }
692 :
693 : /************************************************************************/
694 : /* Create() */
695 : /************************************************************************/
696 :
697 287 : static GDALDataset *OGRParquetDriverCreate(const char *pszName, int nXSize,
698 : int nYSize, int nBands,
699 : GDALDataType eType,
700 : char ** /* papszOptions */)
701 : {
702 287 : if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
703 0 : return nullptr;
704 :
705 : try
706 : {
707 287 : std::shared_ptr<arrow::io::OutputStream> out_file;
708 373 : if (STARTS_WITH(pszName, "/vsi") ||
709 86 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "YES")))
710 : {
711 : VSIVirtualHandleUniquePtr fp =
712 287 : VSIFilesystemHandler::OpenStatic(pszName, "wb");
713 287 : if (fp == nullptr)
714 : {
715 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
716 1 : return nullptr;
717 : }
718 286 : out_file = std::make_shared<OGRArrowWritableFile>(std::move(fp));
719 : }
720 : else
721 : {
722 0 : PARQUET_ASSIGN_OR_THROW(out_file,
723 : arrow::io::FileOutputStream::Open(pszName));
724 : }
725 :
726 286 : return new OGRParquetWriterDataset(out_file);
727 : }
728 0 : catch (const std::exception &e)
729 : {
730 0 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
731 0 : e.what());
732 0 : return nullptr;
733 : }
734 : }
735 :
736 : /************************************************************************/
737 : /* OGRParquetDriver() */
738 : /************************************************************************/
739 :
740 : class OGRParquetDriver final : public GDALDriver
741 : {
742 : std::recursive_mutex m_oMutex{};
743 : bool m_bMetadataInitialized = false;
744 : void InitMetadata();
745 :
746 : public:
747 : const char *GetMetadataItem(const char *pszName,
748 : const char *pszDomain) override;
749 :
750 57 : char **GetMetadata(const char *pszDomain) override
751 : {
752 114 : std::lock_guard oLock(m_oMutex);
753 57 : InitMetadata();
754 114 : return GDALDriver::GetMetadata(pszDomain);
755 : }
756 : };
757 :
758 2358 : const char *OGRParquetDriver::GetMetadataItem(const char *pszName,
759 : const char *pszDomain)
760 : {
761 4716 : std::lock_guard oLock(m_oMutex);
762 2358 : if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
763 : {
764 341 : InitMetadata();
765 : }
766 4716 : return GDALDriver::GetMetadataItem(pszName, pszDomain);
767 : }
768 :
769 398 : void OGRParquetDriver::InitMetadata()
770 : {
771 398 : if (m_bMetadataInitialized)
772 375 : return;
773 23 : m_bMetadataInitialized = true;
774 :
775 : CPLXMLTreeCloser oTree(
776 46 : CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
777 :
778 46 : std::vector<const char *> apszCompressionMethods;
779 23 : bool bHasSnappy = false;
780 23 : int minComprLevel = INT_MAX;
781 23 : int maxComprLevel = INT_MIN;
782 46 : std::string osCompressionLevelDesc = "Compression level, codec dependent.";
783 161 : for (const char *pszMethod :
784 184 : {"SNAPPY", "GZIP", "BROTLI", "ZSTD", "LZ4_RAW", "LZO", "LZ4_HADOOP"})
785 : {
786 : auto compressionTypeRes = arrow::util::Codec::GetCompressionType(
787 322 : CPLString(pszMethod).tolower());
788 322 : if (compressionTypeRes.ok() &&
789 161 : arrow::util::Codec::IsAvailable(*compressionTypeRes))
790 : {
791 138 : const auto compressionType = *compressionTypeRes;
792 138 : if (EQUAL(pszMethod, "SNAPPY"))
793 23 : bHasSnappy = true;
794 138 : apszCompressionMethods.emplace_back(pszMethod);
795 :
796 : auto minCompressLevelRes =
797 276 : arrow::util::Codec::MinimumCompressionLevel(compressionType);
798 : auto maxCompressLevelRes =
799 276 : arrow::util::Codec::MaximumCompressionLevel(compressionType);
800 : auto defCompressLevelRes =
801 276 : arrow::util::Codec::DefaultCompressionLevel(compressionType);
802 230 : if (minCompressLevelRes.ok() && maxCompressLevelRes.ok() &&
803 92 : defCompressLevelRes.ok())
804 : {
805 92 : minComprLevel = std::min(minComprLevel, *minCompressLevelRes);
806 92 : maxComprLevel = std::max(maxComprLevel, *maxCompressLevelRes);
807 92 : osCompressionLevelDesc += ' ';
808 92 : osCompressionLevelDesc += pszMethod;
809 92 : osCompressionLevelDesc += ": [";
810 92 : osCompressionLevelDesc += std::to_string(*minCompressLevelRes);
811 92 : osCompressionLevelDesc += ',';
812 92 : osCompressionLevelDesc += std::to_string(*maxCompressLevelRes);
813 92 : osCompressionLevelDesc += "], default=";
814 92 : if (EQUAL(pszMethod, "ZSTD"))
815 46 : osCompressionLevelDesc += std::to_string(
816 23 : OGR_PARQUET_ZSTD_DEFAULT_COMPRESSION_LEVEL);
817 : else
818 : osCompressionLevelDesc +=
819 69 : std::to_string(*defCompressLevelRes);
820 92 : osCompressionLevelDesc += '.';
821 : }
822 : }
823 : }
824 :
825 : {
826 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
827 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
828 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
829 23 : CPLAddXMLAttributeAndValue(psOption, "description",
830 : "Compression method");
831 23 : CPLAddXMLAttributeAndValue(psOption, "default",
832 : bHasSnappy ? "SNAPPY" : "NONE");
833 : {
834 23 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
835 23 : CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
836 23 : CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
837 : }
838 161 : for (const char *pszMethod : apszCompressionMethods)
839 : {
840 138 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
841 138 : CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
842 : }
843 : }
844 :
845 23 : if (minComprLevel <= maxComprLevel)
846 : {
847 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
848 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION_LEVEL");
849 23 : CPLAddXMLAttributeAndValue(psOption, "type", "int");
850 23 : CPLAddXMLAttributeAndValue(
851 : psOption, "min",
852 : CPLSPrintf("%d",
853 23 : std::min(DEFAULT_COMPRESSION_LEVEL, minComprLevel)));
854 23 : CPLAddXMLAttributeAndValue(psOption, "max",
855 : CPLSPrintf("%d", maxComprLevel));
856 23 : CPLAddXMLAttributeAndValue(psOption, "description",
857 : osCompressionLevelDesc.c_str());
858 23 : CPLAddXMLAttributeAndValue(psOption, "default",
859 : CPLSPrintf("%d", DEFAULT_COMPRESSION_LEVEL));
860 : }
861 :
862 : {
863 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
864 23 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
865 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
866 23 : CPLAddXMLAttributeAndValue(psOption, "description",
867 : "Encoding of geometry columns");
868 23 : CPLAddXMLAttributeAndValue(psOption, "default", "WKB");
869 92 : for (const char *pszEncoding :
870 115 : {"WKB", "WKT", "GEOARROW", "GEOARROW_INTERLEAVED"})
871 : {
872 92 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
873 92 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
874 92 : if (EQUAL(pszEncoding, "GEOARROW"))
875 23 : CPLAddXMLAttributeAndValue(poValueNode, "alias",
876 : "GEOARROW_STRUCT");
877 : }
878 : }
879 :
880 : {
881 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
882 23 : CPLAddXMLAttributeAndValue(psOption, "name", "ROW_GROUP_SIZE");
883 23 : CPLAddXMLAttributeAndValue(psOption, "type", "integer");
884 23 : CPLAddXMLAttributeAndValue(psOption, "description",
885 : "Maximum number of rows per group");
886 23 : CPLAddXMLAttributeAndValue(psOption, "default", "65536");
887 : }
888 :
889 : {
890 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
891 23 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
892 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
893 23 : CPLAddXMLAttributeAndValue(psOption, "description",
894 : "Name of geometry column");
895 23 : CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
896 : }
897 :
898 : {
899 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
900 23 : CPLAddXMLAttributeAndValue(psOption, "name", "COORDINATE_PRECISION");
901 23 : CPLAddXMLAttributeAndValue(psOption, "type", "float");
902 23 : CPLAddXMLAttributeAndValue(psOption, "description",
903 : "Number of decimals for coordinates (only "
904 : "for GEOMETRY_ENCODING=WKT)");
905 : }
906 :
907 : {
908 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
909 23 : CPLAddXMLAttributeAndValue(psOption, "name", "FID");
910 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
911 23 : CPLAddXMLAttributeAndValue(psOption, "description",
912 : "Name of the FID column to create");
913 : }
914 :
915 : {
916 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
917 23 : CPLAddXMLAttributeAndValue(psOption, "name", "POLYGON_ORIENTATION");
918 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
919 23 : CPLAddXMLAttributeAndValue(
920 : psOption, "description",
921 : "Which ring orientation to use for polygons");
922 23 : CPLAddXMLAttributeAndValue(psOption, "default", "COUNTERCLOCKWISE");
923 23 : CPLCreateXMLElementAndValue(psOption, "Value", "COUNTERCLOCKWISE");
924 23 : CPLCreateXMLElementAndValue(psOption, "Value", "UNMODIFIED");
925 : }
926 :
927 : {
928 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
929 23 : CPLAddXMLAttributeAndValue(psOption, "name", "EDGES");
930 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
931 23 : CPLAddXMLAttributeAndValue(
932 : psOption, "description",
933 : "Name of the coordinate system for the edges");
934 23 : CPLAddXMLAttributeAndValue(psOption, "default", "PLANAR");
935 23 : CPLCreateXMLElementAndValue(psOption, "Value", "PLANAR");
936 23 : CPLCreateXMLElementAndValue(psOption, "Value", "SPHERICAL");
937 : }
938 :
939 : {
940 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
941 23 : CPLAddXMLAttributeAndValue(psOption, "name", "CREATOR");
942 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
943 23 : CPLAddXMLAttributeAndValue(psOption, "description",
944 : "Name of creating application");
945 : }
946 :
947 : {
948 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
949 23 : CPLAddXMLAttributeAndValue(psOption, "name", "WRITE_COVERING_BBOX");
950 23 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
951 23 : CPLAddXMLAttributeAndValue(psOption, "default", "AUTO");
952 23 : CPLAddXMLAttributeAndValue(psOption, "description",
953 : "Whether to write xmin/ymin/xmax/ymax "
954 : "columns with the bounding box of "
955 : "geometries");
956 23 : CPLCreateXMLElementAndValue(psOption, "Value", "AUTO");
957 23 : CPLCreateXMLElementAndValue(psOption, "Value", "YES");
958 23 : CPLCreateXMLElementAndValue(psOption, "Value", "NO");
959 : }
960 :
961 : #if ARROW_VERSION_MAJOR >= 21
962 : {
963 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
964 : CPLAddXMLAttributeAndValue(psOption, "name", "USE_PARQUET_GEO_TYPES");
965 : CPLAddXMLAttributeAndValue(psOption, "default", "NO");
966 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
967 : CPLAddXMLAttributeAndValue(psOption, "description",
968 : "Whether to use Parquet Geometry/Geography "
969 : "logical types (introduced in libarrow 21), "
970 : "when using GEOMETRY_ENCODING=WKB encoding");
971 : CPLCreateXMLElementAndValue(psOption, "Value", "YES");
972 : CPLCreateXMLElementAndValue(psOption, "Value", "NO");
973 : CPLCreateXMLElementAndValue(psOption, "Value", "ONLY");
974 : }
975 : #endif
976 :
977 : {
978 23 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
979 23 : CPLAddXMLAttributeAndValue(psOption, "name", "SORT_BY_BBOX");
980 23 : CPLAddXMLAttributeAndValue(psOption, "type", "boolean");
981 23 : CPLAddXMLAttributeAndValue(psOption, "default", "NO");
982 23 : CPLAddXMLAttributeAndValue(psOption, "description",
983 : "Whether features should be sorted based on "
984 : "the bounding box of their geometries");
985 : }
986 :
987 23 : char *pszXML = CPLSerializeXMLTree(oTree.get());
988 23 : GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
989 23 : CPLFree(pszXML);
990 : }
991 :
992 : /************************************************************************/
993 : /* RegisterOGRParquet() */
994 : /************************************************************************/
995 :
996 37 : void RegisterOGRParquet()
997 : {
998 37 : if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
999 0 : return;
1000 :
1001 74 : auto poDriver = std::make_unique<OGRParquetDriver>();
1002 37 : OGRParquetDriverSetCommonMetadata(poDriver.get());
1003 :
1004 37 : poDriver->pfnOpen = OGRParquetDriverOpen;
1005 37 : poDriver->pfnCreate = OGRParquetDriverCreate;
1006 :
1007 37 : poDriver->SetMetadataItem("ARROW_VERSION", ARROW_VERSION_STRING);
1008 : #ifdef GDAL_USE_ARROWDATASET
1009 37 : poDriver->SetMetadataItem("ARROW_DATASET", "YES");
1010 : #endif
1011 :
1012 37 : GetGDALDriverManager()->RegisterDriver(poDriver.release());
1013 :
1014 : #if ARROW_VERSION_MAJOR >= 16
1015 : // Mostly for tests
1016 : const char *pszPath =
1017 37 : CPLGetConfigOption("OGR_PARQUET_LOAD_FILE_SYSTEM_FACTORIES", nullptr);
1018 37 : if (pszPath)
1019 : {
1020 0 : auto result = arrow::fs::LoadFileSystemFactories(pszPath);
1021 0 : if (!result.ok())
1022 : {
1023 0 : CPLError(CE_Warning, CPLE_AppDefined,
1024 : "arrow::fs::LoadFileSystemFactories() failed with %s",
1025 0 : result.message().c_str());
1026 : }
1027 : }
1028 : #endif
1029 :
1030 : #if defined(GDAL_USE_ARROWDATASET) && defined(GDAL_USE_ARROWCOMPUTE)
1031 : {
1032 : auto status = arrow::compute::Initialize();
1033 : if (!status.ok())
1034 : {
1035 : CPLError(CE_Warning, CPLE_AppDefined,
1036 : "arrow::compute::Initialize() failed with %s",
1037 : status.message().c_str());
1038 : }
1039 : }
1040 : #endif
1041 : }
|