Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "cpl_json.h"
14 : #include "cpl_time.h"
15 : #include "cpl_multiproc.h"
16 : #include "gdal_pam.h"
17 : #include "ogrsf_frmts.h"
18 : #include "ogr_p.h"
19 :
20 : #include <algorithm>
21 : #include <cinttypes>
22 : #include <cmath>
23 : #include <limits>
24 : #include <map>
25 : #include <set>
26 : #include <utility>
27 :
28 : #include "ogr_parquet.h"
29 :
30 : #include "../arrow_common/ograrrowlayer.hpp"
31 : #include "../arrow_common/ograrrowdataset.hpp"
32 :
33 : /************************************************************************/
34 : /* OGRParquetLayerBase() */
35 : /************************************************************************/
36 :
37 1136 : OGRParquetLayerBase::OGRParquetLayerBase(OGRParquetDataset *poDS,
38 : const char *pszLayerName,
39 1136 : CSLConstList papszOpenOptions)
40 : : OGRArrowLayer(poDS, pszLayerName), m_poDS(poDS),
41 : m_aosGeomPossibleNames(CSLTokenizeString2(
42 : CSLFetchNameValueDef(papszOpenOptions, "GEOM_POSSIBLE_NAMES",
43 : "geometry,wkb_geometry,wkt_geometry"),
44 : ",", 0)),
45 1136 : m_osCRS(CSLFetchNameValueDef(papszOpenOptions, "CRS", ""))
46 : {
47 1136 : }
48 :
49 : /************************************************************************/
50 : /* GetDataset() */
51 : /************************************************************************/
52 :
53 27 : GDALDataset *OGRParquetLayerBase::GetDataset()
54 : {
55 27 : return m_poDS;
56 : }
57 :
58 : /************************************************************************/
59 : /* ResetReading() */
60 : /************************************************************************/
61 :
62 6297 : void OGRParquetLayerBase::ResetReading()
63 : {
64 6297 : if (m_iRecordBatch != 0)
65 : {
66 5976 : m_poRecordBatchReader.reset();
67 : }
68 6297 : OGRArrowLayer::ResetReading();
69 6297 : }
70 :
71 : /************************************************************************/
72 : /* InvalidateCachedBatches() */
73 : /************************************************************************/
74 :
75 1685 : void OGRParquetLayerBase::InvalidateCachedBatches()
76 : {
77 1685 : m_iRecordBatch = -1;
78 1685 : ResetReading();
79 1685 : }
80 :
81 : /************************************************************************/
82 : /* LoadGeoMetadata() */
83 : /************************************************************************/
84 :
85 1136 : void OGRParquetLayerBase::LoadGeoMetadata(
86 : const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata)
87 : {
88 1136 : if (kv_metadata && kv_metadata->Contains("geo"))
89 : {
90 2158 : auto geo = kv_metadata->Get("geo");
91 1079 : if (geo.ok())
92 : {
93 1079 : CPLDebug("PARQUET", "geo = %s", geo->c_str());
94 2158 : CPLJSONDocument oDoc;
95 1079 : if (oDoc.LoadMemory(*geo))
96 : {
97 2156 : auto oRoot = oDoc.GetRoot();
98 3234 : const auto osVersion = oRoot.GetString("version");
99 2488 : if (osVersion != "0.1.0" && osVersion != "0.2.0" &&
100 2114 : osVersion != "0.3.0" && osVersion != "0.4.0" &&
101 2110 : osVersion != "1.0.0-beta.1" && osVersion != "1.0.0-rc.1" &&
102 2486 : osVersion != "1.0.0" && osVersion != "1.1.0")
103 : {
104 1 : CPLDebug(
105 : "PARQUET",
106 : "version = %s not explicitly handled by the driver",
107 : osVersion.c_str());
108 : }
109 :
110 3234 : auto oColumns = oRoot.GetObj("columns");
111 1078 : if (oColumns.IsValid())
112 : {
113 2171 : for (const auto &oColumn : oColumns.GetChildren())
114 : {
115 1094 : m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
116 : }
117 : }
118 : }
119 : else
120 : {
121 1 : CPLError(CE_Warning, CPLE_AppDefined,
122 : "Cannot parse 'geo' metadata");
123 : }
124 : }
125 : }
126 1136 : }
127 :
128 : /************************************************************************/
129 : /* ParseGeometryColumnCovering() */
130 : /************************************************************************/
131 :
132 : //! Parse bounding box column definition
133 : /*static */
134 2178 : bool OGRParquetLayerBase::ParseGeometryColumnCovering(
135 : const CPLJSONObject &oJSONDef, std::string &osBBOXColumn,
136 : std::string &osXMin, std::string &osYMin, std::string &osXMax,
137 : std::string &osYMax)
138 : {
139 6534 : const auto oCovering = oJSONDef["covering"];
140 3039 : if (oCovering.IsValid() &&
141 861 : oCovering.GetType() == CPLJSONObject::Type::Object)
142 : {
143 1722 : const auto oBBOX = oCovering["bbox"];
144 861 : if (oBBOX.IsValid() && oBBOX.GetType() == CPLJSONObject::Type::Object)
145 : {
146 1722 : const auto oXMin = oBBOX["xmin"];
147 1722 : const auto oYMin = oBBOX["ymin"];
148 1722 : const auto oXMax = oBBOX["xmax"];
149 1722 : const auto oYMax = oBBOX["ymax"];
150 1722 : if (oXMin.IsValid() && oYMin.IsValid() && oXMax.IsValid() &&
151 861 : oYMax.IsValid() &&
152 861 : oXMin.GetType() == CPLJSONObject::Type::Array &&
153 861 : oYMin.GetType() == CPLJSONObject::Type::Array &&
154 2583 : oXMax.GetType() == CPLJSONObject::Type::Array &&
155 861 : oYMax.GetType() == CPLJSONObject::Type::Array)
156 : {
157 861 : const auto osXMinArray = oXMin.ToArray();
158 861 : const auto osYMinArray = oYMin.ToArray();
159 861 : const auto osXMaxArray = oXMax.ToArray();
160 861 : const auto osYMaxArray = oYMax.ToArray();
161 861 : if (osXMinArray.Size() == 2 && osYMinArray.Size() == 2 &&
162 861 : osXMaxArray.Size() == 2 && osYMaxArray.Size() == 2 &&
163 1722 : osXMinArray[0].GetType() == CPLJSONObject::Type::String &&
164 1722 : osXMinArray[1].GetType() == CPLJSONObject::Type::String &&
165 1722 : osYMinArray[0].GetType() == CPLJSONObject::Type::String &&
166 1722 : osYMinArray[1].GetType() == CPLJSONObject::Type::String &&
167 1722 : osXMaxArray[0].GetType() == CPLJSONObject::Type::String &&
168 1722 : osXMaxArray[1].GetType() == CPLJSONObject::Type::String &&
169 1722 : osYMaxArray[0].GetType() == CPLJSONObject::Type::String &&
170 2583 : osYMaxArray[1].GetType() == CPLJSONObject::Type::String &&
171 2583 : osXMinArray[0].ToString() == osYMinArray[0].ToString() &&
172 3444 : osXMinArray[0].ToString() == osXMaxArray[0].ToString() &&
173 1722 : osXMinArray[0].ToString() == osYMaxArray[0].ToString())
174 : {
175 861 : osBBOXColumn = osXMinArray[0].ToString();
176 861 : osXMin = osXMinArray[1].ToString();
177 861 : osYMin = osYMinArray[1].ToString();
178 861 : osXMax = osXMaxArray[1].ToString();
179 861 : osYMax = osYMaxArray[1].ToString();
180 861 : return true;
181 : }
182 : }
183 : }
184 : }
185 1317 : return false;
186 : }
187 :
188 : /************************************************************************/
189 : /* DealWithGeometryColumn() */
190 : /************************************************************************/
191 :
192 31926 : bool OGRParquetLayerBase::DealWithGeometryColumn(
193 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
194 : std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun,
195 : [[maybe_unused]] const parquet::ColumnDescriptor *parquetColumn,
196 : [[maybe_unused]] const parquet::FileMetaData *metadata,
197 : [[maybe_unused]] int iColumn)
198 : {
199 63852 : const auto &field_kv_metadata = field->metadata();
200 63852 : std::string osExtensionName;
201 31926 : if (field_kv_metadata)
202 : {
203 84 : auto extension_name = field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
204 42 : if (extension_name.ok())
205 : {
206 2 : osExtensionName = *extension_name;
207 : }
208 : #ifdef DEBUG
209 42 : CPLDebug("PARQUET", "Metadata field %s:", field->name().c_str());
210 45 : for (const auto &keyValue : field_kv_metadata->sorted_pairs())
211 : {
212 3 : CPLDebug("PARQUET", " %s = %s", keyValue.first.c_str(),
213 : keyValue.second.c_str());
214 : }
215 : #endif
216 : }
217 :
218 31926 : bool bRegularField = true;
219 31926 : auto oIter = m_oMapGeometryColumns.find(field->name());
220 31926 : if (oIter != m_oMapGeometryColumns.end() ||
221 62758 : STARTS_WITH(osExtensionName.c_str(), "ogc.") ||
222 30832 : STARTS_WITH(osExtensionName.c_str(), "geoarrow."))
223 : {
224 2190 : CPLJSONObject oJSONDef;
225 1095 : if (oIter != m_oMapGeometryColumns.end())
226 1094 : oJSONDef = oIter->second;
227 3285 : auto osEncoding = oJSONDef.GetString("encoding");
228 1095 : if (osEncoding.empty() && !osExtensionName.empty())
229 1 : osEncoding = osExtensionName;
230 :
231 1095 : OGRwkbGeometryType eGeomType = wkbUnknown;
232 1095 : auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
233 1095 : if (IsValidGeometryEncoding(field, osEncoding,
234 2190 : oIter != m_oMapGeometryColumns.end(),
235 : eGeomType, eGeomEncoding))
236 : {
237 1095 : bRegularField = false;
238 2190 : OGRGeomFieldDefn oField(field->name().c_str(), wkbUnknown);
239 :
240 3285 : auto oCRS = oJSONDef["crs"];
241 1095 : OGRSpatialReference *poSRS = nullptr;
242 1095 : if (!oCRS.IsValid())
243 : {
244 40 : if (!m_oMapGeometryColumns.empty())
245 : {
246 : // WGS 84 is implied if no crs member is found.
247 39 : poSRS = new OGRSpatialReference();
248 39 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
249 39 : poSRS->importFromEPSG(4326);
250 : }
251 : }
252 1055 : else if (oCRS.GetType() == CPLJSONObject::Type::String)
253 : {
254 1116 : const auto osWKT = oCRS.ToString();
255 372 : poSRS = new OGRSpatialReference();
256 372 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
257 :
258 372 : if (poSRS->importFromWkt(osWKT.c_str()) != OGRERR_NONE)
259 : {
260 0 : poSRS->Release();
261 0 : poSRS = nullptr;
262 : }
263 : }
264 683 : else if (oCRS.GetType() == CPLJSONObject::Type::Object)
265 : {
266 : // CRS encoded as PROJJSON (extension)
267 75 : const auto oType = oCRS["type"];
268 50 : if (oType.IsValid() &&
269 25 : oType.GetType() == CPLJSONObject::Type::String)
270 : {
271 75 : const auto osType = oType.ToString();
272 25 : if (osType.find("CRS") != std::string::npos)
273 : {
274 25 : poSRS = new OGRSpatialReference();
275 25 : poSRS->SetAxisMappingStrategy(
276 : OAMS_TRADITIONAL_GIS_ORDER);
277 :
278 50 : if (poSRS->SetFromUserInput(
279 50 : oCRS.ToString().c_str(),
280 : OGRSpatialReference::
281 25 : SET_FROM_USER_INPUT_LIMITATIONS_get()) !=
282 : OGRERR_NONE)
283 : {
284 0 : poSRS->Release();
285 0 : poSRS = nullptr;
286 : }
287 : }
288 : }
289 : }
290 :
291 1095 : if (poSRS)
292 : {
293 436 : const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
294 436 : if (dfCoordEpoch > 0)
295 4 : poSRS->SetCoordinateEpoch(dfCoordEpoch);
296 :
297 436 : oField.SetSpatialRef(poSRS);
298 :
299 436 : poSRS->Release();
300 : }
301 :
302 1095 : if (!m_osCRS.empty())
303 : {
304 0 : poSRS = new OGRSpatialReference();
305 0 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
306 0 : if (poSRS->SetFromUserInput(
307 : m_osCRS.c_str(),
308 : OGRSpatialReference::
309 0 : SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
310 : OGRERR_NONE)
311 : {
312 0 : oField.SetSpatialRef(poSRS);
313 : }
314 0 : poSRS->Release();
315 : }
316 :
317 1095 : if (oJSONDef.GetString("edges") == "spherical")
318 : {
319 2 : SetMetadataItem("EDGES", "SPHERICAL");
320 : }
321 :
322 : // m_aeGeomEncoding be filled before calling
323 : // ComputeGeometryColumnType()
324 1095 : m_aeGeomEncoding.push_back(eGeomEncoding);
325 1095 : if (eGeomType == wkbUnknown)
326 : {
327 : // geometry_types since 1.0.0-beta1. Was geometry_type
328 : // before
329 1689 : auto oType = oJSONDef.GetObj("geometry_types");
330 563 : if (!oType.IsValid())
331 376 : oType = oJSONDef.GetObj("geometry_type");
332 563 : if (oType.GetType() == CPLJSONObject::Type::String)
333 : {
334 : // string is no longer valid since 1.0.0-beta1
335 3 : const auto osType = oType.ToString();
336 1 : if (osType != "Unknown")
337 1 : eGeomType = GetGeometryTypeFromString(osType);
338 : }
339 562 : else if (oType.GetType() == CPLJSONObject::Type::Array)
340 : {
341 374 : const auto oTypeArray = oType.ToArray();
342 187 : if (oTypeArray.Size() == 1)
343 : {
344 85 : eGeomType =
345 85 : GetGeometryTypeFromString(oTypeArray[0].ToString());
346 : }
347 102 : else if (oTypeArray.Size() > 1)
348 : {
349 : const auto PromoteToCollection =
350 271 : [](OGRwkbGeometryType eType)
351 : {
352 271 : if (eType == wkbPoint)
353 41 : return wkbMultiPoint;
354 230 : if (eType == wkbLineString)
355 36 : return wkbMultiLineString;
356 194 : if (eType == wkbPolygon)
357 52 : return wkbMultiPolygon;
358 142 : return eType;
359 : };
360 51 : bool bMixed = false;
361 51 : bool bHasMulti = false;
362 51 : bool bHasZ = false;
363 51 : bool bHasM = false;
364 : const auto eFirstType =
365 51 : OGR_GT_Flatten(GetGeometryTypeFromString(
366 102 : oTypeArray[0].ToString()));
367 : const auto eFirstTypeCollection =
368 51 : PromoteToCollection(eFirstType);
369 145 : for (int i = 0; i < oTypeArray.Size(); ++i)
370 : {
371 126 : const auto eThisGeom = GetGeometryTypeFromString(
372 252 : oTypeArray[i].ToString());
373 126 : if (PromoteToCollection(OGR_GT_Flatten(
374 126 : eThisGeom)) != eFirstTypeCollection)
375 : {
376 32 : bMixed = true;
377 32 : break;
378 : }
379 94 : bHasZ |= OGR_GT_HasZ(eThisGeom) != FALSE;
380 94 : bHasM |= OGR_GT_HasM(eThisGeom) != FALSE;
381 94 : bHasMulti |=
382 94 : (PromoteToCollection(OGR_GT_Flatten(
383 94 : eThisGeom)) == OGR_GT_Flatten(eThisGeom));
384 : }
385 51 : if (!bMixed)
386 : {
387 19 : if (eFirstTypeCollection == wkbMultiPolygon ||
388 : eFirstTypeCollection == wkbMultiLineString)
389 : {
390 18 : if (bHasMulti)
391 18 : eGeomType = OGR_GT_SetModifier(
392 : eFirstTypeCollection, bHasZ, bHasM);
393 : else
394 0 : eGeomType = OGR_GT_SetModifier(
395 : eFirstType, bHasZ, bHasM);
396 : }
397 : }
398 : }
399 : }
400 375 : else if (CPLTestBool(CPLGetConfigOption(
401 : "OGR_PARQUET_COMPUTE_GEOMETRY_TYPE", "YES")))
402 : {
403 375 : eGeomType = computeGeometryTypeFun();
404 : }
405 : }
406 :
407 1095 : oField.SetType(eGeomType);
408 1095 : oField.SetNullable(field->nullable());
409 1095 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
410 1095 : m_anMapGeomFieldIndexToArrowColumn.push_back(iFieldIdx);
411 : }
412 : }
413 :
414 31926 : std::shared_ptr<arrow::DataType> fieldType = field->type();
415 31926 : auto fieldTypeId = fieldType->id();
416 : #if PARQUET_VERSION_MAJOR >= 21
417 : // Try to detect Arrow >= 21 GEOMETRY/GEOGRAPHY logical type
418 : if (bRegularField && fieldTypeId == arrow::Type::EXTENSION)
419 : {
420 : auto extensionType =
421 : cpl::down_cast<arrow::ExtensionType *>(fieldType.get());
422 : osExtensionName = extensionType->extension_name();
423 : if (osExtensionName == EXTENSION_NAME_GEOARROW_WKB)
424 : {
425 : const auto arrowWkb =
426 : dynamic_cast<const OGRGeoArrowWkbExtensionType *>(
427 : extensionType);
428 : #ifdef DEBUG
429 : if (arrowWkb)
430 : {
431 : CPLDebug("PARQUET", "arrowWkb = '%s'",
432 : arrowWkb->Serialize().c_str());
433 : }
434 : #endif
435 :
436 : fieldTypeId = extensionType->storage_type()->id();
437 : if (fieldTypeId == arrow::Type::BINARY ||
438 : fieldTypeId == arrow::Type::LARGE_BINARY)
439 : {
440 : OGRwkbGeometryType eGeomType = wkbUnknown;
441 : bool bSkipRowGroups = false;
442 :
443 : // m_aeGeomEncoding be filled before calling
444 : // ComputeGeometryColumnType()
445 : bRegularField = false;
446 : m_aeGeomEncoding.push_back(OGRArrowGeomEncoding::WKB);
447 :
448 : std::string crs(m_osCRS);
449 : if (parquetColumn && crs.empty())
450 : {
451 : const auto &logicalType = parquetColumn->logical_type();
452 : if (logicalType->is_geometry())
453 : {
454 : crs = static_cast<const parquet::GeometryLogicalType *>(
455 : logicalType.get())
456 : ->crs();
457 : if (crs.empty())
458 : crs = "EPSG:4326";
459 : CPLDebugOnly("PARQUET", "GeometryLogicalType crs=%s",
460 : crs.c_str());
461 : }
462 : else if (logicalType->is_geography())
463 : {
464 : const auto *geographyType =
465 : static_cast<const parquet::GeographyLogicalType *>(
466 : logicalType.get());
467 : crs = geographyType->crs();
468 : if (crs.empty())
469 : crs = "EPSG:4326";
470 :
471 : SetMetadataItem(
472 : "EDGES",
473 : CPLString(
474 : std::string(geographyType->algorithm_name()))
475 : .toupper());
476 : CPLDebugOnly("PARQUET", "GeographyLogicalType crs=%s",
477 : crs.c_str());
478 : }
479 : else
480 : {
481 : CPLDebug("PARQUET", "geoarrow.wkb column is neither a "
482 : "geometry or geography one");
483 :
484 : // This might be an old geoarrow.wkb extension...
485 : if (CPLTestBool(CPLGetConfigOption(
486 : "OGR_PARQUET_COMPUTE_GEOMETRY_TYPE", "YES")))
487 : {
488 : eGeomType = computeGeometryTypeFun();
489 : bSkipRowGroups = true;
490 : }
491 : }
492 :
493 : // Cf https://github.com/apache/parquet-format/blob/master/Geospatial.md#crs-customization
494 : // "projjson: PROJJSON, identifier is the name of a table property or a file property where the projjson string is stored."
495 : // Here the property is interpreted as the key of a file metadata (as done in libarrow)
496 : constexpr const char *PROJJSON_PREFIX = "projjson:";
497 : if (cpl::starts_with(crs, PROJJSON_PREFIX) && metadata)
498 : {
499 : auto projjson_value =
500 : metadata->key_value_metadata()->Get(
501 : crs.substr(strlen(PROJJSON_PREFIX)));
502 : if (projjson_value.ok())
503 : {
504 : crs = *projjson_value;
505 : }
506 : else
507 : {
508 : CPLDebug("PARQUET",
509 : "Cannot find file metadata for %s",
510 : crs.c_str());
511 : }
512 : }
513 : }
514 : else if (!parquetColumn && arrowWkb)
515 : {
516 : // For a OGRParquetDatasetLayer for example
517 : const std::string arrowWkbMetadata = arrowWkb->Serialize();
518 : if (arrowWkbMetadata.empty() || arrowWkbMetadata == "{}")
519 : {
520 : crs = "EPSG:4326";
521 : }
522 : else if (arrowWkbMetadata[0] == '{')
523 : {
524 : CPLJSONDocument oDoc;
525 : if (oDoc.LoadMemory(arrowWkbMetadata))
526 : {
527 : auto jCrs = oDoc.GetRoot()["crs"];
528 : if (jCrs.GetType() == CPLJSONObject::Type::Object)
529 : {
530 : crs = jCrs.Format(
531 : CPLJSONObject::PrettyFormat::Plain);
532 : }
533 : else if (jCrs.GetType() ==
534 : CPLJSONObject::Type::String)
535 : {
536 : crs = jCrs.ToString();
537 : }
538 : if (oDoc.GetRoot()["edges"].ToString() ==
539 : "spherical")
540 : {
541 : SetMetadataItem("EDGES", "SPHERICAL");
542 : }
543 : }
544 : }
545 : }
546 :
547 : bool bGeomTypeInvalid = false;
548 : bool bHasMulti = false;
549 : bool bHasZ = false;
550 : bool bHasM = false;
551 : bool bFirst = true;
552 : OGRwkbGeometryType eFirstType = wkbUnknown;
553 : OGRwkbGeometryType eFirstTypeCollection = wkbUnknown;
554 : const auto numRowGroups =
555 : metadata ? metadata->num_row_groups() : 0;
556 : bool bEnvelopeValid = true;
557 : OGREnvelope sEnvelope;
558 : bool bEnvelope3DValid = true;
559 : OGREnvelope3D sEnvelope3D;
560 : for (int iRowGroup = 0;
561 : !bSkipRowGroups && iRowGroup < numRowGroups; ++iRowGroup)
562 : {
563 : const auto columnChunk =
564 : metadata->RowGroup(iRowGroup)->ColumnChunk(iColumn);
565 : if (auto geostats = columnChunk->geo_statistics())
566 : {
567 : double dfMinX =
568 : std::numeric_limits<double>::quiet_NaN();
569 : double dfMinY =
570 : std::numeric_limits<double>::quiet_NaN();
571 : double dfMinZ =
572 : std::numeric_limits<double>::quiet_NaN();
573 : double dfMaxX =
574 : std::numeric_limits<double>::quiet_NaN();
575 : double dfMaxY =
576 : std::numeric_limits<double>::quiet_NaN();
577 : double dfMaxZ =
578 : std::numeric_limits<double>::quiet_NaN();
579 : if (bEnvelopeValid && geostats->dimension_valid()[0] &&
580 : geostats->dimension_valid()[1])
581 : {
582 : dfMinX = geostats->lower_bound()[0];
583 : dfMaxX = geostats->upper_bound()[0];
584 : dfMinY = geostats->lower_bound()[1];
585 : dfMaxY = geostats->upper_bound()[1];
586 :
587 : // Deal as best as we can with wrap around bounding box
588 : if (dfMinX > dfMaxX && std::fabs(dfMinX) <= 180 &&
589 : std::fabs(dfMaxX) <= 180)
590 : {
591 : dfMinX = -180;
592 : dfMaxX = 180;
593 : }
594 :
595 : if (std::isfinite(dfMinX) &&
596 : std::isfinite(dfMaxX) &&
597 : std::isfinite(dfMinY) && std::isfinite(dfMaxY))
598 : {
599 : sEnvelope.Merge(dfMinX, dfMinY);
600 : sEnvelope.Merge(dfMaxX, dfMaxY);
601 : if (bEnvelope3DValid &&
602 : geostats->dimension_valid()[2])
603 : {
604 : dfMinZ = geostats->lower_bound()[2];
605 : dfMaxZ = geostats->upper_bound()[2];
606 : if (std::isfinite(dfMinZ) &&
607 : std::isfinite(dfMaxZ))
608 : {
609 : sEnvelope3D.Merge(dfMinX, dfMinY,
610 : dfMinZ);
611 : sEnvelope3D.Merge(dfMaxX, dfMaxY,
612 : dfMaxZ);
613 : }
614 : }
615 : }
616 : }
617 :
618 : bEnvelopeValid =
619 : bEnvelopeValid && std::isfinite(dfMinX) &&
620 : std::isfinite(dfMaxX) && std::isfinite(dfMinY) &&
621 : std::isfinite(dfMaxY);
622 :
623 : bEnvelope3DValid = bEnvelope3DValid &&
624 : std::isfinite(dfMinZ) &&
625 : std::isfinite(dfMaxZ);
626 :
627 : if (auto geometry_types = geostats->geometry_types())
628 : {
629 : const auto PromoteToCollection =
630 : [](OGRwkbGeometryType eType)
631 : {
632 : if (eType == wkbPoint)
633 : return wkbMultiPoint;
634 : if (eType == wkbLineString)
635 : return wkbMultiLineString;
636 : if (eType == wkbPolygon)
637 : return wkbMultiPolygon;
638 : return eType;
639 : };
640 :
641 : for (int nGeomType : *geometry_types)
642 : {
643 : OGRwkbGeometryType eThisGeom = wkbUnknown;
644 : if ((nGeomType > 0 && nGeomType <= 17) ||
645 : (nGeomType > 2000 && nGeomType <= 2017) ||
646 : (nGeomType > 3000 && nGeomType <= 3017))
647 : {
648 : eThisGeom = static_cast<OGRwkbGeometryType>(
649 : nGeomType);
650 : }
651 : else if (nGeomType > 1000 && nGeomType <= 1017)
652 : {
653 : eThisGeom = OGR_GT_SetZ(
654 : static_cast<OGRwkbGeometryType>(
655 : nGeomType - 1000));
656 : ;
657 : }
658 : else
659 : {
660 : CPLDebug("PARQUET",
661 : "Unknown geometry type: %d",
662 : nGeomType);
663 : bGeomTypeInvalid = true;
664 : break;
665 : }
666 : if (bFirst)
667 : {
668 : bFirst = false;
669 : eFirstType = eThisGeom;
670 : eFirstTypeCollection =
671 : PromoteToCollection(eFirstType);
672 : }
673 : else if (PromoteToCollection(
674 : OGR_GT_Flatten(eThisGeom)) !=
675 : eFirstTypeCollection)
676 : {
677 : bGeomTypeInvalid = true;
678 : break;
679 : }
680 : bHasZ |= OGR_GT_HasZ(eThisGeom) != FALSE;
681 : bHasM |= OGR_GT_HasM(eThisGeom) != FALSE;
682 : bHasMulti |= (PromoteToCollection(
683 : OGR_GT_Flatten(eThisGeom)) ==
684 : OGR_GT_Flatten(eThisGeom));
685 : }
686 : }
687 : }
688 : else
689 : {
690 : bEnvelopeValid = false;
691 : bEnvelope3DValid = false;
692 : bGeomTypeInvalid = true;
693 : }
694 : }
695 :
696 : if (bEnvelopeValid && sEnvelope.IsInit())
697 : {
698 : CPLDebug("PARQUET", "Got bounding box from geo_statistics");
699 : m_geoStatsWithBBOXAvailable.insert(
700 : m_poFeatureDefn->GetGeomFieldCount());
701 : m_oMapExtents[m_poFeatureDefn->GetGeomFieldCount()] =
702 : std::move(sEnvelope);
703 :
704 : if (bEnvelope3DValid && sEnvelope3D.IsInit())
705 : {
706 : CPLDebug("PARQUET",
707 : "Got bounding box 3D from geo_statistics");
708 : m_oMapExtents3D[m_poFeatureDefn->GetGeomFieldCount()] =
709 : std::move(sEnvelope3D);
710 : }
711 : }
712 :
713 : if (!bSkipRowGroups && !bGeomTypeInvalid)
714 : {
715 : if (eFirstTypeCollection == wkbMultiPoint ||
716 : eFirstTypeCollection == wkbMultiPolygon ||
717 : eFirstTypeCollection == wkbMultiLineString)
718 : {
719 : if (bHasMulti)
720 : eGeomType = OGR_GT_SetModifier(eFirstTypeCollection,
721 : bHasZ, bHasM);
722 : else
723 : eGeomType =
724 : OGR_GT_SetModifier(eFirstType, bHasZ, bHasM);
725 : }
726 : }
727 :
728 : OGRGeomFieldDefn oField(field->name().c_str(), eGeomType);
729 : oField.SetNullable(field->nullable());
730 :
731 : if (!crs.empty())
732 : {
733 : // Cf https://github.com/apache/parquet-format/blob/master/Geospatial.md#crs-customization
734 : // "srid: Spatial reference identifier, identifier is the SRID itself.."
735 : constexpr const char *SRID_PREFIX = "srid:";
736 : if (cpl::starts_with(crs, SRID_PREFIX))
737 : {
738 : // When getting the value from the GeometryLogicalType::crs() method
739 : crs = crs.substr(strlen(SRID_PREFIX));
740 : }
741 : if (CPLGetValueType(crs.c_str()) == CPL_VALUE_INTEGER)
742 : {
743 : // Getting here from above if, or if reading the ArrowWkb
744 : // metadata directly (typically from a OGRParquetDatasetLayer)
745 :
746 : // Assumes a SRID code is an EPSG code...
747 : crs = std::string("EPSG:") + crs;
748 : }
749 :
750 : auto poSRS = new OGRSpatialReference();
751 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
752 : if (poSRS->SetFromUserInput(
753 : crs.c_str(),
754 : OGRSpatialReference::
755 : SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
756 : OGRERR_NONE)
757 : {
758 : const char *pszAuthName =
759 : poSRS->GetAuthorityName(nullptr);
760 : const char *pszAuthCode =
761 : poSRS->GetAuthorityCode(nullptr);
762 : if (pszAuthName && pszAuthCode &&
763 : EQUAL(pszAuthName, "OGC") &&
764 : EQUAL(pszAuthCode, "CRS84"))
765 : poSRS->importFromEPSG(4326);
766 : oField.SetSpatialRef(poSRS);
767 : }
768 : poSRS->Release();
769 : }
770 :
771 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
772 : m_anMapGeomFieldIndexToArrowColumn.push_back(iFieldIdx);
773 : }
774 : }
775 : }
776 : #endif
777 :
778 : // Try to autodetect a (WKB) geometry column from the GEOM_POSSIBLE_NAMES
779 : // open option
780 61661 : if (bRegularField && osExtensionName.empty() &&
781 93587 : m_oMapGeometryColumns.empty() &&
782 221 : m_aosGeomPossibleNames.FindString(field->name().c_str()) >= 0)
783 : {
784 11 : if (fieldTypeId == arrow::Type::BINARY ||
785 : fieldTypeId == arrow::Type::LARGE_BINARY)
786 : {
787 5 : CPLDebug("PARQUET",
788 : "Field %s detected as likely WKB geometry field",
789 5 : field->name().c_str());
790 5 : bRegularField = false;
791 5 : m_aeGeomEncoding.push_back(OGRArrowGeomEncoding::WKB);
792 : }
793 0 : else if ((fieldTypeId == arrow::Type::STRING ||
794 16 : fieldTypeId == arrow::Type::LARGE_STRING) &&
795 10 : (field->name().find("wkt") != std::string::npos ||
796 4 : field->name().find("WKT") != std::string::npos))
797 : {
798 2 : CPLDebug("PARQUET",
799 : "Field %s detected as likely WKT geometry field",
800 2 : field->name().c_str());
801 2 : bRegularField = false;
802 2 : m_aeGeomEncoding.push_back(OGRArrowGeomEncoding::WKT);
803 : }
804 11 : if (!bRegularField)
805 : {
806 14 : OGRGeomFieldDefn oField(field->name().c_str(), wkbUnknown);
807 7 : oField.SetNullable(field->nullable());
808 :
809 7 : if (!m_osCRS.empty())
810 : {
811 2 : auto poSRS = new OGRSpatialReference();
812 2 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
813 2 : if (poSRS->SetFromUserInput(
814 : m_osCRS.c_str(),
815 : OGRSpatialReference::
816 2 : SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
817 : OGRERR_NONE)
818 : {
819 2 : oField.SetSpatialRef(poSRS);
820 : }
821 2 : poSRS->Release();
822 : }
823 :
824 7 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
825 7 : m_anMapGeomFieldIndexToArrowColumn.push_back(iFieldIdx);
826 : }
827 : }
828 :
829 63852 : return !bRegularField;
830 : }
831 :
832 : /************************************************************************/
833 : /* TestCapability() */
834 : /************************************************************************/
835 :
836 700 : int OGRParquetLayerBase::TestCapability(const char *pszCap)
837 : {
838 700 : if (EQUAL(pszCap, OLCMeasuredGeometries))
839 24 : return true;
840 :
841 676 : if (EQUAL(pszCap, OLCFastSetNextByIndex))
842 0 : return true;
843 :
844 676 : if (EQUAL(pszCap, OLCFastSpatialFilter))
845 : {
846 49 : if (m_oMapGeomFieldIndexToGeomColBBOX.find(m_iGeomFieldFilter) !=
847 98 : m_oMapGeomFieldIndexToGeomColBBOX.end())
848 : {
849 25 : return true;
850 : }
851 24 : return false;
852 : }
853 :
854 627 : return OGRArrowLayer::TestCapability(pszCap);
855 : }
856 :
857 : /************************************************************************/
858 : /* GetNumCPUs() */
859 : /************************************************************************/
860 :
861 : /* static */
862 1558 : int OGRParquetLayerBase::GetNumCPUs()
863 : {
864 1558 : const char *pszNumThreads = CPLGetConfigOption("GDAL_NUM_THREADS", nullptr);
865 1558 : int nNumThreads = 0;
866 1558 : if (pszNumThreads == nullptr)
867 1558 : nNumThreads = std::min(4, CPLGetNumCPUs());
868 : else
869 0 : nNumThreads = EQUAL(pszNumThreads, "ALL_CPUS") ? CPLGetNumCPUs()
870 0 : : atoi(pszNumThreads);
871 1558 : if (nNumThreads > 1)
872 : {
873 1558 : CPL_IGNORE_RET_VAL(arrow::SetCpuThreadPoolCapacity(nNumThreads));
874 : }
875 1558 : return nNumThreads;
876 : }
877 :
878 : /************************************************************************/
879 : /* OGRParquetLayer() */
880 : /************************************************************************/
881 :
882 863 : OGRParquetLayer::OGRParquetLayer(
883 : OGRParquetDataset *poDS, const char *pszLayerName,
884 : std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
885 863 : CSLConstList papszOpenOptions)
886 : : OGRParquetLayerBase(poDS, pszLayerName, papszOpenOptions),
887 863 : m_poArrowReader(std::move(arrow_reader))
888 : {
889 863 : EstablishFeatureDefn();
890 863 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
891 : m_poFeatureDefn->GetGeomFieldCount());
892 :
893 863 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
894 863 : }
895 :
896 : /************************************************************************/
897 : /* EstablishFeatureDefn() */
898 : /************************************************************************/
899 :
900 863 : void OGRParquetLayer::EstablishFeatureDefn()
901 : {
902 863 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
903 863 : const auto &kv_metadata = metadata->key_value_metadata();
904 :
905 863 : LoadGeoMetadata(kv_metadata);
906 : const auto oMapFieldNameToGDALSchemaFieldDefn =
907 863 : LoadGDALSchema(kv_metadata.get());
908 :
909 863 : LoadGDALMetadata(kv_metadata.get());
910 :
911 863 : if (!m_poArrowReader->GetSchema(&m_poSchema).ok())
912 : {
913 0 : return;
914 : }
915 :
916 : const bool bUseBBOX =
917 863 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_BBOX", "YES"));
918 :
919 : // Keep track of declared bounding box columns in GeoParquet JSON metadata,
920 : // in order not to expose them as regular fields.
921 1726 : std::set<std::string> oSetBBOXColumns;
922 863 : if (bUseBBOX)
923 : {
924 1693 : for (const auto &iter : m_oMapGeometryColumns)
925 : {
926 1670 : std::string osBBOXColumn;
927 1670 : std::string osXMin, osYMin, osXMax, osYMax;
928 835 : if (ParseGeometryColumnCovering(iter.second, osBBOXColumn, osXMin,
929 : osYMin, osXMax, osYMax))
930 : {
931 340 : oSetBBOXColumns.insert(std::move(osBBOXColumn));
932 : }
933 : }
934 : }
935 :
936 863 : const auto &fields = m_poSchema->fields();
937 863 : const auto poParquetSchema = metadata->schema();
938 :
939 : // Map from Parquet column name (with dot separator) to Parquet index
940 1726 : std::map<std::string, int> oMapParquetColumnNameToIdx;
941 863 : const int nParquetColumns = poParquetSchema->num_columns();
942 35306 : for (int iParquetCol = 0; iParquetCol < nParquetColumns; ++iParquetCol)
943 : {
944 34443 : const auto parquetColumn = poParquetSchema->Column(iParquetCol);
945 34443 : const auto parquetColumnName = parquetColumn->path()->ToDotString();
946 34443 : oMapParquetColumnNameToIdx[parquetColumnName] = iParquetCol;
947 : }
948 :
949 : // Synthetize a GeoParquet bounding box column definition when detecting
950 : // a Overture Map dataset < 2024-04-16-beta.0
951 822 : if ((m_oMapGeometryColumns.empty() ||
952 : // Below is for release 2024-01-17-alpha.0
953 1685 : (m_oMapGeometryColumns.find("geometry") !=
954 1685 : m_oMapGeometryColumns.end() &&
955 2170 : !m_oMapGeometryColumns["geometry"].GetObj("covering").IsValid() &&
956 1710 : m_oMapGeometryColumns["geometry"].GetString("encoding") == "WKB")) &&
957 350 : bUseBBOX &&
958 1213 : oMapParquetColumnNameToIdx.find("geometry") !=
959 1530 : oMapParquetColumnNameToIdx.end() &&
960 1180 : oMapParquetColumnNameToIdx.find("bbox.minx") !=
961 1181 : oMapParquetColumnNameToIdx.end() &&
962 864 : oMapParquetColumnNameToIdx.find("bbox.miny") !=
963 865 : oMapParquetColumnNameToIdx.end() &&
964 864 : oMapParquetColumnNameToIdx.find("bbox.maxx") !=
965 2590 : oMapParquetColumnNameToIdx.end() &&
966 864 : oMapParquetColumnNameToIdx.find("bbox.maxy") !=
967 864 : oMapParquetColumnNameToIdx.end())
968 : {
969 2 : CPLJSONObject oDef;
970 1 : if (m_oMapGeometryColumns.find("geometry") !=
971 2 : m_oMapGeometryColumns.end())
972 : {
973 0 : oDef = m_oMapGeometryColumns["geometry"];
974 : }
975 2 : CPLJSONObject oCovering;
976 1 : oDef.Add("covering", oCovering);
977 1 : CPLJSONObject oBBOX;
978 1 : oCovering.Add("bbox", oBBOX);
979 : {
980 1 : CPLJSONArray oArray;
981 1 : oArray.Add("bbox");
982 1 : oArray.Add("minx");
983 1 : oBBOX.Add("xmin", oArray);
984 : }
985 : {
986 1 : CPLJSONArray oArray;
987 1 : oArray.Add("bbox");
988 1 : oArray.Add("miny");
989 1 : oBBOX.Add("ymin", oArray);
990 : }
991 : {
992 1 : CPLJSONArray oArray;
993 1 : oArray.Add("bbox");
994 1 : oArray.Add("maxx");
995 1 : oBBOX.Add("xmax", oArray);
996 : }
997 : {
998 1 : CPLJSONArray oArray;
999 1 : oArray.Add("bbox");
1000 1 : oArray.Add("maxy");
1001 1 : oBBOX.Add("ymax", oArray);
1002 : }
1003 1 : oSetBBOXColumns.insert("bbox");
1004 1 : oDef.Add("encoding", "WKB");
1005 1 : m_oMapGeometryColumns["geometry"] = std::move(oDef);
1006 : }
1007 : // Overture Maps 2024-04-16-beta.0 almost follows GeoParquet 1.1, except
1008 : // they don't declare the "covering" element in the GeoParquet JSON metadata
1009 1724 : else if (m_oMapGeometryColumns.find("geometry") !=
1010 1674 : m_oMapGeometryColumns.end() &&
1011 1616 : bUseBBOX &&
1012 2164 : !m_oMapGeometryColumns["geometry"].GetObj("covering").IsValid() &&
1013 1666 : m_oMapGeometryColumns["geometry"].GetString("encoding") == "WKB" &&
1014 1172 : oMapParquetColumnNameToIdx.find("geometry") !=
1015 1482 : oMapParquetColumnNameToIdx.end() &&
1016 1172 : oMapParquetColumnNameToIdx.find("bbox.xmin") !=
1017 1173 : oMapParquetColumnNameToIdx.end() &&
1018 863 : oMapParquetColumnNameToIdx.find("bbox.ymin") !=
1019 864 : oMapParquetColumnNameToIdx.end() &&
1020 863 : oMapParquetColumnNameToIdx.find("bbox.xmax") !=
1021 3399 : oMapParquetColumnNameToIdx.end() &&
1022 863 : oMapParquetColumnNameToIdx.find("bbox.ymax") !=
1023 863 : oMapParquetColumnNameToIdx.end())
1024 : {
1025 3 : CPLJSONObject oDef = m_oMapGeometryColumns["geometry"];
1026 2 : CPLJSONObject oCovering;
1027 1 : oDef.Add("covering", oCovering);
1028 1 : CPLJSONObject oBBOX;
1029 1 : oCovering.Add("bbox", oBBOX);
1030 : {
1031 1 : CPLJSONArray oArray;
1032 1 : oArray.Add("bbox");
1033 1 : oArray.Add("xmin");
1034 1 : oBBOX.Add("xmin", oArray);
1035 : }
1036 : {
1037 1 : CPLJSONArray oArray;
1038 1 : oArray.Add("bbox");
1039 1 : oArray.Add("ymin");
1040 1 : oBBOX.Add("ymin", oArray);
1041 : }
1042 : {
1043 1 : CPLJSONArray oArray;
1044 1 : oArray.Add("bbox");
1045 1 : oArray.Add("xmax");
1046 1 : oBBOX.Add("xmax", oArray);
1047 : }
1048 : {
1049 1 : CPLJSONArray oArray;
1050 1 : oArray.Add("bbox");
1051 1 : oArray.Add("ymax");
1052 1 : oBBOX.Add("ymax", oArray);
1053 : }
1054 1 : oSetBBOXColumns.insert("bbox");
1055 1 : m_oMapGeometryColumns["geometry"] = std::move(oDef);
1056 : }
1057 :
1058 863 : int iParquetCol = 0;
1059 26336 : for (int i = 0; i < m_poSchema->num_fields(); ++i)
1060 : {
1061 25473 : const auto &field = fields[i];
1062 :
1063 : bool bParquetColValid =
1064 25473 : CheckMatchArrowParquetColumnNames(iParquetCol, field);
1065 25473 : if (!bParquetColValid)
1066 0 : m_bHasMissingMappingToParquet = true;
1067 :
1068 25509 : if (!m_osFIDColumn.empty() && field->name() == m_osFIDColumn &&
1069 36 : (field->type()->id() == arrow::Type::INT32 ||
1070 18 : field->type()->id() == arrow::Type::INT64))
1071 : {
1072 18 : m_poFIDType = field->type();
1073 18 : m_iFIDArrowColumn = i;
1074 18 : if (bParquetColValid)
1075 : {
1076 18 : m_iFIDParquetColumn = iParquetCol;
1077 18 : iParquetCol++;
1078 : }
1079 360 : continue;
1080 : }
1081 :
1082 25455 : if (oSetBBOXColumns.find(field->name()) != oSetBBOXColumns.end())
1083 : {
1084 342 : m_oSetBBoxArrowColumns.insert(i);
1085 342 : if (bParquetColValid)
1086 342 : iParquetCol++;
1087 342 : continue;
1088 : }
1089 :
1090 : const auto ComputeGeometryColumnTypeLambda =
1091 888 : [this, bParquetColValid, iParquetCol, &poParquetSchema]()
1092 : {
1093 : // only with GeoParquet < 0.2.0
1094 592 : if (bParquetColValid &&
1095 296 : poParquetSchema->Column(iParquetCol)->physical_type() ==
1096 : parquet::Type::BYTE_ARRAY)
1097 : {
1098 296 : return ComputeGeometryColumnType(
1099 592 : m_poFeatureDefn->GetGeomFieldCount(), iParquetCol);
1100 : }
1101 0 : return wkbUnknown;
1102 25113 : };
1103 :
1104 75339 : const bool bGeometryField = DealWithGeometryColumn(
1105 : i, field, ComputeGeometryColumnTypeLambda,
1106 25113 : bParquetColValid ? poParquetSchema->Column(iParquetCol) : nullptr,
1107 25113 : metadata.get(), bParquetColValid ? iParquetCol : -1);
1108 25113 : if (bGeometryField)
1109 : {
1110 847 : const auto oIter = m_oMapGeometryColumns.find(field->name());
1111 847 : if (bUseBBOX && oIter != m_oMapGeometryColumns.end())
1112 : {
1113 835 : ProcessGeometryColumnCovering(field, oIter->second,
1114 : oMapParquetColumnNameToIdx);
1115 : }
1116 :
1117 2501 : if (bParquetColValid &&
1118 1654 : (field->type()->id() == arrow::Type::STRUCT ||
1119 807 : field->type()->id() == arrow::Type::LIST))
1120 : {
1121 : // GeoArrow types
1122 352 : std::vector<int> anParquetCols;
1123 2160 : for (const auto &iterParquetCols : oMapParquetColumnNameToIdx)
1124 : {
1125 1808 : if (STARTS_WITH(
1126 : iterParquetCols.first.c_str(),
1127 : std::string(field->name()).append(".").c_str()))
1128 : {
1129 752 : iParquetCol =
1130 752 : std::max(iParquetCol, iterParquetCols.second);
1131 752 : anParquetCols.push_back(iterParquetCols.second);
1132 : }
1133 : }
1134 352 : m_anMapGeomFieldIndexToParquetColumns.push_back(
1135 352 : std::move(anParquetCols));
1136 352 : ++iParquetCol;
1137 : }
1138 : else
1139 : {
1140 495 : m_anMapGeomFieldIndexToParquetColumns.push_back(
1141 495 : {bParquetColValid ? iParquetCol : -1});
1142 495 : if (bParquetColValid)
1143 495 : iParquetCol++;
1144 : }
1145 : }
1146 : else
1147 : {
1148 24266 : CreateFieldFromSchema(field, bParquetColValid, iParquetCol, {i},
1149 : oMapFieldNameToGDALSchemaFieldDefn);
1150 : }
1151 : }
1152 :
1153 863 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
1154 : m_poFeatureDefn->GetFieldCount());
1155 863 : CPLAssert(static_cast<int>(m_anMapFieldIndexToParquetColumn.size()) ==
1156 : m_poFeatureDefn->GetFieldCount());
1157 863 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
1158 : m_poFeatureDefn->GetGeomFieldCount());
1159 863 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToParquetColumns.size()) ==
1160 : m_poFeatureDefn->GetGeomFieldCount());
1161 :
1162 863 : if (!fields.empty())
1163 : {
1164 : try
1165 : {
1166 1672 : auto poRowGroup = m_poArrowReader->parquet_reader()->RowGroup(0);
1167 810 : if (poRowGroup)
1168 : {
1169 1620 : auto poColumn = poRowGroup->metadata()->ColumnChunk(0);
1170 810 : CPLDebug("PARQUET", "Compression (of first column): %s",
1171 : arrow::util::Codec::GetCodecAsString(
1172 810 : poColumn->compression())
1173 : .c_str());
1174 : }
1175 : }
1176 52 : catch (const std::exception &)
1177 : {
1178 : }
1179 : }
1180 : }
1181 :
1182 : /************************************************************************/
1183 : /* ProcessGeometryColumnCovering() */
1184 : /************************************************************************/
1185 :
1186 : /** Process GeoParquet JSON geometry field object to extract information about
1187 : * its bounding box column, and appropriately fill m_oMapGeomFieldIndexToGeomColBBOX
1188 : * and m_oMapGeomFieldIndexToGeomColBBOXParquet members with information on that
1189 : * bounding box column.
1190 : */
1191 835 : void OGRParquetLayer::ProcessGeometryColumnCovering(
1192 : const std::shared_ptr<arrow::Field> &field,
1193 : const CPLJSONObject &oJSONGeometryColumn,
1194 : const std::map<std::string, int> &oMapParquetColumnNameToIdx)
1195 : {
1196 1670 : std::string osBBOXColumn;
1197 1670 : std::string osXMin, osYMin, osXMax, osYMax;
1198 835 : if (ParseGeometryColumnCovering(oJSONGeometryColumn, osBBOXColumn, osXMin,
1199 : osYMin, osXMax, osYMax))
1200 : {
1201 342 : OGRArrowLayer::GeomColBBOX sDesc;
1202 342 : sDesc.iArrowCol = m_poSchema->GetFieldIndex(osBBOXColumn);
1203 684 : const auto fieldBBOX = m_poSchema->GetFieldByName(osBBOXColumn);
1204 684 : if (sDesc.iArrowCol >= 0 && fieldBBOX &&
1205 342 : fieldBBOX->type()->id() == arrow::Type::STRUCT)
1206 : {
1207 : const auto fieldBBOXStruct =
1208 684 : std::static_pointer_cast<arrow::StructType>(fieldBBOX->type());
1209 684 : const auto fieldXMin = fieldBBOXStruct->GetFieldByName(osXMin);
1210 684 : const auto fieldYMin = fieldBBOXStruct->GetFieldByName(osYMin);
1211 684 : const auto fieldXMax = fieldBBOXStruct->GetFieldByName(osXMax);
1212 684 : const auto fieldYMax = fieldBBOXStruct->GetFieldByName(osYMax);
1213 342 : const int nXMinIdx = fieldBBOXStruct->GetFieldIndex(osXMin);
1214 342 : const int nYMinIdx = fieldBBOXStruct->GetFieldIndex(osYMin);
1215 342 : const int nXMaxIdx = fieldBBOXStruct->GetFieldIndex(osXMax);
1216 342 : const int nYMaxIdx = fieldBBOXStruct->GetFieldIndex(osYMax);
1217 : const auto oIterParquetIdxXMin = oMapParquetColumnNameToIdx.find(
1218 342 : std::string(osBBOXColumn).append(".").append(osXMin));
1219 : const auto oIterParquetIdxYMin = oMapParquetColumnNameToIdx.find(
1220 342 : std::string(osBBOXColumn).append(".").append(osYMin));
1221 : const auto oIterParquetIdxXMax = oMapParquetColumnNameToIdx.find(
1222 342 : std::string(osBBOXColumn).append(".").append(osXMax));
1223 : const auto oIterParquetIdxYMax = oMapParquetColumnNameToIdx.find(
1224 342 : std::string(osBBOXColumn).append(".").append(osYMax));
1225 342 : if (nXMinIdx >= 0 && nYMinIdx >= 0 && nXMaxIdx >= 0 &&
1226 684 : nYMaxIdx >= 0 && fieldXMin && fieldYMin && fieldXMax &&
1227 684 : fieldYMax &&
1228 684 : oIterParquetIdxXMin != oMapParquetColumnNameToIdx.end() &&
1229 684 : oIterParquetIdxYMin != oMapParquetColumnNameToIdx.end() &&
1230 684 : oIterParquetIdxXMax != oMapParquetColumnNameToIdx.end() &&
1231 684 : oIterParquetIdxYMax != oMapParquetColumnNameToIdx.end() &&
1232 343 : (fieldXMin->type()->id() == arrow::Type::FLOAT ||
1233 1 : fieldXMin->type()->id() == arrow::Type::DOUBLE) &&
1234 342 : fieldXMin->type()->id() == fieldYMin->type()->id() &&
1235 1026 : fieldXMin->type()->id() == fieldXMax->type()->id() &&
1236 342 : fieldXMin->type()->id() == fieldYMax->type()->id())
1237 : {
1238 342 : CPLDebug("PARQUET",
1239 : "Bounding box column '%s' detected for "
1240 : "geometry column '%s'",
1241 342 : osBBOXColumn.c_str(), field->name().c_str());
1242 342 : sDesc.iArrowSubfieldXMin = nXMinIdx;
1243 342 : sDesc.iArrowSubfieldYMin = nYMinIdx;
1244 342 : sDesc.iArrowSubfieldXMax = nXMaxIdx;
1245 342 : sDesc.iArrowSubfieldYMax = nYMaxIdx;
1246 342 : sDesc.bIsFloat =
1247 342 : (fieldXMin->type()->id() == arrow::Type::FLOAT);
1248 :
1249 : m_oMapGeomFieldIndexToGeomColBBOX
1250 342 : [m_poFeatureDefn->GetGeomFieldCount() - 1] =
1251 342 : std::move(sDesc);
1252 :
1253 342 : GeomColBBOXParquet sDescParquet;
1254 342 : sDescParquet.iParquetXMin = oIterParquetIdxXMin->second;
1255 342 : sDescParquet.iParquetYMin = oIterParquetIdxYMin->second;
1256 342 : sDescParquet.iParquetXMax = oIterParquetIdxXMax->second;
1257 342 : sDescParquet.iParquetYMax = oIterParquetIdxYMax->second;
1258 3730 : for (const auto &iterParquetCols : oMapParquetColumnNameToIdx)
1259 : {
1260 3388 : if (STARTS_WITH(
1261 : iterParquetCols.first.c_str(),
1262 : std::string(osBBOXColumn).append(".").c_str()))
1263 : {
1264 1368 : sDescParquet.anParquetCols.push_back(
1265 1368 : iterParquetCols.second);
1266 : }
1267 : }
1268 : m_oMapGeomFieldIndexToGeomColBBOXParquet
1269 684 : [m_poFeatureDefn->GetGeomFieldCount() - 1] =
1270 684 : std::move(sDescParquet);
1271 : }
1272 : }
1273 : }
1274 835 : }
1275 :
1276 : /************************************************************************/
1277 : /* CheckMatchArrowParquetColumnNames() */
1278 : /************************************************************************/
1279 :
1280 27668 : bool OGRParquetLayer::CheckMatchArrowParquetColumnNames(
1281 : int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const
1282 : {
1283 55336 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
1284 27668 : const auto poParquetSchema = metadata->schema();
1285 27668 : const int nParquetColumns = poParquetSchema->num_columns();
1286 27668 : const auto &fieldName = field->name();
1287 27668 : const int iParquetColBefore = iParquetCol;
1288 :
1289 28359 : while (iParquetCol < nParquetColumns)
1290 : {
1291 28359 : const auto parquetColumn = poParquetSchema->Column(iParquetCol);
1292 28359 : const auto parquetColumnName = parquetColumn->path()->ToDotString();
1293 60419 : if (fieldName == parquetColumnName ||
1294 16030 : (parquetColumnName.size() > fieldName.size() &&
1295 16030 : STARTS_WITH(parquetColumnName.c_str(), fieldName.c_str()) &&
1296 15339 : parquetColumnName[fieldName.size()] == '.'))
1297 : {
1298 27668 : return true;
1299 : }
1300 : else
1301 : {
1302 691 : iParquetCol++;
1303 : }
1304 : }
1305 :
1306 0 : CPLError(CE_Warning, CPLE_AppDefined,
1307 : "Cannot match Arrow column name %s with a Parquet one",
1308 : fieldName.c_str());
1309 0 : iParquetCol = iParquetColBefore;
1310 0 : return false;
1311 : }
1312 :
1313 : /************************************************************************/
1314 : /* CreateFieldFromSchema() */
1315 : /************************************************************************/
1316 :
1317 26461 : void OGRParquetLayer::CreateFieldFromSchema(
1318 : const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
1319 : int &iParquetCol, const std::vector<int> &path,
1320 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
1321 : &oMapFieldNameToGDALSchemaFieldDefn)
1322 : {
1323 26461 : OGRFieldDefn oField(field->name().c_str(), OFTString);
1324 26461 : OGRFieldType eType = OFTString;
1325 26461 : OGRFieldSubType eSubType = OFSTNone;
1326 26461 : bool bTypeOK = true;
1327 :
1328 26461 : auto type = field->type();
1329 26461 : if (type->id() == arrow::Type::DICTIONARY && path.size() == 1)
1330 : {
1331 : const auto dictionaryType =
1332 596 : std::static_pointer_cast<arrow::DictionaryType>(field->type());
1333 596 : auto indexType = dictionaryType->index_type();
1334 596 : if (dictionaryType->value_type()->id() == arrow::Type::STRING &&
1335 298 : IsIntegerArrowType(indexType->id()))
1336 : {
1337 298 : if (bParquetColValid)
1338 : {
1339 596 : std::string osDomainName(field->name() + "Domain");
1340 298 : m_poDS->RegisterDomainName(osDomainName,
1341 298 : m_poFeatureDefn->GetFieldCount());
1342 298 : oField.SetDomainName(osDomainName);
1343 : }
1344 298 : type = std::move(indexType);
1345 : }
1346 : else
1347 : {
1348 0 : bTypeOK = false;
1349 : }
1350 : }
1351 :
1352 26461 : int nParquetColIncrement = 1;
1353 26461 : switch (type->id())
1354 : {
1355 624 : case arrow::Type::STRUCT:
1356 : {
1357 1248 : const auto subfields = field->Flatten();
1358 1248 : auto newpath = path;
1359 624 : newpath.push_back(0);
1360 2819 : for (int j = 0; j < static_cast<int>(subfields.size()); j++)
1361 : {
1362 2195 : const auto &subfield = subfields[j];
1363 : bParquetColValid =
1364 2195 : CheckMatchArrowParquetColumnNames(iParquetCol, subfield);
1365 2195 : if (!bParquetColValid)
1366 0 : m_bHasMissingMappingToParquet = true;
1367 2195 : newpath.back() = j;
1368 2195 : CreateFieldFromSchema(subfield, bParquetColValid, iParquetCol,
1369 : newpath,
1370 : oMapFieldNameToGDALSchemaFieldDefn);
1371 : }
1372 624 : return; // return intended, not break
1373 : }
1374 :
1375 5335 : case arrow::Type::MAP:
1376 : {
1377 : // A arrow map maps to 2 Parquet columns
1378 5335 : nParquetColIncrement = 2;
1379 5335 : break;
1380 : }
1381 :
1382 20502 : default:
1383 20502 : break;
1384 : }
1385 :
1386 25837 : if (bTypeOK)
1387 : {
1388 25837 : bTypeOK = MapArrowTypeToOGR(type, field, oField, eType, eSubType, path,
1389 : oMapFieldNameToGDALSchemaFieldDefn);
1390 25837 : if (bTypeOK)
1391 : {
1392 25548 : m_apoArrowDataTypes.push_back(std::move(type));
1393 51096 : m_anMapFieldIndexToParquetColumn.push_back(
1394 25548 : bParquetColValid ? iParquetCol : -1);
1395 : }
1396 : }
1397 :
1398 25837 : if (bParquetColValid)
1399 25837 : iParquetCol += nParquetColIncrement;
1400 : }
1401 :
1402 : /************************************************************************/
1403 : /* BuildDomain() */
1404 : /************************************************************************/
1405 :
1406 : std::unique_ptr<OGRFieldDomain>
1407 16 : OGRParquetLayer::BuildDomain(const std::string &osDomainName,
1408 : int iFieldIndex) const
1409 : {
1410 : #ifdef DEBUG
1411 16 : const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
1412 : (void)iArrowCol;
1413 16 : CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
1414 : arrow::Type::DICTIONARY);
1415 : #endif
1416 16 : const int iParquetCol = m_anMapFieldIndexToParquetColumn[iFieldIndex];
1417 16 : CPLAssert(iParquetCol >= 0);
1418 16 : const auto oldBatchSize = m_poArrowReader->properties().batch_size();
1419 16 : m_poArrowReader->set_batch_size(1);
1420 : #if PARQUET_VERSION_MAJOR >= 21
1421 : std::unique_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1422 : auto result = m_poArrowReader->GetRecordBatchReader({0}, {iParquetCol});
1423 : if (result.ok())
1424 : poRecordBatchReader = std::move(*result);
1425 : #else
1426 16 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1427 16 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1428 : {0}, {iParquetCol}, &poRecordBatchReader));
1429 : #endif
1430 16 : if (poRecordBatchReader != nullptr)
1431 : {
1432 0 : std::shared_ptr<arrow::RecordBatch> poBatch;
1433 16 : auto status = poRecordBatchReader->ReadNext(&poBatch);
1434 16 : if (!status.ok())
1435 : {
1436 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1437 0 : status.message().c_str());
1438 : }
1439 16 : else if (poBatch)
1440 : {
1441 16 : m_poArrowReader->set_batch_size(oldBatchSize);
1442 16 : return BuildDomainFromBatch(osDomainName, poBatch, 0);
1443 : }
1444 : }
1445 0 : m_poArrowReader->set_batch_size(oldBatchSize);
1446 0 : return nullptr;
1447 : }
1448 :
1449 : /************************************************************************/
1450 : /* ComputeGeometryColumnType() */
1451 : /************************************************************************/
1452 :
1453 : OGRwkbGeometryType
1454 296 : OGRParquetLayer::ComputeGeometryColumnType(int iGeomCol, int iParquetCol) const
1455 : {
1456 : // Compute type of geometry column by iterating over each geometry, and
1457 : // looking at the WKB geometry type in the first 5 bytes of each geometry.
1458 :
1459 296 : OGRwkbGeometryType eGeomType = wkbNone;
1460 :
1461 592 : std::vector<int> anRowGroups;
1462 296 : const int nNumGroups = m_poArrowReader->num_row_groups();
1463 296 : anRowGroups.reserve(nNumGroups);
1464 881 : for (int i = 0; i < nNumGroups; ++i)
1465 585 : anRowGroups.push_back(i);
1466 : #if PARQUET_VERSION_MAJOR >= 21
1467 : std::unique_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1468 : auto result =
1469 : m_poArrowReader->GetRecordBatchReader(anRowGroups, {iParquetCol});
1470 : if (result.ok())
1471 : poRecordBatchReader = std::move(*result);
1472 : #else
1473 0 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1474 296 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1475 : anRowGroups, {iParquetCol}, &poRecordBatchReader));
1476 : #endif
1477 296 : if (poRecordBatchReader != nullptr)
1478 : {
1479 592 : std::shared_ptr<arrow::RecordBatch> poBatch;
1480 : while (true)
1481 : {
1482 594 : auto status = poRecordBatchReader->ReadNext(&poBatch);
1483 594 : if (!status.ok())
1484 : {
1485 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1486 0 : status.message().c_str());
1487 0 : break;
1488 : }
1489 594 : else if (!poBatch)
1490 294 : break;
1491 :
1492 300 : eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
1493 : 0, eGeomType);
1494 300 : if (eGeomType == wkbUnknown)
1495 2 : break;
1496 298 : }
1497 : }
1498 :
1499 592 : return eGeomType == wkbNone ? wkbUnknown : eGeomType;
1500 : }
1501 :
1502 : /************************************************************************/
1503 : /* GetFeatureExplicitFID() */
1504 : /************************************************************************/
1505 :
1506 4 : OGRFeature *OGRParquetLayer::GetFeatureExplicitFID(GIntBig nFID)
1507 : {
1508 8 : std::vector<int> anRowGroups;
1509 4 : const int nNumGroups = m_poArrowReader->num_row_groups();
1510 4 : anRowGroups.reserve(nNumGroups);
1511 16 : for (int i = 0; i < nNumGroups; ++i)
1512 12 : anRowGroups.push_back(i);
1513 : #if PARQUET_VERSION_MAJOR >= 21
1514 : std::unique_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1515 : auto result = m_bIgnoredFields
1516 : ? m_poArrowReader->GetRecordBatchReader(
1517 : anRowGroups, m_anRequestedParquetColumns)
1518 : : m_poArrowReader->GetRecordBatchReader(anRowGroups);
1519 : if (result.ok())
1520 : {
1521 : poRecordBatchReader = std::move(*result);
1522 : }
1523 : #else
1524 4 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1525 4 : if (m_bIgnoredFields)
1526 : {
1527 4 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1528 2 : anRowGroups, m_anRequestedParquetColumns, &poRecordBatchReader));
1529 : }
1530 : else
1531 : {
1532 2 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1533 : anRowGroups, &poRecordBatchReader));
1534 : }
1535 : #endif
1536 4 : if (poRecordBatchReader != nullptr)
1537 : {
1538 4 : std::shared_ptr<arrow::RecordBatch> poBatch;
1539 : while (true)
1540 : {
1541 14 : auto status = poRecordBatchReader->ReadNext(&poBatch);
1542 14 : if (!status.ok())
1543 : {
1544 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1545 0 : status.message().c_str());
1546 0 : break;
1547 : }
1548 14 : else if (!poBatch)
1549 2 : break;
1550 :
1551 12 : const auto array = poBatch->column(
1552 12 : m_bIgnoredFields ? m_nRequestedFIDColumn : m_iFIDArrowColumn);
1553 12 : const auto arrayPtr = array.get();
1554 12 : const auto arrayTypeId = array->type_id();
1555 30 : for (int64_t nIdxInBatch = 0; nIdxInBatch < poBatch->num_rows();
1556 : nIdxInBatch++)
1557 : {
1558 20 : if (!array->IsNull(nIdxInBatch))
1559 : {
1560 20 : if (arrayTypeId == arrow::Type::INT64)
1561 : {
1562 20 : const auto castArray =
1563 : static_cast<const arrow::Int64Array *>(arrayPtr);
1564 20 : if (castArray->Value(nIdxInBatch) == nFID)
1565 : {
1566 2 : return ReadFeature(nIdxInBatch, poBatch->columns());
1567 : }
1568 : }
1569 0 : else if (arrayTypeId == arrow::Type::INT32)
1570 : {
1571 0 : const auto castArray =
1572 : static_cast<const arrow::Int32Array *>(arrayPtr);
1573 0 : if (castArray->Value(nIdxInBatch) == nFID)
1574 : {
1575 0 : return ReadFeature(nIdxInBatch, poBatch->columns());
1576 : }
1577 : }
1578 : }
1579 : }
1580 10 : }
1581 : }
1582 2 : return nullptr;
1583 : }
1584 :
1585 : /************************************************************************/
1586 : /* GetFeatureByIndex() */
1587 : /************************************************************************/
1588 :
1589 47 : OGRFeature *OGRParquetLayer::GetFeatureByIndex(GIntBig nFID)
1590 : {
1591 :
1592 47 : if (nFID < 0)
1593 3 : return nullptr;
1594 :
1595 88 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
1596 44 : const int nNumGroups = m_poArrowReader->num_row_groups();
1597 44 : int64_t nAccRows = 0;
1598 53 : for (int iGroup = 0; iGroup < nNumGroups; ++iGroup)
1599 : {
1600 : const int64_t nNextAccRows =
1601 47 : nAccRows + metadata->RowGroup(iGroup)->num_rows();
1602 47 : if (nFID < nNextAccRows)
1603 : {
1604 : #if PARQUET_VERSION_MAJOR >= 21
1605 : std::unique_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1606 : auto result = m_bIgnoredFields
1607 : ? m_poArrowReader->GetRecordBatchReader(
1608 : {iGroup}, m_anRequestedParquetColumns)
1609 : : m_poArrowReader->GetRecordBatchReader({iGroup});
1610 : if (result.ok())
1611 : {
1612 : poRecordBatchReader = std::move(*result);
1613 : }
1614 : else
1615 : {
1616 : CPLError(CE_Failure, CPLE_AppDefined,
1617 : "GetRecordBatchReader() failed: %s",
1618 : result.status().message().c_str());
1619 : return nullptr;
1620 : }
1621 : #else
1622 38 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1623 : {
1624 0 : arrow::Status status;
1625 38 : if (m_bIgnoredFields)
1626 : {
1627 0 : status = m_poArrowReader->GetRecordBatchReader(
1628 0 : {iGroup}, m_anRequestedParquetColumns,
1629 0 : &poRecordBatchReader);
1630 : }
1631 : else
1632 : {
1633 76 : status = m_poArrowReader->GetRecordBatchReader(
1634 38 : {iGroup}, &poRecordBatchReader);
1635 : }
1636 38 : if (poRecordBatchReader == nullptr)
1637 : {
1638 0 : CPLError(CE_Failure, CPLE_AppDefined,
1639 : "GetRecordBatchReader() failed: %s",
1640 0 : status.message().c_str());
1641 0 : return nullptr;
1642 : }
1643 : }
1644 : #endif
1645 :
1646 38 : const int64_t nExpectedIdxInGroup = nFID - nAccRows;
1647 38 : int64_t nIdxInGroup = 0;
1648 : while (true)
1649 : {
1650 0 : std::shared_ptr<arrow::RecordBatch> poBatch;
1651 38 : arrow::Status status = poRecordBatchReader->ReadNext(&poBatch);
1652 38 : if (!status.ok())
1653 : {
1654 0 : CPLError(CE_Failure, CPLE_AppDefined,
1655 0 : "ReadNext() failed: %s", status.message().c_str());
1656 0 : return nullptr;
1657 : }
1658 38 : if (poBatch == nullptr)
1659 : {
1660 0 : return nullptr;
1661 : }
1662 38 : if (nExpectedIdxInGroup < nIdxInGroup + poBatch->num_rows())
1663 : {
1664 38 : const auto nIdxInBatch = nExpectedIdxInGroup - nIdxInGroup;
1665 : auto poFeature =
1666 38 : ReadFeature(nIdxInBatch, poBatch->columns());
1667 38 : poFeature->SetFID(nFID);
1668 38 : return poFeature;
1669 : }
1670 0 : nIdxInGroup += poBatch->num_rows();
1671 0 : }
1672 : }
1673 9 : nAccRows = nNextAccRows;
1674 : }
1675 6 : return nullptr;
1676 : }
1677 :
1678 : /************************************************************************/
1679 : /* GetFeature() */
1680 : /************************************************************************/
1681 :
1682 51 : OGRFeature *OGRParquetLayer::GetFeature(GIntBig nFID)
1683 : {
1684 51 : if (!m_osFIDColumn.empty())
1685 : {
1686 4 : return GetFeatureExplicitFID(nFID);
1687 : }
1688 : else
1689 : {
1690 47 : return GetFeatureByIndex(nFID);
1691 : }
1692 : }
1693 :
1694 : /************************************************************************/
1695 : /* ResetReading() */
1696 : /************************************************************************/
1697 :
1698 3713 : void OGRParquetLayer::ResetReading()
1699 : {
1700 3713 : OGRParquetLayerBase::ResetReading();
1701 3713 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
1702 3713 : m_nFeatureIdxSelected = 0;
1703 3713 : if (!m_asFeatureIdxRemapping.empty())
1704 : {
1705 1627 : m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
1706 1627 : ++m_oFeatureIdxRemappingIter;
1707 : }
1708 3713 : }
1709 :
1710 : /************************************************************************/
1711 : /* CreateRecordBatchReader() */
1712 : /************************************************************************/
1713 :
1714 594 : bool OGRParquetLayer::CreateRecordBatchReader(int iStartingRowGroup)
1715 : {
1716 1188 : std::vector<int> anRowGroups;
1717 594 : const int nNumGroups = m_poArrowReader->num_row_groups();
1718 594 : anRowGroups.reserve(nNumGroups - iStartingRowGroup);
1719 1537 : for (int i = iStartingRowGroup; i < nNumGroups; ++i)
1720 943 : anRowGroups.push_back(i);
1721 1188 : return CreateRecordBatchReader(anRowGroups);
1722 : }
1723 :
1724 844 : bool OGRParquetLayer::CreateRecordBatchReader(
1725 : const std::vector<int> &anRowGroups)
1726 : {
1727 : #if PARQUET_VERSION_MAJOR >= 21
1728 : auto result = m_bIgnoredFields
1729 : ? m_poArrowReader->GetRecordBatchReader(
1730 : anRowGroups, m_anRequestedParquetColumns)
1731 : : m_poArrowReader->GetRecordBatchReader(anRowGroups);
1732 : if (result.ok())
1733 : {
1734 : m_poRecordBatchReader = std::move(*result);
1735 : return true;
1736 : }
1737 : else
1738 : {
1739 : CPLError(CE_Failure, CPLE_AppDefined,
1740 : "GetRecordBatchReader() failed: %s",
1741 : result.status().message().c_str());
1742 : return false;
1743 : }
1744 : #else
1745 844 : arrow::Status status;
1746 844 : if (m_bIgnoredFields)
1747 : {
1748 410 : status = m_poArrowReader->GetRecordBatchReader(
1749 205 : anRowGroups, m_anRequestedParquetColumns, &m_poRecordBatchReader);
1750 : }
1751 : else
1752 : {
1753 1278 : status = m_poArrowReader->GetRecordBatchReader(anRowGroups,
1754 639 : &m_poRecordBatchReader);
1755 : }
1756 844 : if (m_poRecordBatchReader == nullptr)
1757 : {
1758 0 : CPLError(CE_Failure, CPLE_AppDefined,
1759 0 : "GetRecordBatchReader() failed: %s", status.message().c_str());
1760 0 : return false;
1761 : }
1762 844 : return true;
1763 : #endif
1764 : }
1765 :
1766 : /************************************************************************/
1767 : /* IsConstraintPossible() */
1768 : /************************************************************************/
1769 :
1770 : enum class IsConstraintPossibleRes
1771 : {
1772 : YES,
1773 : NO,
1774 : UNKNOWN
1775 : };
1776 :
1777 : template <class T>
1778 228 : static IsConstraintPossibleRes IsConstraintPossible(int nOperation, T v, T min,
1779 : T max)
1780 : {
1781 228 : if (nOperation == SWQ_EQ)
1782 : {
1783 150 : if (v < min || v > max)
1784 : {
1785 62 : return IsConstraintPossibleRes::NO;
1786 : }
1787 : }
1788 78 : else if (nOperation == SWQ_NE)
1789 : {
1790 38 : if (v == min && v == max)
1791 : {
1792 0 : return IsConstraintPossibleRes::NO;
1793 : }
1794 : }
1795 40 : else if (nOperation == SWQ_LE)
1796 : {
1797 10 : if (v < min)
1798 : {
1799 4 : return IsConstraintPossibleRes::NO;
1800 : }
1801 : }
1802 30 : else if (nOperation == SWQ_LT)
1803 : {
1804 10 : if (v <= min)
1805 : {
1806 4 : return IsConstraintPossibleRes::NO;
1807 : }
1808 : }
1809 20 : else if (nOperation == SWQ_GE)
1810 : {
1811 10 : if (v > max)
1812 : {
1813 4 : return IsConstraintPossibleRes::NO;
1814 : }
1815 : }
1816 10 : else if (nOperation == SWQ_GT)
1817 : {
1818 10 : if (v >= max)
1819 : {
1820 6 : return IsConstraintPossibleRes::NO;
1821 : }
1822 : }
1823 : else
1824 : {
1825 0 : CPLDebug("PARQUET",
1826 : "IsConstraintPossible: Unhandled operation type: %d",
1827 : nOperation);
1828 0 : return IsConstraintPossibleRes::UNKNOWN;
1829 : }
1830 148 : return IsConstraintPossibleRes::YES;
1831 : }
1832 :
1833 : /************************************************************************/
1834 : /* IncrFeatureIdx() */
1835 : /************************************************************************/
1836 :
1837 7476 : void OGRParquetLayer::IncrFeatureIdx()
1838 : {
1839 7476 : ++m_nFeatureIdxSelected;
1840 7476 : ++m_nFeatureIdx;
1841 8422 : if (m_iFIDArrowColumn < 0 && !m_asFeatureIdxRemapping.empty() &&
1842 8422 : m_oFeatureIdxRemappingIter != m_asFeatureIdxRemapping.end())
1843 : {
1844 140 : if (m_nFeatureIdxSelected == m_oFeatureIdxRemappingIter->first)
1845 : {
1846 48 : m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
1847 48 : ++m_oFeatureIdxRemappingIter;
1848 : }
1849 : }
1850 7476 : }
1851 :
1852 : /************************************************************************/
1853 : /* ReadNextBatch() */
1854 : /************************************************************************/
1855 :
1856 1862 : bool OGRParquetLayer::ReadNextBatch()
1857 : {
1858 1862 : m_nIdxInBatch = 0;
1859 :
1860 1862 : const int nNumGroups = m_poArrowReader->num_row_groups();
1861 1862 : if (nNumGroups == 0)
1862 2 : return false;
1863 :
1864 1860 : if (m_bSingleBatch)
1865 : {
1866 19 : CPLAssert(m_iRecordBatch == 0);
1867 19 : CPLAssert(m_poBatch != nullptr);
1868 19 : return false;
1869 : }
1870 :
1871 1841 : CPLAssert((m_iRecordBatch == -1 && m_poRecordBatchReader == nullptr) ||
1872 : (m_iRecordBatch >= 0 && m_poRecordBatchReader != nullptr));
1873 :
1874 1841 : if (m_poRecordBatchReader == nullptr)
1875 : {
1876 851 : m_asFeatureIdxRemapping.clear();
1877 :
1878 851 : bool bIterateEverything = false;
1879 851 : std::vector<int> anSelectedGroups;
1880 : const auto oIterToGeomColBBOX =
1881 851 : m_oMapGeomFieldIndexToGeomColBBOXParquet.find(m_iGeomFieldFilter);
1882 : const bool bUSEBBOXFields =
1883 192 : (m_poFilterGeom &&
1884 192 : oIterToGeomColBBOX !=
1885 1043 : m_oMapGeomFieldIndexToGeomColBBOXParquet.end() &&
1886 90 : CPLTestBool(CPLGetConfigOption(
1887 941 : ("OGR_" + GetDriverUCName() + "_USE_BBOX").c_str(), "YES")));
1888 : const bool bIsGeoArrowStruct =
1889 1702 : (m_iGeomFieldFilter >= 0 &&
1890 851 : m_iGeomFieldFilter < static_cast<int>(m_aeGeomEncoding.size()) &&
1891 843 : m_iGeomFieldFilter <
1892 : static_cast<int>(
1893 1686 : m_anMapGeomFieldIndexToParquetColumns.size()) &&
1894 843 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter].size() >=
1895 1702 : 2 &&
1896 204 : OGRArrowIsGeoArrowStruct(m_aeGeomEncoding[m_iGeomFieldFilter]));
1897 : #if PARQUET_VERSION_MAJOR >= 21
1898 : const bool bUseParquetGeoStat =
1899 : (m_poFilterGeom && m_iGeomFieldFilter >= 0 &&
1900 : m_geoStatsWithBBOXAvailable.find(m_iGeomFieldFilter) !=
1901 : m_geoStatsWithBBOXAvailable.end() &&
1902 : m_iGeomFieldFilter <
1903 : static_cast<int>(
1904 : m_anMapGeomFieldIndexToParquetColumns.size()) &&
1905 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter].size() ==
1906 : 1 &&
1907 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter][0] >= 0);
1908 : #endif
1909 1479 : if (m_asAttributeFilterConstraints.empty() && !bUSEBBOXFields &&
1910 628 : !(bIsGeoArrowStruct && m_poFilterGeom)
1911 : #if PARQUET_VERSION_MAJOR >= 21
1912 : && !bUseParquetGeoStat
1913 : #endif
1914 : )
1915 : {
1916 582 : bIterateEverything = true;
1917 : }
1918 : else
1919 : {
1920 : OGRField sMin;
1921 : OGRField sMax;
1922 269 : OGR_RawField_SetNull(&sMin);
1923 269 : OGR_RawField_SetNull(&sMax);
1924 269 : bool bFoundMin = false;
1925 269 : bool bFoundMax = false;
1926 269 : OGRFieldType eType = OFTMaxType;
1927 269 : OGRFieldSubType eSubType = OFSTNone;
1928 538 : std::string osMinTmp, osMaxTmp;
1929 269 : int64_t nFeatureIdxSelected = 0;
1930 269 : int64_t nFeatureIdxTotal = 0;
1931 :
1932 269 : int iXMinField = -1;
1933 269 : int iYMinField = -1;
1934 269 : int iXMaxField = -1;
1935 269 : int iYMaxField = -1;
1936 :
1937 269 : if (bIsGeoArrowStruct)
1938 : {
1939 : const auto metadata =
1940 184 : m_poArrowReader->parquet_reader()->metadata();
1941 92 : const auto poParquetSchema = metadata->schema();
1942 228 : for (int iParquetCol :
1943 548 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter])
1944 : {
1945 : const auto parquetColumn =
1946 228 : poParquetSchema->Column(iParquetCol);
1947 : const auto parquetColumnName =
1948 456 : parquetColumn->path()->ToDotString();
1949 456 : if (parquetColumnName.size() > 2 &&
1950 228 : parquetColumnName.find(".x") ==
1951 228 : parquetColumnName.size() - 2)
1952 : {
1953 92 : iXMinField = iParquetCol;
1954 92 : iXMaxField = iParquetCol;
1955 : }
1956 272 : else if (parquetColumnName.size() > 2 &&
1957 136 : parquetColumnName.find(".y") ==
1958 136 : parquetColumnName.size() - 2)
1959 : {
1960 92 : iYMinField = iParquetCol;
1961 92 : iYMaxField = iParquetCol;
1962 : }
1963 : }
1964 : }
1965 177 : else if (bUSEBBOXFields)
1966 : {
1967 44 : iXMinField = oIterToGeomColBBOX->second.iParquetXMin;
1968 44 : iYMinField = oIterToGeomColBBOX->second.iParquetYMin;
1969 44 : iXMaxField = oIterToGeomColBBOX->second.iParquetXMax;
1970 44 : iYMaxField = oIterToGeomColBBOX->second.iParquetYMax;
1971 : }
1972 :
1973 665 : for (int iRowGroup = 0;
1974 665 : iRowGroup < nNumGroups && !bIterateEverything; ++iRowGroup)
1975 : {
1976 396 : bool bSelectGroup = true;
1977 : auto poRowGroup =
1978 396 : GetReader()->parquet_reader()->RowGroup(iRowGroup);
1979 :
1980 396 : if (iXMinField >= 0 && iYMinField >= 0 && iXMaxField >= 0 &&
1981 : iYMaxField >= 0)
1982 : {
1983 144 : if (GetMinMaxForParquetCol(iRowGroup, iXMinField, nullptr,
1984 : true, sMin, bFoundMin, false,
1985 : sMax, bFoundMax, eType, eSubType,
1986 143 : osMinTmp, osMaxTmp) &&
1987 287 : bFoundMin && eType == OFTReal)
1988 : {
1989 143 : const double dfGroupMinX = sMin.Real;
1990 143 : if (dfGroupMinX > m_sFilterEnvelope.MaxX)
1991 : {
1992 1 : bSelectGroup = false;
1993 : }
1994 142 : else if (GetMinMaxForParquetCol(
1995 : iRowGroup, iYMinField, nullptr, true, sMin,
1996 : bFoundMin, false, sMax, bFoundMax, eType,
1997 142 : eSubType, osMinTmp, osMaxTmp) &&
1998 284 : bFoundMin && eType == OFTReal)
1999 : {
2000 142 : const double dfGroupMinY = sMin.Real;
2001 142 : if (dfGroupMinY > m_sFilterEnvelope.MaxY)
2002 : {
2003 1 : bSelectGroup = false;
2004 : }
2005 141 : else if (GetMinMaxForParquetCol(
2006 : iRowGroup, iXMaxField, nullptr, false,
2007 : sMin, bFoundMin, true, sMax, bFoundMax,
2008 141 : eType, eSubType, osMinTmp, osMaxTmp) &&
2009 282 : bFoundMax && eType == OFTReal)
2010 : {
2011 141 : const double dfGroupMaxX = sMax.Real;
2012 141 : if (dfGroupMaxX < m_sFilterEnvelope.MinX)
2013 : {
2014 1 : bSelectGroup = false;
2015 : }
2016 140 : else if (GetMinMaxForParquetCol(
2017 : iRowGroup, iYMaxField, nullptr,
2018 : false, sMin, bFoundMin, true, sMax,
2019 : bFoundMax, eType, eSubType,
2020 140 : osMinTmp, osMaxTmp) &&
2021 280 : bFoundMax && eType == OFTReal)
2022 : {
2023 140 : const double dfGroupMaxY = sMax.Real;
2024 140 : if (dfGroupMaxY < m_sFilterEnvelope.MinY)
2025 : {
2026 1 : bSelectGroup = false;
2027 : }
2028 : }
2029 : }
2030 : }
2031 : }
2032 : }
2033 : #if PARQUET_VERSION_MAJOR >= 21
2034 : else if (bUseParquetGeoStat)
2035 : {
2036 : const int iParquetCol =
2037 : m_anMapGeomFieldIndexToParquetColumns
2038 : [m_iGeomFieldFilter][0];
2039 : CPLAssert(iParquetCol >= 0);
2040 :
2041 : const auto metadata =
2042 : m_poArrowReader->parquet_reader()->metadata();
2043 : const auto columnChunk =
2044 : metadata->RowGroup(iRowGroup)->ColumnChunk(iParquetCol);
2045 : if (auto geostats = columnChunk->geo_statistics())
2046 : {
2047 : if (geostats->dimension_valid()[0] &&
2048 : geostats->dimension_valid()[1])
2049 : {
2050 : double dfMinX = geostats->lower_bound()[0];
2051 : double dfMaxX = geostats->upper_bound()[0];
2052 : double dfMinY = geostats->lower_bound()[1];
2053 : double dfMaxY = geostats->upper_bound()[1];
2054 :
2055 : // Deal as best as we can with wrap around bounding box
2056 : if (dfMinX > dfMaxX && std::fabs(dfMinX) <= 180 &&
2057 : std::fabs(dfMaxX) <= 180)
2058 : {
2059 : dfMinX = -180;
2060 : dfMaxX = 180;
2061 : }
2062 :
2063 : // Check if there is an intersection between
2064 : // the geostatistics for this rowgroup and
2065 : // the bbox of interest
2066 : if (dfMinX > m_sFilterEnvelope.MaxX ||
2067 : dfMaxX < m_sFilterEnvelope.MinX ||
2068 : dfMinY > m_sFilterEnvelope.MaxY ||
2069 : dfMaxY < m_sFilterEnvelope.MinY)
2070 : {
2071 : bSelectGroup = false;
2072 : }
2073 : }
2074 : }
2075 : }
2076 : #endif
2077 :
2078 396 : if (bSelectGroup)
2079 : {
2080 556 : for (auto &constraint : m_asAttributeFilterConstraints)
2081 : {
2082 256 : int iOGRField = constraint.iField;
2083 512 : if (constraint.iField ==
2084 256 : m_poFeatureDefn->GetFieldCount() + SPF_FID)
2085 : {
2086 9 : iOGRField = OGR_FID_INDEX;
2087 : }
2088 256 : if (constraint.nOperation != SWQ_ISNULL &&
2089 247 : constraint.nOperation != SWQ_ISNOTNULL)
2090 : {
2091 234 : if (iOGRField == OGR_FID_INDEX &&
2092 9 : m_iFIDParquetColumn < 0)
2093 : {
2094 6 : sMin.Integer64 = nFeatureIdxTotal;
2095 6 : sMax.Integer64 =
2096 6 : nFeatureIdxTotal +
2097 6 : poRowGroup->metadata()->num_rows() - 1;
2098 6 : eType = OFTInteger64;
2099 : }
2100 228 : else if (!GetMinMaxForOGRField(
2101 : iRowGroup, iOGRField, true, sMin,
2102 : bFoundMin, true, sMax, bFoundMax,
2103 225 : eType, eSubType, osMinTmp, osMaxTmp) ||
2104 228 : !bFoundMin || !bFoundMax)
2105 : {
2106 3 : bIterateEverything = true;
2107 3 : break;
2108 : }
2109 : }
2110 :
2111 253 : IsConstraintPossibleRes res =
2112 : IsConstraintPossibleRes::UNKNOWN;
2113 253 : if (constraint.eType ==
2114 147 : OGRArrowLayer::Constraint::Type::Integer &&
2115 147 : eType == OFTInteger)
2116 : {
2117 : #if 0
2118 : CPLDebug("PARQUET",
2119 : "Group %d, field %s, min = %d, max = %d",
2120 : iRowGroup,
2121 : iOGRField == OGR_FID_INDEX
2122 : ? m_osFIDColumn.c_str()
2123 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
2124 : ->GetNameRef(),
2125 : sMin.Integer, sMax.Integer);
2126 : #endif
2127 125 : res = IsConstraintPossible(
2128 : constraint.nOperation,
2129 : constraint.sValue.Integer, sMin.Integer,
2130 : sMax.Integer);
2131 : }
2132 128 : else if (constraint.eType == OGRArrowLayer::Constraint::
2133 39 : Type::Integer64 &&
2134 39 : eType == OFTInteger64)
2135 : {
2136 : #if 0
2137 : CPLDebug("PARQUET",
2138 : "Group %d, field %s, min = " CPL_FRMT_GIB
2139 : ", max = " CPL_FRMT_GIB,
2140 : iRowGroup,
2141 : iOGRField == OGR_FID_INDEX
2142 : ? m_osFIDColumn.c_str()
2143 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
2144 : ->GetNameRef(),
2145 : static_cast<GIntBig>(sMin.Integer64),
2146 : static_cast<GIntBig>(sMax.Integer64));
2147 : #endif
2148 39 : res = IsConstraintPossible(
2149 : constraint.nOperation,
2150 : constraint.sValue.Integer64, sMin.Integer64,
2151 : sMax.Integer64);
2152 : }
2153 89 : else if (constraint.eType ==
2154 29 : OGRArrowLayer::Constraint::Type::Real &&
2155 29 : eType == OFTReal)
2156 : {
2157 : #if 0
2158 : CPLDebug("PARQUET",
2159 : "Group %d, field %s, min = %g, max = %g",
2160 : iRowGroup,
2161 : iOGRField == OGR_FID_INDEX
2162 : ? m_osFIDColumn.c_str()
2163 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
2164 : ->GetNameRef(),
2165 : sMin.Real, sMax.Real);
2166 : #endif
2167 26 : res = IsConstraintPossible(constraint.nOperation,
2168 : constraint.sValue.Real,
2169 : sMin.Real, sMax.Real);
2170 : }
2171 63 : else if (constraint.eType ==
2172 38 : OGRArrowLayer::Constraint::Type::String &&
2173 38 : eType == OFTString)
2174 : {
2175 : #if 0
2176 : CPLDebug("PARQUET",
2177 : "Group %d, field %s, min = %s, max = %s",
2178 : iRowGroup,
2179 : iOGRField == OGR_FID_INDEX
2180 : ? m_osFIDColumn.c_str()
2181 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
2182 : ->GetNameRef(),
2183 : sMin.String, sMax.String);
2184 : #endif
2185 38 : res = IsConstraintPossible(
2186 : constraint.nOperation,
2187 76 : std::string(constraint.sValue.String),
2188 76 : std::string(sMin.String),
2189 76 : std::string(sMax.String));
2190 : }
2191 25 : else if (constraint.nOperation == SWQ_ISNULL ||
2192 16 : constraint.nOperation == SWQ_ISNOTNULL)
2193 : {
2194 : const int iCol =
2195 : iOGRField == OGR_FID_INDEX
2196 44 : ? m_iFIDParquetColumn
2197 22 : : GetMapFieldIndexToParquetColumn()
2198 22 : [iOGRField];
2199 22 : if (iCol >= 0)
2200 : {
2201 : const auto metadata =
2202 22 : m_poArrowReader->parquet_reader()
2203 44 : ->metadata();
2204 : const auto rowGroupColumnChunk =
2205 22 : metadata->RowGroup(iRowGroup)->ColumnChunk(
2206 44 : iCol);
2207 : const auto rowGroupStats =
2208 44 : rowGroupColumnChunk->statistics();
2209 44 : if (rowGroupColumnChunk->is_stats_set() &&
2210 22 : rowGroupStats)
2211 : {
2212 22 : res = IsConstraintPossibleRes::YES;
2213 31 : if (constraint.nOperation == SWQ_ISNULL &&
2214 9 : rowGroupStats->num_values() ==
2215 9 : poRowGroup->metadata()->num_rows())
2216 : {
2217 5 : res = IsConstraintPossibleRes::NO;
2218 : }
2219 34 : else if (constraint.nOperation ==
2220 30 : SWQ_ISNOTNULL &&
2221 13 : rowGroupStats->num_values() == 0)
2222 : {
2223 1 : res = IsConstraintPossibleRes::NO;
2224 : }
2225 : }
2226 22 : }
2227 : }
2228 : else
2229 : {
2230 3 : CPLDebug(
2231 : "PARQUET",
2232 : "Unhandled combination of constraint.eType "
2233 : "(%d) and eType (%d)",
2234 3 : static_cast<int>(constraint.eType), eType);
2235 : }
2236 :
2237 253 : if (res == IsConstraintPossibleRes::NO)
2238 : {
2239 86 : bSelectGroup = false;
2240 86 : break;
2241 : }
2242 167 : else if (res == IsConstraintPossibleRes::UNKNOWN)
2243 : {
2244 3 : bIterateEverything = true;
2245 3 : break;
2246 : }
2247 : }
2248 : }
2249 :
2250 396 : if (bSelectGroup)
2251 : {
2252 : // CPLDebug("PARQUET", "Selecting row group %d", iRowGroup);
2253 : m_asFeatureIdxRemapping.emplace_back(
2254 306 : std::make_pair(nFeatureIdxSelected, nFeatureIdxTotal));
2255 306 : anSelectedGroups.push_back(iRowGroup);
2256 306 : nFeatureIdxSelected += poRowGroup->metadata()->num_rows();
2257 : }
2258 :
2259 396 : nFeatureIdxTotal += poRowGroup->metadata()->num_rows();
2260 : }
2261 : }
2262 :
2263 851 : if (bIterateEverything)
2264 : {
2265 588 : m_asFeatureIdxRemapping.clear();
2266 588 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
2267 588 : if (!CreateRecordBatchReader(0))
2268 0 : return false;
2269 : }
2270 : else
2271 : {
2272 263 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
2273 263 : if (anSelectedGroups.empty())
2274 : {
2275 13 : return false;
2276 : }
2277 250 : CPLDebug("PARQUET", "%d/%d row groups selected",
2278 250 : int(anSelectedGroups.size()),
2279 250 : m_poArrowReader->num_row_groups());
2280 250 : m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
2281 250 : ++m_oFeatureIdxRemappingIter;
2282 250 : if (!CreateRecordBatchReader(anSelectedGroups))
2283 : {
2284 0 : return false;
2285 : }
2286 : }
2287 : }
2288 :
2289 3656 : std::shared_ptr<arrow::RecordBatch> poNextBatch;
2290 :
2291 0 : do
2292 : {
2293 1828 : ++m_iRecordBatch;
2294 1828 : poNextBatch.reset();
2295 1828 : auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
2296 1828 : if (!status.ok())
2297 : {
2298 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
2299 0 : status.message().c_str());
2300 0 : poNextBatch.reset();
2301 : }
2302 1828 : if (poNextBatch == nullptr)
2303 : {
2304 926 : if (m_iRecordBatch == 1 && m_poBatch && m_poAttrQuery == nullptr &&
2305 272 : m_poFilterGeom == nullptr)
2306 : {
2307 46 : m_iRecordBatch = 0;
2308 46 : m_bSingleBatch = true;
2309 : }
2310 : else
2311 608 : m_poBatch.reset();
2312 654 : return false;
2313 : }
2314 1174 : } while (poNextBatch->num_rows() == 0);
2315 :
2316 1174 : SetBatch(poNextBatch);
2317 :
2318 1174 : return true;
2319 : }
2320 :
2321 : /************************************************************************/
2322 : /* InvalidateCachedBatches() */
2323 : /************************************************************************/
2324 :
2325 775 : void OGRParquetLayer::InvalidateCachedBatches()
2326 : {
2327 775 : m_bSingleBatch = false;
2328 775 : OGRParquetLayerBase::InvalidateCachedBatches();
2329 775 : }
2330 :
2331 : /************************************************************************/
2332 : /* SetIgnoredFields() */
2333 : /************************************************************************/
2334 :
2335 219 : OGRErr OGRParquetLayer::SetIgnoredFields(CSLConstList papszFields)
2336 : {
2337 219 : m_bIgnoredFields = false;
2338 219 : m_anRequestedParquetColumns.clear();
2339 219 : m_anMapFieldIndexToArrayIndex.clear();
2340 219 : m_anMapGeomFieldIndexToArrayIndex.clear();
2341 219 : m_nRequestedFIDColumn = -1;
2342 219 : OGRErr eErr = OGRLayer::SetIgnoredFields(papszFields);
2343 219 : int nBatchColumns = 0;
2344 219 : if (!m_bHasMissingMappingToParquet && eErr == OGRERR_NONE)
2345 : {
2346 219 : m_bIgnoredFields = papszFields != nullptr && papszFields[0] != nullptr;
2347 219 : if (m_bIgnoredFields)
2348 : {
2349 167 : if (m_iFIDParquetColumn >= 0)
2350 : {
2351 6 : m_nRequestedFIDColumn = nBatchColumns;
2352 6 : nBatchColumns++;
2353 6 : m_anRequestedParquetColumns.push_back(m_iFIDParquetColumn);
2354 : }
2355 :
2356 5908 : for (int i = 0; i < m_poFeatureDefn->GetFieldCount(); ++i)
2357 : {
2358 : const auto eArrowType =
2359 5741 : m_poSchema->fields()[m_anMapFieldIndexToArrowColumn[i][0]]
2360 5741 : ->type()
2361 5741 : ->id();
2362 5741 : if (eArrowType == arrow::Type::STRUCT)
2363 : {
2364 : // For a struct, for the sake of simplicity in
2365 : // GetNextRawFeature(), as soon as one of the member if
2366 : // requested, request all Parquet columns, so that the Arrow
2367 : // type doesn't change
2368 69 : bool bFoundNotIgnored = false;
2369 296 : for (int j = i; j < m_poFeatureDefn->GetFieldCount() &&
2370 294 : m_anMapFieldIndexToArrowColumn[i][0] ==
2371 147 : m_anMapFieldIndexToArrowColumn[j][0];
2372 : ++j)
2373 : {
2374 136 : if (!m_poFeatureDefn->GetFieldDefn(j)->IsIgnored())
2375 : {
2376 56 : bFoundNotIgnored = true;
2377 56 : break;
2378 : }
2379 : }
2380 69 : if (bFoundNotIgnored)
2381 : {
2382 : int j;
2383 784 : for (j = i; j < m_poFeatureDefn->GetFieldCount() &&
2384 784 : m_anMapFieldIndexToArrowColumn[i][0] ==
2385 392 : m_anMapFieldIndexToArrowColumn[j][0];
2386 : ++j)
2387 : {
2388 : const int iParquetCol =
2389 336 : m_anMapFieldIndexToParquetColumn[j];
2390 336 : CPLAssert(iParquetCol >= 0);
2391 336 : if (!m_poFeatureDefn->GetFieldDefn(j)->IsIgnored())
2392 : {
2393 330 : m_anMapFieldIndexToArrayIndex.push_back(
2394 : nBatchColumns);
2395 : }
2396 : else
2397 : {
2398 6 : m_anMapFieldIndexToArrayIndex.push_back(-1);
2399 : }
2400 336 : m_anRequestedParquetColumns.push_back(iParquetCol);
2401 : }
2402 56 : i = j - 1;
2403 56 : nBatchColumns++;
2404 : }
2405 : else
2406 : {
2407 : int j;
2408 172 : for (j = i; j < m_poFeatureDefn->GetFieldCount() &&
2409 170 : m_anMapFieldIndexToArrowColumn[i][0] ==
2410 85 : m_anMapFieldIndexToArrowColumn[j][0];
2411 : ++j)
2412 : {
2413 74 : m_anMapFieldIndexToArrayIndex.push_back(-1);
2414 : }
2415 13 : i = j - 1;
2416 : }
2417 : }
2418 5672 : else if (!m_poFeatureDefn->GetFieldDefn(i)->IsIgnored())
2419 : {
2420 4179 : const int iParquetCol = m_anMapFieldIndexToParquetColumn[i];
2421 4179 : CPLAssert(iParquetCol >= 0);
2422 4179 : m_anMapFieldIndexToArrayIndex.push_back(nBatchColumns);
2423 4179 : nBatchColumns++;
2424 4179 : m_anRequestedParquetColumns.push_back(iParquetCol);
2425 4179 : if (eArrowType == arrow::Type::MAP)
2426 : {
2427 : // For a map, request both keys and items Parquet
2428 : // columns
2429 338 : m_anRequestedParquetColumns.push_back(iParquetCol + 1);
2430 : }
2431 : }
2432 : else
2433 : {
2434 1493 : m_anMapFieldIndexToArrayIndex.push_back(-1);
2435 : }
2436 : }
2437 :
2438 167 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrayIndex.size()) ==
2439 : m_poFeatureDefn->GetFieldCount());
2440 :
2441 346 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
2442 : {
2443 179 : if (!m_poFeatureDefn->GetGeomFieldDefn(i)->IsIgnored())
2444 : {
2445 : const auto &anVals =
2446 157 : m_anMapGeomFieldIndexToParquetColumns[i];
2447 157 : CPLAssert(!anVals.empty() && anVals[0] >= 0);
2448 : m_anRequestedParquetColumns.insert(
2449 157 : m_anRequestedParquetColumns.end(), anVals.begin(),
2450 314 : anVals.end());
2451 157 : m_anMapGeomFieldIndexToArrayIndex.push_back(nBatchColumns);
2452 157 : nBatchColumns++;
2453 :
2454 157 : auto oIter = m_oMapGeomFieldIndexToGeomColBBOX.find(i);
2455 : const auto oIterParquet =
2456 157 : m_oMapGeomFieldIndexToGeomColBBOXParquet.find(i);
2457 219 : if (oIter != m_oMapGeomFieldIndexToGeomColBBOX.end() &&
2458 62 : oIterParquet !=
2459 219 : m_oMapGeomFieldIndexToGeomColBBOXParquet.end())
2460 : {
2461 62 : oIter->second.iArrayIdx = nBatchColumns++;
2462 : m_anRequestedParquetColumns.insert(
2463 62 : m_anRequestedParquetColumns.end(),
2464 62 : oIterParquet->second.anParquetCols.begin(),
2465 186 : oIterParquet->second.anParquetCols.end());
2466 : }
2467 : }
2468 : else
2469 : {
2470 22 : m_anMapGeomFieldIndexToArrayIndex.push_back(-1);
2471 : }
2472 : }
2473 :
2474 167 : CPLAssert(
2475 : static_cast<int>(m_anMapGeomFieldIndexToArrayIndex.size()) ==
2476 : m_poFeatureDefn->GetGeomFieldCount());
2477 : }
2478 : }
2479 :
2480 219 : m_nExpectedBatchColumns = m_bIgnoredFields ? nBatchColumns : -1;
2481 :
2482 219 : ComputeConstraintsArrayIdx();
2483 :
2484 : // Full invalidation
2485 219 : InvalidateCachedBatches();
2486 :
2487 219 : return eErr;
2488 : }
2489 :
2490 : /************************************************************************/
2491 : /* GetFeatureCount() */
2492 : /************************************************************************/
2493 :
2494 843 : GIntBig OGRParquetLayer::GetFeatureCount(int bForce)
2495 : {
2496 843 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
2497 : {
2498 27 : auto metadata = m_poArrowReader->parquet_reader()->metadata();
2499 27 : if (metadata)
2500 27 : return metadata->num_rows();
2501 : }
2502 816 : return OGRLayer::GetFeatureCount(bForce);
2503 : }
2504 :
2505 : /************************************************************************/
2506 : /* FastGetExtent() */
2507 : /************************************************************************/
2508 :
2509 633 : bool OGRParquetLayer::FastGetExtent(int iGeomField, OGREnvelope *psExtent) const
2510 : {
2511 633 : if (OGRParquetLayerBase::FastGetExtent(iGeomField, psExtent))
2512 618 : return true;
2513 :
2514 : const auto oIterToGeomColBBOX =
2515 15 : m_oMapGeomFieldIndexToGeomColBBOXParquet.find(iGeomField);
2516 16 : if (oIterToGeomColBBOX != m_oMapGeomFieldIndexToGeomColBBOXParquet.end() &&
2517 1 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_BBOX", "YES")))
2518 : {
2519 1 : OGREnvelope sExtent;
2520 : OGRField sMin, sMax;
2521 1 : OGR_RawField_SetNull(&sMin);
2522 1 : OGR_RawField_SetNull(&sMax);
2523 : bool bFoundMin, bFoundMax;
2524 1 : OGRFieldType eType = OFTMaxType;
2525 1 : OGRFieldSubType eSubType = OFSTNone;
2526 1 : std::string osMinTmp, osMaxTmp;
2527 2 : if (GetMinMaxForParquetCol(-1, oIterToGeomColBBOX->second.iParquetXMin,
2528 : nullptr, true, sMin, bFoundMin, false, sMax,
2529 : bFoundMax, eType, eSubType, osMinTmp,
2530 3 : osMaxTmp) &&
2531 1 : eType == OFTReal)
2532 : {
2533 1 : sExtent.MinX = sMin.Real;
2534 :
2535 1 : if (GetMinMaxForParquetCol(
2536 1 : -1, oIterToGeomColBBOX->second.iParquetYMin, nullptr, true,
2537 : sMin, bFoundMin, false, sMax, bFoundMax, eType, eSubType,
2538 3 : osMinTmp, osMaxTmp) &&
2539 1 : eType == OFTReal)
2540 : {
2541 1 : sExtent.MinY = sMin.Real;
2542 :
2543 1 : if (GetMinMaxForParquetCol(
2544 1 : -1, oIterToGeomColBBOX->second.iParquetXMax, nullptr,
2545 : false, sMin, bFoundMin, true, sMax, bFoundMax, eType,
2546 3 : eSubType, osMinTmp, osMaxTmp) &&
2547 1 : eType == OFTReal)
2548 : {
2549 1 : sExtent.MaxX = sMax.Real;
2550 :
2551 1 : if (GetMinMaxForParquetCol(
2552 1 : -1, oIterToGeomColBBOX->second.iParquetYMax,
2553 : nullptr, false, sMin, bFoundMin, true, sMax,
2554 3 : bFoundMax, eType, eSubType, osMinTmp, osMaxTmp) &&
2555 1 : eType == OFTReal)
2556 : {
2557 1 : sExtent.MaxY = sMax.Real;
2558 :
2559 1 : CPLDebug("PARQUET",
2560 : "Using statistics of bbox.minx, bbox.miny, "
2561 : "bbox.maxx, bbox.maxy columns to get extent");
2562 1 : m_oMapExtents[iGeomField] = sExtent;
2563 1 : *psExtent = sExtent;
2564 1 : return true;
2565 : }
2566 : }
2567 : }
2568 : }
2569 : }
2570 :
2571 14 : return false;
2572 : }
2573 :
2574 : /************************************************************************/
2575 : /* TestCapability() */
2576 : /************************************************************************/
2577 :
2578 622 : int OGRParquetLayer::TestCapability(const char *pszCap)
2579 : {
2580 622 : if (EQUAL(pszCap, OLCFastFeatureCount))
2581 79 : return m_poAttrQuery == nullptr && m_poFilterGeom == nullptr;
2582 :
2583 543 : if (EQUAL(pszCap, OLCIgnoreFields))
2584 8 : return !m_bHasMissingMappingToParquet;
2585 :
2586 535 : if (EQUAL(pszCap, OLCFastSpatialFilter))
2587 : {
2588 168 : if (m_iGeomFieldFilter >= 0 &&
2589 112 : m_iGeomFieldFilter < static_cast<int>(m_aeGeomEncoding.size()) &&
2590 56 : OGRArrowIsGeoArrowStruct(m_aeGeomEncoding[m_iGeomFieldFilter]))
2591 : {
2592 56 : return true;
2593 : }
2594 :
2595 : #if PARQUET_VERSION_MAJOR >= 21
2596 : if (m_iGeomFieldFilter >= 0 &&
2597 : m_iGeomFieldFilter < static_cast<int>(m_aeGeomEncoding.size()) &&
2598 : m_aeGeomEncoding[m_iGeomFieldFilter] == OGRArrowGeomEncoding::WKB &&
2599 : m_iGeomFieldFilter <
2600 : static_cast<int>(
2601 : m_anMapGeomFieldIndexToParquetColumns.size()) &&
2602 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter].size() ==
2603 : 1)
2604 : {
2605 : const int iParquetCol =
2606 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter][0];
2607 : if (iParquetCol >= 0)
2608 : {
2609 : const auto metadata =
2610 : m_poArrowReader->parquet_reader()->metadata();
2611 :
2612 : int nCountRowGroupsStatsValid = 0;
2613 : const int nNumGroups = m_poArrowReader->num_row_groups();
2614 : for (int iRowGroup = 0; iRowGroup < nNumGroups &&
2615 : nCountRowGroupsStatsValid == iRowGroup;
2616 : ++iRowGroup)
2617 : {
2618 : const auto columnChunk =
2619 : metadata->RowGroup(iRowGroup)->ColumnChunk(iParquetCol);
2620 : if (auto geostats = columnChunk->geo_statistics())
2621 : {
2622 : if (geostats->dimension_valid()[0] &&
2623 : geostats->dimension_valid()[1])
2624 : {
2625 : const double dfMinX = geostats->lower_bound()[0];
2626 : const double dfMaxX = geostats->upper_bound()[0];
2627 : const double dfMinY = geostats->lower_bound()[1];
2628 : const double dfMaxY = geostats->upper_bound()[1];
2629 : if (std::isfinite(dfMinX) &&
2630 : std::isfinite(dfMaxX) &&
2631 : std::isfinite(dfMinY) && std::isfinite(dfMaxY))
2632 : {
2633 : nCountRowGroupsStatsValid++;
2634 : }
2635 : }
2636 : }
2637 : }
2638 : if (nCountRowGroupsStatsValid == nNumGroups)
2639 : {
2640 : return true;
2641 : }
2642 : }
2643 : }
2644 : #endif
2645 :
2646 : // fallback to base method
2647 : }
2648 :
2649 479 : return OGRParquetLayerBase::TestCapability(pszCap);
2650 : }
2651 :
2652 : /************************************************************************/
2653 : /* GetMetadataItem() */
2654 : /************************************************************************/
2655 :
2656 308 : const char *OGRParquetLayer::GetMetadataItem(const char *pszName,
2657 : const char *pszDomain)
2658 : {
2659 : // Mostly for unit test purposes
2660 308 : if (pszDomain != nullptr && EQUAL(pszDomain, "_PARQUET_"))
2661 : {
2662 9 : int nRowGroupIdx = -1;
2663 9 : int nColumn = -1;
2664 9 : if (EQUAL(pszName, "NUM_ROW_GROUPS"))
2665 : {
2666 1 : return CPLSPrintf("%d", m_poArrowReader->num_row_groups());
2667 : }
2668 8 : if (EQUAL(pszName, "CREATOR"))
2669 : {
2670 4 : return CPLSPrintf("%s", m_poArrowReader->parquet_reader()
2671 4 : ->metadata()
2672 2 : ->created_by()
2673 2 : .c_str());
2674 : }
2675 12 : else if (sscanf(pszName, "ROW_GROUPS[%d]", &nRowGroupIdx) == 1 &&
2676 6 : strstr(pszName, ".NUM_ROWS"))
2677 : {
2678 : try
2679 : {
2680 : auto poRowGroup =
2681 6 : m_poArrowReader->parquet_reader()->RowGroup(nRowGroupIdx);
2682 3 : if (poRowGroup == nullptr)
2683 0 : return nullptr;
2684 3 : return CPLSPrintf("%" PRId64,
2685 3 : poRowGroup->metadata()->num_rows());
2686 : }
2687 0 : catch (const std::exception &)
2688 : {
2689 : }
2690 : }
2691 6 : else if (sscanf(pszName, "ROW_GROUPS[%d].COLUMNS[%d]", &nRowGroupIdx,
2692 6 : &nColumn) == 2 &&
2693 3 : strstr(pszName, ".COMPRESSION"))
2694 : {
2695 : try
2696 : {
2697 : auto poRowGroup =
2698 6 : m_poArrowReader->parquet_reader()->RowGroup(nRowGroupIdx);
2699 3 : if (poRowGroup == nullptr)
2700 0 : return nullptr;
2701 6 : auto poColumn = poRowGroup->metadata()->ColumnChunk(nColumn);
2702 3 : return CPLSPrintf("%s", arrow::util::Codec::GetCodecAsString(
2703 3 : poColumn->compression())
2704 3 : .c_str());
2705 : }
2706 0 : catch (const std::exception &)
2707 : {
2708 : }
2709 : }
2710 0 : return nullptr;
2711 : }
2712 299 : if (pszDomain != nullptr && EQUAL(pszDomain, "_PARQUET_METADATA_"))
2713 : {
2714 404 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
2715 202 : const auto &kv_metadata = metadata->key_value_metadata();
2716 202 : if (kv_metadata && kv_metadata->Contains(pszName))
2717 : {
2718 199 : auto metadataItem = kv_metadata->Get(pszName);
2719 199 : if (metadataItem.ok())
2720 : {
2721 199 : return CPLSPrintf("%s", metadataItem->c_str());
2722 : }
2723 : }
2724 3 : return nullptr;
2725 : }
2726 97 : return OGRLayer::GetMetadataItem(pszName, pszDomain);
2727 : }
2728 :
2729 : /************************************************************************/
2730 : /* GetMetadata() */
2731 : /************************************************************************/
2732 :
2733 31 : char **OGRParquetLayer::GetMetadata(const char *pszDomain)
2734 : {
2735 : // Mostly for unit test purposes
2736 31 : if (pszDomain != nullptr && EQUAL(pszDomain, "_PARQUET_METADATA_"))
2737 : {
2738 2 : m_aosFeatherMetadata.Clear();
2739 4 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
2740 2 : const auto &kv_metadata = metadata->key_value_metadata();
2741 2 : if (kv_metadata)
2742 : {
2743 7 : for (const auto &kv : kv_metadata->sorted_pairs())
2744 : {
2745 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
2746 5 : kv.second.c_str());
2747 : }
2748 : }
2749 2 : return m_aosFeatherMetadata.List();
2750 : }
2751 29 : return OGRLayer::GetMetadata(pszDomain);
2752 : }
2753 :
2754 : /************************************************************************/
2755 : /* GetArrowStream() */
2756 : /************************************************************************/
2757 :
2758 134 : bool OGRParquetLayer::GetArrowStream(struct ArrowArrayStream *out_stream,
2759 : CSLConstList papszOptions)
2760 : {
2761 : const char *pszMaxFeaturesInBatch =
2762 134 : CSLFetchNameValue(papszOptions, "MAX_FEATURES_IN_BATCH");
2763 134 : if (pszMaxFeaturesInBatch)
2764 : {
2765 14 : int nMaxBatchSize = atoi(pszMaxFeaturesInBatch);
2766 14 : if (nMaxBatchSize <= 0)
2767 0 : nMaxBatchSize = 1;
2768 14 : if (nMaxBatchSize > INT_MAX - 1)
2769 0 : nMaxBatchSize = INT_MAX - 1;
2770 14 : m_poArrowReader->set_batch_size(nMaxBatchSize);
2771 : }
2772 134 : return OGRArrowLayer::GetArrowStream(out_stream, papszOptions);
2773 : }
2774 :
2775 : /************************************************************************/
2776 : /* SetNextByIndex() */
2777 : /************************************************************************/
2778 :
2779 12 : OGRErr OGRParquetLayer::SetNextByIndex(GIntBig nIndex)
2780 : {
2781 12 : if (nIndex < 0)
2782 3 : return OGRERR_FAILURE;
2783 :
2784 18 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
2785 9 : if (nIndex >= metadata->num_rows())
2786 3 : return OGRERR_FAILURE;
2787 :
2788 6 : if (m_bSingleBatch)
2789 : {
2790 0 : ResetReading();
2791 0 : m_nIdxInBatch = nIndex;
2792 0 : m_nFeatureIdx = nIndex;
2793 0 : return OGRERR_NONE;
2794 : }
2795 :
2796 6 : const int nNumGroups = m_poArrowReader->num_row_groups();
2797 6 : int64_t nAccRows = 0;
2798 6 : const auto nBatchSize = m_poArrowReader->properties().batch_size();
2799 6 : m_iRecordBatch = -1;
2800 6 : ResetReading();
2801 6 : m_iRecordBatch = 0;
2802 7 : for (int iGroup = 0; iGroup < nNumGroups; ++iGroup)
2803 : {
2804 : const int64_t nNextAccRows =
2805 7 : nAccRows + metadata->RowGroup(iGroup)->num_rows();
2806 7 : if (nIndex < nNextAccRows)
2807 : {
2808 6 : if (!CreateRecordBatchReader(iGroup))
2809 0 : return OGRERR_FAILURE;
2810 :
2811 12 : std::shared_ptr<arrow::RecordBatch> poBatch;
2812 : while (true)
2813 : {
2814 6 : auto status = m_poRecordBatchReader->ReadNext(&poBatch);
2815 6 : if (!status.ok())
2816 : {
2817 0 : CPLError(CE_Failure, CPLE_AppDefined,
2818 0 : "ReadNext() failed: %s", status.message().c_str());
2819 0 : m_iRecordBatch = -1;
2820 0 : ResetReading();
2821 0 : return OGRERR_FAILURE;
2822 : }
2823 6 : if (poBatch == nullptr)
2824 : {
2825 0 : m_iRecordBatch = -1;
2826 0 : ResetReading();
2827 0 : return OGRERR_FAILURE;
2828 : }
2829 6 : if (nIndex < nAccRows + poBatch->num_rows())
2830 : {
2831 6 : break;
2832 : }
2833 0 : nAccRows += poBatch->num_rows();
2834 0 : m_iRecordBatch++;
2835 0 : }
2836 6 : m_nIdxInBatch = nIndex - nAccRows;
2837 6 : m_nFeatureIdx = nIndex;
2838 6 : SetBatch(poBatch);
2839 6 : return OGRERR_NONE;
2840 : }
2841 1 : nAccRows = nNextAccRows;
2842 1 : m_iRecordBatch +=
2843 1 : (metadata->RowGroup(iGroup)->num_rows() + nBatchSize - 1) /
2844 : nBatchSize;
2845 : }
2846 :
2847 0 : m_iRecordBatch = -1;
2848 0 : ResetReading();
2849 0 : return OGRERR_FAILURE;
2850 : }
2851 :
2852 : /***********************************************************************/
2853 : /* GetStats() */
2854 : /***********************************************************************/
2855 :
2856 : template <class STAT_TYPE> struct GetStats
2857 : {
2858 : using T = typename STAT_TYPE::T;
2859 :
2860 511 : static T min(const std::shared_ptr<parquet::FileMetaData> &metadata,
2861 : const int iRowGroup, const int numRowGroups, const int iCol,
2862 : bool &bFound)
2863 : {
2864 511 : T v{};
2865 511 : bFound = false;
2866 1030 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2867 : {
2868 555 : const auto columnChunk =
2869 30 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2870 : ->ColumnChunk(iCol);
2871 525 : const auto colStats = columnChunk->statistics();
2872 1047 : if (columnChunk->is_stats_set() && colStats &&
2873 522 : colStats->HasMinMax())
2874 : {
2875 516 : auto castStats = static_cast<STAT_TYPE *>(colStats.get());
2876 516 : const auto rowGroupVal = castStats->min();
2877 516 : if (i == 0 || rowGroupVal < v)
2878 : {
2879 504 : bFound = true;
2880 504 : v = rowGroupVal;
2881 : }
2882 : }
2883 9 : else if (columnChunk->num_values() > 0)
2884 : {
2885 6 : bFound = false;
2886 6 : break;
2887 : }
2888 : }
2889 511 : return v;
2890 : }
2891 :
2892 500 : static T max(const std::shared_ptr<parquet::FileMetaData> &metadata,
2893 : const int iRowGroup, const int numRowGroups, const int iCol,
2894 : bool &bFound)
2895 : {
2896 500 : T v{};
2897 500 : bFound = false;
2898 1014 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2899 : {
2900 544 : const auto columnChunk =
2901 30 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2902 : ->ColumnChunk(iCol);
2903 514 : const auto colStats = columnChunk->statistics();
2904 1026 : if (columnChunk->is_stats_set() && colStats &&
2905 512 : colStats->HasMinMax())
2906 : {
2907 512 : auto castStats = static_cast<STAT_TYPE *>(colStats.get());
2908 512 : const auto rowGroupVal = castStats->max();
2909 512 : if (i == 0 || rowGroupVal > v)
2910 : {
2911 510 : bFound = true;
2912 510 : v = rowGroupVal;
2913 : }
2914 : }
2915 2 : else if (columnChunk->num_values() > 0)
2916 : {
2917 0 : bFound = false;
2918 0 : break;
2919 : }
2920 : }
2921 500 : return v;
2922 : }
2923 : };
2924 :
2925 : template <> struct GetStats<parquet::ByteArrayStatistics>
2926 : {
2927 : static std::string
2928 39 : min(const std::shared_ptr<parquet::FileMetaData> &metadata,
2929 : const int iRowGroup, const int numRowGroups, const int iCol,
2930 : bool &bFound)
2931 : {
2932 39 : std::string v{};
2933 39 : bFound = false;
2934 79 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2935 : {
2936 : const auto columnChunk =
2937 40 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2938 80 : ->ColumnChunk(iCol);
2939 80 : const auto colStats = columnChunk->statistics();
2940 80 : if (columnChunk->is_stats_set() && colStats &&
2941 40 : colStats->HasMinMax())
2942 : {
2943 : auto castStats =
2944 40 : static_cast<parquet::ByteArrayStatistics *>(colStats.get());
2945 40 : const auto rowGroupValRaw = castStats->min();
2946 : std::string rowGroupVal(
2947 40 : reinterpret_cast<const char *>(rowGroupValRaw.ptr),
2948 80 : rowGroupValRaw.len);
2949 40 : if (i == 0 || rowGroupVal < v)
2950 : {
2951 39 : bFound = true;
2952 39 : v = std::move(rowGroupVal);
2953 : }
2954 : }
2955 : }
2956 39 : return v;
2957 : }
2958 :
2959 : static std::string
2960 39 : max(const std::shared_ptr<parquet::FileMetaData> &metadata,
2961 : const int iRowGroup, const int numRowGroups, const int iCol,
2962 : bool &bFound)
2963 : {
2964 39 : std::string v{};
2965 39 : bFound = false;
2966 79 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2967 : {
2968 : const auto columnChunk =
2969 40 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2970 40 : ->ColumnChunk(iCol);
2971 40 : const auto colStats = columnChunk->statistics();
2972 80 : if (columnChunk->is_stats_set() && colStats &&
2973 40 : colStats->HasMinMax())
2974 : {
2975 : auto castStats =
2976 40 : static_cast<parquet::ByteArrayStatistics *>(colStats.get());
2977 40 : const auto rowGroupValRaw = castStats->max();
2978 : std::string rowGroupVal(
2979 40 : reinterpret_cast<const char *>(rowGroupValRaw.ptr),
2980 80 : rowGroupValRaw.len);
2981 40 : if (i == 0 || rowGroupVal > v)
2982 : {
2983 40 : bFound = true;
2984 40 : v = std::move(rowGroupVal);
2985 : }
2986 : }
2987 : else
2988 : {
2989 0 : bFound = false;
2990 0 : break;
2991 : }
2992 : }
2993 39 : return v;
2994 : }
2995 : };
2996 :
2997 : /************************************************************************/
2998 : /* GetMinMaxForOGRField() */
2999 : /************************************************************************/
3000 :
3001 258 : bool OGRParquetLayer::GetMinMaxForOGRField(int iRowGroup, // -1 for all
3002 : int iOGRField, bool bComputeMin,
3003 : OGRField &sMin, bool &bFoundMin,
3004 : bool bComputeMax, OGRField &sMax,
3005 : bool &bFoundMax, OGRFieldType &eType,
3006 : OGRFieldSubType &eSubType,
3007 : std::string &osMinTmp,
3008 : std::string &osMaxTmp) const
3009 : {
3010 258 : OGR_RawField_SetNull(&sMin);
3011 258 : OGR_RawField_SetNull(&sMax);
3012 258 : eType = OFTReal;
3013 258 : eSubType = OFSTNone;
3014 258 : bFoundMin = false;
3015 258 : bFoundMax = false;
3016 :
3017 : const int iCol = iOGRField == OGR_FID_INDEX
3018 511 : ? m_iFIDParquetColumn
3019 253 : : GetMapFieldIndexToParquetColumn()[iOGRField];
3020 258 : if (iCol < 0)
3021 0 : return false;
3022 : const auto &arrowType = iOGRField == OGR_FID_INDEX
3023 258 : ? m_poFIDType
3024 253 : : GetArrowFieldTypes()[iOGRField];
3025 :
3026 258 : const bool bRet = GetMinMaxForParquetCol(
3027 : iRowGroup, iCol, arrowType, bComputeMin, sMin, bFoundMin, bComputeMax,
3028 : sMax, bFoundMax, eType, eSubType, osMinTmp, osMaxTmp);
3029 :
3030 258 : if (eType == OFTInteger64 && arrowType->id() == arrow::Type::TIMESTAMP)
3031 : {
3032 : const OGRFieldDefn oDummyFIDFieldDefn(m_osFIDColumn.c_str(),
3033 4 : OFTInteger64);
3034 : const OGRFieldDefn *poFieldDefn =
3035 2 : iOGRField == OGR_FID_INDEX ? &oDummyFIDFieldDefn
3036 : : const_cast<OGRParquetLayer *>(this)
3037 2 : ->GetLayerDefn()
3038 2 : ->GetFieldDefn(iOGRField);
3039 2 : if (poFieldDefn->GetType() == OFTDateTime)
3040 : {
3041 : const auto timestampType =
3042 2 : static_cast<arrow::TimestampType *>(arrowType.get());
3043 2 : if (bFoundMin)
3044 : {
3045 1 : const int64_t timestamp = sMin.Integer64;
3046 1 : OGRArrowLayer::TimestampToOGR(timestamp, timestampType,
3047 : poFieldDefn->GetTZFlag(), &sMin);
3048 : }
3049 2 : if (bFoundMax)
3050 : {
3051 1 : const int64_t timestamp = sMax.Integer64;
3052 1 : OGRArrowLayer::TimestampToOGR(timestamp, timestampType,
3053 : poFieldDefn->GetTZFlag(), &sMax);
3054 : }
3055 2 : eType = OFTDateTime;
3056 : }
3057 : }
3058 :
3059 258 : return bRet;
3060 : }
3061 :
3062 : /************************************************************************/
3063 : /* GetMinMaxForParquetCol() */
3064 : /************************************************************************/
3065 :
3066 867 : bool OGRParquetLayer::GetMinMaxForParquetCol(
3067 : int iRowGroup, // -1 for all
3068 : int iCol,
3069 : const std::shared_ptr<arrow::DataType> &arrowType, // potentially nullptr
3070 : bool bComputeMin, OGRField &sMin, bool &bFoundMin, bool bComputeMax,
3071 : OGRField &sMax, bool &bFoundMax, OGRFieldType &eType,
3072 : OGRFieldSubType &eSubType, std::string &osMinTmp,
3073 : std::string &osMaxTmp) const
3074 : {
3075 867 : OGR_RawField_SetNull(&sMin);
3076 867 : OGR_RawField_SetNull(&sMax);
3077 867 : eType = OFTReal;
3078 867 : eSubType = OFSTNone;
3079 867 : bFoundMin = false;
3080 867 : bFoundMax = false;
3081 :
3082 1734 : const auto metadata = GetReader()->parquet_reader()->metadata();
3083 867 : const auto numRowGroups = metadata->num_row_groups();
3084 :
3085 867 : if (numRowGroups == 0)
3086 0 : return false;
3087 :
3088 1734 : const auto rowGroup0 = metadata->RowGroup(0);
3089 867 : if (iCol < 0 || iCol >= rowGroup0->num_columns())
3090 : {
3091 0 : CPLError(CE_Failure, CPLE_AppDefined,
3092 : "GetMinMaxForParquetCol(): invalid iCol=%d", iCol);
3093 0 : return false;
3094 : }
3095 1734 : const auto rowGroup0columnChunk = rowGroup0->ColumnChunk(iCol);
3096 1734 : const auto rowGroup0Stats = rowGroup0columnChunk->statistics();
3097 867 : if (!(rowGroup0columnChunk->is_stats_set() && rowGroup0Stats))
3098 : {
3099 0 : CPLDebug("PARQUET", "Statistics not available for field %s",
3100 0 : rowGroup0columnChunk->path_in_schema()->ToDotString().c_str());
3101 0 : return false;
3102 : }
3103 :
3104 867 : const auto physicalType = rowGroup0Stats->physical_type();
3105 :
3106 867 : if (bComputeMin)
3107 : {
3108 553 : if (physicalType == parquet::Type::BOOLEAN)
3109 : {
3110 54 : eType = OFTInteger;
3111 54 : eSubType = OFSTBoolean;
3112 54 : sMin.Integer = GetStats<parquet::BoolStatistics>::min(
3113 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3114 : }
3115 499 : else if (physicalType == parquet::Type::INT32)
3116 : {
3117 78 : if (arrowType && arrowType->id() == arrow::Type::UINT32)
3118 : {
3119 : // With parquet file version 2.0,
3120 : // statistics of uint32 fields are
3121 : // stored as signed int32 values...
3122 1 : eType = OFTInteger64;
3123 1 : int nVal = GetStats<parquet::Int32Statistics>::min(
3124 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3125 1 : if (bFoundMin)
3126 : {
3127 1 : sMin.Integer64 = static_cast<uint32_t>(nVal);
3128 : }
3129 : }
3130 : else
3131 : {
3132 77 : eType = OFTInteger;
3133 77 : if (arrowType && arrowType->id() == arrow::Type::INT16)
3134 1 : eSubType = OFSTInt16;
3135 77 : sMin.Integer = GetStats<parquet::Int32Statistics>::min(
3136 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3137 : }
3138 : }
3139 421 : else if (physicalType == parquet::Type::INT64)
3140 : {
3141 41 : eType = OFTInteger64;
3142 41 : sMin.Integer64 = GetStats<parquet::Int64Statistics>::min(
3143 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3144 : }
3145 380 : else if (physicalType == parquet::Type::FLOAT)
3146 : {
3147 128 : eType = OFTReal;
3148 128 : eSubType = OFSTFloat32;
3149 128 : sMin.Real = GetStats<parquet::FloatStatistics>::min(
3150 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3151 : }
3152 252 : else if (physicalType == parquet::Type::DOUBLE)
3153 : {
3154 210 : eType = OFTReal;
3155 210 : sMin.Real = GetStats<parquet::DoubleStatistics>::min(
3156 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3157 : }
3158 42 : else if (arrowType &&
3159 53 : (arrowType->id() == arrow::Type::STRING ||
3160 95 : arrowType->id() == arrow::Type::LARGE_STRING) &&
3161 : physicalType == parquet::Type::BYTE_ARRAY)
3162 : {
3163 78 : osMinTmp = GetStats<parquet::ByteArrayStatistics>::min(
3164 39 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
3165 39 : if (bFoundMin)
3166 : {
3167 39 : eType = OFTString;
3168 39 : sMin.String = &osMinTmp[0];
3169 : }
3170 : }
3171 : }
3172 :
3173 867 : if (bComputeMax)
3174 : {
3175 542 : if (physicalType == parquet::Type::BOOLEAN)
3176 : {
3177 54 : eType = OFTInteger;
3178 54 : eSubType = OFSTBoolean;
3179 54 : sMax.Integer = GetStats<parquet::BoolStatistics>::max(
3180 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3181 : }
3182 488 : else if (physicalType == parquet::Type::INT32)
3183 : {
3184 78 : if (arrowType && arrowType->id() == arrow::Type::UINT32)
3185 : {
3186 : // With parquet file version 2.0,
3187 : // statistics of uint32 fields are
3188 : // stored as signed int32 values...
3189 1 : eType = OFTInteger64;
3190 1 : int nVal = GetStats<parquet::Int32Statistics>::max(
3191 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3192 1 : if (bFoundMax)
3193 : {
3194 1 : sMax.Integer64 = static_cast<uint32_t>(nVal);
3195 : }
3196 : }
3197 : else
3198 : {
3199 77 : eType = OFTInteger;
3200 77 : if (arrowType && arrowType->id() == arrow::Type::INT16)
3201 1 : eSubType = OFSTInt16;
3202 77 : sMax.Integer = GetStats<parquet::Int32Statistics>::max(
3203 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3204 : }
3205 : }
3206 410 : else if (physicalType == parquet::Type::INT64)
3207 : {
3208 41 : eType = OFTInteger64;
3209 41 : sMax.Integer64 = GetStats<parquet::Int64Statistics>::max(
3210 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3211 : }
3212 369 : else if (physicalType == parquet::Type::FLOAT)
3213 : {
3214 118 : eType = OFTReal;
3215 118 : eSubType = OFSTFloat32;
3216 118 : sMax.Real = GetStats<parquet::FloatStatistics>::max(
3217 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3218 : }
3219 251 : else if (physicalType == parquet::Type::DOUBLE)
3220 : {
3221 209 : eType = OFTReal;
3222 209 : sMax.Real = GetStats<parquet::DoubleStatistics>::max(
3223 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3224 : }
3225 42 : else if (arrowType &&
3226 53 : (arrowType->id() == arrow::Type::STRING ||
3227 95 : arrowType->id() == arrow::Type::LARGE_STRING) &&
3228 : physicalType == parquet::Type::BYTE_ARRAY)
3229 : {
3230 78 : osMaxTmp = GetStats<parquet::ByteArrayStatistics>::max(
3231 39 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
3232 39 : if (bFoundMax)
3233 : {
3234 39 : eType = OFTString;
3235 39 : sMax.String = &osMaxTmp[0];
3236 : }
3237 : }
3238 : }
3239 :
3240 867 : return bFoundMin || bFoundMax;
3241 : }
3242 :
3243 : /************************************************************************/
3244 : /* GeomColsBBOXParquet() */
3245 : /************************************************************************/
3246 :
3247 : /** Return for a given geometry column (iGeom: in [0, GetGeomFieldCount()-1] range),
3248 : * the Parquet column number of the corresponding xmin,ymin,xmax,ymax bounding
3249 : * box columns, if existing.
3250 : */
3251 1 : bool OGRParquetLayer::GeomColsBBOXParquet(int iGeom, int &iParquetXMin,
3252 : int &iParquetYMin, int &iParquetXMax,
3253 : int &iParquetYMax) const
3254 : {
3255 1 : const auto oIter = m_oMapGeomFieldIndexToGeomColBBOXParquet.find(iGeom);
3256 : const bool bFound =
3257 1 : (oIter != m_oMapGeomFieldIndexToGeomColBBOXParquet.end());
3258 1 : if (bFound)
3259 : {
3260 1 : iParquetXMin = oIter->second.iParquetXMin;
3261 1 : iParquetYMin = oIter->second.iParquetYMin;
3262 1 : iParquetXMax = oIter->second.iParquetXMax;
3263 1 : iParquetYMax = oIter->second.iParquetYMax;
3264 : }
3265 1 : return bFound;
3266 : }
|