Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "cpl_json.h"
14 : #include "cpl_time.h"
15 : #include "cpl_multiproc.h"
16 : #include "gdal_pam.h"
17 : #include "ogrsf_frmts.h"
18 : #include "ogr_p.h"
19 :
20 : #include <algorithm>
21 : #include <cinttypes>
22 : #include <cmath>
23 : #include <limits>
24 : #include <map>
25 : #include <set>
26 : #include <utility>
27 :
28 : #include "ogr_parquet.h"
29 :
30 : #include "../arrow_common/ograrrowlayer.hpp"
31 : #include "../arrow_common/ograrrowdataset.hpp"
32 :
33 : /************************************************************************/
34 : /* OGRParquetLayerBase() */
35 : /************************************************************************/
36 :
37 1167 : OGRParquetLayerBase::OGRParquetLayerBase(OGRParquetDataset *poDS,
38 : const char *pszLayerName,
39 1167 : CSLConstList papszOpenOptions)
40 : : OGRArrowLayer(poDS, pszLayerName), m_poDS(poDS),
41 : m_aosGeomPossibleNames(CSLTokenizeString2(
42 : CSLFetchNameValueDef(papszOpenOptions, "GEOM_POSSIBLE_NAMES",
43 : "geometry,wkb_geometry,wkt_geometry"),
44 : ",", 0)),
45 1167 : m_osCRS(CSLFetchNameValueDef(papszOpenOptions, "CRS", ""))
46 : {
47 1167 : }
48 :
49 : /************************************************************************/
50 : /* GetDataset() */
51 : /************************************************************************/
52 :
53 27 : GDALDataset *OGRParquetLayerBase::GetDataset()
54 : {
55 27 : return m_poDS;
56 : }
57 :
58 : /************************************************************************/
59 : /* ResetReading() */
60 : /************************************************************************/
61 :
62 6432 : void OGRParquetLayerBase::ResetReading()
63 : {
64 6432 : if (m_iRecordBatch != 0)
65 : {
66 6050 : m_poRecordBatchReader.reset();
67 : }
68 6432 : OGRArrowLayer::ResetReading();
69 6432 : }
70 :
71 : /************************************************************************/
72 : /* InvalidateCachedBatches() */
73 : /************************************************************************/
74 :
75 1704 : void OGRParquetLayerBase::InvalidateCachedBatches()
76 : {
77 1704 : m_iRecordBatch = -1;
78 1704 : ResetReading();
79 1704 : }
80 :
81 : /************************************************************************/
82 : /* LoadGeoMetadata() */
83 : /************************************************************************/
84 :
85 1167 : void OGRParquetLayerBase::LoadGeoMetadata(
86 : const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata)
87 : {
88 1167 : if (kv_metadata && kv_metadata->Contains("geo"))
89 : {
90 2208 : auto geo = kv_metadata->Get("geo");
91 1104 : if (geo.ok())
92 : {
93 1104 : CPLDebug("PARQUET", "geo = %s", geo->c_str());
94 2208 : CPLJSONDocument oDoc;
95 1104 : if (oDoc.LoadMemory(*geo))
96 : {
97 2206 : auto oRoot = oDoc.GetRoot();
98 3309 : const auto osVersion = oRoot.GetString("version");
99 2563 : if (osVersion != "0.1.0" && osVersion != "0.2.0" &&
100 2189 : osVersion != "0.3.0" && osVersion != "0.4.0" &&
101 2185 : osVersion != "1.0.0-beta.1" && osVersion != "1.0.0-rc.1" &&
102 2561 : osVersion != "1.0.0" && osVersion != "1.1.0")
103 : {
104 1 : CPLDebug(
105 : "PARQUET",
106 : "version = %s not explicitly handled by the driver",
107 : osVersion.c_str());
108 : }
109 :
110 3309 : auto oColumns = oRoot.GetObj("columns");
111 1103 : if (oColumns.IsValid())
112 : {
113 2221 : for (const auto &oColumn : oColumns.GetChildren())
114 : {
115 1119 : m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
116 : }
117 : }
118 : }
119 : else
120 : {
121 1 : CPLError(CE_Warning, CPLE_AppDefined,
122 : "Cannot parse 'geo' metadata");
123 : }
124 : }
125 : }
126 1167 : }
127 :
128 : /************************************************************************/
129 : /* ParseGeometryColumnCovering() */
130 : /************************************************************************/
131 :
132 : //! Parse bounding box column definition
133 : /*static */
134 2228 : bool OGRParquetLayerBase::ParseGeometryColumnCovering(
135 : const CPLJSONObject &oJSONDef, std::string &osBBOXColumn,
136 : std::string &osXMin, std::string &osYMin, std::string &osXMax,
137 : std::string &osYMax)
138 : {
139 6684 : const auto oCovering = oJSONDef["covering"];
140 3139 : if (oCovering.IsValid() &&
141 911 : oCovering.GetType() == CPLJSONObject::Type::Object)
142 : {
143 1822 : const auto oBBOX = oCovering["bbox"];
144 911 : if (oBBOX.IsValid() && oBBOX.GetType() == CPLJSONObject::Type::Object)
145 : {
146 1822 : const auto oXMin = oBBOX["xmin"];
147 1822 : const auto oYMin = oBBOX["ymin"];
148 1822 : const auto oXMax = oBBOX["xmax"];
149 1822 : const auto oYMax = oBBOX["ymax"];
150 1822 : if (oXMin.IsValid() && oYMin.IsValid() && oXMax.IsValid() &&
151 911 : oYMax.IsValid() &&
152 911 : oXMin.GetType() == CPLJSONObject::Type::Array &&
153 911 : oYMin.GetType() == CPLJSONObject::Type::Array &&
154 2733 : oXMax.GetType() == CPLJSONObject::Type::Array &&
155 911 : oYMax.GetType() == CPLJSONObject::Type::Array)
156 : {
157 911 : const auto osXMinArray = oXMin.ToArray();
158 911 : const auto osYMinArray = oYMin.ToArray();
159 911 : const auto osXMaxArray = oXMax.ToArray();
160 911 : const auto osYMaxArray = oYMax.ToArray();
161 911 : if (osXMinArray.Size() == 2 && osYMinArray.Size() == 2 &&
162 911 : osXMaxArray.Size() == 2 && osYMaxArray.Size() == 2 &&
163 1822 : osXMinArray[0].GetType() == CPLJSONObject::Type::String &&
164 1822 : osXMinArray[1].GetType() == CPLJSONObject::Type::String &&
165 1822 : osYMinArray[0].GetType() == CPLJSONObject::Type::String &&
166 1822 : osYMinArray[1].GetType() == CPLJSONObject::Type::String &&
167 1822 : osXMaxArray[0].GetType() == CPLJSONObject::Type::String &&
168 1822 : osXMaxArray[1].GetType() == CPLJSONObject::Type::String &&
169 1822 : osYMaxArray[0].GetType() == CPLJSONObject::Type::String &&
170 2733 : osYMaxArray[1].GetType() == CPLJSONObject::Type::String &&
171 2733 : osXMinArray[0].ToString() == osYMinArray[0].ToString() &&
172 3644 : osXMinArray[0].ToString() == osXMaxArray[0].ToString() &&
173 1822 : osXMinArray[0].ToString() == osYMaxArray[0].ToString())
174 : {
175 911 : osBBOXColumn = osXMinArray[0].ToString();
176 911 : osXMin = osXMinArray[1].ToString();
177 911 : osYMin = osYMinArray[1].ToString();
178 911 : osXMax = osXMaxArray[1].ToString();
179 911 : osYMax = osYMaxArray[1].ToString();
180 911 : return true;
181 : }
182 : }
183 : }
184 : }
185 1317 : return false;
186 : }
187 :
188 : /************************************************************************/
189 : /* DealWithGeometryColumn() */
190 : /************************************************************************/
191 :
192 32026 : bool OGRParquetLayerBase::DealWithGeometryColumn(
193 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
194 : std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun,
195 : [[maybe_unused]] const parquet::ColumnDescriptor *parquetColumn,
196 : [[maybe_unused]] const parquet::FileMetaData *metadata,
197 : [[maybe_unused]] int iColumn)
198 : {
199 64052 : const auto &field_kv_metadata = field->metadata();
200 64052 : std::string osExtensionName;
201 32026 : if (field_kv_metadata)
202 : {
203 84 : auto extension_name = field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
204 42 : if (extension_name.ok())
205 : {
206 2 : osExtensionName = *extension_name;
207 : }
208 : #ifdef DEBUG
209 42 : CPLDebug("PARQUET", "Metadata field %s:", field->name().c_str());
210 45 : for (const auto &keyValue : field_kv_metadata->sorted_pairs())
211 : {
212 3 : CPLDebug("PARQUET", " %s = %s", keyValue.first.c_str(),
213 : keyValue.second.c_str());
214 : }
215 : #endif
216 : }
217 :
218 32026 : bool bRegularField = true;
219 32026 : auto oIter = m_oMapGeometryColumns.find(field->name());
220 32026 : if (oIter != m_oMapGeometryColumns.end() ||
221 62933 : STARTS_WITH(osExtensionName.c_str(), "ogc.") ||
222 30907 : STARTS_WITH(osExtensionName.c_str(), "geoarrow."))
223 : {
224 2240 : CPLJSONObject oJSONDef;
225 1120 : if (oIter != m_oMapGeometryColumns.end())
226 1119 : oJSONDef = oIter->second;
227 3360 : auto osEncoding = oJSONDef.GetString("encoding");
228 1120 : if (osEncoding.empty() && !osExtensionName.empty())
229 1 : osEncoding = osExtensionName;
230 :
231 1120 : OGRwkbGeometryType eGeomType = wkbUnknown;
232 1120 : auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
233 1120 : if (IsValidGeometryEncoding(field, osEncoding,
234 2240 : oIter != m_oMapGeometryColumns.end(),
235 : eGeomType, eGeomEncoding))
236 : {
237 1120 : bRegularField = false;
238 2240 : OGRGeomFieldDefn oField(field->name().c_str(), wkbUnknown);
239 :
240 3360 : auto oCRS = oJSONDef["crs"];
241 1120 : OGRSpatialReference *poSRS = nullptr;
242 1120 : if (!oCRS.IsValid())
243 : {
244 40 : if (!m_oMapGeometryColumns.empty())
245 : {
246 : // WGS 84 is implied if no crs member is found.
247 39 : poSRS = new OGRSpatialReference();
248 39 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
249 39 : poSRS->importFromEPSG(4326);
250 : }
251 : }
252 1080 : else if (oCRS.GetType() == CPLJSONObject::Type::String)
253 : {
254 1116 : const auto osWKT = oCRS.ToString();
255 372 : poSRS = new OGRSpatialReference();
256 372 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
257 :
258 372 : if (poSRS->importFromWkt(osWKT.c_str()) != OGRERR_NONE)
259 : {
260 0 : poSRS->Release();
261 0 : poSRS = nullptr;
262 : }
263 : }
264 708 : else if (oCRS.GetType() == CPLJSONObject::Type::Object)
265 : {
266 : // CRS encoded as PROJJSON (extension)
267 120 : const auto oType = oCRS["type"];
268 80 : if (oType.IsValid() &&
269 40 : oType.GetType() == CPLJSONObject::Type::String)
270 : {
271 120 : const auto osType = oType.ToString();
272 40 : if (osType.find("CRS") != std::string::npos)
273 : {
274 40 : poSRS = new OGRSpatialReference();
275 40 : poSRS->SetAxisMappingStrategy(
276 : OAMS_TRADITIONAL_GIS_ORDER);
277 :
278 80 : if (poSRS->SetFromUserInput(
279 80 : oCRS.ToString().c_str(),
280 : OGRSpatialReference::
281 40 : SET_FROM_USER_INPUT_LIMITATIONS_get()) !=
282 : OGRERR_NONE)
283 : {
284 0 : poSRS->Release();
285 0 : poSRS = nullptr;
286 : }
287 : }
288 : }
289 : }
290 :
291 1120 : if (poSRS)
292 : {
293 451 : const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
294 451 : if (dfCoordEpoch > 0)
295 4 : poSRS->SetCoordinateEpoch(dfCoordEpoch);
296 :
297 451 : oField.SetSpatialRef(poSRS);
298 :
299 451 : poSRS->Release();
300 : }
301 :
302 1120 : if (!m_osCRS.empty())
303 : {
304 0 : poSRS = new OGRSpatialReference();
305 0 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
306 0 : if (poSRS->SetFromUserInput(
307 : m_osCRS.c_str(),
308 : OGRSpatialReference::
309 0 : SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
310 : OGRERR_NONE)
311 : {
312 0 : oField.SetSpatialRef(poSRS);
313 : }
314 0 : poSRS->Release();
315 : }
316 :
317 1120 : if (oJSONDef.GetString("edges") == "spherical")
318 : {
319 5 : SetMetadataItem("EDGES", "SPHERICAL");
320 : }
321 :
322 : // m_aeGeomEncoding be filled before calling
323 : // ComputeGeometryColumnType()
324 1120 : m_aeGeomEncoding.push_back(eGeomEncoding);
325 1120 : if (eGeomType == wkbUnknown)
326 : {
327 : // geometry_types since 1.0.0-beta1. Was geometry_type
328 : // before
329 1764 : auto oType = oJSONDef.GetObj("geometry_types");
330 588 : if (!oType.IsValid())
331 376 : oType = oJSONDef.GetObj("geometry_type");
332 588 : if (oType.GetType() == CPLJSONObject::Type::String)
333 : {
334 : // string is no longer valid since 1.0.0-beta1
335 3 : const auto osType = oType.ToString();
336 1 : if (osType != "Unknown")
337 1 : eGeomType = GetGeometryTypeFromString(osType);
338 : }
339 587 : else if (oType.GetType() == CPLJSONObject::Type::Array)
340 : {
341 424 : const auto oTypeArray = oType.ToArray();
342 212 : if (oTypeArray.Size() == 1)
343 : {
344 105 : eGeomType =
345 105 : GetGeometryTypeFromString(oTypeArray[0].ToString());
346 : }
347 107 : else if (oTypeArray.Size() > 1)
348 : {
349 : const auto PromoteToCollection =
350 271 : [](OGRwkbGeometryType eType)
351 : {
352 271 : if (eType == wkbPoint)
353 41 : return wkbMultiPoint;
354 230 : if (eType == wkbLineString)
355 36 : return wkbMultiLineString;
356 194 : if (eType == wkbPolygon)
357 52 : return wkbMultiPolygon;
358 142 : return eType;
359 : };
360 51 : bool bMixed = false;
361 51 : bool bHasMulti = false;
362 51 : bool bHasZ = false;
363 51 : bool bHasM = false;
364 : const auto eFirstType =
365 51 : OGR_GT_Flatten(GetGeometryTypeFromString(
366 102 : oTypeArray[0].ToString()));
367 : const auto eFirstTypeCollection =
368 51 : PromoteToCollection(eFirstType);
369 145 : for (int i = 0; i < oTypeArray.Size(); ++i)
370 : {
371 126 : const auto eThisGeom = GetGeometryTypeFromString(
372 252 : oTypeArray[i].ToString());
373 126 : if (PromoteToCollection(OGR_GT_Flatten(
374 126 : eThisGeom)) != eFirstTypeCollection)
375 : {
376 32 : bMixed = true;
377 32 : break;
378 : }
379 94 : bHasZ |= OGR_GT_HasZ(eThisGeom) != FALSE;
380 94 : bHasM |= OGR_GT_HasM(eThisGeom) != FALSE;
381 94 : bHasMulti |=
382 94 : (PromoteToCollection(OGR_GT_Flatten(
383 94 : eThisGeom)) == OGR_GT_Flatten(eThisGeom));
384 : }
385 51 : if (!bMixed)
386 : {
387 19 : if (eFirstTypeCollection == wkbMultiPolygon ||
388 : eFirstTypeCollection == wkbMultiLineString)
389 : {
390 18 : if (bHasMulti)
391 18 : eGeomType = OGR_GT_SetModifier(
392 : eFirstTypeCollection, bHasZ, bHasM);
393 : else
394 0 : eGeomType = OGR_GT_SetModifier(
395 : eFirstType, bHasZ, bHasM);
396 : }
397 : }
398 : }
399 : }
400 375 : else if (CPLTestBool(CPLGetConfigOption(
401 : "OGR_PARQUET_COMPUTE_GEOMETRY_TYPE", "YES")))
402 : {
403 375 : eGeomType = computeGeometryTypeFun();
404 : }
405 : }
406 :
407 1120 : oField.SetType(eGeomType);
408 1120 : oField.SetNullable(field->nullable());
409 1120 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
410 1120 : m_anMapGeomFieldIndexToArrowColumn.push_back(iFieldIdx);
411 : }
412 : }
413 :
414 32026 : std::shared_ptr<arrow::DataType> fieldType = field->type();
415 32026 : auto fieldTypeId = fieldType->id();
416 : #if PARQUET_VERSION_MAJOR >= 21
417 : // Try to detect Arrow >= 21 GEOMETRY/GEOGRAPHY logical type
418 : if (bRegularField && fieldTypeId == arrow::Type::EXTENSION)
419 : {
420 : auto extensionType =
421 : cpl::down_cast<arrow::ExtensionType *>(fieldType.get());
422 : osExtensionName = extensionType->extension_name();
423 : if (osExtensionName == EXTENSION_NAME_GEOARROW_WKB)
424 : {
425 : const auto arrowWkb =
426 : dynamic_cast<const OGRGeoArrowWkbExtensionType *>(
427 : extensionType);
428 : #ifdef DEBUG
429 : if (arrowWkb)
430 : {
431 : CPLDebug("PARQUET", "arrowWkb = '%s'",
432 : arrowWkb->Serialize().c_str());
433 : }
434 : #endif
435 :
436 : fieldTypeId = extensionType->storage_type()->id();
437 : if (fieldTypeId == arrow::Type::BINARY ||
438 : fieldTypeId == arrow::Type::LARGE_BINARY)
439 : {
440 : OGRwkbGeometryType eGeomType = wkbUnknown;
441 : bool bSkipRowGroups = false;
442 :
443 : // m_aeGeomEncoding be filled before calling
444 : // ComputeGeometryColumnType()
445 : bRegularField = false;
446 : m_aeGeomEncoding.push_back(OGRArrowGeomEncoding::WKB);
447 :
448 : std::string crs(m_osCRS);
449 : if (parquetColumn && crs.empty())
450 : {
451 : const auto &logicalType = parquetColumn->logical_type();
452 : if (logicalType->is_geometry())
453 : {
454 : crs = static_cast<const parquet::GeometryLogicalType *>(
455 : logicalType.get())
456 : ->crs();
457 : if (crs.empty())
458 : crs = "EPSG:4326";
459 : CPLDebugOnly("PARQUET", "GeometryLogicalType crs=%s",
460 : crs.c_str());
461 : }
462 : else if (logicalType->is_geography())
463 : {
464 : const auto *geographyType =
465 : static_cast<const parquet::GeographyLogicalType *>(
466 : logicalType.get());
467 : crs = geographyType->crs();
468 : if (crs.empty())
469 : crs = "EPSG:4326";
470 :
471 : SetMetadataItem(
472 : "EDGES",
473 : CPLString(
474 : std::string(geographyType->algorithm_name()))
475 : .toupper());
476 : CPLDebugOnly("PARQUET", "GeographyLogicalType crs=%s",
477 : crs.c_str());
478 : }
479 : else
480 : {
481 : CPLDebug("PARQUET", "geoarrow.wkb column is neither a "
482 : "geometry or geography one");
483 :
484 : // This might be an old geoarrow.wkb extension...
485 : if (CPLTestBool(CPLGetConfigOption(
486 : "OGR_PARQUET_COMPUTE_GEOMETRY_TYPE", "YES")))
487 : {
488 : eGeomType = computeGeometryTypeFun();
489 : bSkipRowGroups = true;
490 : }
491 : }
492 :
493 : // Cf https://github.com/apache/parquet-format/blob/master/Geospatial.md#crs-customization
494 : // "projjson: PROJJSON, identifier is the name of a table property or a file property where the projjson string is stored."
495 : // Here the property is interpreted as the key of a file metadata (as done in libarrow)
496 : constexpr const char *PROJJSON_PREFIX = "projjson:";
497 : if (cpl::starts_with(crs, PROJJSON_PREFIX) && metadata)
498 : {
499 : auto projjson_value =
500 : metadata->key_value_metadata()->Get(
501 : crs.substr(strlen(PROJJSON_PREFIX)));
502 : if (projjson_value.ok())
503 : {
504 : crs = *projjson_value;
505 : }
506 : else
507 : {
508 : CPLDebug("PARQUET",
509 : "Cannot find file metadata for %s",
510 : crs.c_str());
511 : }
512 : }
513 : }
514 : else if (!parquetColumn && arrowWkb)
515 : {
516 : // For a OGRParquetDatasetLayer for example
517 : const std::string arrowWkbMetadata = arrowWkb->Serialize();
518 : if (arrowWkbMetadata.empty() || arrowWkbMetadata == "{}")
519 : {
520 : crs = "EPSG:4326";
521 : }
522 : else if (arrowWkbMetadata[0] == '{')
523 : {
524 : CPLJSONDocument oDoc;
525 : if (oDoc.LoadMemory(arrowWkbMetadata))
526 : {
527 : auto jCrs = oDoc.GetRoot()["crs"];
528 : if (jCrs.GetType() == CPLJSONObject::Type::Object)
529 : {
530 : crs = jCrs.Format(
531 : CPLJSONObject::PrettyFormat::Plain);
532 : }
533 : else if (jCrs.GetType() ==
534 : CPLJSONObject::Type::String)
535 : {
536 : crs = jCrs.ToString();
537 : }
538 : if (oDoc.GetRoot()["edges"].ToString() ==
539 : "spherical")
540 : {
541 : SetMetadataItem("EDGES", "SPHERICAL");
542 : }
543 : }
544 : }
545 : }
546 :
547 : bool bGeomTypeInvalid = false;
548 : bool bHasMulti = false;
549 : bool bHasZ = false;
550 : bool bHasM = false;
551 : bool bFirst = true;
552 : OGRwkbGeometryType eFirstType = wkbUnknown;
553 : OGRwkbGeometryType eFirstTypeCollection = wkbUnknown;
554 : const auto numRowGroups =
555 : metadata ? metadata->num_row_groups() : 0;
556 : bool bEnvelopeValid = true;
557 : OGREnvelope sEnvelope;
558 : bool bEnvelope3DValid = true;
559 : OGREnvelope3D sEnvelope3D;
560 : for (int iRowGroup = 0;
561 : !bSkipRowGroups && iRowGroup < numRowGroups; ++iRowGroup)
562 : {
563 : const auto columnChunk =
564 : metadata->RowGroup(iRowGroup)->ColumnChunk(iColumn);
565 : if (auto geostats = columnChunk->geo_statistics())
566 : {
567 : double dfMinX =
568 : std::numeric_limits<double>::quiet_NaN();
569 : double dfMinY =
570 : std::numeric_limits<double>::quiet_NaN();
571 : double dfMinZ =
572 : std::numeric_limits<double>::quiet_NaN();
573 : double dfMaxX =
574 : std::numeric_limits<double>::quiet_NaN();
575 : double dfMaxY =
576 : std::numeric_limits<double>::quiet_NaN();
577 : double dfMaxZ =
578 : std::numeric_limits<double>::quiet_NaN();
579 : if (bEnvelopeValid && geostats->dimension_valid()[0] &&
580 : geostats->dimension_valid()[1])
581 : {
582 : dfMinX = geostats->lower_bound()[0];
583 : dfMaxX = geostats->upper_bound()[0];
584 : dfMinY = geostats->lower_bound()[1];
585 : dfMaxY = geostats->upper_bound()[1];
586 :
587 : // Deal as best as we can with wrap around bounding box
588 : if (dfMinX > dfMaxX && std::fabs(dfMinX) <= 180 &&
589 : std::fabs(dfMaxX) <= 180)
590 : {
591 : dfMinX = -180;
592 : dfMaxX = 180;
593 : }
594 :
595 : if (std::isfinite(dfMinX) &&
596 : std::isfinite(dfMaxX) &&
597 : std::isfinite(dfMinY) && std::isfinite(dfMaxY))
598 : {
599 : sEnvelope.Merge(dfMinX, dfMinY);
600 : sEnvelope.Merge(dfMaxX, dfMaxY);
601 : if (bEnvelope3DValid &&
602 : geostats->dimension_valid()[2])
603 : {
604 : dfMinZ = geostats->lower_bound()[2];
605 : dfMaxZ = geostats->upper_bound()[2];
606 : if (std::isfinite(dfMinZ) &&
607 : std::isfinite(dfMaxZ))
608 : {
609 : sEnvelope3D.Merge(dfMinX, dfMinY,
610 : dfMinZ);
611 : sEnvelope3D.Merge(dfMaxX, dfMaxY,
612 : dfMaxZ);
613 : }
614 : }
615 : }
616 : }
617 :
618 : bEnvelopeValid =
619 : bEnvelopeValid && std::isfinite(dfMinX) &&
620 : std::isfinite(dfMaxX) && std::isfinite(dfMinY) &&
621 : std::isfinite(dfMaxY);
622 :
623 : bEnvelope3DValid = bEnvelope3DValid &&
624 : std::isfinite(dfMinZ) &&
625 : std::isfinite(dfMaxZ);
626 :
627 : if (auto geometry_types = geostats->geometry_types())
628 : {
629 : const auto PromoteToCollection =
630 : [](OGRwkbGeometryType eType)
631 : {
632 : if (eType == wkbPoint)
633 : return wkbMultiPoint;
634 : if (eType == wkbLineString)
635 : return wkbMultiLineString;
636 : if (eType == wkbPolygon)
637 : return wkbMultiPolygon;
638 : return eType;
639 : };
640 :
641 : for (int nGeomType : *geometry_types)
642 : {
643 : OGRwkbGeometryType eThisGeom = wkbUnknown;
644 : if ((nGeomType > 0 && nGeomType <= 17) ||
645 : (nGeomType > 2000 && nGeomType <= 2017) ||
646 : (nGeomType > 3000 && nGeomType <= 3017))
647 : {
648 : eThisGeom = static_cast<OGRwkbGeometryType>(
649 : nGeomType);
650 : }
651 : else if (nGeomType > 1000 && nGeomType <= 1017)
652 : {
653 : eThisGeom = OGR_GT_SetZ(
654 : static_cast<OGRwkbGeometryType>(
655 : nGeomType - 1000));
656 : ;
657 : }
658 : else
659 : {
660 : CPLDebug("PARQUET",
661 : "Unknown geometry type: %d",
662 : nGeomType);
663 : bGeomTypeInvalid = true;
664 : break;
665 : }
666 : if (bFirst)
667 : {
668 : bFirst = false;
669 : eFirstType = eThisGeom;
670 : eFirstTypeCollection =
671 : PromoteToCollection(eFirstType);
672 : }
673 : else if (PromoteToCollection(
674 : OGR_GT_Flatten(eThisGeom)) !=
675 : eFirstTypeCollection)
676 : {
677 : bGeomTypeInvalid = true;
678 : break;
679 : }
680 : bHasZ |= OGR_GT_HasZ(eThisGeom) != FALSE;
681 : bHasM |= OGR_GT_HasM(eThisGeom) != FALSE;
682 : bHasMulti |= (PromoteToCollection(
683 : OGR_GT_Flatten(eThisGeom)) ==
684 : OGR_GT_Flatten(eThisGeom));
685 : }
686 : }
687 : }
688 : else
689 : {
690 : bEnvelopeValid = false;
691 : bEnvelope3DValid = false;
692 : bGeomTypeInvalid = true;
693 : }
694 : }
695 :
696 : if (bEnvelopeValid && sEnvelope.IsInit())
697 : {
698 : CPLDebug("PARQUET", "Got bounding box from geo_statistics");
699 : m_geoStatsWithBBOXAvailable.insert(
700 : m_poFeatureDefn->GetGeomFieldCount());
701 : m_oMapExtents[m_poFeatureDefn->GetGeomFieldCount()] =
702 : std::move(sEnvelope);
703 :
704 : if (bEnvelope3DValid && sEnvelope3D.IsInit())
705 : {
706 : CPLDebug("PARQUET",
707 : "Got bounding box 3D from geo_statistics");
708 : m_oMapExtents3D[m_poFeatureDefn->GetGeomFieldCount()] =
709 : std::move(sEnvelope3D);
710 : }
711 : }
712 :
713 : if (!bSkipRowGroups && !bGeomTypeInvalid)
714 : {
715 : if (eFirstTypeCollection == wkbMultiPoint ||
716 : eFirstTypeCollection == wkbMultiPolygon ||
717 : eFirstTypeCollection == wkbMultiLineString)
718 : {
719 : if (bHasMulti)
720 : eGeomType = OGR_GT_SetModifier(eFirstTypeCollection,
721 : bHasZ, bHasM);
722 : else
723 : eGeomType =
724 : OGR_GT_SetModifier(eFirstType, bHasZ, bHasM);
725 : }
726 : }
727 :
728 : OGRGeomFieldDefn oField(field->name().c_str(), eGeomType);
729 : oField.SetNullable(field->nullable());
730 :
731 : if (!crs.empty())
732 : {
733 : // Cf https://github.com/apache/parquet-format/blob/master/Geospatial.md#crs-customization
734 : // "srid: Spatial reference identifier, identifier is the SRID itself.."
735 : constexpr const char *SRID_PREFIX = "srid:";
736 : if (cpl::starts_with(crs, SRID_PREFIX))
737 : {
738 : // When getting the value from the GeometryLogicalType::crs() method
739 : crs = crs.substr(strlen(SRID_PREFIX));
740 : }
741 : if (CPLGetValueType(crs.c_str()) == CPL_VALUE_INTEGER)
742 : {
743 : // Getting here from above if, or if reading the ArrowWkb
744 : // metadata directly (typically from a OGRParquetDatasetLayer)
745 :
746 : // Assumes a SRID code is an EPSG code...
747 : crs = std::string("EPSG:") + crs;
748 : }
749 :
750 : auto poSRS = new OGRSpatialReference();
751 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
752 : if (poSRS->SetFromUserInput(
753 : crs.c_str(),
754 : OGRSpatialReference::
755 : SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
756 : OGRERR_NONE)
757 : {
758 : const char *pszAuthName =
759 : poSRS->GetAuthorityName(nullptr);
760 : const char *pszAuthCode =
761 : poSRS->GetAuthorityCode(nullptr);
762 : if (pszAuthName && pszAuthCode &&
763 : EQUAL(pszAuthName, "OGC") &&
764 : EQUAL(pszAuthCode, "CRS84"))
765 : poSRS->importFromEPSG(4326);
766 : oField.SetSpatialRef(poSRS);
767 : }
768 : poSRS->Release();
769 : }
770 :
771 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
772 : m_anMapGeomFieldIndexToArrowColumn.push_back(iFieldIdx);
773 : }
774 : }
775 : }
776 : #endif
777 :
778 : // Try to autodetect a (WKB) geometry column from the GEOM_POSSIBLE_NAMES
779 : // open option
780 61811 : if (bRegularField && osExtensionName.empty() &&
781 93837 : m_oMapGeometryColumns.empty() &&
782 245 : m_aosGeomPossibleNames.FindString(field->name().c_str()) >= 0)
783 : {
784 11 : if (fieldTypeId == arrow::Type::BINARY ||
785 : fieldTypeId == arrow::Type::LARGE_BINARY)
786 : {
787 5 : CPLDebug("PARQUET",
788 : "Field %s detected as likely WKB geometry field",
789 5 : field->name().c_str());
790 5 : bRegularField = false;
791 5 : m_aeGeomEncoding.push_back(OGRArrowGeomEncoding::WKB);
792 : }
793 0 : else if ((fieldTypeId == arrow::Type::STRING ||
794 16 : fieldTypeId == arrow::Type::LARGE_STRING) &&
795 10 : (field->name().find("wkt") != std::string::npos ||
796 4 : field->name().find("WKT") != std::string::npos))
797 : {
798 2 : CPLDebug("PARQUET",
799 : "Field %s detected as likely WKT geometry field",
800 2 : field->name().c_str());
801 2 : bRegularField = false;
802 2 : m_aeGeomEncoding.push_back(OGRArrowGeomEncoding::WKT);
803 : }
804 11 : if (!bRegularField)
805 : {
806 14 : OGRGeomFieldDefn oField(field->name().c_str(), wkbUnknown);
807 7 : oField.SetNullable(field->nullable());
808 :
809 7 : if (!m_osCRS.empty())
810 : {
811 2 : auto poSRS = new OGRSpatialReference();
812 2 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
813 2 : if (poSRS->SetFromUserInput(
814 : m_osCRS.c_str(),
815 : OGRSpatialReference::
816 2 : SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
817 : OGRERR_NONE)
818 : {
819 2 : oField.SetSpatialRef(poSRS);
820 : }
821 2 : poSRS->Release();
822 : }
823 :
824 7 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
825 7 : m_anMapGeomFieldIndexToArrowColumn.push_back(iFieldIdx);
826 : }
827 : }
828 :
829 64052 : return !bRegularField;
830 : }
831 :
832 : /************************************************************************/
833 : /* TestCapability() */
834 : /************************************************************************/
835 :
836 739 : int OGRParquetLayerBase::TestCapability(const char *pszCap) const
837 : {
838 739 : if (EQUAL(pszCap, OLCMeasuredGeometries))
839 32 : return true;
840 :
841 707 : if (EQUAL(pszCap, OLCFastSetNextByIndex))
842 0 : return true;
843 :
844 707 : if (EQUAL(pszCap, OLCFastSpatialFilter))
845 : {
846 49 : if (m_oMapGeomFieldIndexToGeomColBBOX.find(m_iGeomFieldFilter) !=
847 98 : m_oMapGeomFieldIndexToGeomColBBOX.end())
848 : {
849 25 : return true;
850 : }
851 24 : return false;
852 : }
853 :
854 658 : return OGRArrowLayer::TestCapability(pszCap);
855 : }
856 :
857 : /************************************************************************/
858 : /* GetNumCPUs() */
859 : /************************************************************************/
860 :
861 : /* static */
862 1589 : int OGRParquetLayerBase::GetNumCPUs()
863 : {
864 1589 : const char *pszNumThreads = CPLGetConfigOption("GDAL_NUM_THREADS", nullptr);
865 1589 : int nNumThreads = 0;
866 1589 : if (pszNumThreads == nullptr)
867 1589 : nNumThreads = std::min(4, CPLGetNumCPUs());
868 : else
869 0 : nNumThreads = EQUAL(pszNumThreads, "ALL_CPUS") ? CPLGetNumCPUs()
870 0 : : atoi(pszNumThreads);
871 1589 : if (nNumThreads > 1)
872 : {
873 1589 : CPL_IGNORE_RET_VAL(arrow::SetCpuThreadPoolCapacity(nNumThreads));
874 : }
875 1589 : return nNumThreads;
876 : }
877 :
878 : /************************************************************************/
879 : /* OGRParquetLayer() */
880 : /************************************************************************/
881 :
882 893 : OGRParquetLayer::OGRParquetLayer(
883 : OGRParquetDataset *poDS, const char *pszLayerName,
884 : std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
885 893 : CSLConstList papszOpenOptions)
886 : : OGRParquetLayerBase(poDS, pszLayerName, papszOpenOptions),
887 893 : m_poArrowReader(std::move(arrow_reader))
888 : {
889 893 : EstablishFeatureDefn();
890 893 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
891 : m_poFeatureDefn->GetGeomFieldCount());
892 :
893 893 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
894 893 : }
895 :
896 : /************************************************************************/
897 : /* EstablishFeatureDefn() */
898 : /************************************************************************/
899 :
900 893 : void OGRParquetLayer::EstablishFeatureDefn()
901 : {
902 893 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
903 893 : const auto &kv_metadata = metadata->key_value_metadata();
904 :
905 893 : LoadGeoMetadata(kv_metadata);
906 : const auto oMapFieldNameToGDALSchemaFieldDefn =
907 893 : LoadGDALSchema(kv_metadata.get());
908 :
909 893 : LoadGDALMetadata(kv_metadata.get());
910 :
911 893 : if (kv_metadata && kv_metadata->Contains("gdal:creation-options"))
912 : {
913 796 : auto co = kv_metadata->Get("gdal:creation-options");
914 398 : if (co.ok())
915 : {
916 398 : CPLDebugOnly("PARQUET", "gdal:creation-options = %s", co->c_str());
917 796 : CPLJSONDocument oDoc;
918 398 : if (oDoc.LoadMemory(*co))
919 : {
920 796 : auto oRoot = oDoc.GetRoot();
921 398 : if (oRoot.GetType() == CPLJSONObject::Type::Object)
922 : {
923 1164 : for (const auto &oChild : oRoot.GetChildren())
924 : {
925 766 : if (oChild.GetType() == CPLJSONObject::Type::String)
926 : {
927 : m_aosCreationOptions.SetNameValue(
928 1532 : oChild.GetName().c_str(),
929 2298 : oChild.ToString().c_str());
930 : }
931 : }
932 : }
933 : }
934 : }
935 : }
936 :
937 893 : if (!m_poArrowReader->GetSchema(&m_poSchema).ok())
938 : {
939 0 : return;
940 : }
941 :
942 : const bool bUseBBOX =
943 893 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_BBOX", "YES"));
944 :
945 : // Keep track of declared bounding box columns in GeoParquet JSON metadata,
946 : // in order not to expose them as regular fields.
947 1786 : std::set<std::string> oSetBBOXColumns;
948 893 : if (bUseBBOX)
949 : {
950 1747 : for (const auto &iter : m_oMapGeometryColumns)
951 : {
952 1718 : std::string osBBOXColumn;
953 1718 : std::string osXMin, osYMin, osXMax, osYMax;
954 859 : if (ParseGeometryColumnCovering(iter.second, osBBOXColumn, osXMin,
955 : osYMin, osXMax, osYMax))
956 : {
957 364 : oSetBBOXColumns.insert(std::move(osBBOXColumn));
958 : }
959 : }
960 : }
961 :
962 893 : const auto &fields = m_poSchema->fields();
963 893 : const auto poParquetSchema = metadata->schema();
964 :
965 : // Map from Parquet column name (with dot separator) to Parquet index
966 1786 : std::map<std::string, int> oMapParquetColumnNameToIdx;
967 893 : const int nParquetColumns = poParquetSchema->num_columns();
968 35539 : for (int iParquetCol = 0; iParquetCol < nParquetColumns; ++iParquetCol)
969 : {
970 34646 : const auto parquetColumn = poParquetSchema->Column(iParquetCol);
971 34646 : const auto parquetColumnName = parquetColumn->path()->ToDotString();
972 34646 : oMapParquetColumnNameToIdx[parquetColumnName] = iParquetCol;
973 : }
974 :
975 : // Synthetize a GeoParquet bounding box column definition when detecting
976 : // a Overture Map dataset < 2024-04-16-beta.0
977 846 : if ((m_oMapGeometryColumns.empty() ||
978 : // Below is for release 2024-01-17-alpha.0
979 1739 : (m_oMapGeometryColumns.find("geometry") !=
980 1739 : m_oMapGeometryColumns.end() &&
981 2221 : !m_oMapGeometryColumns["geometry"].GetObj("covering").IsValid() &&
982 1746 : m_oMapGeometryColumns["geometry"].GetString("encoding") == "WKB")) &&
983 356 : bUseBBOX &&
984 1249 : oMapParquetColumnNameToIdx.find("geometry") !=
985 1566 : oMapParquetColumnNameToIdx.end() &&
986 1210 : oMapParquetColumnNameToIdx.find("bbox.minx") !=
987 1211 : oMapParquetColumnNameToIdx.end() &&
988 894 : oMapParquetColumnNameToIdx.find("bbox.miny") !=
989 895 : oMapParquetColumnNameToIdx.end() &&
990 894 : oMapParquetColumnNameToIdx.find("bbox.maxx") !=
991 2680 : oMapParquetColumnNameToIdx.end() &&
992 894 : oMapParquetColumnNameToIdx.find("bbox.maxy") !=
993 894 : oMapParquetColumnNameToIdx.end())
994 : {
995 2 : CPLJSONObject oDef;
996 1 : if (m_oMapGeometryColumns.find("geometry") !=
997 2 : m_oMapGeometryColumns.end())
998 : {
999 0 : oDef = m_oMapGeometryColumns["geometry"];
1000 : }
1001 2 : CPLJSONObject oCovering;
1002 1 : oDef.Add("covering", oCovering);
1003 1 : CPLJSONObject oBBOX;
1004 1 : oCovering.Add("bbox", oBBOX);
1005 : {
1006 1 : CPLJSONArray oArray;
1007 1 : oArray.Add("bbox");
1008 1 : oArray.Add("minx");
1009 1 : oBBOX.Add("xmin", oArray);
1010 : }
1011 : {
1012 1 : CPLJSONArray oArray;
1013 1 : oArray.Add("bbox");
1014 1 : oArray.Add("miny");
1015 1 : oBBOX.Add("ymin", oArray);
1016 : }
1017 : {
1018 1 : CPLJSONArray oArray;
1019 1 : oArray.Add("bbox");
1020 1 : oArray.Add("maxx");
1021 1 : oBBOX.Add("xmax", oArray);
1022 : }
1023 : {
1024 1 : CPLJSONArray oArray;
1025 1 : oArray.Add("bbox");
1026 1 : oArray.Add("maxy");
1027 1 : oBBOX.Add("ymax", oArray);
1028 : }
1029 1 : oSetBBOXColumns.insert("bbox");
1030 1 : oDef.Add("encoding", "WKB");
1031 1 : m_oMapGeometryColumns["geometry"] = std::move(oDef);
1032 : }
1033 : // Overture Maps 2024-04-16-beta.0 almost follows GeoParquet 1.1, except
1034 : // they don't declare the "covering" element in the GeoParquet JSON metadata
1035 1784 : else if (m_oMapGeometryColumns.find("geometry") !=
1036 1725 : m_oMapGeometryColumns.end() &&
1037 1658 : bUseBBOX &&
1038 2215 : !m_oMapGeometryColumns["geometry"].GetObj("covering").IsValid() &&
1039 1696 : m_oMapGeometryColumns["geometry"].GetString("encoding") == "WKB" &&
1040 1202 : oMapParquetColumnNameToIdx.find("geometry") !=
1041 1512 : oMapParquetColumnNameToIdx.end() &&
1042 1202 : oMapParquetColumnNameToIdx.find("bbox.xmin") !=
1043 1203 : oMapParquetColumnNameToIdx.end() &&
1044 893 : oMapParquetColumnNameToIdx.find("bbox.ymin") !=
1045 894 : oMapParquetColumnNameToIdx.end() &&
1046 893 : oMapParquetColumnNameToIdx.find("bbox.xmax") !=
1047 3510 : oMapParquetColumnNameToIdx.end() &&
1048 893 : oMapParquetColumnNameToIdx.find("bbox.ymax") !=
1049 893 : oMapParquetColumnNameToIdx.end())
1050 : {
1051 3 : CPLJSONObject oDef = m_oMapGeometryColumns["geometry"];
1052 2 : CPLJSONObject oCovering;
1053 1 : oDef.Add("covering", oCovering);
1054 1 : CPLJSONObject oBBOX;
1055 1 : oCovering.Add("bbox", oBBOX);
1056 : {
1057 1 : CPLJSONArray oArray;
1058 1 : oArray.Add("bbox");
1059 1 : oArray.Add("xmin");
1060 1 : oBBOX.Add("xmin", oArray);
1061 : }
1062 : {
1063 1 : CPLJSONArray oArray;
1064 1 : oArray.Add("bbox");
1065 1 : oArray.Add("ymin");
1066 1 : oBBOX.Add("ymin", oArray);
1067 : }
1068 : {
1069 1 : CPLJSONArray oArray;
1070 1 : oArray.Add("bbox");
1071 1 : oArray.Add("xmax");
1072 1 : oBBOX.Add("xmax", oArray);
1073 : }
1074 : {
1075 1 : CPLJSONArray oArray;
1076 1 : oArray.Add("bbox");
1077 1 : oArray.Add("ymax");
1078 1 : oBBOX.Add("ymax", oArray);
1079 : }
1080 1 : oSetBBOXColumns.insert("bbox");
1081 1 : m_oMapGeometryColumns["geometry"] = std::move(oDef);
1082 : }
1083 :
1084 893 : int iParquetCol = 0;
1085 26497 : for (int i = 0; i < m_poSchema->num_fields(); ++i)
1086 : {
1087 25604 : const auto &field = fields[i];
1088 :
1089 : bool bParquetColValid =
1090 25604 : CheckMatchArrowParquetColumnNames(iParquetCol, field);
1091 25604 : if (!bParquetColValid)
1092 0 : m_bHasMissingMappingToParquet = true;
1093 :
1094 25646 : if (!m_osFIDColumn.empty() && field->name() == m_osFIDColumn &&
1095 42 : (field->type()->id() == arrow::Type::INT32 ||
1096 21 : field->type()->id() == arrow::Type::INT64))
1097 : {
1098 21 : m_poFIDType = field->type();
1099 21 : m_iFIDArrowColumn = i;
1100 21 : if (bParquetColValid)
1101 : {
1102 21 : m_iFIDParquetColumn = iParquetCol;
1103 21 : iParquetCol++;
1104 : }
1105 387 : continue;
1106 : }
1107 :
1108 25583 : if (oSetBBOXColumns.find(field->name()) != oSetBBOXColumns.end())
1109 : {
1110 366 : m_oSetBBoxArrowColumns.insert(i);
1111 366 : if (bParquetColValid)
1112 366 : iParquetCol++;
1113 366 : continue;
1114 : }
1115 :
1116 : const auto ComputeGeometryColumnTypeLambda =
1117 888 : [this, bParquetColValid, iParquetCol, &poParquetSchema]()
1118 : {
1119 : // only with GeoParquet < 0.2.0
1120 592 : if (bParquetColValid &&
1121 296 : poParquetSchema->Column(iParquetCol)->physical_type() ==
1122 : parquet::Type::BYTE_ARRAY)
1123 : {
1124 296 : return ComputeGeometryColumnType(
1125 592 : m_poFeatureDefn->GetGeomFieldCount(), iParquetCol);
1126 : }
1127 0 : return wkbUnknown;
1128 25217 : };
1129 :
1130 75651 : const bool bGeometryField = DealWithGeometryColumn(
1131 : i, field, ComputeGeometryColumnTypeLambda,
1132 25217 : bParquetColValid ? poParquetSchema->Column(iParquetCol) : nullptr,
1133 25217 : metadata.get(), bParquetColValid ? iParquetCol : -1);
1134 25217 : if (bGeometryField)
1135 : {
1136 871 : const auto oIter = m_oMapGeometryColumns.find(field->name());
1137 871 : if (bUseBBOX && oIter != m_oMapGeometryColumns.end())
1138 : {
1139 859 : ProcessGeometryColumnCovering(field, oIter->second,
1140 : oMapParquetColumnNameToIdx);
1141 : }
1142 :
1143 2573 : if (bParquetColValid &&
1144 1702 : (field->type()->id() == arrow::Type::STRUCT ||
1145 831 : field->type()->id() == arrow::Type::LIST))
1146 : {
1147 : // GeoArrow types
1148 352 : std::vector<int> anParquetCols;
1149 2160 : for (const auto &iterParquetCols : oMapParquetColumnNameToIdx)
1150 : {
1151 1808 : if (STARTS_WITH(
1152 : iterParquetCols.first.c_str(),
1153 : std::string(field->name()).append(".").c_str()))
1154 : {
1155 752 : iParquetCol =
1156 752 : std::max(iParquetCol, iterParquetCols.second);
1157 752 : anParquetCols.push_back(iterParquetCols.second);
1158 : }
1159 : }
1160 352 : m_anMapGeomFieldIndexToParquetColumns.push_back(
1161 352 : std::move(anParquetCols));
1162 352 : ++iParquetCol;
1163 : }
1164 : else
1165 : {
1166 519 : m_anMapGeomFieldIndexToParquetColumns.push_back(
1167 519 : {bParquetColValid ? iParquetCol : -1});
1168 519 : if (bParquetColValid)
1169 519 : iParquetCol++;
1170 : }
1171 : }
1172 : else
1173 : {
1174 24346 : CreateFieldFromSchema(field, bParquetColValid, iParquetCol, {i},
1175 : oMapFieldNameToGDALSchemaFieldDefn);
1176 : }
1177 : }
1178 :
1179 893 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
1180 : m_poFeatureDefn->GetFieldCount());
1181 893 : CPLAssert(static_cast<int>(m_anMapFieldIndexToParquetColumn.size()) ==
1182 : m_poFeatureDefn->GetFieldCount());
1183 893 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
1184 : m_poFeatureDefn->GetGeomFieldCount());
1185 893 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToParquetColumns.size()) ==
1186 : m_poFeatureDefn->GetGeomFieldCount());
1187 :
1188 893 : if (!fields.empty())
1189 : {
1190 : try
1191 : {
1192 1732 : auto poRowGroup = m_poArrowReader->parquet_reader()->RowGroup(0);
1193 840 : if (poRowGroup)
1194 : {
1195 1680 : auto poColumn = poRowGroup->metadata()->ColumnChunk(0);
1196 840 : CPLDebug("PARQUET", "Compression (of first column): %s",
1197 : arrow::util::Codec::GetCodecAsString(
1198 840 : poColumn->compression())
1199 : .c_str());
1200 : }
1201 : }
1202 52 : catch (const std::exception &)
1203 : {
1204 : }
1205 : }
1206 : }
1207 :
1208 : /************************************************************************/
1209 : /* ProcessGeometryColumnCovering() */
1210 : /************************************************************************/
1211 :
1212 : /** Process GeoParquet JSON geometry field object to extract information about
1213 : * its bounding box column, and appropriately fill m_oMapGeomFieldIndexToGeomColBBOX
1214 : * and m_oMapGeomFieldIndexToGeomColBBOXParquet members with information on that
1215 : * bounding box column.
1216 : */
1217 859 : void OGRParquetLayer::ProcessGeometryColumnCovering(
1218 : const std::shared_ptr<arrow::Field> &field,
1219 : const CPLJSONObject &oJSONGeometryColumn,
1220 : const std::map<std::string, int> &oMapParquetColumnNameToIdx)
1221 : {
1222 1718 : std::string osBBOXColumn;
1223 1718 : std::string osXMin, osYMin, osXMax, osYMax;
1224 859 : if (ParseGeometryColumnCovering(oJSONGeometryColumn, osBBOXColumn, osXMin,
1225 : osYMin, osXMax, osYMax))
1226 : {
1227 366 : OGRArrowLayer::GeomColBBOX sDesc;
1228 366 : sDesc.iArrowCol = m_poSchema->GetFieldIndex(osBBOXColumn);
1229 732 : const auto fieldBBOX = m_poSchema->GetFieldByName(osBBOXColumn);
1230 732 : if (sDesc.iArrowCol >= 0 && fieldBBOX &&
1231 366 : fieldBBOX->type()->id() == arrow::Type::STRUCT)
1232 : {
1233 : const auto fieldBBOXStruct =
1234 732 : std::static_pointer_cast<arrow::StructType>(fieldBBOX->type());
1235 732 : const auto fieldXMin = fieldBBOXStruct->GetFieldByName(osXMin);
1236 732 : const auto fieldYMin = fieldBBOXStruct->GetFieldByName(osYMin);
1237 732 : const auto fieldXMax = fieldBBOXStruct->GetFieldByName(osXMax);
1238 732 : const auto fieldYMax = fieldBBOXStruct->GetFieldByName(osYMax);
1239 366 : const int nXMinIdx = fieldBBOXStruct->GetFieldIndex(osXMin);
1240 366 : const int nYMinIdx = fieldBBOXStruct->GetFieldIndex(osYMin);
1241 366 : const int nXMaxIdx = fieldBBOXStruct->GetFieldIndex(osXMax);
1242 366 : const int nYMaxIdx = fieldBBOXStruct->GetFieldIndex(osYMax);
1243 : const auto oIterParquetIdxXMin = oMapParquetColumnNameToIdx.find(
1244 366 : std::string(osBBOXColumn).append(".").append(osXMin));
1245 : const auto oIterParquetIdxYMin = oMapParquetColumnNameToIdx.find(
1246 366 : std::string(osBBOXColumn).append(".").append(osYMin));
1247 : const auto oIterParquetIdxXMax = oMapParquetColumnNameToIdx.find(
1248 366 : std::string(osBBOXColumn).append(".").append(osXMax));
1249 : const auto oIterParquetIdxYMax = oMapParquetColumnNameToIdx.find(
1250 366 : std::string(osBBOXColumn).append(".").append(osYMax));
1251 366 : if (nXMinIdx >= 0 && nYMinIdx >= 0 && nXMaxIdx >= 0 &&
1252 732 : nYMaxIdx >= 0 && fieldXMin && fieldYMin && fieldXMax &&
1253 732 : fieldYMax &&
1254 732 : oIterParquetIdxXMin != oMapParquetColumnNameToIdx.end() &&
1255 732 : oIterParquetIdxYMin != oMapParquetColumnNameToIdx.end() &&
1256 732 : oIterParquetIdxXMax != oMapParquetColumnNameToIdx.end() &&
1257 732 : oIterParquetIdxYMax != oMapParquetColumnNameToIdx.end() &&
1258 367 : (fieldXMin->type()->id() == arrow::Type::FLOAT ||
1259 1 : fieldXMin->type()->id() == arrow::Type::DOUBLE) &&
1260 366 : fieldXMin->type()->id() == fieldYMin->type()->id() &&
1261 1098 : fieldXMin->type()->id() == fieldXMax->type()->id() &&
1262 366 : fieldXMin->type()->id() == fieldYMax->type()->id())
1263 : {
1264 366 : CPLDebug("PARQUET",
1265 : "Bounding box column '%s' detected for "
1266 : "geometry column '%s'",
1267 366 : osBBOXColumn.c_str(), field->name().c_str());
1268 366 : sDesc.iArrowSubfieldXMin = nXMinIdx;
1269 366 : sDesc.iArrowSubfieldYMin = nYMinIdx;
1270 366 : sDesc.iArrowSubfieldXMax = nXMaxIdx;
1271 366 : sDesc.iArrowSubfieldYMax = nYMaxIdx;
1272 366 : sDesc.bIsFloat =
1273 366 : (fieldXMin->type()->id() == arrow::Type::FLOAT);
1274 :
1275 : m_oMapGeomFieldIndexToGeomColBBOX
1276 366 : [m_poFeatureDefn->GetGeomFieldCount() - 1] =
1277 366 : std::move(sDesc);
1278 :
1279 366 : GeomColBBOXParquet sDescParquet;
1280 366 : sDescParquet.iParquetXMin = oIterParquetIdxXMin->second;
1281 366 : sDescParquet.iParquetYMin = oIterParquetIdxYMin->second;
1282 366 : sDescParquet.iParquetXMax = oIterParquetIdxXMax->second;
1283 366 : sDescParquet.iParquetYMax = oIterParquetIdxYMax->second;
1284 3933 : for (const auto &iterParquetCols : oMapParquetColumnNameToIdx)
1285 : {
1286 3567 : if (STARTS_WITH(
1287 : iterParquetCols.first.c_str(),
1288 : std::string(osBBOXColumn).append(".").c_str()))
1289 : {
1290 1464 : sDescParquet.anParquetCols.push_back(
1291 1464 : iterParquetCols.second);
1292 : }
1293 : }
1294 : m_oMapGeomFieldIndexToGeomColBBOXParquet
1295 732 : [m_poFeatureDefn->GetGeomFieldCount() - 1] =
1296 732 : std::move(sDescParquet);
1297 : }
1298 : }
1299 : }
1300 859 : }
1301 :
1302 : /************************************************************************/
1303 : /* CheckMatchArrowParquetColumnNames() */
1304 : /************************************************************************/
1305 :
1306 27799 : bool OGRParquetLayer::CheckMatchArrowParquetColumnNames(
1307 : int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const
1308 : {
1309 55598 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
1310 27799 : const auto poParquetSchema = metadata->schema();
1311 27799 : const int nParquetColumns = poParquetSchema->num_columns();
1312 27799 : const auto &fieldName = field->name();
1313 27799 : const int iParquetColBefore = iParquetCol;
1314 :
1315 28490 : while (iParquetCol < nParquetColumns)
1316 : {
1317 28490 : const auto parquetColumn = poParquetSchema->Column(iParquetCol);
1318 28490 : const auto parquetColumnName = parquetColumn->path()->ToDotString();
1319 60598 : if (fieldName == parquetColumnName ||
1320 16054 : (parquetColumnName.size() > fieldName.size() &&
1321 16054 : STARTS_WITH(parquetColumnName.c_str(), fieldName.c_str()) &&
1322 15363 : parquetColumnName[fieldName.size()] == '.'))
1323 : {
1324 27799 : return true;
1325 : }
1326 : else
1327 : {
1328 691 : iParquetCol++;
1329 : }
1330 : }
1331 :
1332 0 : CPLError(CE_Warning, CPLE_AppDefined,
1333 : "Cannot match Arrow column name %s with a Parquet one",
1334 : fieldName.c_str());
1335 0 : iParquetCol = iParquetColBefore;
1336 0 : return false;
1337 : }
1338 :
1339 : /************************************************************************/
1340 : /* CreateFieldFromSchema() */
1341 : /************************************************************************/
1342 :
1343 26541 : void OGRParquetLayer::CreateFieldFromSchema(
1344 : const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
1345 : int &iParquetCol, const std::vector<int> &path,
1346 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
1347 : &oMapFieldNameToGDALSchemaFieldDefn)
1348 : {
1349 26541 : OGRFieldDefn oField(field->name().c_str(), OFTString);
1350 26541 : OGRFieldType eType = OFTString;
1351 26541 : OGRFieldSubType eSubType = OFSTNone;
1352 26541 : bool bTypeOK = true;
1353 :
1354 26541 : auto type = field->type();
1355 26541 : if (type->id() == arrow::Type::DICTIONARY && path.size() == 1)
1356 : {
1357 : const auto dictionaryType =
1358 596 : std::static_pointer_cast<arrow::DictionaryType>(field->type());
1359 596 : auto indexType = dictionaryType->index_type();
1360 596 : if (dictionaryType->value_type()->id() == arrow::Type::STRING &&
1361 298 : IsIntegerArrowType(indexType->id()))
1362 : {
1363 298 : if (bParquetColValid)
1364 : {
1365 596 : std::string osDomainName(field->name() + "Domain");
1366 298 : m_poDS->RegisterDomainName(osDomainName,
1367 298 : m_poFeatureDefn->GetFieldCount());
1368 298 : oField.SetDomainName(osDomainName);
1369 : }
1370 298 : type = std::move(indexType);
1371 : }
1372 : else
1373 : {
1374 0 : bTypeOK = false;
1375 : }
1376 : }
1377 :
1378 26541 : int nParquetColIncrement = 1;
1379 26541 : switch (type->id())
1380 : {
1381 624 : case arrow::Type::STRUCT:
1382 : {
1383 1248 : const auto subfields = field->Flatten();
1384 1248 : auto newpath = path;
1385 624 : newpath.push_back(0);
1386 2819 : for (int j = 0; j < static_cast<int>(subfields.size()); j++)
1387 : {
1388 2195 : const auto &subfield = subfields[j];
1389 : bParquetColValid =
1390 2195 : CheckMatchArrowParquetColumnNames(iParquetCol, subfield);
1391 2195 : if (!bParquetColValid)
1392 0 : m_bHasMissingMappingToParquet = true;
1393 2195 : newpath.back() = j;
1394 2195 : CreateFieldFromSchema(subfield, bParquetColValid, iParquetCol,
1395 : newpath,
1396 : oMapFieldNameToGDALSchemaFieldDefn);
1397 : }
1398 624 : return; // return intended, not break
1399 : }
1400 :
1401 5335 : case arrow::Type::MAP:
1402 : {
1403 : // A arrow map maps to 2 Parquet columns
1404 5335 : nParquetColIncrement = 2;
1405 5335 : break;
1406 : }
1407 :
1408 20582 : default:
1409 20582 : break;
1410 : }
1411 :
1412 25917 : if (bTypeOK)
1413 : {
1414 25917 : bTypeOK = MapArrowTypeToOGR(type, field, oField, eType, eSubType, path,
1415 : oMapFieldNameToGDALSchemaFieldDefn);
1416 25917 : if (bTypeOK)
1417 : {
1418 25628 : m_apoArrowDataTypes.push_back(std::move(type));
1419 51256 : m_anMapFieldIndexToParquetColumn.push_back(
1420 25628 : bParquetColValid ? iParquetCol : -1);
1421 : }
1422 : }
1423 :
1424 25917 : if (bParquetColValid)
1425 25917 : iParquetCol += nParquetColIncrement;
1426 : }
1427 :
1428 : /************************************************************************/
1429 : /* BuildDomain() */
1430 : /************************************************************************/
1431 :
1432 : std::unique_ptr<OGRFieldDomain>
1433 16 : OGRParquetLayer::BuildDomain(const std::string &osDomainName,
1434 : int iFieldIndex) const
1435 : {
1436 : #ifdef DEBUG
1437 16 : const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
1438 : (void)iArrowCol;
1439 16 : CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
1440 : arrow::Type::DICTIONARY);
1441 : #endif
1442 16 : const int iParquetCol = m_anMapFieldIndexToParquetColumn[iFieldIndex];
1443 16 : CPLAssert(iParquetCol >= 0);
1444 16 : const auto oldBatchSize = m_poArrowReader->properties().batch_size();
1445 16 : m_poArrowReader->set_batch_size(1);
1446 : #if PARQUET_VERSION_MAJOR >= 21
1447 : std::unique_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1448 : auto result = m_poArrowReader->GetRecordBatchReader({0}, {iParquetCol});
1449 : if (result.ok())
1450 : poRecordBatchReader = std::move(*result);
1451 : #else
1452 16 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1453 16 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1454 : {0}, {iParquetCol}, &poRecordBatchReader));
1455 : #endif
1456 16 : if (poRecordBatchReader != nullptr)
1457 : {
1458 0 : std::shared_ptr<arrow::RecordBatch> poBatch;
1459 16 : auto status = poRecordBatchReader->ReadNext(&poBatch);
1460 16 : if (!status.ok())
1461 : {
1462 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1463 0 : status.message().c_str());
1464 : }
1465 16 : else if (poBatch)
1466 : {
1467 16 : m_poArrowReader->set_batch_size(oldBatchSize);
1468 16 : return BuildDomainFromBatch(osDomainName, poBatch, 0);
1469 : }
1470 : }
1471 0 : m_poArrowReader->set_batch_size(oldBatchSize);
1472 0 : return nullptr;
1473 : }
1474 :
1475 : /************************************************************************/
1476 : /* ComputeGeometryColumnType() */
1477 : /************************************************************************/
1478 :
1479 : OGRwkbGeometryType
1480 296 : OGRParquetLayer::ComputeGeometryColumnType(int iGeomCol, int iParquetCol) const
1481 : {
1482 : // Compute type of geometry column by iterating over each geometry, and
1483 : // looking at the WKB geometry type in the first 5 bytes of each geometry.
1484 :
1485 296 : OGRwkbGeometryType eGeomType = wkbNone;
1486 :
1487 592 : std::vector<int> anRowGroups;
1488 296 : const int nNumGroups = m_poArrowReader->num_row_groups();
1489 296 : anRowGroups.reserve(nNumGroups);
1490 881 : for (int i = 0; i < nNumGroups; ++i)
1491 585 : anRowGroups.push_back(i);
1492 : #if PARQUET_VERSION_MAJOR >= 21
1493 : std::unique_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1494 : auto result =
1495 : m_poArrowReader->GetRecordBatchReader(anRowGroups, {iParquetCol});
1496 : if (result.ok())
1497 : poRecordBatchReader = std::move(*result);
1498 : #else
1499 0 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1500 296 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1501 : anRowGroups, {iParquetCol}, &poRecordBatchReader));
1502 : #endif
1503 296 : if (poRecordBatchReader != nullptr)
1504 : {
1505 592 : std::shared_ptr<arrow::RecordBatch> poBatch;
1506 : while (true)
1507 : {
1508 594 : auto status = poRecordBatchReader->ReadNext(&poBatch);
1509 594 : if (!status.ok())
1510 : {
1511 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1512 0 : status.message().c_str());
1513 0 : break;
1514 : }
1515 594 : else if (!poBatch)
1516 294 : break;
1517 :
1518 300 : eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
1519 : 0, eGeomType);
1520 300 : if (eGeomType == wkbUnknown)
1521 2 : break;
1522 298 : }
1523 : }
1524 :
1525 592 : return eGeomType == wkbNone ? wkbUnknown : eGeomType;
1526 : }
1527 :
1528 : /************************************************************************/
1529 : /* GetFeatureExplicitFID() */
1530 : /************************************************************************/
1531 :
1532 4 : OGRFeature *OGRParquetLayer::GetFeatureExplicitFID(GIntBig nFID)
1533 : {
1534 8 : std::vector<int> anRowGroups;
1535 4 : const int nNumGroups = m_poArrowReader->num_row_groups();
1536 4 : anRowGroups.reserve(nNumGroups);
1537 16 : for (int i = 0; i < nNumGroups; ++i)
1538 12 : anRowGroups.push_back(i);
1539 : #if PARQUET_VERSION_MAJOR >= 21
1540 : std::unique_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1541 : auto result = m_bIgnoredFields
1542 : ? m_poArrowReader->GetRecordBatchReader(
1543 : anRowGroups, m_anRequestedParquetColumns)
1544 : : m_poArrowReader->GetRecordBatchReader(anRowGroups);
1545 : if (result.ok())
1546 : {
1547 : poRecordBatchReader = std::move(*result);
1548 : }
1549 : #else
1550 4 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1551 4 : if (m_bIgnoredFields)
1552 : {
1553 4 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1554 2 : anRowGroups, m_anRequestedParquetColumns, &poRecordBatchReader));
1555 : }
1556 : else
1557 : {
1558 2 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1559 : anRowGroups, &poRecordBatchReader));
1560 : }
1561 : #endif
1562 4 : if (poRecordBatchReader != nullptr)
1563 : {
1564 4 : std::shared_ptr<arrow::RecordBatch> poBatch;
1565 : while (true)
1566 : {
1567 14 : auto status = poRecordBatchReader->ReadNext(&poBatch);
1568 14 : if (!status.ok())
1569 : {
1570 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1571 0 : status.message().c_str());
1572 0 : break;
1573 : }
1574 14 : else if (!poBatch)
1575 2 : break;
1576 :
1577 12 : const auto array = poBatch->column(
1578 12 : m_bIgnoredFields ? m_nRequestedFIDColumn : m_iFIDArrowColumn);
1579 12 : const auto arrayPtr = array.get();
1580 12 : const auto arrayTypeId = array->type_id();
1581 30 : for (int64_t nIdxInBatch = 0; nIdxInBatch < poBatch->num_rows();
1582 : nIdxInBatch++)
1583 : {
1584 20 : if (!array->IsNull(nIdxInBatch))
1585 : {
1586 20 : if (arrayTypeId == arrow::Type::INT64)
1587 : {
1588 20 : const auto castArray =
1589 : static_cast<const arrow::Int64Array *>(arrayPtr);
1590 20 : if (castArray->Value(nIdxInBatch) == nFID)
1591 : {
1592 2 : return ReadFeature(nIdxInBatch, poBatch->columns());
1593 : }
1594 : }
1595 0 : else if (arrayTypeId == arrow::Type::INT32)
1596 : {
1597 0 : const auto castArray =
1598 : static_cast<const arrow::Int32Array *>(arrayPtr);
1599 0 : if (castArray->Value(nIdxInBatch) == nFID)
1600 : {
1601 0 : return ReadFeature(nIdxInBatch, poBatch->columns());
1602 : }
1603 : }
1604 : }
1605 : }
1606 10 : }
1607 : }
1608 2 : return nullptr;
1609 : }
1610 :
1611 : /************************************************************************/
1612 : /* GetFeatureByIndex() */
1613 : /************************************************************************/
1614 :
1615 63 : OGRFeature *OGRParquetLayer::GetFeatureByIndex(GIntBig nFID)
1616 : {
1617 :
1618 63 : if (nFID < 0)
1619 5 : return nullptr;
1620 :
1621 116 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
1622 58 : const int nNumGroups = m_poArrowReader->num_row_groups();
1623 58 : int64_t nAccRows = 0;
1624 70 : for (int iGroup = 0; iGroup < nNumGroups; ++iGroup)
1625 : {
1626 : const int64_t nNextAccRows =
1627 61 : nAccRows + metadata->RowGroup(iGroup)->num_rows();
1628 61 : if (nFID < nNextAccRows)
1629 : {
1630 : #if PARQUET_VERSION_MAJOR >= 21
1631 : std::unique_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1632 : auto result = m_bIgnoredFields
1633 : ? m_poArrowReader->GetRecordBatchReader(
1634 : {iGroup}, m_anRequestedParquetColumns)
1635 : : m_poArrowReader->GetRecordBatchReader({iGroup});
1636 : if (result.ok())
1637 : {
1638 : poRecordBatchReader = std::move(*result);
1639 : }
1640 : else
1641 : {
1642 : CPLError(CE_Failure, CPLE_AppDefined,
1643 : "GetRecordBatchReader() failed: %s",
1644 : result.status().message().c_str());
1645 : return nullptr;
1646 : }
1647 : #else
1648 49 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1649 : {
1650 0 : arrow::Status status;
1651 49 : if (m_bIgnoredFields)
1652 : {
1653 0 : status = m_poArrowReader->GetRecordBatchReader(
1654 0 : {iGroup}, m_anRequestedParquetColumns,
1655 0 : &poRecordBatchReader);
1656 : }
1657 : else
1658 : {
1659 98 : status = m_poArrowReader->GetRecordBatchReader(
1660 49 : {iGroup}, &poRecordBatchReader);
1661 : }
1662 49 : if (poRecordBatchReader == nullptr)
1663 : {
1664 0 : CPLError(CE_Failure, CPLE_AppDefined,
1665 : "GetRecordBatchReader() failed: %s",
1666 0 : status.message().c_str());
1667 0 : return nullptr;
1668 : }
1669 : }
1670 : #endif
1671 :
1672 49 : const int64_t nExpectedIdxInGroup = nFID - nAccRows;
1673 49 : int64_t nIdxInGroup = 0;
1674 : while (true)
1675 : {
1676 0 : std::shared_ptr<arrow::RecordBatch> poBatch;
1677 49 : arrow::Status status = poRecordBatchReader->ReadNext(&poBatch);
1678 49 : if (!status.ok())
1679 : {
1680 0 : CPLError(CE_Failure, CPLE_AppDefined,
1681 0 : "ReadNext() failed: %s", status.message().c_str());
1682 0 : return nullptr;
1683 : }
1684 49 : if (poBatch == nullptr)
1685 : {
1686 0 : return nullptr;
1687 : }
1688 49 : if (nExpectedIdxInGroup < nIdxInGroup + poBatch->num_rows())
1689 : {
1690 49 : const auto nIdxInBatch = nExpectedIdxInGroup - nIdxInGroup;
1691 : auto poFeature =
1692 49 : ReadFeature(nIdxInBatch, poBatch->columns());
1693 49 : poFeature->SetFID(nFID);
1694 49 : return poFeature;
1695 : }
1696 0 : nIdxInGroup += poBatch->num_rows();
1697 0 : }
1698 : }
1699 12 : nAccRows = nNextAccRows;
1700 : }
1701 9 : return nullptr;
1702 : }
1703 :
1704 : /************************************************************************/
1705 : /* GetFeature() */
1706 : /************************************************************************/
1707 :
1708 67 : OGRFeature *OGRParquetLayer::GetFeature(GIntBig nFID)
1709 : {
1710 67 : if (!m_osFIDColumn.empty())
1711 : {
1712 4 : return GetFeatureExplicitFID(nFID);
1713 : }
1714 : else
1715 : {
1716 63 : return GetFeatureByIndex(nFID);
1717 : }
1718 : }
1719 :
1720 : /************************************************************************/
1721 : /* ResetReading() */
1722 : /************************************************************************/
1723 :
1724 3842 : void OGRParquetLayer::ResetReading()
1725 : {
1726 3842 : OGRParquetLayerBase::ResetReading();
1727 3842 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
1728 3842 : m_nFeatureIdxSelected = 0;
1729 3842 : if (!m_asFeatureIdxRemapping.empty())
1730 : {
1731 1655 : m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
1732 1655 : ++m_oFeatureIdxRemappingIter;
1733 : }
1734 3842 : }
1735 :
1736 : /************************************************************************/
1737 : /* CreateRecordBatchReader() */
1738 : /************************************************************************/
1739 :
1740 613 : bool OGRParquetLayer::CreateRecordBatchReader(int iStartingRowGroup)
1741 : {
1742 1226 : std::vector<int> anRowGroups;
1743 613 : const int nNumGroups = m_poArrowReader->num_row_groups();
1744 613 : anRowGroups.reserve(nNumGroups - iStartingRowGroup);
1745 1576 : for (int i = iStartingRowGroup; i < nNumGroups; ++i)
1746 963 : anRowGroups.push_back(i);
1747 1226 : return CreateRecordBatchReader(anRowGroups);
1748 : }
1749 :
1750 868 : bool OGRParquetLayer::CreateRecordBatchReader(
1751 : const std::vector<int> &anRowGroups)
1752 : {
1753 : #if PARQUET_VERSION_MAJOR >= 21
1754 : auto result = m_bIgnoredFields
1755 : ? m_poArrowReader->GetRecordBatchReader(
1756 : anRowGroups, m_anRequestedParquetColumns)
1757 : : m_poArrowReader->GetRecordBatchReader(anRowGroups);
1758 : if (result.ok())
1759 : {
1760 : m_poRecordBatchReader = std::move(*result);
1761 : return true;
1762 : }
1763 : else
1764 : {
1765 : CPLError(CE_Failure, CPLE_AppDefined,
1766 : "GetRecordBatchReader() failed: %s",
1767 : result.status().message().c_str());
1768 : return false;
1769 : }
1770 : #else
1771 868 : arrow::Status status;
1772 868 : if (m_bIgnoredFields)
1773 : {
1774 412 : status = m_poArrowReader->GetRecordBatchReader(
1775 206 : anRowGroups, m_anRequestedParquetColumns, &m_poRecordBatchReader);
1776 : }
1777 : else
1778 : {
1779 1324 : status = m_poArrowReader->GetRecordBatchReader(anRowGroups,
1780 662 : &m_poRecordBatchReader);
1781 : }
1782 868 : if (m_poRecordBatchReader == nullptr)
1783 : {
1784 0 : CPLError(CE_Failure, CPLE_AppDefined,
1785 0 : "GetRecordBatchReader() failed: %s", status.message().c_str());
1786 0 : return false;
1787 : }
1788 868 : return true;
1789 : #endif
1790 : }
1791 :
1792 : /************************************************************************/
1793 : /* IsConstraintPossible() */
1794 : /************************************************************************/
1795 :
1796 : enum class IsConstraintPossibleRes
1797 : {
1798 : YES,
1799 : NO,
1800 : UNKNOWN
1801 : };
1802 :
1803 : template <class T>
1804 228 : static IsConstraintPossibleRes IsConstraintPossible(int nOperation, T v, T min,
1805 : T max)
1806 : {
1807 228 : if (nOperation == SWQ_EQ)
1808 : {
1809 150 : if (v < min || v > max)
1810 : {
1811 62 : return IsConstraintPossibleRes::NO;
1812 : }
1813 : }
1814 78 : else if (nOperation == SWQ_NE)
1815 : {
1816 38 : if (v == min && v == max)
1817 : {
1818 0 : return IsConstraintPossibleRes::NO;
1819 : }
1820 : }
1821 40 : else if (nOperation == SWQ_LE)
1822 : {
1823 10 : if (v < min)
1824 : {
1825 4 : return IsConstraintPossibleRes::NO;
1826 : }
1827 : }
1828 30 : else if (nOperation == SWQ_LT)
1829 : {
1830 10 : if (v <= min)
1831 : {
1832 4 : return IsConstraintPossibleRes::NO;
1833 : }
1834 : }
1835 20 : else if (nOperation == SWQ_GE)
1836 : {
1837 10 : if (v > max)
1838 : {
1839 4 : return IsConstraintPossibleRes::NO;
1840 : }
1841 : }
1842 10 : else if (nOperation == SWQ_GT)
1843 : {
1844 10 : if (v >= max)
1845 : {
1846 6 : return IsConstraintPossibleRes::NO;
1847 : }
1848 : }
1849 : else
1850 : {
1851 0 : CPLDebug("PARQUET",
1852 : "IsConstraintPossible: Unhandled operation type: %d",
1853 : nOperation);
1854 0 : return IsConstraintPossibleRes::UNKNOWN;
1855 : }
1856 148 : return IsConstraintPossibleRes::YES;
1857 : }
1858 :
1859 : /************************************************************************/
1860 : /* IncrFeatureIdx() */
1861 : /************************************************************************/
1862 :
1863 7553 : void OGRParquetLayer::IncrFeatureIdx()
1864 : {
1865 7553 : ++m_nFeatureIdxSelected;
1866 7553 : ++m_nFeatureIdx;
1867 8509 : if (m_iFIDArrowColumn < 0 && !m_asFeatureIdxRemapping.empty() &&
1868 8509 : m_oFeatureIdxRemappingIter != m_asFeatureIdxRemapping.end())
1869 : {
1870 140 : if (m_nFeatureIdxSelected == m_oFeatureIdxRemappingIter->first)
1871 : {
1872 48 : m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
1873 48 : ++m_oFeatureIdxRemappingIter;
1874 : }
1875 : }
1876 7553 : }
1877 :
1878 : /************************************************************************/
1879 : /* ReadNextBatch() */
1880 : /************************************************************************/
1881 :
1882 1917 : bool OGRParquetLayer::ReadNextBatch()
1883 : {
1884 1917 : m_nIdxInBatch = 0;
1885 :
1886 1917 : const int nNumGroups = m_poArrowReader->num_row_groups();
1887 1917 : if (nNumGroups == 0)
1888 2 : return false;
1889 :
1890 1915 : if (m_bSingleBatch)
1891 : {
1892 32 : CPLAssert(m_iRecordBatch == 0);
1893 32 : CPLAssert(m_poBatch != nullptr);
1894 32 : return false;
1895 : }
1896 :
1897 1883 : CPLAssert((m_iRecordBatch == -1 && m_poRecordBatchReader == nullptr) ||
1898 : (m_iRecordBatch >= 0 && m_poRecordBatchReader != nullptr));
1899 :
1900 1883 : if (m_poRecordBatchReader == nullptr)
1901 : {
1902 875 : m_asFeatureIdxRemapping.clear();
1903 :
1904 875 : bool bIterateEverything = false;
1905 875 : std::vector<int> anSelectedGroups;
1906 : const auto oIterToGeomColBBOX =
1907 875 : m_oMapGeomFieldIndexToGeomColBBOXParquet.find(m_iGeomFieldFilter);
1908 : const bool bUSEBBOXFields =
1909 197 : (m_poFilterGeom &&
1910 197 : oIterToGeomColBBOX !=
1911 1072 : m_oMapGeomFieldIndexToGeomColBBOXParquet.end() &&
1912 95 : CPLTestBool(CPLGetConfigOption(
1913 970 : ("OGR_" + GetDriverUCName() + "_USE_BBOX").c_str(), "YES")));
1914 : const bool bIsGeoArrowStruct =
1915 1750 : (m_iGeomFieldFilter >= 0 &&
1916 875 : m_iGeomFieldFilter < static_cast<int>(m_aeGeomEncoding.size()) &&
1917 867 : m_iGeomFieldFilter <
1918 : static_cast<int>(
1919 1734 : m_anMapGeomFieldIndexToParquetColumns.size()) &&
1920 867 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter].size() >=
1921 1750 : 2 &&
1922 204 : OGRArrowIsGeoArrowStruct(m_aeGeomEncoding[m_iGeomFieldFilter]));
1923 : #if PARQUET_VERSION_MAJOR >= 21
1924 : const bool bUseParquetGeoStat =
1925 : (m_poFilterGeom && m_iGeomFieldFilter >= 0 &&
1926 : m_geoStatsWithBBOXAvailable.find(m_iGeomFieldFilter) !=
1927 : m_geoStatsWithBBOXAvailable.end() &&
1928 : m_iGeomFieldFilter <
1929 : static_cast<int>(
1930 : m_anMapGeomFieldIndexToParquetColumns.size()) &&
1931 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter].size() ==
1932 : 1 &&
1933 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter][0] >= 0);
1934 : #endif
1935 1522 : if (m_asAttributeFilterConstraints.empty() && !bUSEBBOXFields &&
1936 647 : !(bIsGeoArrowStruct && m_poFilterGeom)
1937 : #if PARQUET_VERSION_MAJOR >= 21
1938 : && !bUseParquetGeoStat
1939 : #endif
1940 : )
1941 : {
1942 601 : bIterateEverything = true;
1943 : }
1944 : else
1945 : {
1946 : OGRField sMin;
1947 : OGRField sMax;
1948 274 : OGR_RawField_SetNull(&sMin);
1949 274 : OGR_RawField_SetNull(&sMax);
1950 274 : bool bFoundMin = false;
1951 274 : bool bFoundMax = false;
1952 274 : OGRFieldType eType = OFTMaxType;
1953 274 : OGRFieldSubType eSubType = OFSTNone;
1954 548 : std::string osMinTmp, osMaxTmp;
1955 274 : int64_t nFeatureIdxSelected = 0;
1956 274 : int64_t nFeatureIdxTotal = 0;
1957 :
1958 274 : int iXMinField = -1;
1959 274 : int iYMinField = -1;
1960 274 : int iXMaxField = -1;
1961 274 : int iYMaxField = -1;
1962 :
1963 274 : if (bIsGeoArrowStruct)
1964 : {
1965 : const auto metadata =
1966 184 : m_poArrowReader->parquet_reader()->metadata();
1967 92 : const auto poParquetSchema = metadata->schema();
1968 228 : for (int iParquetCol :
1969 548 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter])
1970 : {
1971 : const auto parquetColumn =
1972 228 : poParquetSchema->Column(iParquetCol);
1973 : const auto parquetColumnName =
1974 456 : parquetColumn->path()->ToDotString();
1975 456 : if (parquetColumnName.size() > 2 &&
1976 228 : parquetColumnName.find(".x") ==
1977 228 : parquetColumnName.size() - 2)
1978 : {
1979 92 : iXMinField = iParquetCol;
1980 92 : iXMaxField = iParquetCol;
1981 : }
1982 272 : else if (parquetColumnName.size() > 2 &&
1983 136 : parquetColumnName.find(".y") ==
1984 136 : parquetColumnName.size() - 2)
1985 : {
1986 92 : iYMinField = iParquetCol;
1987 92 : iYMaxField = iParquetCol;
1988 : }
1989 : }
1990 : }
1991 182 : else if (bUSEBBOXFields)
1992 : {
1993 49 : iXMinField = oIterToGeomColBBOX->second.iParquetXMin;
1994 49 : iYMinField = oIterToGeomColBBOX->second.iParquetYMin;
1995 49 : iXMaxField = oIterToGeomColBBOX->second.iParquetXMax;
1996 49 : iYMaxField = oIterToGeomColBBOX->second.iParquetYMax;
1997 : }
1998 :
1999 675 : for (int iRowGroup = 0;
2000 675 : iRowGroup < nNumGroups && !bIterateEverything; ++iRowGroup)
2001 : {
2002 401 : bool bSelectGroup = true;
2003 : auto poRowGroup =
2004 401 : GetReader()->parquet_reader()->RowGroup(iRowGroup);
2005 :
2006 401 : if (iXMinField >= 0 && iYMinField >= 0 && iXMaxField >= 0 &&
2007 : iYMaxField >= 0)
2008 : {
2009 149 : if (GetMinMaxForParquetCol(iRowGroup, iXMinField, nullptr,
2010 : true, sMin, bFoundMin, false,
2011 : sMax, bFoundMax, eType, eSubType,
2012 148 : osMinTmp, osMaxTmp) &&
2013 297 : bFoundMin && eType == OFTReal)
2014 : {
2015 148 : const double dfGroupMinX = sMin.Real;
2016 148 : if (dfGroupMinX > m_sFilterEnvelope.MaxX)
2017 : {
2018 1 : bSelectGroup = false;
2019 : }
2020 147 : else if (GetMinMaxForParquetCol(
2021 : iRowGroup, iYMinField, nullptr, true, sMin,
2022 : bFoundMin, false, sMax, bFoundMax, eType,
2023 147 : eSubType, osMinTmp, osMaxTmp) &&
2024 294 : bFoundMin && eType == OFTReal)
2025 : {
2026 147 : const double dfGroupMinY = sMin.Real;
2027 147 : if (dfGroupMinY > m_sFilterEnvelope.MaxY)
2028 : {
2029 1 : bSelectGroup = false;
2030 : }
2031 146 : else if (GetMinMaxForParquetCol(
2032 : iRowGroup, iXMaxField, nullptr, false,
2033 : sMin, bFoundMin, true, sMax, bFoundMax,
2034 146 : eType, eSubType, osMinTmp, osMaxTmp) &&
2035 292 : bFoundMax && eType == OFTReal)
2036 : {
2037 146 : const double dfGroupMaxX = sMax.Real;
2038 146 : if (dfGroupMaxX < m_sFilterEnvelope.MinX)
2039 : {
2040 1 : bSelectGroup = false;
2041 : }
2042 145 : else if (GetMinMaxForParquetCol(
2043 : iRowGroup, iYMaxField, nullptr,
2044 : false, sMin, bFoundMin, true, sMax,
2045 : bFoundMax, eType, eSubType,
2046 145 : osMinTmp, osMaxTmp) &&
2047 290 : bFoundMax && eType == OFTReal)
2048 : {
2049 145 : const double dfGroupMaxY = sMax.Real;
2050 145 : if (dfGroupMaxY < m_sFilterEnvelope.MinY)
2051 : {
2052 1 : bSelectGroup = false;
2053 : }
2054 : }
2055 : }
2056 : }
2057 : }
2058 : }
2059 : #if PARQUET_VERSION_MAJOR >= 21
2060 : else if (bUseParquetGeoStat)
2061 : {
2062 : const int iParquetCol =
2063 : m_anMapGeomFieldIndexToParquetColumns
2064 : [m_iGeomFieldFilter][0];
2065 : CPLAssert(iParquetCol >= 0);
2066 :
2067 : const auto metadata =
2068 : m_poArrowReader->parquet_reader()->metadata();
2069 : const auto columnChunk =
2070 : metadata->RowGroup(iRowGroup)->ColumnChunk(iParquetCol);
2071 : if (auto geostats = columnChunk->geo_statistics())
2072 : {
2073 : if (geostats->dimension_valid()[0] &&
2074 : geostats->dimension_valid()[1])
2075 : {
2076 : double dfMinX = geostats->lower_bound()[0];
2077 : double dfMaxX = geostats->upper_bound()[0];
2078 : double dfMinY = geostats->lower_bound()[1];
2079 : double dfMaxY = geostats->upper_bound()[1];
2080 :
2081 : // Deal as best as we can with wrap around bounding box
2082 : if (dfMinX > dfMaxX && std::fabs(dfMinX) <= 180 &&
2083 : std::fabs(dfMaxX) <= 180)
2084 : {
2085 : dfMinX = -180;
2086 : dfMaxX = 180;
2087 : }
2088 :
2089 : // Check if there is an intersection between
2090 : // the geostatistics for this rowgroup and
2091 : // the bbox of interest
2092 : if (dfMinX > m_sFilterEnvelope.MaxX ||
2093 : dfMaxX < m_sFilterEnvelope.MinX ||
2094 : dfMinY > m_sFilterEnvelope.MaxY ||
2095 : dfMaxY < m_sFilterEnvelope.MinY)
2096 : {
2097 : bSelectGroup = false;
2098 : }
2099 : }
2100 : }
2101 : }
2102 : #endif
2103 :
2104 401 : if (bSelectGroup)
2105 : {
2106 561 : for (auto &constraint : m_asAttributeFilterConstraints)
2107 : {
2108 256 : int iOGRField = constraint.iField;
2109 512 : if (constraint.iField ==
2110 256 : m_poFeatureDefn->GetFieldCount() + SPF_FID)
2111 : {
2112 9 : iOGRField = OGR_FID_INDEX;
2113 : }
2114 256 : if (constraint.nOperation != SWQ_ISNULL &&
2115 247 : constraint.nOperation != SWQ_ISNOTNULL)
2116 : {
2117 234 : if (iOGRField == OGR_FID_INDEX &&
2118 9 : m_iFIDParquetColumn < 0)
2119 : {
2120 6 : sMin.Integer64 = nFeatureIdxTotal;
2121 6 : sMax.Integer64 =
2122 6 : nFeatureIdxTotal +
2123 6 : poRowGroup->metadata()->num_rows() - 1;
2124 6 : eType = OFTInteger64;
2125 : }
2126 228 : else if (!GetMinMaxForOGRField(
2127 : iRowGroup, iOGRField, true, sMin,
2128 : bFoundMin, true, sMax, bFoundMax,
2129 225 : eType, eSubType, osMinTmp, osMaxTmp) ||
2130 228 : !bFoundMin || !bFoundMax)
2131 : {
2132 3 : bIterateEverything = true;
2133 3 : break;
2134 : }
2135 : }
2136 :
2137 253 : IsConstraintPossibleRes res =
2138 : IsConstraintPossibleRes::UNKNOWN;
2139 253 : if (constraint.eType ==
2140 147 : OGRArrowLayer::Constraint::Type::Integer &&
2141 147 : eType == OFTInteger)
2142 : {
2143 : #if 0
2144 : CPLDebug("PARQUET",
2145 : "Group %d, field %s, min = %d, max = %d",
2146 : iRowGroup,
2147 : iOGRField == OGR_FID_INDEX
2148 : ? m_osFIDColumn.c_str()
2149 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
2150 : ->GetNameRef(),
2151 : sMin.Integer, sMax.Integer);
2152 : #endif
2153 125 : res = IsConstraintPossible(
2154 : constraint.nOperation,
2155 : constraint.sValue.Integer, sMin.Integer,
2156 : sMax.Integer);
2157 : }
2158 128 : else if (constraint.eType == OGRArrowLayer::Constraint::
2159 39 : Type::Integer64 &&
2160 39 : eType == OFTInteger64)
2161 : {
2162 : #if 0
2163 : CPLDebug("PARQUET",
2164 : "Group %d, field %s, min = " CPL_FRMT_GIB
2165 : ", max = " CPL_FRMT_GIB,
2166 : iRowGroup,
2167 : iOGRField == OGR_FID_INDEX
2168 : ? m_osFIDColumn.c_str()
2169 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
2170 : ->GetNameRef(),
2171 : static_cast<GIntBig>(sMin.Integer64),
2172 : static_cast<GIntBig>(sMax.Integer64));
2173 : #endif
2174 39 : res = IsConstraintPossible(
2175 : constraint.nOperation,
2176 : constraint.sValue.Integer64, sMin.Integer64,
2177 : sMax.Integer64);
2178 : }
2179 89 : else if (constraint.eType ==
2180 29 : OGRArrowLayer::Constraint::Type::Real &&
2181 29 : eType == OFTReal)
2182 : {
2183 : #if 0
2184 : CPLDebug("PARQUET",
2185 : "Group %d, field %s, min = %g, max = %g",
2186 : iRowGroup,
2187 : iOGRField == OGR_FID_INDEX
2188 : ? m_osFIDColumn.c_str()
2189 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
2190 : ->GetNameRef(),
2191 : sMin.Real, sMax.Real);
2192 : #endif
2193 26 : res = IsConstraintPossible(constraint.nOperation,
2194 : constraint.sValue.Real,
2195 : sMin.Real, sMax.Real);
2196 : }
2197 63 : else if (constraint.eType ==
2198 38 : OGRArrowLayer::Constraint::Type::String &&
2199 38 : eType == OFTString)
2200 : {
2201 : #if 0
2202 : CPLDebug("PARQUET",
2203 : "Group %d, field %s, min = %s, max = %s",
2204 : iRowGroup,
2205 : iOGRField == OGR_FID_INDEX
2206 : ? m_osFIDColumn.c_str()
2207 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
2208 : ->GetNameRef(),
2209 : sMin.String, sMax.String);
2210 : #endif
2211 38 : res = IsConstraintPossible(
2212 : constraint.nOperation,
2213 76 : std::string(constraint.sValue.String),
2214 76 : std::string(sMin.String),
2215 76 : std::string(sMax.String));
2216 : }
2217 25 : else if (constraint.nOperation == SWQ_ISNULL ||
2218 16 : constraint.nOperation == SWQ_ISNOTNULL)
2219 : {
2220 : const int iCol =
2221 : iOGRField == OGR_FID_INDEX
2222 44 : ? m_iFIDParquetColumn
2223 22 : : GetMapFieldIndexToParquetColumn()
2224 22 : [iOGRField];
2225 22 : if (iCol >= 0)
2226 : {
2227 : const auto metadata =
2228 22 : m_poArrowReader->parquet_reader()
2229 44 : ->metadata();
2230 : const auto rowGroupColumnChunk =
2231 22 : metadata->RowGroup(iRowGroup)->ColumnChunk(
2232 44 : iCol);
2233 : const auto rowGroupStats =
2234 44 : rowGroupColumnChunk->statistics();
2235 44 : if (rowGroupColumnChunk->is_stats_set() &&
2236 22 : rowGroupStats)
2237 : {
2238 22 : res = IsConstraintPossibleRes::YES;
2239 31 : if (constraint.nOperation == SWQ_ISNULL &&
2240 9 : rowGroupStats->num_values() ==
2241 9 : poRowGroup->metadata()->num_rows())
2242 : {
2243 5 : res = IsConstraintPossibleRes::NO;
2244 : }
2245 34 : else if (constraint.nOperation ==
2246 30 : SWQ_ISNOTNULL &&
2247 13 : rowGroupStats->num_values() == 0)
2248 : {
2249 1 : res = IsConstraintPossibleRes::NO;
2250 : }
2251 : }
2252 22 : }
2253 : }
2254 : else
2255 : {
2256 3 : CPLDebug(
2257 : "PARQUET",
2258 : "Unhandled combination of constraint.eType "
2259 : "(%d) and eType (%d)",
2260 3 : static_cast<int>(constraint.eType), eType);
2261 : }
2262 :
2263 253 : if (res == IsConstraintPossibleRes::NO)
2264 : {
2265 86 : bSelectGroup = false;
2266 86 : break;
2267 : }
2268 167 : else if (res == IsConstraintPossibleRes::UNKNOWN)
2269 : {
2270 3 : bIterateEverything = true;
2271 3 : break;
2272 : }
2273 : }
2274 : }
2275 :
2276 401 : if (bSelectGroup)
2277 : {
2278 : // CPLDebug("PARQUET", "Selecting row group %d", iRowGroup);
2279 : m_asFeatureIdxRemapping.emplace_back(
2280 311 : std::make_pair(nFeatureIdxSelected, nFeatureIdxTotal));
2281 311 : anSelectedGroups.push_back(iRowGroup);
2282 311 : nFeatureIdxSelected += poRowGroup->metadata()->num_rows();
2283 : }
2284 :
2285 401 : nFeatureIdxTotal += poRowGroup->metadata()->num_rows();
2286 : }
2287 : }
2288 :
2289 875 : if (bIterateEverything)
2290 : {
2291 607 : m_asFeatureIdxRemapping.clear();
2292 607 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
2293 607 : if (!CreateRecordBatchReader(0))
2294 0 : return false;
2295 : }
2296 : else
2297 : {
2298 268 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
2299 268 : if (anSelectedGroups.empty())
2300 : {
2301 13 : return false;
2302 : }
2303 255 : CPLDebug("PARQUET", "%d/%d row groups selected",
2304 255 : int(anSelectedGroups.size()),
2305 255 : m_poArrowReader->num_row_groups());
2306 255 : m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
2307 255 : ++m_oFeatureIdxRemappingIter;
2308 255 : if (!CreateRecordBatchReader(anSelectedGroups))
2309 : {
2310 0 : return false;
2311 : }
2312 : }
2313 : }
2314 :
2315 3740 : std::shared_ptr<arrow::RecordBatch> poNextBatch;
2316 :
2317 0 : do
2318 : {
2319 1870 : ++m_iRecordBatch;
2320 1870 : poNextBatch.reset();
2321 1870 : auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
2322 1870 : if (!status.ok())
2323 : {
2324 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
2325 0 : status.message().c_str());
2326 0 : poNextBatch.reset();
2327 : }
2328 1870 : if (poNextBatch == nullptr)
2329 : {
2330 962 : if (m_iRecordBatch == 1 && m_poBatch && m_poAttrQuery == nullptr &&
2331 290 : m_poFilterGeom == nullptr)
2332 : {
2333 58 : m_iRecordBatch = 0;
2334 58 : m_bSingleBatch = true;
2335 : }
2336 : else
2337 614 : m_poBatch.reset();
2338 672 : return false;
2339 : }
2340 1198 : } while (poNextBatch->num_rows() == 0);
2341 :
2342 1198 : SetBatch(poNextBatch);
2343 :
2344 1198 : return true;
2345 : }
2346 :
2347 : /************************************************************************/
2348 : /* InvalidateCachedBatches() */
2349 : /************************************************************************/
2350 :
2351 794 : void OGRParquetLayer::InvalidateCachedBatches()
2352 : {
2353 794 : m_bSingleBatch = false;
2354 794 : OGRParquetLayerBase::InvalidateCachedBatches();
2355 794 : }
2356 :
2357 : /************************************************************************/
2358 : /* SetIgnoredFields() */
2359 : /************************************************************************/
2360 :
2361 231 : OGRErr OGRParquetLayer::SetIgnoredFields(CSLConstList papszFields)
2362 : {
2363 231 : m_bIgnoredFields = false;
2364 231 : m_anRequestedParquetColumns.clear();
2365 231 : m_anMapFieldIndexToArrayIndex.clear();
2366 231 : m_anMapGeomFieldIndexToArrayIndex.clear();
2367 231 : m_nRequestedFIDColumn = -1;
2368 231 : OGRErr eErr = OGRLayer::SetIgnoredFields(papszFields);
2369 231 : int nBatchColumns = 0;
2370 231 : if (!m_bHasMissingMappingToParquet && eErr == OGRERR_NONE)
2371 : {
2372 231 : m_bIgnoredFields = papszFields != nullptr && papszFields[0] != nullptr;
2373 231 : if (m_bIgnoredFields)
2374 : {
2375 168 : if (m_iFIDParquetColumn >= 0)
2376 : {
2377 6 : m_nRequestedFIDColumn = nBatchColumns;
2378 6 : nBatchColumns++;
2379 6 : m_anRequestedParquetColumns.push_back(m_iFIDParquetColumn);
2380 : }
2381 :
2382 5912 : for (int i = 0; i < m_poFeatureDefn->GetFieldCount(); ++i)
2383 : {
2384 : const auto eArrowType =
2385 5744 : m_poSchema->fields()[m_anMapFieldIndexToArrowColumn[i][0]]
2386 5744 : ->type()
2387 5744 : ->id();
2388 5744 : if (eArrowType == arrow::Type::STRUCT)
2389 : {
2390 : // For a struct, for the sake of simplicity in
2391 : // GetNextRawFeature(), as soon as one of the member if
2392 : // requested, request all Parquet columns, so that the Arrow
2393 : // type doesn't change
2394 69 : bool bFoundNotIgnored = false;
2395 296 : for (int j = i; j < m_poFeatureDefn->GetFieldCount() &&
2396 294 : m_anMapFieldIndexToArrowColumn[i][0] ==
2397 147 : m_anMapFieldIndexToArrowColumn[j][0];
2398 : ++j)
2399 : {
2400 136 : if (!m_poFeatureDefn->GetFieldDefn(j)->IsIgnored())
2401 : {
2402 56 : bFoundNotIgnored = true;
2403 56 : break;
2404 : }
2405 : }
2406 69 : if (bFoundNotIgnored)
2407 : {
2408 : int j;
2409 784 : for (j = i; j < m_poFeatureDefn->GetFieldCount() &&
2410 784 : m_anMapFieldIndexToArrowColumn[i][0] ==
2411 392 : m_anMapFieldIndexToArrowColumn[j][0];
2412 : ++j)
2413 : {
2414 : const int iParquetCol =
2415 336 : m_anMapFieldIndexToParquetColumn[j];
2416 336 : CPLAssert(iParquetCol >= 0);
2417 336 : if (!m_poFeatureDefn->GetFieldDefn(j)->IsIgnored())
2418 : {
2419 330 : m_anMapFieldIndexToArrayIndex.push_back(
2420 : nBatchColumns);
2421 : }
2422 : else
2423 : {
2424 6 : m_anMapFieldIndexToArrayIndex.push_back(-1);
2425 : }
2426 336 : m_anRequestedParquetColumns.push_back(iParquetCol);
2427 : }
2428 56 : i = j - 1;
2429 56 : nBatchColumns++;
2430 : }
2431 : else
2432 : {
2433 : int j;
2434 172 : for (j = i; j < m_poFeatureDefn->GetFieldCount() &&
2435 170 : m_anMapFieldIndexToArrowColumn[i][0] ==
2436 85 : m_anMapFieldIndexToArrowColumn[j][0];
2437 : ++j)
2438 : {
2439 74 : m_anMapFieldIndexToArrayIndex.push_back(-1);
2440 : }
2441 13 : i = j - 1;
2442 : }
2443 : }
2444 5675 : else if (!m_poFeatureDefn->GetFieldDefn(i)->IsIgnored())
2445 : {
2446 4181 : const int iParquetCol = m_anMapFieldIndexToParquetColumn[i];
2447 4181 : CPLAssert(iParquetCol >= 0);
2448 4181 : m_anMapFieldIndexToArrayIndex.push_back(nBatchColumns);
2449 4181 : nBatchColumns++;
2450 4181 : m_anRequestedParquetColumns.push_back(iParquetCol);
2451 4181 : if (eArrowType == arrow::Type::MAP)
2452 : {
2453 : // For a map, request both keys and items Parquet
2454 : // columns
2455 338 : m_anRequestedParquetColumns.push_back(iParquetCol + 1);
2456 : }
2457 : }
2458 : else
2459 : {
2460 1494 : m_anMapFieldIndexToArrayIndex.push_back(-1);
2461 : }
2462 : }
2463 :
2464 168 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrayIndex.size()) ==
2465 : m_poFeatureDefn->GetFieldCount());
2466 :
2467 348 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
2468 : {
2469 180 : if (!m_poFeatureDefn->GetGeomFieldDefn(i)->IsIgnored())
2470 : {
2471 : const auto &anVals =
2472 157 : m_anMapGeomFieldIndexToParquetColumns[i];
2473 157 : CPLAssert(!anVals.empty() && anVals[0] >= 0);
2474 : m_anRequestedParquetColumns.insert(
2475 157 : m_anRequestedParquetColumns.end(), anVals.begin(),
2476 314 : anVals.end());
2477 157 : m_anMapGeomFieldIndexToArrayIndex.push_back(nBatchColumns);
2478 157 : nBatchColumns++;
2479 :
2480 157 : auto oIter = m_oMapGeomFieldIndexToGeomColBBOX.find(i);
2481 : const auto oIterParquet =
2482 157 : m_oMapGeomFieldIndexToGeomColBBOXParquet.find(i);
2483 219 : if (oIter != m_oMapGeomFieldIndexToGeomColBBOX.end() &&
2484 62 : oIterParquet !=
2485 219 : m_oMapGeomFieldIndexToGeomColBBOXParquet.end())
2486 : {
2487 62 : oIter->second.iArrayIdx = nBatchColumns++;
2488 : m_anRequestedParquetColumns.insert(
2489 62 : m_anRequestedParquetColumns.end(),
2490 62 : oIterParquet->second.anParquetCols.begin(),
2491 186 : oIterParquet->second.anParquetCols.end());
2492 : }
2493 : }
2494 : else
2495 : {
2496 23 : m_anMapGeomFieldIndexToArrayIndex.push_back(-1);
2497 : }
2498 : }
2499 :
2500 168 : CPLAssert(
2501 : static_cast<int>(m_anMapGeomFieldIndexToArrayIndex.size()) ==
2502 : m_poFeatureDefn->GetGeomFieldCount());
2503 : }
2504 : }
2505 :
2506 231 : m_nExpectedBatchColumns = m_bIgnoredFields ? nBatchColumns : -1;
2507 :
2508 231 : ComputeConstraintsArrayIdx();
2509 :
2510 : // Full invalidation
2511 231 : InvalidateCachedBatches();
2512 :
2513 231 : return eErr;
2514 : }
2515 :
2516 : /************************************************************************/
2517 : /* GetFeatureCount() */
2518 : /************************************************************************/
2519 :
2520 870 : GIntBig OGRParquetLayer::GetFeatureCount(int bForce)
2521 : {
2522 870 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
2523 : {
2524 54 : auto metadata = m_poArrowReader->parquet_reader()->metadata();
2525 54 : if (metadata)
2526 54 : return metadata->num_rows();
2527 : }
2528 816 : return OGRLayer::GetFeatureCount(bForce);
2529 : }
2530 :
2531 : /************************************************************************/
2532 : /* FastGetExtent() */
2533 : /************************************************************************/
2534 :
2535 645 : bool OGRParquetLayer::FastGetExtent(int iGeomField, OGREnvelope *psExtent) const
2536 : {
2537 645 : if (OGRParquetLayerBase::FastGetExtent(iGeomField, psExtent))
2538 630 : return true;
2539 :
2540 : const auto oIterToGeomColBBOX =
2541 15 : m_oMapGeomFieldIndexToGeomColBBOXParquet.find(iGeomField);
2542 16 : if (oIterToGeomColBBOX != m_oMapGeomFieldIndexToGeomColBBOXParquet.end() &&
2543 1 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_BBOX", "YES")))
2544 : {
2545 1 : OGREnvelope sExtent;
2546 : OGRField sMin, sMax;
2547 1 : OGR_RawField_SetNull(&sMin);
2548 1 : OGR_RawField_SetNull(&sMax);
2549 : bool bFoundMin, bFoundMax;
2550 1 : OGRFieldType eType = OFTMaxType;
2551 1 : OGRFieldSubType eSubType = OFSTNone;
2552 1 : std::string osMinTmp, osMaxTmp;
2553 2 : if (GetMinMaxForParquetCol(-1, oIterToGeomColBBOX->second.iParquetXMin,
2554 : nullptr, true, sMin, bFoundMin, false, sMax,
2555 : bFoundMax, eType, eSubType, osMinTmp,
2556 3 : osMaxTmp) &&
2557 1 : eType == OFTReal)
2558 : {
2559 1 : sExtent.MinX = sMin.Real;
2560 :
2561 1 : if (GetMinMaxForParquetCol(
2562 1 : -1, oIterToGeomColBBOX->second.iParquetYMin, nullptr, true,
2563 : sMin, bFoundMin, false, sMax, bFoundMax, eType, eSubType,
2564 3 : osMinTmp, osMaxTmp) &&
2565 1 : eType == OFTReal)
2566 : {
2567 1 : sExtent.MinY = sMin.Real;
2568 :
2569 1 : if (GetMinMaxForParquetCol(
2570 1 : -1, oIterToGeomColBBOX->second.iParquetXMax, nullptr,
2571 : false, sMin, bFoundMin, true, sMax, bFoundMax, eType,
2572 3 : eSubType, osMinTmp, osMaxTmp) &&
2573 1 : eType == OFTReal)
2574 : {
2575 1 : sExtent.MaxX = sMax.Real;
2576 :
2577 1 : if (GetMinMaxForParquetCol(
2578 1 : -1, oIterToGeomColBBOX->second.iParquetYMax,
2579 : nullptr, false, sMin, bFoundMin, true, sMax,
2580 3 : bFoundMax, eType, eSubType, osMinTmp, osMaxTmp) &&
2581 1 : eType == OFTReal)
2582 : {
2583 1 : sExtent.MaxY = sMax.Real;
2584 :
2585 1 : CPLDebug("PARQUET",
2586 : "Using statistics of bbox.minx, bbox.miny, "
2587 : "bbox.maxx, bbox.maxy columns to get extent");
2588 1 : m_oMapExtents[iGeomField] = sExtent;
2589 1 : *psExtent = sExtent;
2590 1 : return true;
2591 : }
2592 : }
2593 : }
2594 : }
2595 : }
2596 :
2597 14 : return false;
2598 : }
2599 :
2600 : /************************************************************************/
2601 : /* TestCapability() */
2602 : /************************************************************************/
2603 :
2604 659 : int OGRParquetLayer::TestCapability(const char *pszCap) const
2605 : {
2606 659 : if (EQUAL(pszCap, OLCFastFeatureCount))
2607 79 : return m_poAttrQuery == nullptr && m_poFilterGeom == nullptr;
2608 :
2609 580 : if (EQUAL(pszCap, OLCIgnoreFields))
2610 9 : return !m_bHasMissingMappingToParquet;
2611 :
2612 571 : if (EQUAL(pszCap, OLCFastSpatialFilter))
2613 : {
2614 168 : if (m_iGeomFieldFilter >= 0 &&
2615 112 : m_iGeomFieldFilter < static_cast<int>(m_aeGeomEncoding.size()) &&
2616 56 : OGRArrowIsGeoArrowStruct(m_aeGeomEncoding[m_iGeomFieldFilter]))
2617 : {
2618 56 : return true;
2619 : }
2620 :
2621 : #if PARQUET_VERSION_MAJOR >= 21
2622 : if (m_iGeomFieldFilter >= 0 &&
2623 : m_iGeomFieldFilter < static_cast<int>(m_aeGeomEncoding.size()) &&
2624 : m_aeGeomEncoding[m_iGeomFieldFilter] == OGRArrowGeomEncoding::WKB &&
2625 : m_iGeomFieldFilter <
2626 : static_cast<int>(
2627 : m_anMapGeomFieldIndexToParquetColumns.size()) &&
2628 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter].size() ==
2629 : 1)
2630 : {
2631 : const int iParquetCol =
2632 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter][0];
2633 : if (iParquetCol >= 0)
2634 : {
2635 : const auto metadata =
2636 : m_poArrowReader->parquet_reader()->metadata();
2637 :
2638 : int nCountRowGroupsStatsValid = 0;
2639 : const int nNumGroups = m_poArrowReader->num_row_groups();
2640 : for (int iRowGroup = 0; iRowGroup < nNumGroups &&
2641 : nCountRowGroupsStatsValid == iRowGroup;
2642 : ++iRowGroup)
2643 : {
2644 : const auto columnChunk =
2645 : metadata->RowGroup(iRowGroup)->ColumnChunk(iParquetCol);
2646 : if (auto geostats = columnChunk->geo_statistics())
2647 : {
2648 : if (geostats->dimension_valid()[0] &&
2649 : geostats->dimension_valid()[1])
2650 : {
2651 : const double dfMinX = geostats->lower_bound()[0];
2652 : const double dfMaxX = geostats->upper_bound()[0];
2653 : const double dfMinY = geostats->lower_bound()[1];
2654 : const double dfMaxY = geostats->upper_bound()[1];
2655 : if (std::isfinite(dfMinX) &&
2656 : std::isfinite(dfMaxX) &&
2657 : std::isfinite(dfMinY) && std::isfinite(dfMaxY))
2658 : {
2659 : nCountRowGroupsStatsValid++;
2660 : }
2661 : }
2662 : }
2663 : }
2664 : if (nCountRowGroupsStatsValid == nNumGroups)
2665 : {
2666 : return true;
2667 : }
2668 : }
2669 : }
2670 : #endif
2671 :
2672 : // fallback to base method
2673 : }
2674 :
2675 515 : return OGRParquetLayerBase::TestCapability(pszCap);
2676 : }
2677 :
2678 : /************************************************************************/
2679 : /* GetMetadataItem() */
2680 : /************************************************************************/
2681 :
2682 322 : const char *OGRParquetLayer::GetMetadataItem(const char *pszName,
2683 : const char *pszDomain)
2684 : {
2685 : // Mostly for unit test purposes
2686 322 : if (pszDomain != nullptr && EQUAL(pszDomain, "_PARQUET_"))
2687 : {
2688 11 : int nRowGroupIdx = -1;
2689 11 : int nColumn = -1;
2690 11 : if (EQUAL(pszName, "NUM_ROW_GROUPS"))
2691 : {
2692 3 : return CPLSPrintf("%d", m_poArrowReader->num_row_groups());
2693 : }
2694 8 : if (EQUAL(pszName, "CREATOR"))
2695 : {
2696 4 : return CPLSPrintf("%s", m_poArrowReader->parquet_reader()
2697 4 : ->metadata()
2698 2 : ->created_by()
2699 2 : .c_str());
2700 : }
2701 12 : else if (sscanf(pszName, "ROW_GROUPS[%d]", &nRowGroupIdx) == 1 &&
2702 6 : strstr(pszName, ".NUM_ROWS"))
2703 : {
2704 : try
2705 : {
2706 : auto poRowGroup =
2707 6 : m_poArrowReader->parquet_reader()->RowGroup(nRowGroupIdx);
2708 3 : if (poRowGroup == nullptr)
2709 0 : return nullptr;
2710 3 : return CPLSPrintf("%" PRId64,
2711 3 : poRowGroup->metadata()->num_rows());
2712 : }
2713 0 : catch (const std::exception &)
2714 : {
2715 : }
2716 : }
2717 6 : else if (sscanf(pszName, "ROW_GROUPS[%d].COLUMNS[%d]", &nRowGroupIdx,
2718 6 : &nColumn) == 2 &&
2719 3 : strstr(pszName, ".COMPRESSION"))
2720 : {
2721 : try
2722 : {
2723 : auto poRowGroup =
2724 6 : m_poArrowReader->parquet_reader()->RowGroup(nRowGroupIdx);
2725 3 : if (poRowGroup == nullptr)
2726 0 : return nullptr;
2727 6 : auto poColumn = poRowGroup->metadata()->ColumnChunk(nColumn);
2728 3 : return CPLSPrintf("%s", arrow::util::Codec::GetCodecAsString(
2729 3 : poColumn->compression())
2730 3 : .c_str());
2731 : }
2732 0 : catch (const std::exception &)
2733 : {
2734 : }
2735 : }
2736 0 : return nullptr;
2737 : }
2738 311 : if (pszDomain != nullptr && EQUAL(pszDomain, "_PARQUET_METADATA_"))
2739 : {
2740 404 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
2741 202 : const auto &kv_metadata = metadata->key_value_metadata();
2742 202 : if (kv_metadata && kv_metadata->Contains(pszName))
2743 : {
2744 199 : auto metadataItem = kv_metadata->Get(pszName);
2745 199 : if (metadataItem.ok())
2746 : {
2747 199 : return CPLSPrintf("%s", metadataItem->c_str());
2748 : }
2749 : }
2750 3 : return nullptr;
2751 : }
2752 109 : return OGRLayer::GetMetadataItem(pszName, pszDomain);
2753 : }
2754 :
2755 : /************************************************************************/
2756 : /* GetMetadata() */
2757 : /************************************************************************/
2758 :
2759 57 : char **OGRParquetLayer::GetMetadata(const char *pszDomain)
2760 : {
2761 : // Mostly for unit test purposes
2762 57 : if (pszDomain != nullptr && EQUAL(pszDomain, "_PARQUET_METADATA_"))
2763 : {
2764 2 : m_aosFeatherMetadata.Clear();
2765 4 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
2766 2 : const auto &kv_metadata = metadata->key_value_metadata();
2767 2 : if (kv_metadata)
2768 : {
2769 8 : for (const auto &kv : kv_metadata->sorted_pairs())
2770 : {
2771 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
2772 6 : kv.second.c_str());
2773 : }
2774 : }
2775 2 : return m_aosFeatherMetadata.List();
2776 : }
2777 :
2778 : // Mostly for unit test purposes
2779 55 : if (pszDomain != nullptr && EQUAL(pszDomain, "_GDAL_CREATION_OPTIONS_"))
2780 : {
2781 6 : return m_aosCreationOptions.List();
2782 : }
2783 :
2784 49 : return OGRLayer::GetMetadata(pszDomain);
2785 : }
2786 :
2787 : /************************************************************************/
2788 : /* GetArrowStream() */
2789 : /************************************************************************/
2790 :
2791 134 : bool OGRParquetLayer::GetArrowStream(struct ArrowArrayStream *out_stream,
2792 : CSLConstList papszOptions)
2793 : {
2794 : const char *pszMaxFeaturesInBatch =
2795 134 : CSLFetchNameValue(papszOptions, "MAX_FEATURES_IN_BATCH");
2796 134 : if (pszMaxFeaturesInBatch)
2797 : {
2798 14 : int nMaxBatchSize = atoi(pszMaxFeaturesInBatch);
2799 14 : if (nMaxBatchSize <= 0)
2800 0 : nMaxBatchSize = 1;
2801 14 : if (nMaxBatchSize > INT_MAX - 1)
2802 0 : nMaxBatchSize = INT_MAX - 1;
2803 14 : m_poArrowReader->set_batch_size(nMaxBatchSize);
2804 : }
2805 134 : return OGRArrowLayer::GetArrowStream(out_stream, papszOptions);
2806 : }
2807 :
2808 : /************************************************************************/
2809 : /* SetNextByIndex() */
2810 : /************************************************************************/
2811 :
2812 14 : OGRErr OGRParquetLayer::SetNextByIndex(GIntBig nIndex)
2813 : {
2814 14 : if (nIndex < 0)
2815 : {
2816 4 : m_bEOF = true;
2817 4 : return OGRERR_NON_EXISTING_FEATURE;
2818 : }
2819 :
2820 20 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
2821 10 : if (nIndex >= metadata->num_rows())
2822 : {
2823 4 : m_bEOF = true;
2824 4 : return OGRERR_NON_EXISTING_FEATURE;
2825 : }
2826 :
2827 6 : m_bEOF = false;
2828 :
2829 6 : if (m_bSingleBatch)
2830 : {
2831 0 : ResetReading();
2832 0 : m_nIdxInBatch = nIndex;
2833 0 : m_nFeatureIdx = nIndex;
2834 0 : return OGRERR_NONE;
2835 : }
2836 :
2837 6 : const int nNumGroups = m_poArrowReader->num_row_groups();
2838 6 : int64_t nAccRows = 0;
2839 6 : const auto nBatchSize = m_poArrowReader->properties().batch_size();
2840 6 : m_iRecordBatch = -1;
2841 6 : ResetReading();
2842 6 : m_iRecordBatch = 0;
2843 7 : for (int iGroup = 0; iGroup < nNumGroups; ++iGroup)
2844 : {
2845 : const int64_t nNextAccRows =
2846 7 : nAccRows + metadata->RowGroup(iGroup)->num_rows();
2847 7 : if (nIndex < nNextAccRows)
2848 : {
2849 6 : if (!CreateRecordBatchReader(iGroup))
2850 0 : return OGRERR_FAILURE;
2851 :
2852 12 : std::shared_ptr<arrow::RecordBatch> poBatch;
2853 : while (true)
2854 : {
2855 6 : auto status = m_poRecordBatchReader->ReadNext(&poBatch);
2856 6 : if (!status.ok())
2857 : {
2858 0 : CPLError(CE_Failure, CPLE_AppDefined,
2859 0 : "ReadNext() failed: %s", status.message().c_str());
2860 0 : m_iRecordBatch = -1;
2861 0 : ResetReading();
2862 0 : return OGRERR_FAILURE;
2863 : }
2864 6 : if (poBatch == nullptr)
2865 : {
2866 0 : m_iRecordBatch = -1;
2867 0 : ResetReading();
2868 0 : return OGRERR_FAILURE;
2869 : }
2870 6 : if (nIndex < nAccRows + poBatch->num_rows())
2871 : {
2872 6 : break;
2873 : }
2874 0 : nAccRows += poBatch->num_rows();
2875 0 : m_iRecordBatch++;
2876 0 : }
2877 6 : m_nIdxInBatch = nIndex - nAccRows;
2878 6 : m_nFeatureIdx = nIndex;
2879 6 : SetBatch(poBatch);
2880 6 : return OGRERR_NONE;
2881 : }
2882 1 : nAccRows = nNextAccRows;
2883 1 : m_iRecordBatch +=
2884 1 : (metadata->RowGroup(iGroup)->num_rows() + nBatchSize - 1) /
2885 : nBatchSize;
2886 : }
2887 :
2888 0 : m_iRecordBatch = -1;
2889 0 : ResetReading();
2890 0 : return OGRERR_FAILURE;
2891 : }
2892 :
2893 : /***********************************************************************/
2894 : /* GetStats() */
2895 : /***********************************************************************/
2896 :
2897 : template <class STAT_TYPE> struct GetStats
2898 : {
2899 : using T = typename STAT_TYPE::T;
2900 :
2901 521 : static T min(const std::shared_ptr<parquet::FileMetaData> &metadata,
2902 : const int iRowGroup, const int numRowGroups, const int iCol,
2903 : bool &bFound)
2904 : {
2905 521 : T v{};
2906 521 : bFound = false;
2907 1050 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2908 : {
2909 565 : const auto columnChunk =
2910 30 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2911 : ->ColumnChunk(iCol);
2912 535 : const auto colStats = columnChunk->statistics();
2913 1067 : if (columnChunk->is_stats_set() && colStats &&
2914 532 : colStats->HasMinMax())
2915 : {
2916 526 : auto castStats = static_cast<STAT_TYPE *>(colStats.get());
2917 526 : const auto rowGroupVal = castStats->min();
2918 526 : if (i == 0 || rowGroupVal < v)
2919 : {
2920 514 : bFound = true;
2921 514 : v = rowGroupVal;
2922 : }
2923 : }
2924 9 : else if (columnChunk->num_values() > 0)
2925 : {
2926 6 : bFound = false;
2927 6 : break;
2928 : }
2929 : }
2930 521 : return v;
2931 : }
2932 :
2933 510 : static T max(const std::shared_ptr<parquet::FileMetaData> &metadata,
2934 : const int iRowGroup, const int numRowGroups, const int iCol,
2935 : bool &bFound)
2936 : {
2937 510 : T v{};
2938 510 : bFound = false;
2939 1034 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2940 : {
2941 554 : const auto columnChunk =
2942 30 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2943 : ->ColumnChunk(iCol);
2944 524 : const auto colStats = columnChunk->statistics();
2945 1046 : if (columnChunk->is_stats_set() && colStats &&
2946 522 : colStats->HasMinMax())
2947 : {
2948 522 : auto castStats = static_cast<STAT_TYPE *>(colStats.get());
2949 522 : const auto rowGroupVal = castStats->max();
2950 522 : if (i == 0 || rowGroupVal > v)
2951 : {
2952 520 : bFound = true;
2953 520 : v = rowGroupVal;
2954 : }
2955 : }
2956 2 : else if (columnChunk->num_values() > 0)
2957 : {
2958 0 : bFound = false;
2959 0 : break;
2960 : }
2961 : }
2962 510 : return v;
2963 : }
2964 : };
2965 :
2966 : template <> struct GetStats<parquet::ByteArrayStatistics>
2967 : {
2968 : static std::string
2969 39 : min(const std::shared_ptr<parquet::FileMetaData> &metadata,
2970 : const int iRowGroup, const int numRowGroups, const int iCol,
2971 : bool &bFound)
2972 : {
2973 39 : std::string v{};
2974 39 : bFound = false;
2975 79 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2976 : {
2977 : const auto columnChunk =
2978 40 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2979 80 : ->ColumnChunk(iCol);
2980 80 : const auto colStats = columnChunk->statistics();
2981 80 : if (columnChunk->is_stats_set() && colStats &&
2982 40 : colStats->HasMinMax())
2983 : {
2984 : auto castStats =
2985 40 : static_cast<parquet::ByteArrayStatistics *>(colStats.get());
2986 40 : const auto rowGroupValRaw = castStats->min();
2987 : std::string rowGroupVal(
2988 40 : reinterpret_cast<const char *>(rowGroupValRaw.ptr),
2989 80 : rowGroupValRaw.len);
2990 40 : if (i == 0 || rowGroupVal < v)
2991 : {
2992 39 : bFound = true;
2993 39 : v = std::move(rowGroupVal);
2994 : }
2995 : }
2996 : }
2997 39 : return v;
2998 : }
2999 :
3000 : static std::string
3001 39 : max(const std::shared_ptr<parquet::FileMetaData> &metadata,
3002 : const int iRowGroup, const int numRowGroups, const int iCol,
3003 : bool &bFound)
3004 : {
3005 39 : std::string v{};
3006 39 : bFound = false;
3007 79 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
3008 : {
3009 : const auto columnChunk =
3010 40 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
3011 40 : ->ColumnChunk(iCol);
3012 40 : const auto colStats = columnChunk->statistics();
3013 80 : if (columnChunk->is_stats_set() && colStats &&
3014 40 : colStats->HasMinMax())
3015 : {
3016 : auto castStats =
3017 40 : static_cast<parquet::ByteArrayStatistics *>(colStats.get());
3018 40 : const auto rowGroupValRaw = castStats->max();
3019 : std::string rowGroupVal(
3020 40 : reinterpret_cast<const char *>(rowGroupValRaw.ptr),
3021 80 : rowGroupValRaw.len);
3022 40 : if (i == 0 || rowGroupVal > v)
3023 : {
3024 40 : bFound = true;
3025 40 : v = std::move(rowGroupVal);
3026 : }
3027 : }
3028 : else
3029 : {
3030 0 : bFound = false;
3031 0 : break;
3032 : }
3033 : }
3034 39 : return v;
3035 : }
3036 : };
3037 :
3038 : /************************************************************************/
3039 : /* GetMinMaxForOGRField() */
3040 : /************************************************************************/
3041 :
3042 258 : bool OGRParquetLayer::GetMinMaxForOGRField(int iRowGroup, // -1 for all
3043 : int iOGRField, bool bComputeMin,
3044 : OGRField &sMin, bool &bFoundMin,
3045 : bool bComputeMax, OGRField &sMax,
3046 : bool &bFoundMax, OGRFieldType &eType,
3047 : OGRFieldSubType &eSubType,
3048 : std::string &osMinTmp,
3049 : std::string &osMaxTmp) const
3050 : {
3051 258 : OGR_RawField_SetNull(&sMin);
3052 258 : OGR_RawField_SetNull(&sMax);
3053 258 : eType = OFTReal;
3054 258 : eSubType = OFSTNone;
3055 258 : bFoundMin = false;
3056 258 : bFoundMax = false;
3057 :
3058 : const int iCol = iOGRField == OGR_FID_INDEX
3059 511 : ? m_iFIDParquetColumn
3060 253 : : GetMapFieldIndexToParquetColumn()[iOGRField];
3061 258 : if (iCol < 0)
3062 0 : return false;
3063 : const auto &arrowType = iOGRField == OGR_FID_INDEX
3064 258 : ? m_poFIDType
3065 253 : : GetArrowFieldTypes()[iOGRField];
3066 :
3067 258 : const bool bRet = GetMinMaxForParquetCol(
3068 : iRowGroup, iCol, arrowType, bComputeMin, sMin, bFoundMin, bComputeMax,
3069 : sMax, bFoundMax, eType, eSubType, osMinTmp, osMaxTmp);
3070 :
3071 258 : if (eType == OFTInteger64 && arrowType->id() == arrow::Type::TIMESTAMP)
3072 : {
3073 : const OGRFieldDefn oDummyFIDFieldDefn(m_osFIDColumn.c_str(),
3074 4 : OFTInteger64);
3075 : const OGRFieldDefn *poFieldDefn =
3076 2 : iOGRField == OGR_FID_INDEX ? &oDummyFIDFieldDefn
3077 : : const_cast<OGRParquetLayer *>(this)
3078 2 : ->GetLayerDefn()
3079 2 : ->GetFieldDefn(iOGRField);
3080 2 : if (poFieldDefn->GetType() == OFTDateTime)
3081 : {
3082 : const auto timestampType =
3083 2 : static_cast<arrow::TimestampType *>(arrowType.get());
3084 2 : if (bFoundMin)
3085 : {
3086 1 : const int64_t timestamp = sMin.Integer64;
3087 1 : OGRArrowLayer::TimestampToOGR(timestamp, timestampType,
3088 : poFieldDefn->GetTZFlag(), &sMin);
3089 : }
3090 2 : if (bFoundMax)
3091 : {
3092 1 : const int64_t timestamp = sMax.Integer64;
3093 1 : OGRArrowLayer::TimestampToOGR(timestamp, timestampType,
3094 : poFieldDefn->GetTZFlag(), &sMax);
3095 : }
3096 2 : eType = OFTDateTime;
3097 : }
3098 : }
3099 :
3100 258 : return bRet;
3101 : }
3102 :
3103 : /************************************************************************/
3104 : /* GetMinMaxForParquetCol() */
3105 : /************************************************************************/
3106 :
3107 887 : bool OGRParquetLayer::GetMinMaxForParquetCol(
3108 : int iRowGroup, // -1 for all
3109 : int iCol,
3110 : const std::shared_ptr<arrow::DataType> &arrowType, // potentially nullptr
3111 : bool bComputeMin, OGRField &sMin, bool &bFoundMin, bool bComputeMax,
3112 : OGRField &sMax, bool &bFoundMax, OGRFieldType &eType,
3113 : OGRFieldSubType &eSubType, std::string &osMinTmp,
3114 : std::string &osMaxTmp) const
3115 : {
3116 887 : OGR_RawField_SetNull(&sMin);
3117 887 : OGR_RawField_SetNull(&sMax);
3118 887 : eType = OFTReal;
3119 887 : eSubType = OFSTNone;
3120 887 : bFoundMin = false;
3121 887 : bFoundMax = false;
3122 :
3123 1774 : const auto metadata = GetReader()->parquet_reader()->metadata();
3124 887 : const auto numRowGroups = metadata->num_row_groups();
3125 :
3126 887 : if (numRowGroups == 0)
3127 0 : return false;
3128 :
3129 1774 : const auto rowGroup0 = metadata->RowGroup(0);
3130 887 : if (iCol < 0 || iCol >= rowGroup0->num_columns())
3131 : {
3132 0 : CPLError(CE_Failure, CPLE_AppDefined,
3133 : "GetMinMaxForParquetCol(): invalid iCol=%d", iCol);
3134 0 : return false;
3135 : }
3136 1774 : const auto rowGroup0columnChunk = rowGroup0->ColumnChunk(iCol);
3137 1774 : const auto rowGroup0Stats = rowGroup0columnChunk->statistics();
3138 887 : if (!(rowGroup0columnChunk->is_stats_set() && rowGroup0Stats))
3139 : {
3140 0 : CPLDebug("PARQUET", "Statistics not available for field %s",
3141 0 : rowGroup0columnChunk->path_in_schema()->ToDotString().c_str());
3142 0 : return false;
3143 : }
3144 :
3145 887 : const auto physicalType = rowGroup0Stats->physical_type();
3146 :
3147 887 : if (bComputeMin)
3148 : {
3149 563 : if (physicalType == parquet::Type::BOOLEAN)
3150 : {
3151 54 : eType = OFTInteger;
3152 54 : eSubType = OFSTBoolean;
3153 54 : sMin.Integer = GetStats<parquet::BoolStatistics>::min(
3154 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3155 : }
3156 509 : else if (physicalType == parquet::Type::INT32)
3157 : {
3158 78 : if (arrowType && arrowType->id() == arrow::Type::UINT32)
3159 : {
3160 : // With parquet file version 2.0,
3161 : // statistics of uint32 fields are
3162 : // stored as signed int32 values...
3163 1 : eType = OFTInteger64;
3164 1 : int nVal = GetStats<parquet::Int32Statistics>::min(
3165 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3166 1 : if (bFoundMin)
3167 : {
3168 1 : sMin.Integer64 = static_cast<uint32_t>(nVal);
3169 : }
3170 : }
3171 : else
3172 : {
3173 77 : eType = OFTInteger;
3174 77 : if (arrowType && arrowType->id() == arrow::Type::INT16)
3175 1 : eSubType = OFSTInt16;
3176 77 : sMin.Integer = GetStats<parquet::Int32Statistics>::min(
3177 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3178 : }
3179 : }
3180 431 : else if (physicalType == parquet::Type::INT64)
3181 : {
3182 41 : eType = OFTInteger64;
3183 41 : sMin.Integer64 = GetStats<parquet::Int64Statistics>::min(
3184 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3185 : }
3186 390 : else if (physicalType == parquet::Type::FLOAT)
3187 : {
3188 138 : eType = OFTReal;
3189 138 : eSubType = OFSTFloat32;
3190 138 : sMin.Real = GetStats<parquet::FloatStatistics>::min(
3191 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3192 : }
3193 252 : else if (physicalType == parquet::Type::DOUBLE)
3194 : {
3195 210 : eType = OFTReal;
3196 210 : sMin.Real = GetStats<parquet::DoubleStatistics>::min(
3197 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3198 : }
3199 42 : else if (arrowType &&
3200 53 : (arrowType->id() == arrow::Type::STRING ||
3201 95 : arrowType->id() == arrow::Type::LARGE_STRING) &&
3202 : physicalType == parquet::Type::BYTE_ARRAY)
3203 : {
3204 78 : osMinTmp = GetStats<parquet::ByteArrayStatistics>::min(
3205 39 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3206 39 : if (bFoundMin)
3207 : {
3208 39 : eType = OFTString;
3209 39 : sMin.String = &osMinTmp[0];
3210 : }
3211 : }
3212 : }
3213 :
3214 887 : if (bComputeMax)
3215 : {
3216 552 : if (physicalType == parquet::Type::BOOLEAN)
3217 : {
3218 54 : eType = OFTInteger;
3219 54 : eSubType = OFSTBoolean;
3220 54 : sMax.Integer = GetStats<parquet::BoolStatistics>::max(
3221 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3222 : }
3223 498 : else if (physicalType == parquet::Type::INT32)
3224 : {
3225 78 : if (arrowType && arrowType->id() == arrow::Type::UINT32)
3226 : {
3227 : // With parquet file version 2.0,
3228 : // statistics of uint32 fields are
3229 : // stored as signed int32 values...
3230 1 : eType = OFTInteger64;
3231 1 : int nVal = GetStats<parquet::Int32Statistics>::max(
3232 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3233 1 : if (bFoundMax)
3234 : {
3235 1 : sMax.Integer64 = static_cast<uint32_t>(nVal);
3236 : }
3237 : }
3238 : else
3239 : {
3240 77 : eType = OFTInteger;
3241 77 : if (arrowType && arrowType->id() == arrow::Type::INT16)
3242 1 : eSubType = OFSTInt16;
3243 77 : sMax.Integer = GetStats<parquet::Int32Statistics>::max(
3244 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3245 : }
3246 : }
3247 420 : else if (physicalType == parquet::Type::INT64)
3248 : {
3249 41 : eType = OFTInteger64;
3250 41 : sMax.Integer64 = GetStats<parquet::Int64Statistics>::max(
3251 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3252 : }
3253 379 : else if (physicalType == parquet::Type::FLOAT)
3254 : {
3255 128 : eType = OFTReal;
3256 128 : eSubType = OFSTFloat32;
3257 128 : sMax.Real = GetStats<parquet::FloatStatistics>::max(
3258 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3259 : }
3260 251 : else if (physicalType == parquet::Type::DOUBLE)
3261 : {
3262 209 : eType = OFTReal;
3263 209 : sMax.Real = GetStats<parquet::DoubleStatistics>::max(
3264 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3265 : }
3266 42 : else if (arrowType &&
3267 53 : (arrowType->id() == arrow::Type::STRING ||
3268 95 : arrowType->id() == arrow::Type::LARGE_STRING) &&
3269 : physicalType == parquet::Type::BYTE_ARRAY)
3270 : {
3271 78 : osMaxTmp = GetStats<parquet::ByteArrayStatistics>::max(
3272 39 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3273 39 : if (bFoundMax)
3274 : {
3275 39 : eType = OFTString;
3276 39 : sMax.String = &osMaxTmp[0];
3277 : }
3278 : }
3279 : }
3280 :
3281 887 : return bFoundMin || bFoundMax;
3282 : }
3283 :
3284 : /************************************************************************/
3285 : /* GeomColsBBOXParquet() */
3286 : /************************************************************************/
3287 :
3288 : /** Return for a given geometry column (iGeom: in [0, GetGeomFieldCount()-1] range),
3289 : * the Parquet column number of the corresponding xmin,ymin,xmax,ymax bounding
3290 : * box columns, if existing.
3291 : */
3292 1 : bool OGRParquetLayer::GeomColsBBOXParquet(int iGeom, int &iParquetXMin,
3293 : int &iParquetYMin, int &iParquetXMax,
3294 : int &iParquetYMax) const
3295 : {
3296 1 : const auto oIter = m_oMapGeomFieldIndexToGeomColBBOXParquet.find(iGeom);
3297 : const bool bFound =
3298 1 : (oIter != m_oMapGeomFieldIndexToGeomColBBOXParquet.end());
3299 1 : if (bFound)
3300 : {
3301 1 : iParquetXMin = oIter->second.iParquetXMin;
3302 1 : iParquetYMin = oIter->second.iParquetYMin;
3303 1 : iParquetXMax = oIter->second.iParquetXMax;
3304 1 : iParquetYMax = oIter->second.iParquetYMax;
3305 : }
3306 1 : return bFound;
3307 : }
|