Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "cpl_json.h"
14 : #include "cpl_time.h"
15 : #include "cpl_multiproc.h"
16 : #include "gdal_pam.h"
17 : #include "ogrsf_frmts.h"
18 : #include "ogr_p.h"
19 :
20 : #include <algorithm>
21 : #include <cinttypes>
22 : #include <limits>
23 : #include <map>
24 : #include <set>
25 : #include <utility>
26 :
27 : #include "ogr_parquet.h"
28 :
29 : #include "../arrow_common/ograrrowlayer.hpp"
30 : #include "../arrow_common/ograrrowdataset.hpp"
31 :
32 : /************************************************************************/
33 : /* OGRParquetLayerBase() */
34 : /************************************************************************/
35 :
36 1118 : OGRParquetLayerBase::OGRParquetLayerBase(OGRParquetDataset *poDS,
37 : const char *pszLayerName,
38 1118 : CSLConstList papszOpenOptions)
39 : : OGRArrowLayer(poDS, pszLayerName), m_poDS(poDS),
40 : m_aosGeomPossibleNames(CSLTokenizeString2(
41 : CSLFetchNameValueDef(papszOpenOptions, "GEOM_POSSIBLE_NAMES",
42 : "geometry,wkb_geometry,wkt_geometry"),
43 : ",", 0)),
44 1118 : m_osCRS(CSLFetchNameValueDef(papszOpenOptions, "CRS", ""))
45 : {
46 1118 : }
47 :
48 : /************************************************************************/
49 : /* GetDataset() */
50 : /************************************************************************/
51 :
52 12 : GDALDataset *OGRParquetLayerBase::GetDataset()
53 : {
54 12 : return m_poDS;
55 : }
56 :
57 : /************************************************************************/
58 : /* ResetReading() */
59 : /************************************************************************/
60 :
61 6300 : void OGRParquetLayerBase::ResetReading()
62 : {
63 6300 : if (m_iRecordBatch != 0)
64 : {
65 5979 : m_poRecordBatchReader.reset();
66 : }
67 6300 : OGRArrowLayer::ResetReading();
68 6300 : }
69 :
70 : /************************************************************************/
71 : /* InvalidateCachedBatches() */
72 : /************************************************************************/
73 :
74 1689 : void OGRParquetLayerBase::InvalidateCachedBatches()
75 : {
76 1689 : m_iRecordBatch = -1;
77 1689 : ResetReading();
78 1689 : }
79 :
80 : /************************************************************************/
81 : /* LoadGeoMetadata() */
82 : /************************************************************************/
83 :
84 1118 : void OGRParquetLayerBase::LoadGeoMetadata(
85 : const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata)
86 : {
87 1118 : if (kv_metadata && kv_metadata->Contains("geo"))
88 : {
89 2154 : auto geo = kv_metadata->Get("geo");
90 1077 : if (geo.ok())
91 : {
92 1077 : CPLDebug("PARQUET", "geo = %s", geo->c_str());
93 2154 : CPLJSONDocument oDoc;
94 1077 : if (oDoc.LoadMemory(*geo))
95 : {
96 2152 : auto oRoot = oDoc.GetRoot();
97 3228 : const auto osVersion = oRoot.GetString("version");
98 2484 : if (osVersion != "0.1.0" && osVersion != "0.2.0" &&
99 2111 : osVersion != "0.3.0" && osVersion != "0.4.0" &&
100 2107 : osVersion != "1.0.0-beta.1" && osVersion != "1.0.0-rc.1" &&
101 2482 : osVersion != "1.0.0" && osVersion != "1.1.0")
102 : {
103 1 : CPLDebug(
104 : "PARQUET",
105 : "version = %s not explicitly handled by the driver",
106 : osVersion.c_str());
107 : }
108 :
109 3228 : auto oColumns = oRoot.GetObj("columns");
110 1076 : if (oColumns.IsValid())
111 : {
112 2167 : for (const auto &oColumn : oColumns.GetChildren())
113 : {
114 1092 : m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
115 : }
116 : }
117 : }
118 : else
119 : {
120 1 : CPLError(CE_Warning, CPLE_AppDefined,
121 : "Cannot parse 'geo' metadata");
122 : }
123 : }
124 : }
125 1118 : }
126 :
127 : /************************************************************************/
128 : /* ParseGeometryColumnCovering() */
129 : /************************************************************************/
130 :
131 : //! Parse bounding box column definition
132 : /*static */
133 2174 : bool OGRParquetLayerBase::ParseGeometryColumnCovering(
134 : const CPLJSONObject &oJSONDef, std::string &osBBOXColumn,
135 : std::string &osXMin, std::string &osYMin, std::string &osXMax,
136 : std::string &osYMax)
137 : {
138 6522 : const auto oCovering = oJSONDef["covering"];
139 3033 : if (oCovering.IsValid() &&
140 859 : oCovering.GetType() == CPLJSONObject::Type::Object)
141 : {
142 1718 : const auto oBBOX = oCovering["bbox"];
143 859 : if (oBBOX.IsValid() && oBBOX.GetType() == CPLJSONObject::Type::Object)
144 : {
145 1718 : const auto oXMin = oBBOX["xmin"];
146 1718 : const auto oYMin = oBBOX["ymin"];
147 1718 : const auto oXMax = oBBOX["xmax"];
148 1718 : const auto oYMax = oBBOX["ymax"];
149 1718 : if (oXMin.IsValid() && oYMin.IsValid() && oXMax.IsValid() &&
150 859 : oYMax.IsValid() &&
151 859 : oXMin.GetType() == CPLJSONObject::Type::Array &&
152 859 : oYMin.GetType() == CPLJSONObject::Type::Array &&
153 2577 : oXMax.GetType() == CPLJSONObject::Type::Array &&
154 859 : oYMax.GetType() == CPLJSONObject::Type::Array)
155 : {
156 859 : const auto osXMinArray = oXMin.ToArray();
157 859 : const auto osYMinArray = oYMin.ToArray();
158 859 : const auto osXMaxArray = oXMax.ToArray();
159 859 : const auto osYMaxArray = oYMax.ToArray();
160 859 : if (osXMinArray.Size() == 2 && osYMinArray.Size() == 2 &&
161 859 : osXMaxArray.Size() == 2 && osYMaxArray.Size() == 2 &&
162 1718 : osXMinArray[0].GetType() == CPLJSONObject::Type::String &&
163 1718 : osXMinArray[1].GetType() == CPLJSONObject::Type::String &&
164 1718 : osYMinArray[0].GetType() == CPLJSONObject::Type::String &&
165 1718 : osYMinArray[1].GetType() == CPLJSONObject::Type::String &&
166 1718 : osXMaxArray[0].GetType() == CPLJSONObject::Type::String &&
167 1718 : osXMaxArray[1].GetType() == CPLJSONObject::Type::String &&
168 1718 : osYMaxArray[0].GetType() == CPLJSONObject::Type::String &&
169 2577 : osYMaxArray[1].GetType() == CPLJSONObject::Type::String &&
170 2577 : osXMinArray[0].ToString() == osYMinArray[0].ToString() &&
171 3436 : osXMinArray[0].ToString() == osXMaxArray[0].ToString() &&
172 1718 : osXMinArray[0].ToString() == osYMaxArray[0].ToString())
173 : {
174 859 : osBBOXColumn = osXMinArray[0].ToString();
175 859 : osXMin = osXMinArray[1].ToString();
176 859 : osYMin = osYMinArray[1].ToString();
177 859 : osXMax = osXMaxArray[1].ToString();
178 859 : osYMax = osYMaxArray[1].ToString();
179 859 : return true;
180 : }
181 : }
182 : }
183 : }
184 1315 : return false;
185 : }
186 :
187 : /************************************************************************/
188 : /* DealWithGeometryColumn() */
189 : /************************************************************************/
190 :
191 31764 : bool OGRParquetLayerBase::DealWithGeometryColumn(
192 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
193 : std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun)
194 : {
195 63528 : const auto &field_kv_metadata = field->metadata();
196 31764 : std::string osExtensionName;
197 31764 : if (field_kv_metadata)
198 : {
199 84 : auto extension_name = field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
200 42 : if (extension_name.ok())
201 : {
202 2 : osExtensionName = *extension_name;
203 : }
204 : #ifdef DEBUG
205 42 : CPLDebug("PARQUET", "Metadata field %s:", field->name().c_str());
206 45 : for (const auto &keyValue : field_kv_metadata->sorted_pairs())
207 : {
208 3 : CPLDebug("PARQUET", " %s = %s", keyValue.first.c_str(),
209 : keyValue.second.c_str());
210 : }
211 : #endif
212 : }
213 :
214 31764 : bool bRegularField = true;
215 31764 : auto oIter = m_oMapGeometryColumns.find(field->name());
216 31764 : if (oIter != m_oMapGeometryColumns.end() ||
217 62436 : STARTS_WITH(osExtensionName.c_str(), "ogc.") ||
218 30672 : STARTS_WITH(osExtensionName.c_str(), "geoarrow."))
219 : {
220 2186 : CPLJSONObject oJSONDef;
221 1093 : if (oIter != m_oMapGeometryColumns.end())
222 1092 : oJSONDef = oIter->second;
223 3279 : auto osEncoding = oJSONDef.GetString("encoding");
224 1093 : if (osEncoding.empty() && !osExtensionName.empty())
225 1 : osEncoding = osExtensionName;
226 :
227 1093 : OGRwkbGeometryType eGeomType = wkbUnknown;
228 1093 : auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
229 1093 : if (IsValidGeometryEncoding(field, osEncoding,
230 2186 : oIter != m_oMapGeometryColumns.end(),
231 : eGeomType, eGeomEncoding))
232 : {
233 1093 : bRegularField = false;
234 2186 : OGRGeomFieldDefn oField(field->name().c_str(), wkbUnknown);
235 :
236 3279 : auto oCRS = oJSONDef["crs"];
237 1093 : OGRSpatialReference *poSRS = nullptr;
238 1093 : if (!oCRS.IsValid())
239 : {
240 39 : if (!m_oMapGeometryColumns.empty())
241 : {
242 : // WGS 84 is implied if no crs member is found.
243 38 : poSRS = new OGRSpatialReference();
244 38 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
245 38 : poSRS->importFromEPSG(4326);
246 : }
247 : }
248 1054 : else if (oCRS.GetType() == CPLJSONObject::Type::String)
249 : {
250 1113 : const auto osWKT = oCRS.ToString();
251 371 : poSRS = new OGRSpatialReference();
252 371 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
253 :
254 371 : if (poSRS->importFromWkt(osWKT.c_str()) != OGRERR_NONE)
255 : {
256 0 : poSRS->Release();
257 0 : poSRS = nullptr;
258 : }
259 : }
260 683 : else if (oCRS.GetType() == CPLJSONObject::Type::Object)
261 : {
262 : // CRS encoded as PROJJSON (extension)
263 75 : const auto oType = oCRS["type"];
264 50 : if (oType.IsValid() &&
265 25 : oType.GetType() == CPLJSONObject::Type::String)
266 : {
267 75 : const auto osType = oType.ToString();
268 25 : if (osType.find("CRS") != std::string::npos)
269 : {
270 25 : poSRS = new OGRSpatialReference();
271 25 : poSRS->SetAxisMappingStrategy(
272 : OAMS_TRADITIONAL_GIS_ORDER);
273 :
274 25 : if (poSRS->SetFromUserInput(oCRS.ToString().c_str()) !=
275 : OGRERR_NONE)
276 : {
277 0 : poSRS->Release();
278 0 : poSRS = nullptr;
279 : }
280 : }
281 : }
282 : }
283 :
284 1093 : if (poSRS)
285 : {
286 434 : const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
287 434 : if (dfCoordEpoch > 0)
288 4 : poSRS->SetCoordinateEpoch(dfCoordEpoch);
289 :
290 434 : oField.SetSpatialRef(poSRS);
291 :
292 434 : poSRS->Release();
293 : }
294 :
295 1093 : if (!m_osCRS.empty())
296 : {
297 0 : poSRS = new OGRSpatialReference();
298 0 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
299 0 : if (poSRS->SetFromUserInput(m_osCRS.c_str()) == OGRERR_NONE)
300 : {
301 0 : oField.SetSpatialRef(poSRS);
302 : }
303 0 : poSRS->Release();
304 : }
305 :
306 1093 : if (oJSONDef.GetString("edges") == "spherical")
307 : {
308 2 : SetMetadataItem("EDGES", "SPHERICAL");
309 : }
310 :
311 : // m_aeGeomEncoding be filled before calling
312 : // ComputeGeometryColumnType()
313 1093 : m_aeGeomEncoding.push_back(eGeomEncoding);
314 1093 : if (eGeomType == wkbUnknown)
315 : {
316 : // geometry_types since 1.0.0-beta1. Was geometry_type
317 : // before
318 1683 : auto oType = oJSONDef.GetObj("geometry_types");
319 561 : if (!oType.IsValid())
320 375 : oType = oJSONDef.GetObj("geometry_type");
321 561 : if (oType.GetType() == CPLJSONObject::Type::String)
322 : {
323 : // string is no longer valid since 1.0.0-beta1
324 3 : const auto osType = oType.ToString();
325 1 : if (osType != "Unknown")
326 1 : eGeomType = GetGeometryTypeFromString(osType);
327 : }
328 560 : else if (oType.GetType() == CPLJSONObject::Type::Array)
329 : {
330 372 : const auto oTypeArray = oType.ToArray();
331 186 : if (oTypeArray.Size() == 1)
332 : {
333 85 : eGeomType =
334 85 : GetGeometryTypeFromString(oTypeArray[0].ToString());
335 : }
336 101 : else if (oTypeArray.Size() > 1)
337 : {
338 : const auto PromoteToCollection =
339 266 : [](OGRwkbGeometryType eType)
340 : {
341 266 : if (eType == wkbPoint)
342 41 : return wkbMultiPoint;
343 225 : if (eType == wkbLineString)
344 36 : return wkbMultiLineString;
345 189 : if (eType == wkbPolygon)
346 49 : return wkbMultiPolygon;
347 140 : return eType;
348 : };
349 50 : bool bMixed = false;
350 50 : bool bHasMulti = false;
351 50 : bool bHasZ = false;
352 50 : bool bHasM = false;
353 : const auto eFirstType =
354 50 : OGR_GT_Flatten(GetGeometryTypeFromString(
355 100 : oTypeArray[0].ToString()));
356 : const auto eFirstTypeCollection =
357 50 : PromoteToCollection(eFirstType);
358 142 : for (int i = 0; i < oTypeArray.Size(); ++i)
359 : {
360 124 : const auto eThisGeom = GetGeometryTypeFromString(
361 248 : oTypeArray[i].ToString());
362 124 : if (PromoteToCollection(OGR_GT_Flatten(
363 124 : eThisGeom)) != eFirstTypeCollection)
364 : {
365 32 : bMixed = true;
366 32 : break;
367 : }
368 92 : bHasZ |= OGR_GT_HasZ(eThisGeom) != FALSE;
369 92 : bHasM |= OGR_GT_HasM(eThisGeom) != FALSE;
370 92 : bHasMulti |=
371 92 : (PromoteToCollection(OGR_GT_Flatten(
372 92 : eThisGeom)) == OGR_GT_Flatten(eThisGeom));
373 : }
374 50 : if (!bMixed)
375 : {
376 18 : if (eFirstTypeCollection == wkbMultiPolygon ||
377 : eFirstTypeCollection == wkbMultiLineString)
378 : {
379 17 : if (bHasMulti)
380 17 : eGeomType = OGR_GT_SetModifier(
381 : eFirstTypeCollection, bHasZ, bHasM);
382 : else
383 0 : eGeomType = OGR_GT_SetModifier(
384 : eFirstType, bHasZ, bHasM);
385 : }
386 : }
387 : }
388 : }
389 374 : else if (CPLTestBool(CPLGetConfigOption(
390 : "OGR_PARQUET_COMPUTE_GEOMETRY_TYPE", "YES")))
391 : {
392 374 : eGeomType = computeGeometryTypeFun();
393 : }
394 : }
395 :
396 1093 : oField.SetType(eGeomType);
397 1093 : oField.SetNullable(field->nullable());
398 1093 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
399 1093 : m_anMapGeomFieldIndexToArrowColumn.push_back(iFieldIdx);
400 : }
401 : }
402 :
403 : // Try to autodetect a (WKB) geometry column from the GEOM_POSSIBLE_NAMES
404 : // open option
405 31924 : if (osExtensionName.empty() && m_oMapGeometryColumns.empty() &&
406 160 : m_aosGeomPossibleNames.FindString(field->name().c_str()) >= 0)
407 : {
408 22 : std::shared_ptr<arrow::DataType> fieldType = field->type();
409 11 : auto fieldTypeId = fieldType->id();
410 11 : if (fieldTypeId == arrow::Type::BINARY ||
411 : fieldTypeId == arrow::Type::LARGE_BINARY)
412 : {
413 5 : CPLDebug("PARQUET",
414 : "Field %s detected as likely WKB geometry field",
415 5 : field->name().c_str());
416 5 : bRegularField = false;
417 5 : m_aeGeomEncoding.push_back(OGRArrowGeomEncoding::WKB);
418 : }
419 0 : else if ((fieldTypeId == arrow::Type::STRING ||
420 16 : fieldTypeId == arrow::Type::LARGE_STRING) &&
421 10 : (field->name().find("wkt") != std::string::npos ||
422 4 : field->name().find("WKT") != std::string::npos))
423 : {
424 2 : CPLDebug("PARQUET",
425 : "Field %s detected as likely WKT geometry field",
426 2 : field->name().c_str());
427 2 : bRegularField = false;
428 2 : m_aeGeomEncoding.push_back(OGRArrowGeomEncoding::WKT);
429 : }
430 11 : if (!bRegularField)
431 : {
432 14 : OGRGeomFieldDefn oField(field->name().c_str(), wkbUnknown);
433 7 : oField.SetNullable(field->nullable());
434 :
435 7 : if (!m_osCRS.empty())
436 : {
437 2 : auto poSRS = new OGRSpatialReference();
438 2 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
439 2 : if (poSRS->SetFromUserInput(m_osCRS.c_str()) == OGRERR_NONE)
440 : {
441 2 : oField.SetSpatialRef(poSRS);
442 : }
443 2 : poSRS->Release();
444 : }
445 :
446 7 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
447 7 : m_anMapGeomFieldIndexToArrowColumn.push_back(iFieldIdx);
448 : }
449 : }
450 :
451 63528 : return !bRegularField;
452 : }
453 :
454 : /************************************************************************/
455 : /* TestCapability() */
456 : /************************************************************************/
457 :
458 700 : int OGRParquetLayerBase::TestCapability(const char *pszCap)
459 : {
460 700 : if (EQUAL(pszCap, OLCMeasuredGeometries))
461 24 : return true;
462 :
463 676 : if (EQUAL(pszCap, OLCFastSetNextByIndex))
464 0 : return true;
465 :
466 676 : if (EQUAL(pszCap, OLCFastSpatialFilter))
467 : {
468 49 : if (m_oMapGeomFieldIndexToGeomColBBOX.find(m_iGeomFieldFilter) !=
469 98 : m_oMapGeomFieldIndexToGeomColBBOX.end())
470 : {
471 25 : return true;
472 : }
473 24 : return false;
474 : }
475 :
476 627 : return OGRArrowLayer::TestCapability(pszCap);
477 : }
478 :
479 : /************************************************************************/
480 : /* GetNumCPUs() */
481 : /************************************************************************/
482 :
483 : /* static */
484 1541 : int OGRParquetLayerBase::GetNumCPUs()
485 : {
486 1541 : const char *pszNumThreads = CPLGetConfigOption("GDAL_NUM_THREADS", nullptr);
487 1541 : int nNumThreads = 0;
488 1541 : if (pszNumThreads == nullptr)
489 1541 : nNumThreads = std::min(4, CPLGetNumCPUs());
490 : else
491 0 : nNumThreads = EQUAL(pszNumThreads, "ALL_CPUS") ? CPLGetNumCPUs()
492 0 : : atoi(pszNumThreads);
493 1541 : if (nNumThreads > 1)
494 : {
495 1541 : CPL_IGNORE_RET_VAL(arrow::SetCpuThreadPoolCapacity(nNumThreads));
496 : }
497 1541 : return nNumThreads;
498 : }
499 :
500 : /************************************************************************/
501 : /* OGRParquetLayer() */
502 : /************************************************************************/
503 :
504 846 : OGRParquetLayer::OGRParquetLayer(
505 : OGRParquetDataset *poDS, const char *pszLayerName,
506 : std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
507 846 : CSLConstList papszOpenOptions)
508 : : OGRParquetLayerBase(poDS, pszLayerName, papszOpenOptions),
509 846 : m_poArrowReader(std::move(arrow_reader))
510 : {
511 : const char *pszParquetBatchSize =
512 846 : CPLGetConfigOption("OGR_PARQUET_BATCH_SIZE", nullptr);
513 846 : if (pszParquetBatchSize)
514 5 : m_poArrowReader->set_batch_size(CPLAtoGIntBig(pszParquetBatchSize));
515 :
516 846 : const int nNumCPUs = GetNumCPUs();
517 : const char *pszUseThreads =
518 846 : CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr);
519 846 : if (!pszUseThreads && nNumCPUs > 1)
520 : {
521 846 : pszUseThreads = "YES";
522 : }
523 846 : if (pszUseThreads && CPLTestBool(pszUseThreads))
524 : {
525 846 : m_poArrowReader->set_use_threads(true);
526 : }
527 :
528 846 : EstablishFeatureDefn();
529 846 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
530 : m_poFeatureDefn->GetGeomFieldCount());
531 :
532 846 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
533 846 : }
534 :
535 : /************************************************************************/
536 : /* EstablishFeatureDefn() */
537 : /************************************************************************/
538 :
539 846 : void OGRParquetLayer::EstablishFeatureDefn()
540 : {
541 846 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
542 846 : const auto &kv_metadata = metadata->key_value_metadata();
543 :
544 846 : LoadGeoMetadata(kv_metadata);
545 : const auto oMapFieldNameToGDALSchemaFieldDefn =
546 846 : LoadGDALSchema(kv_metadata.get());
547 :
548 846 : LoadGDALMetadata(kv_metadata.get());
549 :
550 846 : if (!m_poArrowReader->GetSchema(&m_poSchema).ok())
551 : {
552 0 : return;
553 : }
554 :
555 : const bool bUseBBOX =
556 846 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_BBOX", "YES"));
557 :
558 : // Keep track of declared bounding box columns in GeoParquet JSON metadata,
559 : // in order not to expose them as regular fields.
560 1692 : std::set<std::string> oSetBBOXColumns;
561 846 : if (bUseBBOX)
562 : {
563 1675 : for (const auto &iter : m_oMapGeometryColumns)
564 : {
565 1668 : std::string osBBOXColumn;
566 1668 : std::string osXMin, osYMin, osXMax, osYMax;
567 834 : if (ParseGeometryColumnCovering(iter.second, osBBOXColumn, osXMin,
568 : osYMin, osXMax, osYMax))
569 : {
570 340 : oSetBBOXColumns.insert(osBBOXColumn);
571 : }
572 : }
573 : }
574 :
575 846 : const auto &fields = m_poSchema->fields();
576 846 : const auto poParquetSchema = metadata->schema();
577 :
578 : // Map from Parquet column name (with dot separator) to Parquet index
579 1692 : std::map<std::string, int> oMapParquetColumnNameToIdx;
580 846 : const int nParquetColumns = poParquetSchema->num_columns();
581 35123 : for (int iParquetCol = 0; iParquetCol < nParquetColumns; ++iParquetCol)
582 : {
583 34277 : const auto parquetColumn = poParquetSchema->Column(iParquetCol);
584 34277 : const auto parquetColumnName = parquetColumn->path()->ToDotString();
585 34277 : oMapParquetColumnNameToIdx[parquetColumnName] = iParquetCol;
586 : }
587 :
588 : // Synthetize a GeoParquet bounding box column definition when detecting
589 : // a Overture Map dataset < 2024-04-16-beta.0
590 821 : if ((m_oMapGeometryColumns.empty() ||
591 : // Below is for release 2024-01-17-alpha.0
592 1667 : (m_oMapGeometryColumns.find("geometry") !=
593 1667 : m_oMapGeometryColumns.end() &&
594 2151 : !m_oMapGeometryColumns["geometry"].GetObj("covering").IsValid() &&
595 1675 : m_oMapGeometryColumns["geometry"].GetString("encoding") == "WKB")) &&
596 333 : bUseBBOX &&
597 1179 : oMapParquetColumnNameToIdx.find("geometry") !=
598 1495 : oMapParquetColumnNameToIdx.end() &&
599 1162 : oMapParquetColumnNameToIdx.find("bbox.minx") !=
600 1163 : oMapParquetColumnNameToIdx.end() &&
601 847 : oMapParquetColumnNameToIdx.find("bbox.miny") !=
602 848 : oMapParquetColumnNameToIdx.end() &&
603 847 : oMapParquetColumnNameToIdx.find("bbox.maxx") !=
604 2539 : oMapParquetColumnNameToIdx.end() &&
605 847 : oMapParquetColumnNameToIdx.find("bbox.maxy") !=
606 847 : oMapParquetColumnNameToIdx.end())
607 : {
608 2 : CPLJSONObject oDef;
609 1 : if (m_oMapGeometryColumns.find("geometry") !=
610 2 : m_oMapGeometryColumns.end())
611 : {
612 0 : oDef = m_oMapGeometryColumns["geometry"];
613 : }
614 2 : CPLJSONObject oCovering;
615 1 : oDef.Add("covering", oCovering);
616 1 : CPLJSONObject oBBOX;
617 1 : oCovering.Add("bbox", oBBOX);
618 : {
619 1 : CPLJSONArray oArray;
620 1 : oArray.Add("bbox");
621 1 : oArray.Add("minx");
622 1 : oBBOX.Add("xmin", oArray);
623 : }
624 : {
625 1 : CPLJSONArray oArray;
626 1 : oArray.Add("bbox");
627 1 : oArray.Add("miny");
628 1 : oBBOX.Add("ymin", oArray);
629 : }
630 : {
631 1 : CPLJSONArray oArray;
632 1 : oArray.Add("bbox");
633 1 : oArray.Add("maxx");
634 1 : oBBOX.Add("xmax", oArray);
635 : }
636 : {
637 1 : CPLJSONArray oArray;
638 1 : oArray.Add("bbox");
639 1 : oArray.Add("maxy");
640 1 : oBBOX.Add("ymax", oArray);
641 : }
642 1 : oSetBBOXColumns.insert("bbox");
643 1 : oDef.Add("encoding", "WKB");
644 1 : m_oMapGeometryColumns["geometry"] = std::move(oDef);
645 : }
646 : // Overture Maps 2024-04-16-beta.0 almost follows GeoParquet 1.1, except
647 : // they don't declare the "covering" element in the GeoParquet JSON metadata
648 1690 : else if (m_oMapGeometryColumns.find("geometry") !=
649 1656 : m_oMapGeometryColumns.end() &&
650 1614 : bUseBBOX &&
651 2145 : !m_oMapGeometryColumns["geometry"].GetObj("covering").IsValid() &&
652 1647 : m_oMapGeometryColumns["geometry"].GetString("encoding") == "WKB" &&
653 1154 : oMapParquetColumnNameToIdx.find("geometry") !=
654 1463 : oMapParquetColumnNameToIdx.end() &&
655 1154 : oMapParquetColumnNameToIdx.find("bbox.xmin") !=
656 1155 : oMapParquetColumnNameToIdx.end() &&
657 846 : oMapParquetColumnNameToIdx.find("bbox.ymin") !=
658 847 : oMapParquetColumnNameToIdx.end() &&
659 846 : oMapParquetColumnNameToIdx.find("bbox.xmax") !=
660 3347 : oMapParquetColumnNameToIdx.end() &&
661 846 : oMapParquetColumnNameToIdx.find("bbox.ymax") !=
662 846 : oMapParquetColumnNameToIdx.end())
663 : {
664 3 : CPLJSONObject oDef = m_oMapGeometryColumns["geometry"];
665 2 : CPLJSONObject oCovering;
666 1 : oDef.Add("covering", oCovering);
667 1 : CPLJSONObject oBBOX;
668 1 : oCovering.Add("bbox", oBBOX);
669 : {
670 1 : CPLJSONArray oArray;
671 1 : oArray.Add("bbox");
672 1 : oArray.Add("xmin");
673 1 : oBBOX.Add("xmin", oArray);
674 : }
675 : {
676 1 : CPLJSONArray oArray;
677 1 : oArray.Add("bbox");
678 1 : oArray.Add("ymin");
679 1 : oBBOX.Add("ymin", oArray);
680 : }
681 : {
682 1 : CPLJSONArray oArray;
683 1 : oArray.Add("bbox");
684 1 : oArray.Add("xmax");
685 1 : oBBOX.Add("xmax", oArray);
686 : }
687 : {
688 1 : CPLJSONArray oArray;
689 1 : oArray.Add("bbox");
690 1 : oArray.Add("ymax");
691 1 : oBBOX.Add("ymax", oArray);
692 : }
693 1 : oSetBBOXColumns.insert("bbox");
694 1 : m_oMapGeometryColumns["geometry"] = std::move(oDef);
695 : }
696 :
697 846 : int iParquetCol = 0;
698 26178 : for (int i = 0; i < m_poSchema->num_fields(); ++i)
699 : {
700 25332 : const auto &field = fields[i];
701 :
702 : bool bParquetColValid =
703 25332 : CheckMatchArrowParquetColumnNames(iParquetCol, field);
704 25332 : if (!bParquetColValid)
705 0 : m_bHasMissingMappingToParquet = true;
706 :
707 25368 : if (!m_osFIDColumn.empty() && field->name() == m_osFIDColumn &&
708 36 : (field->type()->id() == arrow::Type::INT32 ||
709 18 : field->type()->id() == arrow::Type::INT64))
710 : {
711 18 : m_poFIDType = field->type();
712 18 : m_iFIDArrowColumn = i;
713 18 : if (bParquetColValid)
714 : {
715 18 : m_iFIDParquetColumn = iParquetCol;
716 18 : iParquetCol++;
717 : }
718 360 : continue;
719 : }
720 :
721 25314 : if (oSetBBOXColumns.find(field->name()) != oSetBBOXColumns.end())
722 : {
723 342 : m_oSetBBoxArrowColumns.insert(i);
724 342 : if (bParquetColValid)
725 342 : iParquetCol++;
726 342 : continue;
727 : }
728 :
729 : const auto ComputeGeometryColumnTypeLambda =
730 885 : [this, bParquetColValid, iParquetCol, &poParquetSchema]()
731 : {
732 : // only with GeoParquet < 0.2.0
733 590 : if (bParquetColValid &&
734 295 : poParquetSchema->Column(iParquetCol)->physical_type() ==
735 : parquet::Type::BYTE_ARRAY)
736 : {
737 295 : return ComputeGeometryColumnType(
738 590 : m_poFeatureDefn->GetGeomFieldCount(), iParquetCol);
739 : }
740 0 : return wkbUnknown;
741 24972 : };
742 :
743 : const bool bGeometryField =
744 24972 : DealWithGeometryColumn(i, field, ComputeGeometryColumnTypeLambda);
745 24972 : if (bGeometryField)
746 : {
747 846 : const auto oIter = m_oMapGeometryColumns.find(field->name());
748 846 : if (bUseBBOX && oIter != m_oMapGeometryColumns.end())
749 : {
750 834 : ProcessGeometryColumnCovering(field, oIter->second,
751 : oMapParquetColumnNameToIdx);
752 : }
753 :
754 2498 : if (bParquetColValid &&
755 1652 : (field->type()->id() == arrow::Type::STRUCT ||
756 806 : field->type()->id() == arrow::Type::LIST))
757 : {
758 : // GeoArrow types
759 352 : std::vector<int> anParquetCols;
760 2160 : for (const auto &iterParquetCols : oMapParquetColumnNameToIdx)
761 : {
762 1808 : if (STARTS_WITH(
763 : iterParquetCols.first.c_str(),
764 : std::string(field->name()).append(".").c_str()))
765 : {
766 752 : iParquetCol =
767 752 : std::max(iParquetCol, iterParquetCols.second);
768 752 : anParquetCols.push_back(iterParquetCols.second);
769 : }
770 : }
771 352 : m_anMapGeomFieldIndexToParquetColumns.push_back(anParquetCols);
772 352 : ++iParquetCol;
773 : }
774 : else
775 : {
776 494 : m_anMapGeomFieldIndexToParquetColumns.push_back(
777 494 : {bParquetColValid ? iParquetCol : -1});
778 494 : if (bParquetColValid)
779 494 : iParquetCol++;
780 : }
781 : }
782 : else
783 : {
784 24126 : CreateFieldFromSchema(field, bParquetColValid, iParquetCol, {i},
785 : oMapFieldNameToGDALSchemaFieldDefn);
786 : }
787 : }
788 :
789 846 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
790 : m_poFeatureDefn->GetFieldCount());
791 846 : CPLAssert(static_cast<int>(m_anMapFieldIndexToParquetColumn.size()) ==
792 : m_poFeatureDefn->GetFieldCount());
793 846 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
794 : m_poFeatureDefn->GetGeomFieldCount());
795 846 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToParquetColumns.size()) ==
796 : m_poFeatureDefn->GetGeomFieldCount());
797 :
798 846 : if (!fields.empty())
799 : {
800 : try
801 : {
802 1638 : auto poRowGroup = m_poArrowReader->parquet_reader()->RowGroup(0);
803 793 : if (poRowGroup)
804 : {
805 1586 : auto poColumn = poRowGroup->metadata()->ColumnChunk(0);
806 793 : CPLDebug("PARQUET", "Compression (of first column): %s",
807 : arrow::util::Codec::GetCodecAsString(
808 793 : poColumn->compression())
809 : .c_str());
810 : }
811 : }
812 52 : catch (const std::exception &)
813 : {
814 : }
815 : }
816 : }
817 :
818 : /************************************************************************/
819 : /* ProcessGeometryColumnCovering() */
820 : /************************************************************************/
821 :
822 : /** Process GeoParquet JSON geometry field object to extract information about
823 : * its bounding box column, and appropriately fill m_oMapGeomFieldIndexToGeomColBBOX
824 : * and m_oMapGeomFieldIndexToGeomColBBOXParquet members with information on that
825 : * bounding box column.
826 : */
827 834 : void OGRParquetLayer::ProcessGeometryColumnCovering(
828 : const std::shared_ptr<arrow::Field> &field,
829 : const CPLJSONObject &oJSONGeometryColumn,
830 : const std::map<std::string, int> &oMapParquetColumnNameToIdx)
831 : {
832 1668 : std::string osBBOXColumn;
833 1668 : std::string osXMin, osYMin, osXMax, osYMax;
834 834 : if (ParseGeometryColumnCovering(oJSONGeometryColumn, osBBOXColumn, osXMin,
835 : osYMin, osXMax, osYMax))
836 : {
837 342 : OGRArrowLayer::GeomColBBOX sDesc;
838 342 : sDesc.iArrowCol = m_poSchema->GetFieldIndex(osBBOXColumn);
839 684 : const auto fieldBBOX = m_poSchema->GetFieldByName(osBBOXColumn);
840 684 : if (sDesc.iArrowCol >= 0 && fieldBBOX &&
841 342 : fieldBBOX->type()->id() == arrow::Type::STRUCT)
842 : {
843 : const auto fieldBBOXStruct =
844 684 : std::static_pointer_cast<arrow::StructType>(fieldBBOX->type());
845 684 : const auto fieldXMin = fieldBBOXStruct->GetFieldByName(osXMin);
846 684 : const auto fieldYMin = fieldBBOXStruct->GetFieldByName(osYMin);
847 684 : const auto fieldXMax = fieldBBOXStruct->GetFieldByName(osXMax);
848 684 : const auto fieldYMax = fieldBBOXStruct->GetFieldByName(osYMax);
849 342 : const int nXMinIdx = fieldBBOXStruct->GetFieldIndex(osXMin);
850 342 : const int nYMinIdx = fieldBBOXStruct->GetFieldIndex(osYMin);
851 342 : const int nXMaxIdx = fieldBBOXStruct->GetFieldIndex(osXMax);
852 342 : const int nYMaxIdx = fieldBBOXStruct->GetFieldIndex(osYMax);
853 : const auto oIterParquetIdxXMin = oMapParquetColumnNameToIdx.find(
854 342 : std::string(osBBOXColumn).append(".").append(osXMin));
855 : const auto oIterParquetIdxYMin = oMapParquetColumnNameToIdx.find(
856 342 : std::string(osBBOXColumn).append(".").append(osYMin));
857 : const auto oIterParquetIdxXMax = oMapParquetColumnNameToIdx.find(
858 342 : std::string(osBBOXColumn).append(".").append(osXMax));
859 : const auto oIterParquetIdxYMax = oMapParquetColumnNameToIdx.find(
860 342 : std::string(osBBOXColumn).append(".").append(osYMax));
861 342 : if (nXMinIdx >= 0 && nYMinIdx >= 0 && nXMaxIdx >= 0 &&
862 684 : nYMaxIdx >= 0 && fieldXMin && fieldYMin && fieldXMax &&
863 684 : fieldYMax &&
864 684 : oIterParquetIdxXMin != oMapParquetColumnNameToIdx.end() &&
865 684 : oIterParquetIdxYMin != oMapParquetColumnNameToIdx.end() &&
866 684 : oIterParquetIdxXMax != oMapParquetColumnNameToIdx.end() &&
867 684 : oIterParquetIdxYMax != oMapParquetColumnNameToIdx.end() &&
868 343 : (fieldXMin->type()->id() == arrow::Type::FLOAT ||
869 1 : fieldXMin->type()->id() == arrow::Type::DOUBLE) &&
870 342 : fieldXMin->type()->id() == fieldYMin->type()->id() &&
871 1026 : fieldXMin->type()->id() == fieldXMax->type()->id() &&
872 342 : fieldXMin->type()->id() == fieldYMax->type()->id())
873 : {
874 342 : CPLDebug("PARQUET",
875 : "Bounding box column '%s' detected for "
876 : "geometry column '%s'",
877 342 : osBBOXColumn.c_str(), field->name().c_str());
878 342 : sDesc.iArrowSubfieldXMin = nXMinIdx;
879 342 : sDesc.iArrowSubfieldYMin = nYMinIdx;
880 342 : sDesc.iArrowSubfieldXMax = nXMaxIdx;
881 342 : sDesc.iArrowSubfieldYMax = nYMaxIdx;
882 342 : sDesc.bIsFloat =
883 342 : (fieldXMin->type()->id() == arrow::Type::FLOAT);
884 :
885 : m_oMapGeomFieldIndexToGeomColBBOX
886 342 : [m_poFeatureDefn->GetGeomFieldCount() - 1] =
887 342 : std::move(sDesc);
888 :
889 342 : GeomColBBOXParquet sDescParquet;
890 342 : sDescParquet.iParquetXMin = oIterParquetIdxXMin->second;
891 342 : sDescParquet.iParquetYMin = oIterParquetIdxYMin->second;
892 342 : sDescParquet.iParquetXMax = oIterParquetIdxXMax->second;
893 342 : sDescParquet.iParquetYMax = oIterParquetIdxYMax->second;
894 3730 : for (const auto &iterParquetCols : oMapParquetColumnNameToIdx)
895 : {
896 3388 : if (STARTS_WITH(
897 : iterParquetCols.first.c_str(),
898 : std::string(osBBOXColumn).append(".").c_str()))
899 : {
900 1368 : sDescParquet.anParquetCols.push_back(
901 1368 : iterParquetCols.second);
902 : }
903 : }
904 : m_oMapGeomFieldIndexToGeomColBBOXParquet
905 684 : [m_poFeatureDefn->GetGeomFieldCount() - 1] =
906 684 : std::move(sDescParquet);
907 : }
908 : }
909 : }
910 834 : }
911 :
912 : /************************************************************************/
913 : /* CheckMatchArrowParquetColumnNames() */
914 : /************************************************************************/
915 :
916 27520 : bool OGRParquetLayer::CheckMatchArrowParquetColumnNames(
917 : int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const
918 : {
919 55040 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
920 27520 : const auto poParquetSchema = metadata->schema();
921 27520 : const int nParquetColumns = poParquetSchema->num_columns();
922 27520 : const auto &fieldName = field->name();
923 27520 : const int iParquetColBefore = iParquetCol;
924 :
925 28209 : while (iParquetCol < nParquetColumns)
926 : {
927 28209 : const auto parquetColumn = poParquetSchema->Column(iParquetCol);
928 28209 : const auto parquetColumnName = parquetColumn->path()->ToDotString();
929 60163 : if (fieldName == parquetColumnName ||
930 15977 : (parquetColumnName.size() > fieldName.size() &&
931 15977 : STARTS_WITH(parquetColumnName.c_str(), fieldName.c_str()) &&
932 15288 : parquetColumnName[fieldName.size()] == '.'))
933 : {
934 27520 : return true;
935 : }
936 : else
937 : {
938 689 : iParquetCol++;
939 : }
940 : }
941 :
942 0 : CPLError(CE_Warning, CPLE_AppDefined,
943 : "Cannot match Arrow column name %s with a Parquet one",
944 : fieldName.c_str());
945 0 : iParquetCol = iParquetColBefore;
946 0 : return false;
947 : }
948 :
949 : /************************************************************************/
950 : /* CreateFieldFromSchema() */
951 : /************************************************************************/
952 :
953 26314 : void OGRParquetLayer::CreateFieldFromSchema(
954 : const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
955 : int &iParquetCol, const std::vector<int> &path,
956 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
957 : &oMapFieldNameToGDALSchemaFieldDefn)
958 : {
959 26314 : OGRFieldDefn oField(field->name().c_str(), OFTString);
960 26314 : OGRFieldType eType = OFTString;
961 26314 : OGRFieldSubType eSubType = OFSTNone;
962 26314 : bool bTypeOK = true;
963 :
964 26314 : auto type = field->type();
965 26314 : if (type->id() == arrow::Type::DICTIONARY && path.size() == 1)
966 : {
967 : const auto dictionaryType =
968 594 : std::static_pointer_cast<arrow::DictionaryType>(field->type());
969 594 : const auto indexType = dictionaryType->index_type();
970 594 : if (dictionaryType->value_type()->id() == arrow::Type::STRING &&
971 297 : IsIntegerArrowType(indexType->id()))
972 : {
973 297 : if (bParquetColValid)
974 : {
975 594 : std::string osDomainName(field->name() + "Domain");
976 297 : m_poDS->RegisterDomainName(osDomainName,
977 297 : m_poFeatureDefn->GetFieldCount());
978 297 : oField.SetDomainName(osDomainName);
979 : }
980 297 : type = indexType;
981 : }
982 : else
983 : {
984 0 : bTypeOK = false;
985 : }
986 : }
987 :
988 26314 : int nParquetColIncrement = 1;
989 26314 : switch (type->id())
990 : {
991 622 : case arrow::Type::STRUCT:
992 : {
993 1244 : const auto subfields = field->Flatten();
994 1244 : auto newpath = path;
995 622 : newpath.push_back(0);
996 2810 : for (int j = 0; j < static_cast<int>(subfields.size()); j++)
997 : {
998 2188 : const auto &subfield = subfields[j];
999 : bParquetColValid =
1000 2188 : CheckMatchArrowParquetColumnNames(iParquetCol, subfield);
1001 2188 : if (!bParquetColValid)
1002 0 : m_bHasMissingMappingToParquet = true;
1003 2188 : newpath.back() = j;
1004 2188 : CreateFieldFromSchema(subfield, bParquetColValid, iParquetCol,
1005 : newpath,
1006 : oMapFieldNameToGDALSchemaFieldDefn);
1007 : }
1008 622 : return; // return intended, not break
1009 : }
1010 :
1011 5317 : case arrow::Type::MAP:
1012 : {
1013 : // A arrow map maps to 2 Parquet columns
1014 5317 : nParquetColIncrement = 2;
1015 5317 : break;
1016 : }
1017 :
1018 20375 : default:
1019 20375 : break;
1020 : }
1021 :
1022 25692 : if (bTypeOK)
1023 : {
1024 25692 : bTypeOK = MapArrowTypeToOGR(type, field, oField, eType, eSubType, path,
1025 : oMapFieldNameToGDALSchemaFieldDefn);
1026 25692 : if (bTypeOK)
1027 : {
1028 25404 : m_apoArrowDataTypes.push_back(type);
1029 50808 : m_anMapFieldIndexToParquetColumn.push_back(
1030 25404 : bParquetColValid ? iParquetCol : -1);
1031 : }
1032 : }
1033 :
1034 25692 : if (bParquetColValid)
1035 25692 : iParquetCol += nParquetColIncrement;
1036 : }
1037 :
1038 : /************************************************************************/
1039 : /* BuildDomain() */
1040 : /************************************************************************/
1041 :
1042 : std::unique_ptr<OGRFieldDomain>
1043 15 : OGRParquetLayer::BuildDomain(const std::string &osDomainName,
1044 : int iFieldIndex) const
1045 : {
1046 : #ifdef DEBUG
1047 15 : const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
1048 : (void)iArrowCol;
1049 15 : CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
1050 : arrow::Type::DICTIONARY);
1051 : #endif
1052 15 : const int iParquetCol = m_anMapFieldIndexToParquetColumn[iFieldIndex];
1053 15 : CPLAssert(iParquetCol >= 0);
1054 15 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1055 15 : const auto oldBatchSize = m_poArrowReader->properties().batch_size();
1056 15 : m_poArrowReader->set_batch_size(1);
1057 15 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1058 : {0}, {iParquetCol}, &poRecordBatchReader));
1059 15 : if (poRecordBatchReader != nullptr)
1060 : {
1061 0 : std::shared_ptr<arrow::RecordBatch> poBatch;
1062 15 : auto status = poRecordBatchReader->ReadNext(&poBatch);
1063 15 : if (!status.ok())
1064 : {
1065 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1066 0 : status.message().c_str());
1067 : }
1068 15 : else if (poBatch)
1069 : {
1070 15 : m_poArrowReader->set_batch_size(oldBatchSize);
1071 15 : return BuildDomainFromBatch(osDomainName, poBatch, 0);
1072 : }
1073 : }
1074 0 : m_poArrowReader->set_batch_size(oldBatchSize);
1075 0 : return nullptr;
1076 : }
1077 :
1078 : /************************************************************************/
1079 : /* ComputeGeometryColumnType() */
1080 : /************************************************************************/
1081 :
1082 : OGRwkbGeometryType
1083 295 : OGRParquetLayer::ComputeGeometryColumnType(int iGeomCol, int iParquetCol) const
1084 : {
1085 : // Compute type of geometry column by iterating over each geometry, and
1086 : // looking at the WKB geometry type in the first 5 bytes of each geometry.
1087 :
1088 295 : OGRwkbGeometryType eGeomType = wkbNone;
1089 295 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1090 :
1091 295 : std::vector<int> anRowGroups;
1092 295 : const int nNumGroups = m_poArrowReader->num_row_groups();
1093 295 : anRowGroups.reserve(nNumGroups);
1094 878 : for (int i = 0; i < nNumGroups; ++i)
1095 583 : anRowGroups.push_back(i);
1096 295 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1097 : anRowGroups, {iParquetCol}, &poRecordBatchReader));
1098 295 : if (poRecordBatchReader != nullptr)
1099 : {
1100 590 : std::shared_ptr<arrow::RecordBatch> poBatch;
1101 : while (true)
1102 : {
1103 592 : auto status = poRecordBatchReader->ReadNext(&poBatch);
1104 592 : if (!status.ok())
1105 : {
1106 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1107 0 : status.message().c_str());
1108 0 : break;
1109 : }
1110 592 : else if (!poBatch)
1111 293 : break;
1112 :
1113 299 : eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
1114 : 0, eGeomType);
1115 299 : if (eGeomType == wkbUnknown)
1116 2 : break;
1117 297 : }
1118 : }
1119 :
1120 590 : return eGeomType == wkbNone ? wkbUnknown : eGeomType;
1121 : }
1122 :
1123 : /************************************************************************/
1124 : /* GetFeatureExplicitFID() */
1125 : /************************************************************************/
1126 :
1127 4 : OGRFeature *OGRParquetLayer::GetFeatureExplicitFID(GIntBig nFID)
1128 : {
1129 4 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1130 :
1131 8 : std::vector<int> anRowGroups;
1132 4 : const int nNumGroups = m_poArrowReader->num_row_groups();
1133 4 : anRowGroups.reserve(nNumGroups);
1134 16 : for (int i = 0; i < nNumGroups; ++i)
1135 12 : anRowGroups.push_back(i);
1136 4 : if (m_bIgnoredFields)
1137 : {
1138 4 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1139 2 : anRowGroups, m_anRequestedParquetColumns, &poRecordBatchReader));
1140 : }
1141 : else
1142 : {
1143 2 : CPL_IGNORE_RET_VAL(m_poArrowReader->GetRecordBatchReader(
1144 : anRowGroups, &poRecordBatchReader));
1145 : }
1146 4 : if (poRecordBatchReader != nullptr)
1147 : {
1148 4 : std::shared_ptr<arrow::RecordBatch> poBatch;
1149 : while (true)
1150 : {
1151 14 : auto status = poRecordBatchReader->ReadNext(&poBatch);
1152 14 : if (!status.ok())
1153 : {
1154 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1155 0 : status.message().c_str());
1156 0 : break;
1157 : }
1158 14 : else if (!poBatch)
1159 2 : break;
1160 :
1161 12 : const auto array = poBatch->column(
1162 12 : m_bIgnoredFields ? m_nRequestedFIDColumn : m_iFIDArrowColumn);
1163 12 : const auto arrayPtr = array.get();
1164 12 : const auto arrayTypeId = array->type_id();
1165 30 : for (int64_t nIdxInBatch = 0; nIdxInBatch < poBatch->num_rows();
1166 : nIdxInBatch++)
1167 : {
1168 20 : if (!array->IsNull(nIdxInBatch))
1169 : {
1170 20 : if (arrayTypeId == arrow::Type::INT64)
1171 : {
1172 20 : const auto castArray =
1173 : static_cast<const arrow::Int64Array *>(arrayPtr);
1174 20 : if (castArray->Value(nIdxInBatch) == nFID)
1175 : {
1176 2 : return ReadFeature(nIdxInBatch, poBatch->columns());
1177 : }
1178 : }
1179 0 : else if (arrayTypeId == arrow::Type::INT32)
1180 : {
1181 0 : const auto castArray =
1182 : static_cast<const arrow::Int32Array *>(arrayPtr);
1183 0 : if (castArray->Value(nIdxInBatch) == nFID)
1184 : {
1185 0 : return ReadFeature(nIdxInBatch, poBatch->columns());
1186 : }
1187 : }
1188 : }
1189 : }
1190 10 : }
1191 : }
1192 2 : return nullptr;
1193 : }
1194 :
1195 : /************************************************************************/
1196 : /* GetFeatureByIndex() */
1197 : /************************************************************************/
1198 :
1199 23 : OGRFeature *OGRParquetLayer::GetFeatureByIndex(GIntBig nFID)
1200 : {
1201 :
1202 23 : if (nFID < 0)
1203 3 : return nullptr;
1204 :
1205 40 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
1206 20 : const int nNumGroups = m_poArrowReader->num_row_groups();
1207 20 : int64_t nAccRows = 0;
1208 29 : for (int iGroup = 0; iGroup < nNumGroups; ++iGroup)
1209 : {
1210 : const int64_t nNextAccRows =
1211 23 : nAccRows + metadata->RowGroup(iGroup)->num_rows();
1212 23 : if (nFID < nNextAccRows)
1213 : {
1214 14 : std::shared_ptr<arrow::RecordBatchReader> poRecordBatchReader;
1215 14 : arrow::Status status;
1216 14 : if (m_bIgnoredFields)
1217 : {
1218 0 : status = m_poArrowReader->GetRecordBatchReader(
1219 0 : {iGroup}, m_anRequestedParquetColumns,
1220 0 : &poRecordBatchReader);
1221 : }
1222 : else
1223 : {
1224 28 : status = m_poArrowReader->GetRecordBatchReader(
1225 14 : {iGroup}, &poRecordBatchReader);
1226 : }
1227 14 : if (poRecordBatchReader == nullptr)
1228 : {
1229 0 : CPLError(CE_Failure, CPLE_AppDefined,
1230 : "GetRecordBatchReader() failed: %s",
1231 0 : status.message().c_str());
1232 0 : return nullptr;
1233 : }
1234 :
1235 14 : const int64_t nExpectedIdxInGroup = nFID - nAccRows;
1236 14 : int64_t nIdxInGroup = 0;
1237 : while (true)
1238 : {
1239 0 : std::shared_ptr<arrow::RecordBatch> poBatch;
1240 14 : status = poRecordBatchReader->ReadNext(&poBatch);
1241 14 : if (!status.ok())
1242 : {
1243 0 : CPLError(CE_Failure, CPLE_AppDefined,
1244 0 : "ReadNext() failed: %s", status.message().c_str());
1245 0 : return nullptr;
1246 : }
1247 14 : if (poBatch == nullptr)
1248 : {
1249 0 : return nullptr;
1250 : }
1251 14 : if (nExpectedIdxInGroup < nIdxInGroup + poBatch->num_rows())
1252 : {
1253 14 : const auto nIdxInBatch = nExpectedIdxInGroup - nIdxInGroup;
1254 : auto poFeature =
1255 14 : ReadFeature(nIdxInBatch, poBatch->columns());
1256 14 : poFeature->SetFID(nFID);
1257 14 : return poFeature;
1258 : }
1259 0 : nIdxInGroup += poBatch->num_rows();
1260 0 : }
1261 : }
1262 9 : nAccRows = nNextAccRows;
1263 : }
1264 6 : return nullptr;
1265 : }
1266 :
1267 : /************************************************************************/
1268 : /* GetFeature() */
1269 : /************************************************************************/
1270 :
1271 27 : OGRFeature *OGRParquetLayer::GetFeature(GIntBig nFID)
1272 : {
1273 27 : if (!m_osFIDColumn.empty())
1274 : {
1275 4 : return GetFeatureExplicitFID(nFID);
1276 : }
1277 : else
1278 : {
1279 23 : return GetFeatureByIndex(nFID);
1280 : }
1281 : }
1282 :
1283 : /************************************************************************/
1284 : /* ResetReading() */
1285 : /************************************************************************/
1286 :
1287 3712 : void OGRParquetLayer::ResetReading()
1288 : {
1289 3712 : OGRParquetLayerBase::ResetReading();
1290 3712 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
1291 3712 : m_nFeatureIdxSelected = 0;
1292 3712 : if (!m_asFeatureIdxRemapping.empty())
1293 : {
1294 1627 : m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
1295 1627 : ++m_oFeatureIdxRemappingIter;
1296 : }
1297 3712 : }
1298 :
1299 : /************************************************************************/
1300 : /* CreateRecordBatchReader() */
1301 : /************************************************************************/
1302 :
1303 592 : bool OGRParquetLayer::CreateRecordBatchReader(int iStartingRowGroup)
1304 : {
1305 1184 : std::vector<int> anRowGroups;
1306 592 : const int nNumGroups = m_poArrowReader->num_row_groups();
1307 592 : anRowGroups.reserve(nNumGroups - iStartingRowGroup);
1308 1532 : for (int i = iStartingRowGroup; i < nNumGroups; ++i)
1309 940 : anRowGroups.push_back(i);
1310 1184 : return CreateRecordBatchReader(anRowGroups);
1311 : }
1312 :
1313 842 : bool OGRParquetLayer::CreateRecordBatchReader(
1314 : const std::vector<int> &anRowGroups)
1315 : {
1316 842 : arrow::Status status;
1317 842 : if (m_bIgnoredFields)
1318 : {
1319 410 : status = m_poArrowReader->GetRecordBatchReader(
1320 205 : anRowGroups, m_anRequestedParquetColumns, &m_poRecordBatchReader);
1321 : }
1322 : else
1323 : {
1324 1274 : status = m_poArrowReader->GetRecordBatchReader(anRowGroups,
1325 637 : &m_poRecordBatchReader);
1326 : }
1327 842 : if (m_poRecordBatchReader == nullptr)
1328 : {
1329 0 : CPLError(CE_Failure, CPLE_AppDefined,
1330 0 : "GetRecordBatchReader() failed: %s", status.message().c_str());
1331 0 : return false;
1332 : }
1333 842 : return true;
1334 : }
1335 :
1336 : /************************************************************************/
1337 : /* IsConstraintPossible() */
1338 : /************************************************************************/
1339 :
1340 : enum class IsConstraintPossibleRes
1341 : {
1342 : YES,
1343 : NO,
1344 : UNKNOWN
1345 : };
1346 :
1347 : template <class T>
1348 228 : static IsConstraintPossibleRes IsConstraintPossible(int nOperation, T v, T min,
1349 : T max)
1350 : {
1351 228 : if (nOperation == SWQ_EQ)
1352 : {
1353 150 : if (v < min || v > max)
1354 : {
1355 62 : return IsConstraintPossibleRes::NO;
1356 : }
1357 : }
1358 78 : else if (nOperation == SWQ_NE)
1359 : {
1360 38 : if (v == min && v == max)
1361 : {
1362 0 : return IsConstraintPossibleRes::NO;
1363 : }
1364 : }
1365 40 : else if (nOperation == SWQ_LE)
1366 : {
1367 10 : if (v < min)
1368 : {
1369 4 : return IsConstraintPossibleRes::NO;
1370 : }
1371 : }
1372 30 : else if (nOperation == SWQ_LT)
1373 : {
1374 10 : if (v <= min)
1375 : {
1376 4 : return IsConstraintPossibleRes::NO;
1377 : }
1378 : }
1379 20 : else if (nOperation == SWQ_GE)
1380 : {
1381 10 : if (v > max)
1382 : {
1383 4 : return IsConstraintPossibleRes::NO;
1384 : }
1385 : }
1386 10 : else if (nOperation == SWQ_GT)
1387 : {
1388 10 : if (v >= max)
1389 : {
1390 6 : return IsConstraintPossibleRes::NO;
1391 : }
1392 : }
1393 : else
1394 : {
1395 0 : CPLDebug("PARQUET",
1396 : "IsConstraintPossible: Unhandled operation type: %d",
1397 : nOperation);
1398 0 : return IsConstraintPossibleRes::UNKNOWN;
1399 : }
1400 148 : return IsConstraintPossibleRes::YES;
1401 : }
1402 :
1403 : /************************************************************************/
1404 : /* IncrFeatureIdx() */
1405 : /************************************************************************/
1406 :
1407 7468 : void OGRParquetLayer::IncrFeatureIdx()
1408 : {
1409 7468 : ++m_nFeatureIdxSelected;
1410 7468 : ++m_nFeatureIdx;
1411 8414 : if (m_iFIDArrowColumn < 0 && !m_asFeatureIdxRemapping.empty() &&
1412 8414 : m_oFeatureIdxRemappingIter != m_asFeatureIdxRemapping.end())
1413 : {
1414 140 : if (m_nFeatureIdxSelected == m_oFeatureIdxRemappingIter->first)
1415 : {
1416 48 : m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
1417 48 : ++m_oFeatureIdxRemappingIter;
1418 : }
1419 : }
1420 7468 : }
1421 :
1422 : /************************************************************************/
1423 : /* ReadNextBatch() */
1424 : /************************************************************************/
1425 :
1426 1858 : bool OGRParquetLayer::ReadNextBatch()
1427 : {
1428 1858 : m_nIdxInBatch = 0;
1429 :
1430 1858 : const int nNumGroups = m_poArrowReader->num_row_groups();
1431 1858 : if (nNumGroups == 0)
1432 2 : return false;
1433 :
1434 1856 : if (m_bSingleBatch)
1435 : {
1436 19 : CPLAssert(m_iRecordBatch == 0);
1437 19 : CPLAssert(m_poBatch != nullptr);
1438 19 : return false;
1439 : }
1440 :
1441 1837 : CPLAssert((m_iRecordBatch == -1 && m_poRecordBatchReader == nullptr) ||
1442 : (m_iRecordBatch >= 0 && m_poRecordBatchReader != nullptr));
1443 :
1444 1837 : if (m_poRecordBatchReader == nullptr)
1445 : {
1446 849 : m_asFeatureIdxRemapping.clear();
1447 :
1448 849 : bool bIterateEverything = false;
1449 849 : std::vector<int> anSelectedGroups;
1450 : const auto oIterToGeomColBBOX =
1451 849 : m_oMapGeomFieldIndexToGeomColBBOXParquet.find(m_iGeomFieldFilter);
1452 : const bool bUSEBBOXFields =
1453 192 : (m_poFilterGeom &&
1454 192 : oIterToGeomColBBOX !=
1455 1041 : m_oMapGeomFieldIndexToGeomColBBOXParquet.end() &&
1456 90 : CPLTestBool(CPLGetConfigOption(
1457 939 : ("OGR_" + GetDriverUCName() + "_USE_BBOX").c_str(), "YES")));
1458 : const bool bIsGeoArrowStruct =
1459 1698 : (m_iGeomFieldFilter >= 0 &&
1460 849 : m_iGeomFieldFilter < static_cast<int>(m_aeGeomEncoding.size()) &&
1461 842 : m_iGeomFieldFilter <
1462 : static_cast<int>(
1463 1684 : m_anMapGeomFieldIndexToParquetColumns.size()) &&
1464 842 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter].size() >=
1465 1698 : 2 &&
1466 204 : OGRArrowIsGeoArrowStruct(m_aeGeomEncoding[m_iGeomFieldFilter]));
1467 :
1468 1475 : if (m_asAttributeFilterConstraints.empty() && !bUSEBBOXFields &&
1469 626 : !(bIsGeoArrowStruct && m_poFilterGeom))
1470 : {
1471 580 : bIterateEverything = true;
1472 : }
1473 : else
1474 : {
1475 : OGRField sMin;
1476 : OGRField sMax;
1477 269 : OGR_RawField_SetNull(&sMin);
1478 269 : OGR_RawField_SetNull(&sMax);
1479 269 : bool bFoundMin = false;
1480 269 : bool bFoundMax = false;
1481 269 : OGRFieldType eType = OFTMaxType;
1482 269 : OGRFieldSubType eSubType = OFSTNone;
1483 538 : std::string osMinTmp, osMaxTmp;
1484 269 : int64_t nFeatureIdxSelected = 0;
1485 269 : int64_t nFeatureIdxTotal = 0;
1486 :
1487 269 : int iXMinField = -1;
1488 269 : int iYMinField = -1;
1489 269 : int iXMaxField = -1;
1490 269 : int iYMaxField = -1;
1491 :
1492 269 : if (bIsGeoArrowStruct)
1493 : {
1494 : const auto metadata =
1495 184 : m_poArrowReader->parquet_reader()->metadata();
1496 92 : const auto poParquetSchema = metadata->schema();
1497 228 : for (int iParquetCol :
1498 548 : m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter])
1499 : {
1500 : const auto parquetColumn =
1501 228 : poParquetSchema->Column(iParquetCol);
1502 : const auto parquetColumnName =
1503 456 : parquetColumn->path()->ToDotString();
1504 456 : if (parquetColumnName.size() > 2 &&
1505 228 : parquetColumnName.find(".x") ==
1506 228 : parquetColumnName.size() - 2)
1507 : {
1508 92 : iXMinField = iParquetCol;
1509 92 : iXMaxField = iParquetCol;
1510 : }
1511 272 : else if (parquetColumnName.size() > 2 &&
1512 136 : parquetColumnName.find(".y") ==
1513 136 : parquetColumnName.size() - 2)
1514 : {
1515 92 : iYMinField = iParquetCol;
1516 92 : iYMaxField = iParquetCol;
1517 : }
1518 : }
1519 : }
1520 177 : else if (bUSEBBOXFields)
1521 : {
1522 44 : iXMinField = oIterToGeomColBBOX->second.iParquetXMin;
1523 44 : iYMinField = oIterToGeomColBBOX->second.iParquetYMin;
1524 44 : iXMaxField = oIterToGeomColBBOX->second.iParquetXMax;
1525 44 : iYMaxField = oIterToGeomColBBOX->second.iParquetYMax;
1526 : }
1527 :
1528 665 : for (int iRowGroup = 0;
1529 665 : iRowGroup < nNumGroups && !bIterateEverything; ++iRowGroup)
1530 : {
1531 396 : bool bSelectGroup = true;
1532 : auto poRowGroup =
1533 396 : GetReader()->parquet_reader()->RowGroup(iRowGroup);
1534 :
1535 396 : if (iXMinField >= 0 && iYMinField >= 0 && iXMaxField >= 0 &&
1536 : iYMaxField >= 0)
1537 : {
1538 144 : if (GetMinMaxForParquetCol(iRowGroup, iXMinField, nullptr,
1539 : true, sMin, bFoundMin, false,
1540 : sMax, bFoundMax, eType, eSubType,
1541 143 : osMinTmp, osMaxTmp) &&
1542 287 : bFoundMin && eType == OFTReal)
1543 : {
1544 143 : const double dfGroupMinX = sMin.Real;
1545 143 : if (dfGroupMinX > m_sFilterEnvelope.MaxX)
1546 : {
1547 1 : bSelectGroup = false;
1548 : }
1549 142 : else if (GetMinMaxForParquetCol(
1550 : iRowGroup, iYMinField, nullptr, true, sMin,
1551 : bFoundMin, false, sMax, bFoundMax, eType,
1552 142 : eSubType, osMinTmp, osMaxTmp) &&
1553 284 : bFoundMin && eType == OFTReal)
1554 : {
1555 142 : const double dfGroupMinY = sMin.Real;
1556 142 : if (dfGroupMinY > m_sFilterEnvelope.MaxY)
1557 : {
1558 1 : bSelectGroup = false;
1559 : }
1560 141 : else if (GetMinMaxForParquetCol(
1561 : iRowGroup, iXMaxField, nullptr, false,
1562 : sMin, bFoundMin, true, sMax, bFoundMax,
1563 141 : eType, eSubType, osMinTmp, osMaxTmp) &&
1564 282 : bFoundMax && eType == OFTReal)
1565 : {
1566 141 : const double dfGroupMaxX = sMax.Real;
1567 141 : if (dfGroupMaxX < m_sFilterEnvelope.MinX)
1568 : {
1569 1 : bSelectGroup = false;
1570 : }
1571 140 : else if (GetMinMaxForParquetCol(
1572 : iRowGroup, iYMaxField, nullptr,
1573 : false, sMin, bFoundMin, true, sMax,
1574 : bFoundMax, eType, eSubType,
1575 140 : osMinTmp, osMaxTmp) &&
1576 280 : bFoundMax && eType == OFTReal)
1577 : {
1578 140 : const double dfGroupMaxY = sMax.Real;
1579 140 : if (dfGroupMaxY < m_sFilterEnvelope.MinY)
1580 : {
1581 1 : bSelectGroup = false;
1582 : }
1583 : }
1584 : }
1585 : }
1586 : }
1587 : }
1588 :
1589 396 : if (bSelectGroup)
1590 : {
1591 556 : for (auto &constraint : m_asAttributeFilterConstraints)
1592 : {
1593 256 : int iOGRField = constraint.iField;
1594 512 : if (constraint.iField ==
1595 256 : m_poFeatureDefn->GetFieldCount() + SPF_FID)
1596 : {
1597 9 : iOGRField = OGR_FID_INDEX;
1598 : }
1599 256 : if (constraint.nOperation != SWQ_ISNULL &&
1600 247 : constraint.nOperation != SWQ_ISNOTNULL)
1601 : {
1602 234 : if (iOGRField == OGR_FID_INDEX &&
1603 9 : m_iFIDParquetColumn < 0)
1604 : {
1605 6 : sMin.Integer64 = nFeatureIdxTotal;
1606 6 : sMax.Integer64 =
1607 6 : nFeatureIdxTotal +
1608 6 : poRowGroup->metadata()->num_rows() - 1;
1609 6 : eType = OFTInteger64;
1610 : }
1611 228 : else if (!GetMinMaxForOGRField(
1612 : iRowGroup, iOGRField, true, sMin,
1613 : bFoundMin, true, sMax, bFoundMax,
1614 225 : eType, eSubType, osMinTmp, osMaxTmp) ||
1615 228 : !bFoundMin || !bFoundMax)
1616 : {
1617 3 : bIterateEverything = true;
1618 3 : break;
1619 : }
1620 : }
1621 :
1622 253 : IsConstraintPossibleRes res =
1623 : IsConstraintPossibleRes::UNKNOWN;
1624 253 : if (constraint.eType ==
1625 147 : OGRArrowLayer::Constraint::Type::Integer &&
1626 147 : eType == OFTInteger)
1627 : {
1628 : #if 0
1629 : CPLDebug("PARQUET",
1630 : "Group %d, field %s, min = %d, max = %d",
1631 : iRowGroup,
1632 : iOGRField == OGR_FID_INDEX
1633 : ? m_osFIDColumn.c_str()
1634 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
1635 : ->GetNameRef(),
1636 : sMin.Integer, sMax.Integer);
1637 : #endif
1638 125 : res = IsConstraintPossible(
1639 : constraint.nOperation,
1640 : constraint.sValue.Integer, sMin.Integer,
1641 : sMax.Integer);
1642 : }
1643 128 : else if (constraint.eType == OGRArrowLayer::Constraint::
1644 39 : Type::Integer64 &&
1645 39 : eType == OFTInteger64)
1646 : {
1647 : #if 0
1648 : CPLDebug("PARQUET",
1649 : "Group %d, field %s, min = " CPL_FRMT_GIB
1650 : ", max = " CPL_FRMT_GIB,
1651 : iRowGroup,
1652 : iOGRField == OGR_FID_INDEX
1653 : ? m_osFIDColumn.c_str()
1654 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
1655 : ->GetNameRef(),
1656 : static_cast<GIntBig>(sMin.Integer64),
1657 : static_cast<GIntBig>(sMax.Integer64));
1658 : #endif
1659 39 : res = IsConstraintPossible(
1660 : constraint.nOperation,
1661 : constraint.sValue.Integer64, sMin.Integer64,
1662 : sMax.Integer64);
1663 : }
1664 89 : else if (constraint.eType ==
1665 29 : OGRArrowLayer::Constraint::Type::Real &&
1666 29 : eType == OFTReal)
1667 : {
1668 : #if 0
1669 : CPLDebug("PARQUET",
1670 : "Group %d, field %s, min = %g, max = %g",
1671 : iRowGroup,
1672 : iOGRField == OGR_FID_INDEX
1673 : ? m_osFIDColumn.c_str()
1674 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
1675 : ->GetNameRef(),
1676 : sMin.Real, sMax.Real);
1677 : #endif
1678 26 : res = IsConstraintPossible(constraint.nOperation,
1679 : constraint.sValue.Real,
1680 : sMin.Real, sMax.Real);
1681 : }
1682 63 : else if (constraint.eType ==
1683 38 : OGRArrowLayer::Constraint::Type::String &&
1684 38 : eType == OFTString)
1685 : {
1686 : #if 0
1687 : CPLDebug("PARQUET",
1688 : "Group %d, field %s, min = %s, max = %s",
1689 : iRowGroup,
1690 : iOGRField == OGR_FID_INDEX
1691 : ? m_osFIDColumn.c_str()
1692 : : m_poFeatureDefn->GetFieldDefn(iOGRField)
1693 : ->GetNameRef(),
1694 : sMin.String, sMax.String);
1695 : #endif
1696 38 : res = IsConstraintPossible(
1697 : constraint.nOperation,
1698 76 : std::string(constraint.sValue.String),
1699 76 : std::string(sMin.String),
1700 76 : std::string(sMax.String));
1701 : }
1702 25 : else if (constraint.nOperation == SWQ_ISNULL ||
1703 16 : constraint.nOperation == SWQ_ISNOTNULL)
1704 : {
1705 : const int iCol =
1706 : iOGRField == OGR_FID_INDEX
1707 44 : ? m_iFIDParquetColumn
1708 22 : : GetMapFieldIndexToParquetColumn()
1709 22 : [iOGRField];
1710 22 : if (iCol >= 0)
1711 : {
1712 : const auto metadata =
1713 22 : m_poArrowReader->parquet_reader()
1714 44 : ->metadata();
1715 : const auto rowGroupColumnChunk =
1716 22 : metadata->RowGroup(iRowGroup)->ColumnChunk(
1717 44 : iCol);
1718 : const auto rowGroupStats =
1719 44 : rowGroupColumnChunk->statistics();
1720 44 : if (rowGroupColumnChunk->is_stats_set() &&
1721 22 : rowGroupStats)
1722 : {
1723 22 : res = IsConstraintPossibleRes::YES;
1724 31 : if (constraint.nOperation == SWQ_ISNULL &&
1725 9 : rowGroupStats->num_values() ==
1726 9 : poRowGroup->metadata()->num_rows())
1727 : {
1728 5 : res = IsConstraintPossibleRes::NO;
1729 : }
1730 34 : else if (constraint.nOperation ==
1731 30 : SWQ_ISNOTNULL &&
1732 13 : rowGroupStats->num_values() == 0)
1733 : {
1734 1 : res = IsConstraintPossibleRes::NO;
1735 : }
1736 : }
1737 22 : }
1738 : }
1739 : else
1740 : {
1741 3 : CPLDebug(
1742 : "PARQUET",
1743 : "Unhandled combination of constraint.eType "
1744 : "(%d) and eType (%d)",
1745 3 : static_cast<int>(constraint.eType), eType);
1746 : }
1747 :
1748 253 : if (res == IsConstraintPossibleRes::NO)
1749 : {
1750 86 : bSelectGroup = false;
1751 86 : break;
1752 : }
1753 167 : else if (res == IsConstraintPossibleRes::UNKNOWN)
1754 : {
1755 3 : bIterateEverything = true;
1756 3 : break;
1757 : }
1758 : }
1759 : }
1760 :
1761 396 : if (bSelectGroup)
1762 : {
1763 : // CPLDebug("PARQUET", "Selecting row group %d", iRowGroup);
1764 : m_asFeatureIdxRemapping.emplace_back(
1765 306 : std::make_pair(nFeatureIdxSelected, nFeatureIdxTotal));
1766 306 : anSelectedGroups.push_back(iRowGroup);
1767 306 : nFeatureIdxSelected += poRowGroup->metadata()->num_rows();
1768 : }
1769 :
1770 396 : nFeatureIdxTotal += poRowGroup->metadata()->num_rows();
1771 : }
1772 : }
1773 :
1774 849 : if (bIterateEverything)
1775 : {
1776 586 : m_asFeatureIdxRemapping.clear();
1777 586 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
1778 586 : if (!CreateRecordBatchReader(0))
1779 0 : return false;
1780 : }
1781 : else
1782 : {
1783 263 : m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
1784 263 : if (anSelectedGroups.empty())
1785 : {
1786 13 : return false;
1787 : }
1788 250 : CPLDebug("PARQUET", "%d/%d row groups selected",
1789 250 : int(anSelectedGroups.size()),
1790 250 : m_poArrowReader->num_row_groups());
1791 250 : m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
1792 250 : ++m_oFeatureIdxRemappingIter;
1793 250 : if (!CreateRecordBatchReader(anSelectedGroups))
1794 : {
1795 0 : return false;
1796 : }
1797 : }
1798 : }
1799 :
1800 3648 : std::shared_ptr<arrow::RecordBatch> poNextBatch;
1801 :
1802 0 : do
1803 : {
1804 1824 : ++m_iRecordBatch;
1805 1824 : poNextBatch.reset();
1806 1824 : auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
1807 1824 : if (!status.ok())
1808 : {
1809 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1810 0 : status.message().c_str());
1811 0 : poNextBatch.reset();
1812 : }
1813 1824 : if (poNextBatch == nullptr)
1814 : {
1815 925 : if (m_iRecordBatch == 1 && m_poBatch && m_poAttrQuery == nullptr &&
1816 272 : m_poFilterGeom == nullptr)
1817 : {
1818 46 : m_iRecordBatch = 0;
1819 46 : m_bSingleBatch = true;
1820 : }
1821 : else
1822 607 : m_poBatch.reset();
1823 653 : return false;
1824 : }
1825 1171 : } while (poNextBatch->num_rows() == 0);
1826 :
1827 1171 : SetBatch(poNextBatch);
1828 :
1829 1171 : return true;
1830 : }
1831 :
1832 : /************************************************************************/
1833 : /* InvalidateCachedBatches() */
1834 : /************************************************************************/
1835 :
1836 775 : void OGRParquetLayer::InvalidateCachedBatches()
1837 : {
1838 775 : m_bSingleBatch = false;
1839 775 : OGRParquetLayerBase::InvalidateCachedBatches();
1840 775 : }
1841 :
1842 : /************************************************************************/
1843 : /* SetIgnoredFields() */
1844 : /************************************************************************/
1845 :
1846 219 : OGRErr OGRParquetLayer::SetIgnoredFields(CSLConstList papszFields)
1847 : {
1848 219 : m_bIgnoredFields = false;
1849 219 : m_anRequestedParquetColumns.clear();
1850 219 : m_anMapFieldIndexToArrayIndex.clear();
1851 219 : m_anMapGeomFieldIndexToArrayIndex.clear();
1852 219 : m_nRequestedFIDColumn = -1;
1853 219 : OGRErr eErr = OGRLayer::SetIgnoredFields(papszFields);
1854 219 : int nBatchColumns = 0;
1855 219 : if (!m_bHasMissingMappingToParquet && eErr == OGRERR_NONE)
1856 : {
1857 219 : m_bIgnoredFields = papszFields != nullptr && papszFields[0] != nullptr;
1858 219 : if (m_bIgnoredFields)
1859 : {
1860 167 : if (m_iFIDParquetColumn >= 0)
1861 : {
1862 6 : m_nRequestedFIDColumn = nBatchColumns;
1863 6 : nBatchColumns++;
1864 6 : m_anRequestedParquetColumns.push_back(m_iFIDParquetColumn);
1865 : }
1866 :
1867 5908 : for (int i = 0; i < m_poFeatureDefn->GetFieldCount(); ++i)
1868 : {
1869 : const auto eArrowType =
1870 5741 : m_poSchema->fields()[m_anMapFieldIndexToArrowColumn[i][0]]
1871 5741 : ->type()
1872 5741 : ->id();
1873 5741 : if (eArrowType == arrow::Type::STRUCT)
1874 : {
1875 : // For a struct, for the sake of simplicity in
1876 : // GetNextRawFeature(), as soon as one of the member if
1877 : // requested, request all Parquet columns, so that the Arrow
1878 : // type doesn't change
1879 69 : bool bFoundNotIgnored = false;
1880 296 : for (int j = i; j < m_poFeatureDefn->GetFieldCount() &&
1881 294 : m_anMapFieldIndexToArrowColumn[i][0] ==
1882 147 : m_anMapFieldIndexToArrowColumn[j][0];
1883 : ++j)
1884 : {
1885 136 : if (!m_poFeatureDefn->GetFieldDefn(j)->IsIgnored())
1886 : {
1887 56 : bFoundNotIgnored = true;
1888 56 : break;
1889 : }
1890 : }
1891 69 : if (bFoundNotIgnored)
1892 : {
1893 : int j;
1894 784 : for (j = i; j < m_poFeatureDefn->GetFieldCount() &&
1895 784 : m_anMapFieldIndexToArrowColumn[i][0] ==
1896 392 : m_anMapFieldIndexToArrowColumn[j][0];
1897 : ++j)
1898 : {
1899 : const int iParquetCol =
1900 336 : m_anMapFieldIndexToParquetColumn[j];
1901 336 : CPLAssert(iParquetCol >= 0);
1902 336 : if (!m_poFeatureDefn->GetFieldDefn(j)->IsIgnored())
1903 : {
1904 330 : m_anMapFieldIndexToArrayIndex.push_back(
1905 : nBatchColumns);
1906 : }
1907 : else
1908 : {
1909 6 : m_anMapFieldIndexToArrayIndex.push_back(-1);
1910 : }
1911 336 : m_anRequestedParquetColumns.push_back(iParquetCol);
1912 : }
1913 56 : i = j - 1;
1914 56 : nBatchColumns++;
1915 : }
1916 : else
1917 : {
1918 : int j;
1919 172 : for (j = i; j < m_poFeatureDefn->GetFieldCount() &&
1920 170 : m_anMapFieldIndexToArrowColumn[i][0] ==
1921 85 : m_anMapFieldIndexToArrowColumn[j][0];
1922 : ++j)
1923 : {
1924 74 : m_anMapFieldIndexToArrayIndex.push_back(-1);
1925 : }
1926 13 : i = j - 1;
1927 : }
1928 : }
1929 5672 : else if (!m_poFeatureDefn->GetFieldDefn(i)->IsIgnored())
1930 : {
1931 4179 : const int iParquetCol = m_anMapFieldIndexToParquetColumn[i];
1932 4179 : CPLAssert(iParquetCol >= 0);
1933 4179 : m_anMapFieldIndexToArrayIndex.push_back(nBatchColumns);
1934 4179 : nBatchColumns++;
1935 4179 : m_anRequestedParquetColumns.push_back(iParquetCol);
1936 4179 : if (eArrowType == arrow::Type::MAP)
1937 : {
1938 : // For a map, request both keys and items Parquet
1939 : // columns
1940 338 : m_anRequestedParquetColumns.push_back(iParquetCol + 1);
1941 : }
1942 : }
1943 : else
1944 : {
1945 1493 : m_anMapFieldIndexToArrayIndex.push_back(-1);
1946 : }
1947 : }
1948 :
1949 167 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrayIndex.size()) ==
1950 : m_poFeatureDefn->GetFieldCount());
1951 :
1952 346 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
1953 : {
1954 179 : if (!m_poFeatureDefn->GetGeomFieldDefn(i)->IsIgnored())
1955 : {
1956 : const auto &anVals =
1957 157 : m_anMapGeomFieldIndexToParquetColumns[i];
1958 157 : CPLAssert(!anVals.empty() && anVals[0] >= 0);
1959 : m_anRequestedParquetColumns.insert(
1960 157 : m_anRequestedParquetColumns.end(), anVals.begin(),
1961 314 : anVals.end());
1962 157 : m_anMapGeomFieldIndexToArrayIndex.push_back(nBatchColumns);
1963 157 : nBatchColumns++;
1964 :
1965 157 : auto oIter = m_oMapGeomFieldIndexToGeomColBBOX.find(i);
1966 : const auto oIterParquet =
1967 157 : m_oMapGeomFieldIndexToGeomColBBOXParquet.find(i);
1968 219 : if (oIter != m_oMapGeomFieldIndexToGeomColBBOX.end() &&
1969 62 : oIterParquet !=
1970 219 : m_oMapGeomFieldIndexToGeomColBBOXParquet.end())
1971 : {
1972 62 : oIter->second.iArrayIdx = nBatchColumns++;
1973 : m_anRequestedParquetColumns.insert(
1974 62 : m_anRequestedParquetColumns.end(),
1975 62 : oIterParquet->second.anParquetCols.begin(),
1976 186 : oIterParquet->second.anParquetCols.end());
1977 : }
1978 : }
1979 : else
1980 : {
1981 22 : m_anMapGeomFieldIndexToArrayIndex.push_back(-1);
1982 : }
1983 : }
1984 :
1985 167 : CPLAssert(
1986 : static_cast<int>(m_anMapGeomFieldIndexToArrayIndex.size()) ==
1987 : m_poFeatureDefn->GetGeomFieldCount());
1988 : }
1989 : }
1990 :
1991 219 : m_nExpectedBatchColumns = m_bIgnoredFields ? nBatchColumns : -1;
1992 :
1993 219 : ComputeConstraintsArrayIdx();
1994 :
1995 : // Full invalidation
1996 219 : InvalidateCachedBatches();
1997 :
1998 219 : return eErr;
1999 : }
2000 :
2001 : /************************************************************************/
2002 : /* GetFeatureCount() */
2003 : /************************************************************************/
2004 :
2005 843 : GIntBig OGRParquetLayer::GetFeatureCount(int bForce)
2006 : {
2007 843 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
2008 : {
2009 27 : auto metadata = m_poArrowReader->parquet_reader()->metadata();
2010 27 : if (metadata)
2011 27 : return metadata->num_rows();
2012 : }
2013 816 : return OGRLayer::GetFeatureCount(bForce);
2014 : }
2015 :
2016 : /************************************************************************/
2017 : /* FastGetExtent() */
2018 : /************************************************************************/
2019 :
2020 633 : bool OGRParquetLayer::FastGetExtent(int iGeomField, OGREnvelope *psExtent) const
2021 : {
2022 633 : if (OGRParquetLayerBase::FastGetExtent(iGeomField, psExtent))
2023 618 : return true;
2024 :
2025 : const auto oIterToGeomColBBOX =
2026 15 : m_oMapGeomFieldIndexToGeomColBBOXParquet.find(iGeomField);
2027 16 : if (oIterToGeomColBBOX != m_oMapGeomFieldIndexToGeomColBBOXParquet.end() &&
2028 1 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_BBOX", "YES")))
2029 : {
2030 1 : OGREnvelope sExtent;
2031 : OGRField sMin, sMax;
2032 1 : OGR_RawField_SetNull(&sMin);
2033 1 : OGR_RawField_SetNull(&sMax);
2034 : bool bFoundMin, bFoundMax;
2035 1 : OGRFieldType eType = OFTMaxType;
2036 1 : OGRFieldSubType eSubType = OFSTNone;
2037 1 : std::string osMinTmp, osMaxTmp;
2038 2 : if (GetMinMaxForParquetCol(-1, oIterToGeomColBBOX->second.iParquetXMin,
2039 : nullptr, true, sMin, bFoundMin, false, sMax,
2040 : bFoundMax, eType, eSubType, osMinTmp,
2041 3 : osMaxTmp) &&
2042 1 : eType == OFTReal)
2043 : {
2044 1 : sExtent.MinX = sMin.Real;
2045 :
2046 1 : if (GetMinMaxForParquetCol(
2047 1 : -1, oIterToGeomColBBOX->second.iParquetYMin, nullptr, true,
2048 : sMin, bFoundMin, false, sMax, bFoundMax, eType, eSubType,
2049 3 : osMinTmp, osMaxTmp) &&
2050 1 : eType == OFTReal)
2051 : {
2052 1 : sExtent.MinY = sMin.Real;
2053 :
2054 1 : if (GetMinMaxForParquetCol(
2055 1 : -1, oIterToGeomColBBOX->second.iParquetXMax, nullptr,
2056 : false, sMin, bFoundMin, true, sMax, bFoundMax, eType,
2057 3 : eSubType, osMinTmp, osMaxTmp) &&
2058 1 : eType == OFTReal)
2059 : {
2060 1 : sExtent.MaxX = sMax.Real;
2061 :
2062 1 : if (GetMinMaxForParquetCol(
2063 1 : -1, oIterToGeomColBBOX->second.iParquetYMax,
2064 : nullptr, false, sMin, bFoundMin, true, sMax,
2065 3 : bFoundMax, eType, eSubType, osMinTmp, osMaxTmp) &&
2066 1 : eType == OFTReal)
2067 : {
2068 1 : sExtent.MaxY = sMax.Real;
2069 :
2070 1 : CPLDebug("PARQUET",
2071 : "Using statistics of bbox.minx, bbox.miny, "
2072 : "bbox.maxx, bbox.maxy columns to get extent");
2073 1 : m_oMapExtents[iGeomField] = sExtent;
2074 1 : *psExtent = sExtent;
2075 1 : return true;
2076 : }
2077 : }
2078 : }
2079 : }
2080 : }
2081 :
2082 14 : return false;
2083 : }
2084 :
2085 : /************************************************************************/
2086 : /* TestCapability() */
2087 : /************************************************************************/
2088 :
2089 622 : int OGRParquetLayer::TestCapability(const char *pszCap)
2090 : {
2091 622 : if (EQUAL(pszCap, OLCFastFeatureCount))
2092 79 : return m_poAttrQuery == nullptr && m_poFilterGeom == nullptr;
2093 :
2094 543 : if (EQUAL(pszCap, OLCIgnoreFields))
2095 8 : return !m_bHasMissingMappingToParquet;
2096 :
2097 535 : if (EQUAL(pszCap, OLCFastSpatialFilter))
2098 : {
2099 168 : if (m_iGeomFieldFilter >= 0 &&
2100 112 : m_iGeomFieldFilter < static_cast<int>(m_aeGeomEncoding.size()) &&
2101 56 : OGRArrowIsGeoArrowStruct(m_aeGeomEncoding[m_iGeomFieldFilter]))
2102 : {
2103 56 : return true;
2104 : }
2105 : // fallback to base method
2106 : }
2107 :
2108 479 : return OGRParquetLayerBase::TestCapability(pszCap);
2109 : }
2110 :
2111 : /************************************************************************/
2112 : /* GetMetadataItem() */
2113 : /************************************************************************/
2114 :
2115 308 : const char *OGRParquetLayer::GetMetadataItem(const char *pszName,
2116 : const char *pszDomain)
2117 : {
2118 : // Mostly for unit test purposes
2119 308 : if (pszDomain != nullptr && EQUAL(pszDomain, "_PARQUET_"))
2120 : {
2121 9 : int nRowGroupIdx = -1;
2122 9 : int nColumn = -1;
2123 9 : if (EQUAL(pszName, "NUM_ROW_GROUPS"))
2124 : {
2125 1 : return CPLSPrintf("%d", m_poArrowReader->num_row_groups());
2126 : }
2127 8 : if (EQUAL(pszName, "CREATOR"))
2128 : {
2129 4 : return CPLSPrintf("%s", m_poArrowReader->parquet_reader()
2130 4 : ->metadata()
2131 2 : ->created_by()
2132 2 : .c_str());
2133 : }
2134 12 : else if (sscanf(pszName, "ROW_GROUPS[%d]", &nRowGroupIdx) == 1 &&
2135 6 : strstr(pszName, ".NUM_ROWS"))
2136 : {
2137 : try
2138 : {
2139 : auto poRowGroup =
2140 6 : m_poArrowReader->parquet_reader()->RowGroup(nRowGroupIdx);
2141 3 : if (poRowGroup == nullptr)
2142 0 : return nullptr;
2143 3 : return CPLSPrintf("%" PRId64,
2144 3 : poRowGroup->metadata()->num_rows());
2145 : }
2146 0 : catch (const std::exception &)
2147 : {
2148 : }
2149 : }
2150 6 : else if (sscanf(pszName, "ROW_GROUPS[%d].COLUMNS[%d]", &nRowGroupIdx,
2151 6 : &nColumn) == 2 &&
2152 3 : strstr(pszName, ".COMPRESSION"))
2153 : {
2154 : try
2155 : {
2156 : auto poRowGroup =
2157 6 : m_poArrowReader->parquet_reader()->RowGroup(nRowGroupIdx);
2158 3 : if (poRowGroup == nullptr)
2159 0 : return nullptr;
2160 6 : auto poColumn = poRowGroup->metadata()->ColumnChunk(nColumn);
2161 3 : return CPLSPrintf("%s", arrow::util::Codec::GetCodecAsString(
2162 3 : poColumn->compression())
2163 3 : .c_str());
2164 : }
2165 0 : catch (const std::exception &)
2166 : {
2167 : }
2168 : }
2169 0 : return nullptr;
2170 : }
2171 299 : if (pszDomain != nullptr && EQUAL(pszDomain, "_PARQUET_METADATA_"))
2172 : {
2173 404 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
2174 202 : const auto &kv_metadata = metadata->key_value_metadata();
2175 202 : if (kv_metadata && kv_metadata->Contains(pszName))
2176 : {
2177 199 : auto metadataItem = kv_metadata->Get(pszName);
2178 199 : if (metadataItem.ok())
2179 : {
2180 199 : return CPLSPrintf("%s", metadataItem->c_str());
2181 : }
2182 : }
2183 3 : return nullptr;
2184 : }
2185 97 : return OGRLayer::GetMetadataItem(pszName, pszDomain);
2186 : }
2187 :
2188 : /************************************************************************/
2189 : /* GetMetadata() */
2190 : /************************************************************************/
2191 :
2192 31 : char **OGRParquetLayer::GetMetadata(const char *pszDomain)
2193 : {
2194 : // Mostly for unit test purposes
2195 31 : if (pszDomain != nullptr && EQUAL(pszDomain, "_PARQUET_METADATA_"))
2196 : {
2197 2 : m_aosFeatherMetadata.Clear();
2198 4 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
2199 2 : const auto &kv_metadata = metadata->key_value_metadata();
2200 2 : if (kv_metadata)
2201 : {
2202 7 : for (const auto &kv : kv_metadata->sorted_pairs())
2203 : {
2204 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
2205 5 : kv.second.c_str());
2206 : }
2207 : }
2208 2 : return m_aosFeatherMetadata.List();
2209 : }
2210 29 : return OGRLayer::GetMetadata(pszDomain);
2211 : }
2212 :
2213 : /************************************************************************/
2214 : /* GetArrowStream() */
2215 : /************************************************************************/
2216 :
2217 133 : bool OGRParquetLayer::GetArrowStream(struct ArrowArrayStream *out_stream,
2218 : CSLConstList papszOptions)
2219 : {
2220 : const char *pszMaxFeaturesInBatch =
2221 133 : CSLFetchNameValue(papszOptions, "MAX_FEATURES_IN_BATCH");
2222 133 : if (pszMaxFeaturesInBatch)
2223 : {
2224 14 : int nMaxBatchSize = atoi(pszMaxFeaturesInBatch);
2225 14 : if (nMaxBatchSize <= 0)
2226 0 : nMaxBatchSize = 1;
2227 14 : if (nMaxBatchSize > INT_MAX - 1)
2228 0 : nMaxBatchSize = INT_MAX - 1;
2229 14 : m_poArrowReader->set_batch_size(nMaxBatchSize);
2230 : }
2231 133 : return OGRArrowLayer::GetArrowStream(out_stream, papszOptions);
2232 : }
2233 :
2234 : /************************************************************************/
2235 : /* SetNextByIndex() */
2236 : /************************************************************************/
2237 :
2238 12 : OGRErr OGRParquetLayer::SetNextByIndex(GIntBig nIndex)
2239 : {
2240 12 : if (nIndex < 0)
2241 3 : return OGRERR_FAILURE;
2242 :
2243 18 : const auto metadata = m_poArrowReader->parquet_reader()->metadata();
2244 9 : if (nIndex >= metadata->num_rows())
2245 3 : return OGRERR_FAILURE;
2246 :
2247 6 : if (m_bSingleBatch)
2248 : {
2249 0 : ResetReading();
2250 0 : m_nIdxInBatch = nIndex;
2251 0 : m_nFeatureIdx = nIndex;
2252 0 : return OGRERR_NONE;
2253 : }
2254 :
2255 6 : const int nNumGroups = m_poArrowReader->num_row_groups();
2256 6 : int64_t nAccRows = 0;
2257 6 : const auto nBatchSize = m_poArrowReader->properties().batch_size();
2258 6 : m_iRecordBatch = -1;
2259 6 : ResetReading();
2260 6 : m_iRecordBatch = 0;
2261 7 : for (int iGroup = 0; iGroup < nNumGroups; ++iGroup)
2262 : {
2263 : const int64_t nNextAccRows =
2264 7 : nAccRows + metadata->RowGroup(iGroup)->num_rows();
2265 7 : if (nIndex < nNextAccRows)
2266 : {
2267 6 : if (!CreateRecordBatchReader(iGroup))
2268 0 : return OGRERR_FAILURE;
2269 :
2270 12 : std::shared_ptr<arrow::RecordBatch> poBatch;
2271 : while (true)
2272 : {
2273 6 : auto status = m_poRecordBatchReader->ReadNext(&poBatch);
2274 6 : if (!status.ok())
2275 : {
2276 0 : CPLError(CE_Failure, CPLE_AppDefined,
2277 0 : "ReadNext() failed: %s", status.message().c_str());
2278 0 : m_iRecordBatch = -1;
2279 0 : ResetReading();
2280 0 : return OGRERR_FAILURE;
2281 : }
2282 6 : if (poBatch == nullptr)
2283 : {
2284 0 : m_iRecordBatch = -1;
2285 0 : ResetReading();
2286 0 : return OGRERR_FAILURE;
2287 : }
2288 6 : if (nIndex < nAccRows + poBatch->num_rows())
2289 : {
2290 6 : break;
2291 : }
2292 0 : nAccRows += poBatch->num_rows();
2293 0 : m_iRecordBatch++;
2294 0 : }
2295 6 : m_nIdxInBatch = nIndex - nAccRows;
2296 6 : m_nFeatureIdx = nIndex;
2297 6 : SetBatch(poBatch);
2298 6 : return OGRERR_NONE;
2299 : }
2300 1 : nAccRows = nNextAccRows;
2301 1 : m_iRecordBatch +=
2302 1 : (metadata->RowGroup(iGroup)->num_rows() + nBatchSize - 1) /
2303 : nBatchSize;
2304 : }
2305 :
2306 0 : m_iRecordBatch = -1;
2307 0 : ResetReading();
2308 0 : return OGRERR_FAILURE;
2309 : }
2310 :
2311 : /***********************************************************************/
2312 : /* GetStats() */
2313 : /***********************************************************************/
2314 :
2315 : template <class STAT_TYPE> struct GetStats
2316 : {
2317 : using T = typename STAT_TYPE::T;
2318 :
2319 511 : static T min(const std::shared_ptr<parquet::FileMetaData> &metadata,
2320 : const int iRowGroup, const int numRowGroups, const int iCol,
2321 : bool &bFound)
2322 : {
2323 511 : T v{};
2324 511 : bFound = false;
2325 1030 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2326 : {
2327 555 : const auto columnChunk =
2328 30 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2329 : ->ColumnChunk(iCol);
2330 525 : const auto colStats = columnChunk->statistics();
2331 1047 : if (columnChunk->is_stats_set() && colStats &&
2332 522 : colStats->HasMinMax())
2333 : {
2334 516 : auto castStats = static_cast<STAT_TYPE *>(colStats.get());
2335 516 : const auto rowGroupVal = castStats->min();
2336 516 : if (i == 0 || rowGroupVal < v)
2337 : {
2338 504 : bFound = true;
2339 504 : v = rowGroupVal;
2340 : }
2341 : }
2342 9 : else if (columnChunk->num_values() > 0)
2343 : {
2344 6 : bFound = false;
2345 6 : break;
2346 : }
2347 : }
2348 511 : return v;
2349 : }
2350 :
2351 500 : static T max(const std::shared_ptr<parquet::FileMetaData> &metadata,
2352 : const int iRowGroup, const int numRowGroups, const int iCol,
2353 : bool &bFound)
2354 : {
2355 500 : T v{};
2356 500 : bFound = false;
2357 1014 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2358 : {
2359 544 : const auto columnChunk =
2360 30 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2361 : ->ColumnChunk(iCol);
2362 514 : const auto colStats = columnChunk->statistics();
2363 1026 : if (columnChunk->is_stats_set() && colStats &&
2364 512 : colStats->HasMinMax())
2365 : {
2366 512 : auto castStats = static_cast<STAT_TYPE *>(colStats.get());
2367 512 : const auto rowGroupVal = castStats->max();
2368 512 : if (i == 0 || rowGroupVal > v)
2369 : {
2370 510 : bFound = true;
2371 510 : v = rowGroupVal;
2372 : }
2373 : }
2374 2 : else if (columnChunk->num_values() > 0)
2375 : {
2376 0 : bFound = false;
2377 0 : break;
2378 : }
2379 : }
2380 500 : return v;
2381 : }
2382 : };
2383 :
2384 : template <> struct GetStats<parquet::ByteArrayStatistics>
2385 : {
2386 : static std::string
2387 39 : min(const std::shared_ptr<parquet::FileMetaData> &metadata,
2388 : const int iRowGroup, const int numRowGroups, const int iCol,
2389 : bool &bFound)
2390 : {
2391 39 : std::string v{};
2392 39 : bFound = false;
2393 79 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2394 : {
2395 : const auto columnChunk =
2396 40 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2397 80 : ->ColumnChunk(iCol);
2398 80 : const auto colStats = columnChunk->statistics();
2399 80 : if (columnChunk->is_stats_set() && colStats &&
2400 40 : colStats->HasMinMax())
2401 : {
2402 : auto castStats =
2403 40 : static_cast<parquet::ByteArrayStatistics *>(colStats.get());
2404 40 : const auto rowGroupValRaw = castStats->min();
2405 : const std::string rowGroupVal(
2406 40 : reinterpret_cast<const char *>(rowGroupValRaw.ptr),
2407 80 : rowGroupValRaw.len);
2408 40 : if (i == 0 || rowGroupVal < v)
2409 : {
2410 39 : bFound = true;
2411 39 : v = rowGroupVal;
2412 : }
2413 : }
2414 : }
2415 39 : return v;
2416 : }
2417 :
2418 : static std::string
2419 39 : max(const std::shared_ptr<parquet::FileMetaData> &metadata,
2420 : const int iRowGroup, const int numRowGroups, const int iCol,
2421 : bool &bFound)
2422 : {
2423 39 : std::string v{};
2424 39 : bFound = false;
2425 79 : for (int i = 0; i < (iRowGroup < 0 ? numRowGroups : 1); i++)
2426 : {
2427 : const auto columnChunk =
2428 40 : metadata->RowGroup(iRowGroup < 0 ? i : iRowGroup)
2429 40 : ->ColumnChunk(iCol);
2430 40 : const auto colStats = columnChunk->statistics();
2431 80 : if (columnChunk->is_stats_set() && colStats &&
2432 40 : colStats->HasMinMax())
2433 : {
2434 : auto castStats =
2435 40 : static_cast<parquet::ByteArrayStatistics *>(colStats.get());
2436 40 : const auto rowGroupValRaw = castStats->max();
2437 : const std::string rowGroupVal(
2438 40 : reinterpret_cast<const char *>(rowGroupValRaw.ptr),
2439 80 : rowGroupValRaw.len);
2440 40 : if (i == 0 || rowGroupVal > v)
2441 : {
2442 40 : bFound = true;
2443 40 : v = rowGroupVal;
2444 : }
2445 : }
2446 : else
2447 : {
2448 0 : bFound = false;
2449 0 : break;
2450 : }
2451 : }
2452 39 : return v;
2453 : }
2454 : };
2455 :
2456 : /************************************************************************/
2457 : /* GetMinMaxForOGRField() */
2458 : /************************************************************************/
2459 :
2460 258 : bool OGRParquetLayer::GetMinMaxForOGRField(int iRowGroup, // -1 for all
2461 : int iOGRField, bool bComputeMin,
2462 : OGRField &sMin, bool &bFoundMin,
2463 : bool bComputeMax, OGRField &sMax,
2464 : bool &bFoundMax, OGRFieldType &eType,
2465 : OGRFieldSubType &eSubType,
2466 : std::string &osMinTmp,
2467 : std::string &osMaxTmp) const
2468 : {
2469 258 : OGR_RawField_SetNull(&sMin);
2470 258 : OGR_RawField_SetNull(&sMax);
2471 258 : eType = OFTReal;
2472 258 : eSubType = OFSTNone;
2473 258 : bFoundMin = false;
2474 258 : bFoundMax = false;
2475 :
2476 : const int iCol = iOGRField == OGR_FID_INDEX
2477 511 : ? m_iFIDParquetColumn
2478 253 : : GetMapFieldIndexToParquetColumn()[iOGRField];
2479 258 : if (iCol < 0)
2480 0 : return false;
2481 : const auto &arrowType = iOGRField == OGR_FID_INDEX
2482 258 : ? m_poFIDType
2483 253 : : GetArrowFieldTypes()[iOGRField];
2484 :
2485 258 : const bool bRet = GetMinMaxForParquetCol(
2486 : iRowGroup, iCol, arrowType, bComputeMin, sMin, bFoundMin, bComputeMax,
2487 : sMax, bFoundMax, eType, eSubType, osMinTmp, osMaxTmp);
2488 :
2489 258 : if (eType == OFTInteger64 && arrowType->id() == arrow::Type::TIMESTAMP)
2490 : {
2491 : const OGRFieldDefn oDummyFIDFieldDefn(m_osFIDColumn.c_str(),
2492 4 : OFTInteger64);
2493 : const OGRFieldDefn *poFieldDefn =
2494 2 : iOGRField == OGR_FID_INDEX ? &oDummyFIDFieldDefn
2495 : : const_cast<OGRParquetLayer *>(this)
2496 2 : ->GetLayerDefn()
2497 2 : ->GetFieldDefn(iOGRField);
2498 2 : if (poFieldDefn->GetType() == OFTDateTime)
2499 : {
2500 : const auto timestampType =
2501 2 : static_cast<arrow::TimestampType *>(arrowType.get());
2502 2 : if (bFoundMin)
2503 : {
2504 1 : const int64_t timestamp = sMin.Integer64;
2505 1 : OGRArrowLayer::TimestampToOGR(timestamp, timestampType,
2506 : poFieldDefn->GetTZFlag(), &sMin);
2507 : }
2508 2 : if (bFoundMax)
2509 : {
2510 1 : const int64_t timestamp = sMax.Integer64;
2511 1 : OGRArrowLayer::TimestampToOGR(timestamp, timestampType,
2512 : poFieldDefn->GetTZFlag(), &sMax);
2513 : }
2514 2 : eType = OFTDateTime;
2515 : }
2516 : }
2517 :
2518 258 : return bRet;
2519 : }
2520 :
2521 : /************************************************************************/
2522 : /* GetMinMaxForParquetCol() */
2523 : /************************************************************************/
2524 :
2525 867 : bool OGRParquetLayer::GetMinMaxForParquetCol(
2526 : int iRowGroup, // -1 for all
2527 : int iCol,
2528 : const std::shared_ptr<arrow::DataType> &arrowType, // potentially nullptr
2529 : bool bComputeMin, OGRField &sMin, bool &bFoundMin, bool bComputeMax,
2530 : OGRField &sMax, bool &bFoundMax, OGRFieldType &eType,
2531 : OGRFieldSubType &eSubType, std::string &osMinTmp,
2532 : std::string &osMaxTmp) const
2533 : {
2534 867 : OGR_RawField_SetNull(&sMin);
2535 867 : OGR_RawField_SetNull(&sMax);
2536 867 : eType = OFTReal;
2537 867 : eSubType = OFSTNone;
2538 867 : bFoundMin = false;
2539 867 : bFoundMax = false;
2540 :
2541 1734 : const auto metadata = GetReader()->parquet_reader()->metadata();
2542 867 : const auto numRowGroups = metadata->num_row_groups();
2543 :
2544 867 : if (numRowGroups == 0)
2545 0 : return false;
2546 :
2547 1734 : const auto rowGroup0 = metadata->RowGroup(0);
2548 867 : if (iCol < 0 || iCol >= rowGroup0->num_columns())
2549 : {
2550 0 : CPLError(CE_Failure, CPLE_AppDefined,
2551 : "GetMinMaxForParquetCol(): invalid iCol=%d", iCol);
2552 0 : return false;
2553 : }
2554 1734 : const auto rowGroup0columnChunk = rowGroup0->ColumnChunk(iCol);
2555 1734 : const auto rowGroup0Stats = rowGroup0columnChunk->statistics();
2556 867 : if (!(rowGroup0columnChunk->is_stats_set() && rowGroup0Stats))
2557 : {
2558 0 : CPLDebug("PARQUET", "Statistics not available for field %s",
2559 0 : rowGroup0columnChunk->path_in_schema()->ToDotString().c_str());
2560 0 : return false;
2561 : }
2562 :
2563 867 : const auto physicalType = rowGroup0Stats->physical_type();
2564 :
2565 867 : if (bComputeMin)
2566 : {
2567 553 : if (physicalType == parquet::Type::BOOLEAN)
2568 : {
2569 54 : eType = OFTInteger;
2570 54 : eSubType = OFSTBoolean;
2571 54 : sMin.Integer = GetStats<parquet::BoolStatistics>::min(
2572 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
2573 : }
2574 499 : else if (physicalType == parquet::Type::INT32)
2575 : {
2576 78 : if (arrowType && arrowType->id() == arrow::Type::UINT32)
2577 : {
2578 : // With parquet file version 2.0,
2579 : // statistics of uint32 fields are
2580 : // stored as signed int32 values...
2581 1 : eType = OFTInteger64;
2582 1 : int nVal = GetStats<parquet::Int32Statistics>::min(
2583 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
2584 1 : if (bFoundMin)
2585 : {
2586 1 : sMin.Integer64 = static_cast<uint32_t>(nVal);
2587 : }
2588 : }
2589 : else
2590 : {
2591 77 : eType = OFTInteger;
2592 77 : if (arrowType && arrowType->id() == arrow::Type::INT16)
2593 1 : eSubType = OFSTInt16;
2594 77 : sMin.Integer = GetStats<parquet::Int32Statistics>::min(
2595 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
2596 : }
2597 : }
2598 421 : else if (physicalType == parquet::Type::INT64)
2599 : {
2600 41 : eType = OFTInteger64;
2601 41 : sMin.Integer64 = GetStats<parquet::Int64Statistics>::min(
2602 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
2603 : }
2604 380 : else if (physicalType == parquet::Type::FLOAT)
2605 : {
2606 128 : eType = OFTReal;
2607 128 : eSubType = OFSTFloat32;
2608 128 : sMin.Real = GetStats<parquet::FloatStatistics>::min(
2609 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
2610 : }
2611 252 : else if (physicalType == parquet::Type::DOUBLE)
2612 : {
2613 210 : eType = OFTReal;
2614 210 : sMin.Real = GetStats<parquet::DoubleStatistics>::min(
2615 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
2616 : }
2617 42 : else if (arrowType &&
2618 53 : (arrowType->id() == arrow::Type::STRING ||
2619 95 : arrowType->id() == arrow::Type::LARGE_STRING) &&
2620 : physicalType == parquet::Type::BYTE_ARRAY)
2621 : {
2622 78 : osMinTmp = GetStats<parquet::ByteArrayStatistics>::min(
2623 39 : metadata, iRowGroup, numRowGroups, iCol, bFoundMin);
2624 39 : if (bFoundMin)
2625 : {
2626 39 : eType = OFTString;
2627 39 : sMin.String = &osMinTmp[0];
2628 : }
2629 : }
2630 : }
2631 :
2632 867 : if (bComputeMax)
2633 : {
2634 542 : if (physicalType == parquet::Type::BOOLEAN)
2635 : {
2636 54 : eType = OFTInteger;
2637 54 : eSubType = OFSTBoolean;
2638 54 : sMax.Integer = GetStats<parquet::BoolStatistics>::max(
2639 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
2640 : }
2641 488 : else if (physicalType == parquet::Type::INT32)
2642 : {
2643 78 : if (arrowType && arrowType->id() == arrow::Type::UINT32)
2644 : {
2645 : // With parquet file version 2.0,
2646 : // statistics of uint32 fields are
2647 : // stored as signed int32 values...
2648 1 : eType = OFTInteger64;
2649 1 : int nVal = GetStats<parquet::Int32Statistics>::max(
2650 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
2651 1 : if (bFoundMax)
2652 : {
2653 1 : sMax.Integer64 = static_cast<uint32_t>(nVal);
2654 : }
2655 : }
2656 : else
2657 : {
2658 77 : eType = OFTInteger;
2659 77 : if (arrowType && arrowType->id() == arrow::Type::INT16)
2660 1 : eSubType = OFSTInt16;
2661 77 : sMax.Integer = GetStats<parquet::Int32Statistics>::max(
2662 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
2663 : }
2664 : }
2665 410 : else if (physicalType == parquet::Type::INT64)
2666 : {
2667 41 : eType = OFTInteger64;
2668 41 : sMax.Integer64 = GetStats<parquet::Int64Statistics>::max(
2669 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
2670 : }
2671 369 : else if (physicalType == parquet::Type::FLOAT)
2672 : {
2673 118 : eType = OFTReal;
2674 118 : eSubType = OFSTFloat32;
2675 118 : sMax.Real = GetStats<parquet::FloatStatistics>::max(
2676 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
2677 : }
2678 251 : else if (physicalType == parquet::Type::DOUBLE)
2679 : {
2680 209 : eType = OFTReal;
2681 209 : sMax.Real = GetStats<parquet::DoubleStatistics>::max(
2682 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
2683 : }
2684 42 : else if (arrowType &&
2685 53 : (arrowType->id() == arrow::Type::STRING ||
2686 95 : arrowType->id() == arrow::Type::LARGE_STRING) &&
2687 : physicalType == parquet::Type::BYTE_ARRAY)
2688 : {
2689 78 : osMaxTmp = GetStats<parquet::ByteArrayStatistics>::max(
2690 39 : metadata, iRowGroup, numRowGroups, iCol, bFoundMax);
2691 39 : if (bFoundMax)
2692 : {
2693 39 : eType = OFTString;
2694 39 : sMax.String = &osMaxTmp[0];
2695 : }
2696 : }
2697 : }
2698 :
2699 867 : return bFoundMin || bFoundMax;
2700 : }
2701 :
2702 : /************************************************************************/
2703 : /* GeomColsBBOXParquet() */
2704 : /************************************************************************/
2705 :
2706 : /** Return for a given geometry column (iGeom: in [0, GetGeomFieldCount()-1] range),
2707 : * the Parquet column number of the corresponding xmin,ymin,xmax,ymax bounding
2708 : * box columns, if existing.
2709 : */
2710 1 : bool OGRParquetLayer::GeomColsBBOXParquet(int iGeom, int &iParquetXMin,
2711 : int &iParquetYMin, int &iParquetXMax,
2712 : int &iParquetYMax) const
2713 : {
2714 1 : const auto oIter = m_oMapGeomFieldIndexToGeomColBBOXParquet.find(iGeom);
2715 : const bool bFound =
2716 1 : (oIter != m_oMapGeomFieldIndexToGeomColBBOXParquet.end());
2717 1 : if (bFound)
2718 : {
2719 1 : iParquetXMin = oIter->second.iParquetXMin;
2720 1 : iParquetYMin = oIter->second.iParquetYMin;
2721 1 : iParquetXMax = oIter->second.iParquetXMax;
2722 1 : iParquetYMax = oIter->second.iParquetYMax;
2723 : }
2724 1 : return bFound;
2725 : }
|