Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022-2024, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "ogrsf_frmts.h"
14 :
15 : #include <algorithm>
16 : #include <cassert>
17 : #include <map>
18 : #include <set>
19 : #include <utility>
20 :
21 : #include "cpl_time.h"
22 : #include "ogr_api.h"
23 :
24 : #include "ogr_parquet.h"
25 :
26 : #include "../arrow_common/ograrrowlayer.hpp"
27 : #include "../arrow_common/ograrrowdataset.hpp"
28 :
29 : #if PARQUET_VERSION_MAJOR >= 13
30 : // Using field indices for FieldRef is only supported since
31 : // https://github.com/apache/arrow/commit/10eedbe63c71f4cf8f0621f3a2304ab3168a2ae5
32 : #define SUPPORTS_INDICES_IN_FIELD_REF
33 : #endif
34 :
35 : namespace cp = ::arrow::compute;
36 :
37 : /************************************************************************/
38 : /* OGRParquetLayer() */
39 : /************************************************************************/
40 :
41 273 : OGRParquetDatasetLayer::OGRParquetDatasetLayer(
42 : OGRParquetDataset *poDS, const char *pszLayerName, bool bIsVSI,
43 : const std::shared_ptr<arrow::dataset::Dataset> &dataset,
44 273 : CSLConstList papszOpenOptions)
45 : : OGRParquetLayerBase(poDS, pszLayerName, papszOpenOptions),
46 273 : m_bIsVSI(bIsVSI), m_poDataset(dataset)
47 : {
48 273 : m_poSchema = m_poDataset->schema();
49 273 : EstablishFeatureDefn();
50 273 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
51 : m_poFeatureDefn->GetGeomFieldCount());
52 273 : }
53 :
54 : /************************************************************************/
55 : /* ProcessGeometryColumnCovering() */
56 : /************************************************************************/
57 :
58 : /** Process GeoParquet JSON geometry field object to extract information about
59 : * its bounding box column, and appropriately fill m_oMapGeomFieldIndexToGeomColBBOX
60 : * member with information on that bounding box column.
61 : */
62 254 : void OGRParquetDatasetLayer::ProcessGeometryColumnCovering(
63 : const std::shared_ptr<arrow::Field> &field,
64 : const CPLJSONObject &oJSONGeometryColumn)
65 : {
66 508 : std::string osBBOXColumn;
67 508 : std::string osXMin, osYMin, osXMax, osYMax;
68 254 : if (ParseGeometryColumnCovering(oJSONGeometryColumn, osBBOXColumn, osXMin,
69 : osYMin, osXMax, osYMax))
70 : {
71 90 : OGRArrowLayer::GeomColBBOX sDesc;
72 90 : sDesc.iArrowCol = m_poSchema->GetFieldIndex(osBBOXColumn);
73 180 : const auto fieldBBOX = m_poSchema->GetFieldByName(osBBOXColumn);
74 180 : if (sDesc.iArrowCol >= 0 && fieldBBOX &&
75 90 : fieldBBOX->type()->id() == arrow::Type::STRUCT)
76 : {
77 : const auto fieldBBOXStruct =
78 180 : std::static_pointer_cast<arrow::StructType>(fieldBBOX->type());
79 180 : const auto fieldXMin = fieldBBOXStruct->GetFieldByName(osXMin);
80 180 : const auto fieldYMin = fieldBBOXStruct->GetFieldByName(osYMin);
81 180 : const auto fieldXMax = fieldBBOXStruct->GetFieldByName(osXMax);
82 180 : const auto fieldYMax = fieldBBOXStruct->GetFieldByName(osYMax);
83 90 : const int nXMinIdx = fieldBBOXStruct->GetFieldIndex(osXMin);
84 90 : const int nYMinIdx = fieldBBOXStruct->GetFieldIndex(osYMin);
85 90 : const int nXMaxIdx = fieldBBOXStruct->GetFieldIndex(osXMax);
86 90 : const int nYMaxIdx = fieldBBOXStruct->GetFieldIndex(osYMax);
87 90 : if (nXMinIdx >= 0 && nYMinIdx >= 0 && nXMaxIdx >= 0 &&
88 180 : nYMaxIdx >= 0 && fieldXMin && fieldYMin && fieldXMax &&
89 180 : fieldYMax &&
90 90 : (fieldXMin->type()->id() == arrow::Type::FLOAT ||
91 0 : fieldXMin->type()->id() == arrow::Type::DOUBLE) &&
92 90 : fieldXMin->type()->id() == fieldYMin->type()->id() &&
93 270 : fieldXMin->type()->id() == fieldXMax->type()->id() &&
94 90 : fieldXMin->type()->id() == fieldYMax->type()->id())
95 : {
96 90 : CPLDebug("PARQUET",
97 : "Bounding box column '%s' detected for "
98 : "geometry column '%s'",
99 90 : osBBOXColumn.c_str(), field->name().c_str());
100 90 : sDesc.iArrowSubfieldXMin = nXMinIdx;
101 90 : sDesc.iArrowSubfieldYMin = nYMinIdx;
102 90 : sDesc.iArrowSubfieldXMax = nXMaxIdx;
103 90 : sDesc.iArrowSubfieldYMax = nYMaxIdx;
104 90 : sDesc.bIsFloat =
105 90 : (fieldXMin->type()->id() == arrow::Type::FLOAT);
106 :
107 : m_oMapGeomFieldIndexToGeomColBBOX
108 90 : [m_poFeatureDefn->GetGeomFieldCount() - 1] =
109 90 : std::move(sDesc);
110 : }
111 : }
112 : }
113 254 : }
114 :
115 : /************************************************************************/
116 : /* EstablishFeatureDefn() */
117 : /************************************************************************/
118 :
119 273 : void OGRParquetDatasetLayer::EstablishFeatureDefn()
120 : {
121 273 : const auto &kv_metadata = m_poSchema->metadata();
122 :
123 273 : LoadGeoMetadata(kv_metadata);
124 : const auto oMapFieldNameToGDALSchemaFieldDefn =
125 546 : LoadGDALSchema(kv_metadata.get());
126 :
127 273 : LoadGDALMetadata(kv_metadata.get());
128 :
129 : const bool bUseBBOX =
130 273 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_BBOX", "YES"));
131 :
132 : // Keep track of declared bounding box columns in GeoParquet JSON metadata,
133 : // in order not to expose them as regular fields.
134 546 : std::set<std::string> oSetBBOXColumns;
135 273 : if (bUseBBOX)
136 : {
137 526 : for (const auto &iter : m_oMapGeometryColumns)
138 : {
139 508 : std::string osBBOXColumn;
140 508 : std::string osXMin, osYMin, osXMax, osYMax;
141 254 : if (ParseGeometryColumnCovering(iter.second, osBBOXColumn, osXMin,
142 : osYMin, osXMax, osYMax))
143 : {
144 89 : oSetBBOXColumns.insert(osBBOXColumn);
145 : }
146 : }
147 : }
148 :
149 273 : const auto &fields = m_poSchema->fields();
150 :
151 : // Overture Maps 2024-04-16-beta.0 almost follows GeoParquet 1.1, except
152 : // they don't declare the "covering" element in the GeoParquet JSON metadata
153 801 : if (m_oMapGeometryColumns.find("geometry") != m_oMapGeometryColumns.end() &&
154 508 : bUseBBOX &&
155 1239 : !m_oMapGeometryColumns["geometry"].GetObj("covering").IsValid() &&
156 438 : m_oMapGeometryColumns["geometry"].GetString("encoding") == "WKB")
157 : {
158 6327 : for (int i = 0; i < m_poSchema->num_fields(); ++i)
159 : {
160 6247 : const auto &field = fields[i];
161 6248 : if (field->name() == "bbox" &&
162 1 : field->type()->id() == arrow::Type::STRUCT)
163 : {
164 1 : bool bXMin = false;
165 1 : bool bXMax = false;
166 1 : bool bYMin = false;
167 1 : bool bYMax = false;
168 2 : const auto subfields = field->Flatten();
169 1 : if (subfields.size() == 4)
170 : {
171 5 : for (int j = 0; j < static_cast<int>(subfields.size()); j++)
172 : {
173 4 : const auto &subfield = subfields[j];
174 4 : if (subfield->name() == "bbox.xmin")
175 1 : bXMin = true;
176 3 : else if (subfield->name() == "bbox.xmax")
177 1 : bXMax = true;
178 2 : else if (subfield->name() == "bbox.ymin")
179 1 : bYMin = true;
180 1 : else if (subfield->name() == "bbox.ymax")
181 1 : bYMax = true;
182 : }
183 : }
184 1 : if (bXMin && bXMax && bYMin && bYMax)
185 : {
186 3 : CPLJSONObject oDef = m_oMapGeometryColumns["geometry"];
187 2 : CPLJSONObject oCovering;
188 1 : oDef.Add("covering", oCovering);
189 1 : CPLJSONObject oBBOX;
190 1 : oCovering.Add("bbox", oBBOX);
191 : {
192 1 : CPLJSONArray oArray;
193 1 : oArray.Add("bbox");
194 1 : oArray.Add("xmin");
195 1 : oBBOX.Add("xmin", oArray);
196 : }
197 : {
198 1 : CPLJSONArray oArray;
199 1 : oArray.Add("bbox");
200 1 : oArray.Add("ymin");
201 1 : oBBOX.Add("ymin", oArray);
202 : }
203 : {
204 1 : CPLJSONArray oArray;
205 1 : oArray.Add("bbox");
206 1 : oArray.Add("xmax");
207 1 : oBBOX.Add("xmax", oArray);
208 : }
209 : {
210 1 : CPLJSONArray oArray;
211 1 : oArray.Add("bbox");
212 1 : oArray.Add("ymax");
213 1 : oBBOX.Add("ymax", oArray);
214 : }
215 1 : oSetBBOXColumns.insert("bbox");
216 1 : m_oMapGeometryColumns["geometry"] = std::move(oDef);
217 : }
218 1 : break;
219 : }
220 : }
221 : }
222 :
223 7178 : for (int i = 0; i < m_poSchema->num_fields(); ++i)
224 : {
225 6905 : const auto &field = fields[i];
226 :
227 6905 : if (!m_osFIDColumn.empty() && field->name() == m_osFIDColumn)
228 : {
229 2 : m_iFIDArrowColumn = i;
230 2 : continue;
231 : }
232 :
233 6903 : if (oSetBBOXColumns.find(field->name()) != oSetBBOXColumns.end())
234 : {
235 90 : m_oSetBBoxArrowColumns.insert(i);
236 90 : continue;
237 : }
238 :
239 : const bool bGeometryField =
240 6892 : DealWithGeometryColumn(i, field, []() { return wkbUnknown; });
241 6813 : if (bGeometryField)
242 : {
243 255 : const auto oIter = m_oMapGeometryColumns.find(field->name());
244 255 : if (bUseBBOX && oIter != m_oMapGeometryColumns.end())
245 : {
246 254 : ProcessGeometryColumnCovering(field, oIter->second);
247 : }
248 : }
249 : else
250 : {
251 6558 : CreateFieldFromSchema(field, {i},
252 : oMapFieldNameToGDALSchemaFieldDefn);
253 : }
254 : }
255 :
256 273 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
257 : m_poFeatureDefn->GetFieldCount());
258 273 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
259 : m_poFeatureDefn->GetGeomFieldCount());
260 273 : }
261 :
262 : namespace
263 : {
264 :
265 : /************************************************************************/
266 : /* WKBGeometryOptionsType */
267 : /************************************************************************/
268 :
269 : class WKBGeometryOptions;
270 :
271 : class WKBGeometryOptionsType : public cp::FunctionOptionsType
272 : {
273 : WKBGeometryOptionsType() = default;
274 :
275 : static const WKBGeometryOptions &Cast(const cp::FunctionOptions &opts);
276 :
277 : public:
278 23 : const char *type_name() const override
279 : {
280 23 : return "WKBGeometryOptionsType";
281 : }
282 :
283 : std::string Stringify(const cp::FunctionOptions &) const override;
284 : bool Compare(const cp::FunctionOptions &,
285 : const cp::FunctionOptions &) const override;
286 : std::unique_ptr<cp::FunctionOptions>
287 : Copy(const cp::FunctionOptions &) const override;
288 :
289 28 : static WKBGeometryOptionsType *GetSingleton()
290 : {
291 28 : static WKBGeometryOptionsType instance;
292 28 : return &instance;
293 : }
294 : };
295 :
296 : /************************************************************************/
297 : /* WKBGeometryOptions */
298 : /************************************************************************/
299 :
300 : class WKBGeometryOptions : public cp::FunctionOptions
301 : {
302 :
303 : public:
304 22 : explicit WKBGeometryOptions(
305 : const std::vector<GByte> &abyFilterGeomWkbIn = std::vector<GByte>())
306 22 : : cp::FunctionOptions(WKBGeometryOptionsType::GetSingleton()),
307 22 : abyFilterGeomWkb(abyFilterGeomWkbIn)
308 : {
309 22 : }
310 :
311 3 : bool operator==(const WKBGeometryOptions &other) const
312 : {
313 3 : return abyFilterGeomWkb == other.abyFilterGeomWkb;
314 : }
315 :
316 : std::vector<GByte> abyFilterGeomWkb;
317 : };
318 :
319 : const WKBGeometryOptions &
320 32 : WKBGeometryOptionsType::Cast(const cp::FunctionOptions &opts)
321 : {
322 32 : return *cpl::down_cast<const WKBGeometryOptions *>(&opts);
323 : }
324 :
325 3 : bool WKBGeometryOptionsType::Compare(const cp::FunctionOptions &optsA,
326 : const cp::FunctionOptions &optsB) const
327 : {
328 3 : return Cast(optsA) == Cast(optsB);
329 : }
330 :
331 : std::string
332 23 : WKBGeometryOptionsType::Stringify(const cp::FunctionOptions &opts) const
333 : {
334 23 : const auto &bboxOptions = Cast(opts);
335 23 : std::string osRet(type_name());
336 23 : osRet += '-';
337 2162 : for (GByte byVal : bboxOptions.abyFilterGeomWkb)
338 2139 : osRet += CPLSPrintf("%02X", byVal);
339 23 : return osRet;
340 : }
341 :
342 : std::unique_ptr<cp::FunctionOptions>
343 3 : WKBGeometryOptionsType::Copy(const cp::FunctionOptions &opts) const
344 : {
345 3 : return std::make_unique<WKBGeometryOptions>(Cast(opts));
346 : }
347 :
348 : /************************************************************************/
349 : /* OptionsWrapper */
350 : /************************************************************************/
351 :
352 : /// KernelState adapter for the common case of kernels whose only
353 : /// state is an instance of a subclass of FunctionOptions.
354 : template <typename OptionsType> struct OptionsWrapper : public cp::KernelState
355 : {
356 19 : explicit OptionsWrapper(OptionsType optionsIn)
357 19 : : options(std::move(optionsIn))
358 : {
359 19 : }
360 :
361 : static arrow::Result<std::unique_ptr<cp::KernelState>>
362 19 : Init(cp::KernelContext *, const cp::KernelInitArgs &args)
363 : {
364 19 : auto options = cpl::down_cast<const OptionsType *>(args.options);
365 19 : CPLAssert(options);
366 19 : return std::make_unique<OptionsWrapper>(*options);
367 : }
368 :
369 32 : static const OptionsType &Get(cp::KernelContext *ctx)
370 : {
371 32 : return cpl::down_cast<const OptionsWrapper *>(ctx->state())->options;
372 : }
373 :
374 : OptionsType options;
375 : };
376 : } // namespace
377 :
378 : /************************************************************************/
379 : /* ExecOGRWKBIntersects() */
380 : /************************************************************************/
381 :
382 32 : static arrow::Status ExecOGRWKBIntersects(cp::KernelContext *ctx,
383 : const cp::ExecSpan &batch,
384 : cp::ExecResult *out)
385 : {
386 : // Get filter geometry
387 32 : const auto &opts = OptionsWrapper<WKBGeometryOptions>::Get(ctx);
388 32 : OGRGeometry *poGeomTmp = nullptr;
389 64 : OGRErr eErr = OGRGeometryFactory::createFromWkb(
390 32 : opts.abyFilterGeomWkb.data(), nullptr, &poGeomTmp,
391 32 : opts.abyFilterGeomWkb.size());
392 32 : CPL_IGNORE_RET_VAL(eErr);
393 32 : CPLAssert(eErr == OGRERR_NONE);
394 32 : CPLAssert(poGeomTmp != nullptr);
395 64 : std::unique_ptr<OGRGeometry> poFilterGeom(poGeomTmp);
396 32 : OGREnvelope sFilterEnvelope;
397 32 : poFilterGeom->getEnvelope(&sFilterEnvelope);
398 32 : const bool bFilterIsEnvelope = poFilterGeom->IsRectangle();
399 :
400 : // Deal with input array
401 32 : CPLAssert(batch.num_values() == 1);
402 32 : const arrow::ArraySpan &input = batch[0].array;
403 32 : CPLAssert(input.type->id() == arrow::Type::BINARY);
404 : // Packed array of bits
405 32 : const auto pabyInputValidity = input.buffers[0].data;
406 32 : const auto nInputOffsets = input.offset;
407 32 : const auto panWkbOffsets = input.GetValues<int32_t>(1);
408 32 : const auto pabyWkbArray = input.buffers[2].data;
409 :
410 : // Deal with output array
411 32 : CPLAssert(out->type()->id() == arrow::Type::BOOL);
412 32 : auto out_span = out->array_span();
413 : // Below array holds 8 bits per uint8_t
414 32 : uint8_t *pabitsOutValues = out_span->buffers[1].data;
415 32 : const auto nOutOffset = out_span->offset;
416 :
417 : // Iterate over WKB geometries
418 32 : OGRPreparedGeometry *pPreparedFilterGeom = nullptr;
419 32 : OGREnvelope sEnvelope;
420 147 : for (int64_t i = 0; i < batch.length; ++i)
421 : {
422 : const bool bInputIsNull =
423 151 : (pabyInputValidity &&
424 36 : arrow::bit_util::GetBit(pabyInputValidity, i + nInputOffsets) ==
425 115 : 0);
426 115 : bool bOutputVal = false;
427 115 : if (!bInputIsNull)
428 : {
429 104 : const GByte *pabyWkb = pabyWkbArray + panWkbOffsets[i];
430 104 : const size_t nWkbSize = panWkbOffsets[i + 1] - panWkbOffsets[i];
431 104 : bOutputVal = OGRLayer::FilterWKBGeometry(
432 : pabyWkb, nWkbSize,
433 : /* bEnvelopeAlreadySet = */ false, sEnvelope,
434 104 : poFilterGeom.get(), bFilterIsEnvelope, sFilterEnvelope,
435 : pPreparedFilterGeom);
436 : }
437 115 : if (bOutputVal)
438 56 : arrow::bit_util::SetBit(pabitsOutValues, i + nOutOffset);
439 : else
440 59 : arrow::bit_util::ClearBit(pabitsOutValues, i + nOutOffset);
441 : }
442 :
443 : // Cleanup
444 32 : if (pPreparedFilterGeom)
445 0 : OGRDestroyPreparedGeometry(pPreparedFilterGeom);
446 :
447 64 : return arrow::Status::OK();
448 : }
449 :
450 : /************************************************************************/
451 : /* RegisterOGRWKBIntersectsIfNeeded() */
452 : /************************************************************************/
453 :
454 19 : static bool RegisterOGRWKBIntersectsIfNeeded()
455 : {
456 19 : auto registry = cp::GetFunctionRegistry();
457 : bool bRet =
458 19 : registry->GetFunction("OGRWKBIntersects").ValueOr(nullptr) != nullptr;
459 19 : if (!bRet)
460 : {
461 3 : static const WKBGeometryOptions defaultOpts;
462 :
463 : // Below assert is completely useless but helps improve test coverage
464 3 : CPLAssert(WKBGeometryOptionsType::GetSingleton()->Compare(
465 : defaultOpts, *(WKBGeometryOptionsType::GetSingleton()
466 : ->Copy(defaultOpts)
467 : .get())));
468 :
469 : auto func = std::make_shared<cp::ScalarFunction>(
470 6 : "OGRWKBIntersects", cp::Arity::Unary(), cp::FunctionDoc(),
471 9 : &defaultOpts);
472 : cp::ScalarKernel kernel({arrow::binary()}, arrow::boolean(),
473 : ExecOGRWKBIntersects,
474 12 : OptionsWrapper<WKBGeometryOptions>::Init);
475 3 : kernel.null_handling = cp::NullHandling::OUTPUT_NOT_NULL;
476 9 : bRet = func->AddKernel(std::move(kernel)).ok() &&
477 6 : registry->AddFunction(std::move(func)).ok();
478 : }
479 19 : return bRet;
480 : }
481 :
482 : /************************************************************************/
483 : /* BuildScanner() */
484 : /************************************************************************/
485 :
486 696 : void OGRParquetDatasetLayer::BuildScanner()
487 : {
488 696 : m_bRebuildScanner = false;
489 696 : m_bSkipFilterGeometry = false;
490 696 : m_bBaseArrowIgnoreSpatialFilterRect = false;
491 696 : m_bBaseArrowIgnoreSpatialFilter = false;
492 696 : m_bBaseArrowIgnoreAttributeFilter = false;
493 :
494 : try
495 : {
496 696 : std::shared_ptr<arrow::dataset::ScannerBuilder> scannerBuilder;
497 1392 : PARQUET_ASSIGN_OR_THROW(scannerBuilder, m_poDataset->NewScan());
498 696 : assert(scannerBuilder);
499 :
500 : // We cannot use the shared memory pool. Otherwise we get random
501 : // crashes in multi-threaded arrow code (apparently some cleanup code),
502 : // that may used the memory pool after it has been destroyed.
503 : // At least this was true with some older libarrow version
504 : // PARQUET_THROW_NOT_OK(scannerBuilder->Pool(m_poMemoryPool));
505 :
506 696 : if (m_bIsVSI)
507 : {
508 254 : const int nFragmentReadAhead = atoi(
509 : CPLGetConfigOption("OGR_PARQUET_FRAGMENT_READ_AHEAD", "2"));
510 508 : PARQUET_THROW_NOT_OK(
511 : scannerBuilder->FragmentReadahead(nFragmentReadAhead));
512 : }
513 :
514 : const char *pszBatchSize =
515 696 : CPLGetConfigOption("OGR_PARQUET_BATCH_SIZE", nullptr);
516 696 : if (pszBatchSize)
517 : {
518 0 : PARQUET_THROW_NOT_OK(
519 : scannerBuilder->BatchSize(CPLAtoGIntBig(pszBatchSize)));
520 : }
521 :
522 696 : const int nNumCPUs = GetNumCPUs();
523 : const char *pszUseThreads =
524 696 : CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr);
525 696 : if (!pszUseThreads && nNumCPUs > 1)
526 : {
527 696 : pszUseThreads = "YES";
528 : }
529 696 : if (pszUseThreads && CPLTestBool(pszUseThreads))
530 : {
531 1392 : PARQUET_THROW_NOT_OK(scannerBuilder->UseThreads(true));
532 : }
533 :
534 : #if PARQUET_VERSION_MAJOR >= 10
535 : const char *pszBatchReadAhead =
536 696 : CPLGetConfigOption("OGR_PARQUET_BATCH_READ_AHEAD", nullptr);
537 696 : if (pszBatchReadAhead)
538 : {
539 0 : PARQUET_THROW_NOT_OK(
540 : scannerBuilder->BatchReadahead(atoi(pszBatchReadAhead)));
541 : }
542 : #endif
543 :
544 696 : cp::Expression expression;
545 1095 : if (m_poFilterGeom && !m_poFilterGeom->IsEmpty() &&
546 399 : CPLTestBool(CPLGetConfigOption(
547 : "OGR_PARQUET_OPTIMIZED_SPATIAL_FILTER", "YES")))
548 : {
549 : const auto oIter =
550 399 : m_oMapGeomFieldIndexToGeomColBBOX.find(m_iGeomFieldFilter);
551 399 : if (oIter != m_oMapGeomFieldIndexToGeomColBBOX.end())
552 : {
553 : // This actually requires Arrow >= 15 (https://github.com/apache/arrow/issues/39064)
554 : // to be more efficient.
555 : #ifdef SUPPORTS_INDICES_IN_FIELD_REF
556 194 : const auto &oBBOXDef = oIter->second;
557 1746 : expression = cp::and_(
558 : {cp::less_equal(
559 388 : cp::field_ref(arrow::FieldRef(
560 194 : oBBOXDef.iArrowCol, oBBOXDef.iArrowSubfieldXMin)),
561 388 : cp::literal(m_sFilterEnvelope.MaxX)),
562 : cp::less_equal(
563 388 : cp::field_ref(arrow::FieldRef(
564 194 : oBBOXDef.iArrowCol, oBBOXDef.iArrowSubfieldYMin)),
565 388 : cp::literal(m_sFilterEnvelope.MaxY)),
566 : cp::greater_equal(
567 388 : cp::field_ref(arrow::FieldRef(
568 194 : oBBOXDef.iArrowCol, oBBOXDef.iArrowSubfieldXMax)),
569 388 : cp::literal(m_sFilterEnvelope.MinX)),
570 : cp::greater_equal(
571 388 : cp::field_ref(arrow::FieldRef(
572 194 : oBBOXDef.iArrowCol, oBBOXDef.iArrowSubfieldYMax)),
573 1358 : cp::literal(m_sFilterEnvelope.MinY))});
574 : #else
575 : const auto oIter2 = m_oMapGeometryColumns.find(
576 : m_poFeatureDefn->GetGeomFieldDefn(m_iGeomFieldFilter)
577 : ->GetNameRef());
578 : std::string osBBOXColumn;
579 : std::string osXMin, osYMin, osXMax, osYMax;
580 : if (ParseGeometryColumnCovering(oIter2->second, osBBOXColumn,
581 : osXMin, osYMin, osXMax, osYMax))
582 : {
583 : expression = cp::and_(
584 : {cp::less_equal(cp::field_ref(arrow::FieldRef(
585 : osBBOXColumn, osXMin)),
586 : cp::literal(m_sFilterEnvelope.MaxX)),
587 : cp::less_equal(cp::field_ref(arrow::FieldRef(
588 : osBBOXColumn, osYMin)),
589 : cp::literal(m_sFilterEnvelope.MaxY)),
590 : cp::greater_equal(cp::field_ref(arrow::FieldRef(
591 : osBBOXColumn, osXMax)),
592 : cp::literal(m_sFilterEnvelope.MinX)),
593 : cp::greater_equal(
594 : cp::field_ref(
595 : arrow::FieldRef(osBBOXColumn, osYMax)),
596 : cp::literal(m_sFilterEnvelope.MinY))});
597 : }
598 : #endif
599 : }
600 615 : else if (m_iGeomFieldFilter >= 0 &&
601 205 : m_iGeomFieldFilter <
602 410 : static_cast<int>(m_aeGeomEncoding.size()) &&
603 205 : m_aeGeomEncoding[m_iGeomFieldFilter] ==
604 : OGRArrowGeomEncoding::GEOARROW_STRUCT_POINT)
605 : {
606 : const int iCol =
607 28 : m_anMapGeomFieldIndexToArrowColumn[m_iGeomFieldFilter];
608 28 : const auto &field = m_poSchema->fields()[iCol];
609 56 : auto type = field->type();
610 56 : std::vector<arrow::FieldRef> fieldRefs;
611 : #ifdef SUPPORTS_INDICES_IN_FIELD_REF
612 28 : fieldRefs.emplace_back(iCol);
613 : #else
614 : fieldRefs.emplace_back(field->name());
615 : #endif
616 28 : if (type->id() == arrow::Type::STRUCT)
617 : {
618 : const auto fieldStruct =
619 56 : std::static_pointer_cast<arrow::StructType>(type);
620 84 : const auto fieldX = fieldStruct->GetFieldByName("x");
621 84 : const auto fieldY = fieldStruct->GetFieldByName("y");
622 28 : if (fieldX && fieldY)
623 : {
624 56 : auto fieldRefX(fieldRefs);
625 28 : fieldRefX.emplace_back("x");
626 28 : auto fieldRefY(std::move(fieldRefs));
627 28 : fieldRefY.emplace_back("y");
628 252 : expression = cp::and_(
629 : {cp::less_equal(
630 56 : cp::field_ref(arrow::FieldRef(fieldRefX)),
631 56 : cp::literal(m_sFilterEnvelope.MaxX)),
632 : cp::less_equal(
633 56 : cp::field_ref(arrow::FieldRef(fieldRefY)),
634 56 : cp::literal(m_sFilterEnvelope.MaxY)),
635 : cp::greater_equal(
636 56 : cp::field_ref(arrow::FieldRef(fieldRefX)),
637 56 : cp::literal(m_sFilterEnvelope.MinX)),
638 : cp::greater_equal(
639 56 : cp::field_ref(arrow::FieldRef(fieldRefY)),
640 196 : cp::literal(m_sFilterEnvelope.MinY))});
641 : }
642 : }
643 : }
644 531 : else if (m_iGeomFieldFilter >= 0 &&
645 177 : m_iGeomFieldFilter <
646 354 : static_cast<int>(m_aeGeomEncoding.size()) &&
647 177 : m_aeGeomEncoding[m_iGeomFieldFilter] ==
648 : OGRArrowGeomEncoding::WKB)
649 : {
650 : const int iCol =
651 19 : m_anMapGeomFieldIndexToArrowColumn[m_iGeomFieldFilter];
652 19 : const auto &field = m_poSchema->fields()[iCol];
653 38 : if (field->type()->id() == arrow::Type::BINARY &&
654 19 : RegisterOGRWKBIntersectsIfNeeded())
655 : {
656 : #ifdef SUPPORTS_INDICES_IN_FIELD_REF
657 38 : auto oFieldRef = arrow::FieldRef(iCol);
658 : #else
659 : auto oFieldRef = arrow::FieldRef(field->name());
660 : #endif
661 38 : std::vector<GByte> abyFilterGeomWkb;
662 19 : abyFilterGeomWkb.resize(m_poFilterGeom->WkbSize());
663 19 : m_poFilterGeom->exportToWkb(wkbNDR, abyFilterGeomWkb.data(),
664 : wkbVariantIso);
665 : // Silence 'Using uninitialized value oFieldRef. Field oFieldRef.impl_._M_u is uninitialized when calling FieldRef.'
666 : // coverity[uninit_use_in_call]
667 57 : expression = cp::call("OGRWKBIntersects",
668 19 : {cp::field_ref(std::move(oFieldRef))},
669 76 : WKBGeometryOptions(abyFilterGeomWkb));
670 :
671 19 : if (expression.is_valid())
672 : {
673 19 : m_bBaseArrowIgnoreSpatialFilterRect = true;
674 19 : m_bBaseArrowIgnoreSpatialFilter = true;
675 19 : m_bSkipFilterGeometry = true;
676 : }
677 : }
678 : }
679 :
680 399 : if (expression.is_valid() && !m_bSkipFilterGeometry)
681 : {
682 222 : m_bBaseArrowIgnoreSpatialFilterRect = true;
683 :
684 : const bool bIsPoint =
685 222 : wkbFlatten(
686 : m_poFeatureDefn->GetGeomFieldDefn(m_iGeomFieldFilter)
687 222 : ->GetType()) == wkbPoint;
688 222 : m_bBaseArrowIgnoreSpatialFilter =
689 222 : m_bFilterIsEnvelope && bIsPoint;
690 :
691 222 : m_bSkipFilterGeometry =
692 333 : m_bFilterIsEnvelope &&
693 111 : (bIsPoint ||
694 111 : m_poFeatureDefn->GetGeomFieldDefn(m_iGeomFieldFilter)
695 111 : ->IsIgnored());
696 : }
697 : }
698 :
699 791 : if (m_poAttrQuery &&
700 95 : CPLTestBool(CPLGetConfigOption(
701 : "OGR_PARQUET_OPTIMIZED_ATTRIBUTE_FILTER", "YES")))
702 : {
703 : const swq_expr_node *poNode =
704 95 : static_cast<swq_expr_node *>(m_poAttrQuery->GetSWQExpr());
705 95 : bool bFullyTranslated = true;
706 190 : auto expressionFilter = BuildArrowFilter(poNode, bFullyTranslated);
707 95 : if (expressionFilter.is_valid())
708 : {
709 90 : if (bFullyTranslated)
710 : {
711 88 : CPLDebugOnly("PARQUET",
712 : "Attribute filter fully translated to Arrow");
713 88 : m_asAttributeFilterConstraints.clear();
714 88 : m_bBaseArrowIgnoreAttributeFilter = true;
715 : }
716 :
717 90 : if (expression.is_valid())
718 : expression =
719 7 : cp::and_(expression, std::move(expressionFilter));
720 : else
721 83 : expression = std::move(expressionFilter);
722 : }
723 : }
724 :
725 696 : if (expression.is_valid())
726 : {
727 648 : PARQUET_THROW_NOT_OK(scannerBuilder->Filter(expression));
728 : }
729 :
730 696 : if (m_bIgnoredFields)
731 : {
732 : #ifdef DEBUG
733 250 : std::string osFields;
734 943 : for (const std::string &osField : m_aosProjectedFields)
735 : {
736 818 : if (!osFields.empty())
737 694 : osFields += ',';
738 818 : osFields += osField;
739 : }
740 125 : CPLDebug("PARQUET", "Projected fields: %s", osFields.c_str());
741 : #endif
742 250 : PARQUET_THROW_NOT_OK(scannerBuilder->Project(m_aosProjectedFields));
743 : }
744 :
745 696 : PARQUET_ASSIGN_OR_THROW(m_poScanner, scannerBuilder->Finish());
746 : }
747 0 : catch (const std::exception &e)
748 : {
749 0 : CPLError(CE_Failure, CPLE_AppDefined, "Arrow/Parquet exception: %s",
750 0 : e.what());
751 : }
752 696 : }
753 :
754 : /************************************************************************/
755 : /* BuildArrowFilter() */
756 : /************************************************************************/
757 :
758 : cp::Expression
759 338 : OGRParquetDatasetLayer::BuildArrowFilter(const swq_expr_node *poNode,
760 : bool &bFullyTranslated)
761 : {
762 338 : if (poNode->eNodeType == SNT_OPERATION && poNode->nOperation == SWQ_AND &&
763 9 : poNode->nSubExprCount == 2)
764 : {
765 : const auto sLeft =
766 9 : BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated);
767 : const auto sRight =
768 9 : BuildArrowFilter(poNode->papoSubExpr[1], bFullyTranslated);
769 9 : if (sLeft.is_valid() && sRight.is_valid())
770 6 : return cp::and_(sLeft, sRight);
771 3 : if (sLeft.is_valid())
772 1 : return sLeft;
773 2 : if (sRight.is_valid())
774 2 : return sRight;
775 : }
776 :
777 329 : else if (poNode->eNodeType == SNT_OPERATION &&
778 125 : poNode->nOperation == SWQ_OR && poNode->nSubExprCount == 2)
779 : {
780 : const auto sLeft =
781 5 : BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated);
782 : const auto sRight =
783 5 : BuildArrowFilter(poNode->papoSubExpr[1], bFullyTranslated);
784 5 : if (sLeft.is_valid() && sRight.is_valid())
785 5 : return cp::or_(sLeft, sRight);
786 : }
787 :
788 324 : else if (poNode->eNodeType == SNT_OPERATION &&
789 120 : poNode->nOperation == SWQ_NOT && poNode->nSubExprCount == 1)
790 : {
791 : const auto expr =
792 11 : BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated);
793 11 : if (expr.is_valid())
794 11 : return cp::not_(expr);
795 : }
796 :
797 313 : else if (poNode->eNodeType == SNT_COLUMN)
798 : {
799 190 : if (poNode->field_index >= 0 &&
800 95 : poNode->field_index < m_poFeatureDefn->GetFieldCount())
801 : {
802 172 : std::vector<arrow::FieldRef> fieldRefs;
803 : #ifdef SUPPORTS_INDICES_IN_FIELD_REF
804 174 : for (int idx : m_anMapFieldIndexToArrowColumn[poNode->field_index])
805 88 : fieldRefs.emplace_back(idx);
806 : #else
807 : std::shared_ptr<arrow::Field> field;
808 : for (int idx : m_anMapFieldIndexToArrowColumn[poNode->field_index])
809 : {
810 : if (!field)
811 : {
812 : field = m_poSchema->fields()[idx];
813 : }
814 : else
815 : {
816 : CPLAssert(field->type()->id() == arrow::Type::STRUCT);
817 : const auto fieldStruct =
818 : std::static_pointer_cast<arrow::StructType>(
819 : field->type());
820 : field = fieldStruct->fields()[idx];
821 : }
822 : fieldRefs.emplace_back(field->name());
823 : }
824 : #endif
825 258 : auto expr = cp::field_ref(arrow::FieldRef(std::move(fieldRefs)));
826 :
827 : // Comparing a boolean column to 0 or 1 fails without explicit cast
828 172 : if (m_poFeatureDefn->GetFieldDefn(poNode->field_index)
829 86 : ->GetSubType() == OFSTBoolean)
830 : {
831 48 : expr = cp::call("cast", {expr},
832 64 : cp::CastOptions::Safe(arrow::uint8()));
833 : }
834 86 : return expr;
835 : }
836 18 : else if (poNode->field_index ==
837 12 : m_poFeatureDefn->GetFieldCount() + SPF_FID &&
838 3 : m_iFIDArrowColumn >= 0)
839 : {
840 : #ifdef SUPPORTS_INDICES_IN_FIELD_REF
841 2 : return cp::field_ref(arrow::FieldRef(m_iFIDArrowColumn));
842 : #else
843 : return cp::field_ref(arrow::FieldRef(
844 : m_poSchema->fields()[m_iFIDArrowColumn]->name()));
845 : #endif
846 : }
847 : }
848 :
849 218 : else if (poNode->eNodeType == SNT_CONSTANT)
850 : {
851 109 : switch (poNode->field_type)
852 : {
853 88 : case SWQ_INTEGER:
854 : case SWQ_INTEGER64:
855 176 : return cp::literal(static_cast<int64_t>(poNode->int_value));
856 :
857 11 : case SWQ_FLOAT:
858 11 : return cp::literal(poNode->float_value);
859 :
860 6 : case SWQ_STRING:
861 6 : return cp::literal(poNode->string_value);
862 :
863 4 : case SWQ_TIMESTAMP:
864 : {
865 : OGRField sField;
866 4 : if (OGRParseDate(poNode->string_value, &sField, 0))
867 : {
868 : struct tm brokenDown;
869 4 : brokenDown.tm_year = sField.Date.Year - 1900;
870 4 : brokenDown.tm_mon = sField.Date.Month - 1;
871 4 : brokenDown.tm_mday = sField.Date.Day;
872 4 : brokenDown.tm_hour = sField.Date.Hour;
873 4 : brokenDown.tm_min = sField.Date.Minute;
874 4 : brokenDown.tm_sec = static_cast<int>(sField.Date.Second);
875 : int64_t nVal =
876 4 : CPLYMDHMSToUnixTime(&brokenDown) * 1000 +
877 4 : (static_cast<int>(sField.Date.Second * 1000 + 0.5) %
878 4 : 1000);
879 4 : if (sField.Date.TZFlag > OGR_TZFLAG_MIXED_TZ)
880 : {
881 : // Convert for sField.Date.TZFlag to UTC
882 2 : const int TZOffset =
883 2 : (sField.Date.TZFlag - OGR_TZFLAG_UTC) * 15;
884 2 : const int TZOffsetMS = TZOffset * 60 * 1000;
885 2 : nVal -= TZOffsetMS;
886 4 : return cp::literal(arrow::TimestampScalar(
887 2 : nVal, arrow::TimeUnit::MILLI, "UTC"));
888 : }
889 : else
890 : {
891 4 : return cp::literal(arrow::TimestampScalar(
892 2 : nVal, arrow::TimeUnit::MILLI));
893 : }
894 : }
895 : }
896 :
897 : default:
898 0 : break;
899 : }
900 : }
901 :
902 208 : else if (poNode->eNodeType == SNT_OPERATION && poNode->nSubExprCount == 2 &&
903 99 : IsComparisonOp(poNode->nOperation))
904 : {
905 : const auto sLeft =
906 95 : BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated);
907 : const auto sRight =
908 95 : BuildArrowFilter(poNode->papoSubExpr[1], bFullyTranslated);
909 95 : if (sLeft.is_valid() && sRight.is_valid())
910 : {
911 93 : if (poNode->nOperation == SWQ_EQ)
912 61 : return cp::equal(sLeft, sRight);
913 32 : if (poNode->nOperation == SWQ_LT)
914 7 : return cp::less(sLeft, sRight);
915 25 : if (poNode->nOperation == SWQ_LE)
916 5 : return cp::less_equal(sLeft, sRight);
917 20 : if (poNode->nOperation == SWQ_GT)
918 5 : return cp::greater(sLeft, sRight);
919 15 : if (poNode->nOperation == SWQ_GE)
920 5 : return cp::greater_equal(sLeft, sRight);
921 10 : if (poNode->nOperation == SWQ_NE)
922 10 : return cp::not_equal(sLeft, sRight);
923 : }
924 : }
925 :
926 14 : else if (poNode->eNodeType == SNT_OPERATION && poNode->nSubExprCount == 2 &&
927 4 : (poNode->nOperation == SWQ_LIKE ||
928 1 : poNode->nOperation == SWQ_ILIKE) &&
929 4 : poNode->papoSubExpr[1]->eNodeType == SNT_CONSTANT &&
930 4 : poNode->papoSubExpr[1]->field_type == SWQ_STRING)
931 : {
932 : const auto sLeft =
933 4 : BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated);
934 4 : if (sLeft.is_valid())
935 : {
936 4 : if (cp::GetFunctionRegistry()
937 8 : ->GetFunction("match_like")
938 4 : .ValueOr(nullptr))
939 : {
940 : // match_like is only available is Arrow built against RE2.
941 : return cp::call(
942 : "match_like", {sLeft},
943 8 : cp::MatchSubstringOptions(
944 4 : poNode->papoSubExpr[1]->string_value,
945 12 : /* ignore_case=*/poNode->nOperation == SWQ_ILIKE));
946 : }
947 0 : }
948 : }
949 :
950 10 : else if (poNode->eNodeType == SNT_OPERATION &&
951 10 : poNode->nOperation == SWQ_ISNULL && poNode->nSubExprCount == 1)
952 : {
953 : const auto expr =
954 10 : BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated);
955 10 : if (expr.is_valid())
956 4 : return cp::is_null(expr);
957 : }
958 :
959 25 : bFullyTranslated = false;
960 25 : return {};
961 : }
962 :
963 : /************************************************************************/
964 : /* ReadNextBatch() */
965 : /************************************************************************/
966 :
967 1224 : bool OGRParquetDatasetLayer::ReadNextBatch()
968 : {
969 1224 : if (m_bRebuildScanner)
970 663 : BuildScanner();
971 :
972 1224 : m_nIdxInBatch = 0;
973 :
974 1224 : if (m_poRecordBatchReader == nullptr)
975 : {
976 721 : if (!m_poScanner)
977 0 : return false;
978 721 : auto result = m_poScanner->ToRecordBatchReader();
979 721 : if (!result.ok())
980 : {
981 0 : CPLError(CE_Failure, CPLE_AppDefined,
982 : "ToRecordBatchReader() failed: %s",
983 0 : result.status().message().c_str());
984 0 : return false;
985 : }
986 721 : m_poRecordBatchReader = *result;
987 721 : if (m_poRecordBatchReader == nullptr)
988 0 : return false;
989 : }
990 :
991 2448 : std::shared_ptr<arrow::RecordBatch> poNextBatch;
992 175 : do
993 : {
994 1399 : ++m_iRecordBatch;
995 :
996 1399 : poNextBatch.reset();
997 1399 : auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
998 1399 : if (!status.ok())
999 : {
1000 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
1001 0 : status.message().c_str());
1002 0 : poNextBatch.reset();
1003 : }
1004 1399 : if (poNextBatch == nullptr)
1005 : {
1006 557 : m_poBatch.reset();
1007 557 : return false;
1008 : }
1009 842 : } while (poNextBatch->num_rows() == 0);
1010 :
1011 : // CPLDebug("PARQUET", "Current batch has %d rows", int(poNextBatch->num_rows()));
1012 :
1013 667 : SetBatch(poNextBatch);
1014 :
1015 667 : return true;
1016 : }
1017 :
1018 : /************************************************************************/
1019 : /* GetNextFeature() */
1020 : /************************************************************************/
1021 :
1022 2120 : OGRFeature *OGRParquetDatasetLayer::GetNextFeature()
1023 : {
1024 : while (true)
1025 : {
1026 2120 : OGRFeature *poFeature = GetNextRawFeature();
1027 2120 : if (poFeature == nullptr)
1028 554 : return nullptr;
1029 :
1030 420 : if ((m_poFilterGeom == nullptr || m_bSkipFilterGeometry ||
1031 3477 : FilterGeometry(poFeature->GetGeometryRef())) &&
1032 1564 : (m_poAttrQuery == nullptr || m_bBaseArrowIgnoreAttributeFilter ||
1033 23 : m_poAttrQuery->Evaluate(poFeature)))
1034 : {
1035 1561 : return poFeature;
1036 : }
1037 : else
1038 5 : delete poFeature;
1039 5 : }
1040 : }
1041 :
1042 : /************************************************************************/
1043 : /* GetFeatureCount() */
1044 : /************************************************************************/
1045 :
1046 502 : GIntBig OGRParquetDatasetLayer::GetFeatureCount(int bForce)
1047 : {
1048 502 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
1049 : {
1050 39 : if (m_bRebuildScanner)
1051 33 : BuildScanner();
1052 39 : if (!m_poScanner)
1053 39 : return -1;
1054 39 : auto status = m_poScanner->CountRows();
1055 39 : if (status.ok())
1056 39 : return *status;
1057 : }
1058 463 : return OGRLayer::GetFeatureCount(bForce);
1059 : }
1060 :
1061 : /************************************************************************/
1062 : /* GetExtent() */
1063 : /************************************************************************/
1064 :
1065 3 : OGRErr OGRParquetDatasetLayer::GetExtent(OGREnvelope *psExtent, int bForce)
1066 : {
1067 3 : return GetExtent(0, psExtent, bForce);
1068 : }
1069 :
1070 : /************************************************************************/
1071 : /* FastGetExtent() */
1072 : /************************************************************************/
1073 :
1074 423 : bool OGRParquetDatasetLayer::FastGetExtent(int iGeomField,
1075 : OGREnvelope *psExtent) const
1076 : {
1077 423 : const auto oIter = m_oMapExtents.find(iGeomField);
1078 423 : if (oIter != m_oMapExtents.end())
1079 : {
1080 24 : *psExtent = oIter->second;
1081 24 : return true;
1082 : }
1083 :
1084 399 : return false;
1085 : }
1086 :
1087 : /************************************************************************/
1088 : /* GetExtent() */
1089 : /************************************************************************/
1090 :
1091 24 : OGRErr OGRParquetDatasetLayer::GetExtent(int iGeomField, OGREnvelope *psExtent,
1092 : int bForce)
1093 : {
1094 24 : if (iGeomField < 0 || iGeomField >= m_poFeatureDefn->GetGeomFieldCount())
1095 : {
1096 10 : if (iGeomField != 0)
1097 : {
1098 9 : CPLError(CE_Failure, CPLE_AppDefined,
1099 : "Invalid geometry field index : %d", iGeomField);
1100 : }
1101 10 : return OGRERR_FAILURE;
1102 : }
1103 :
1104 14 : if (FastGetExtent(iGeomField, psExtent))
1105 : {
1106 6 : return OGRERR_NONE;
1107 : }
1108 :
1109 : // bbox in general m_oMapGeometryColumns can not be trusted (at least at
1110 : // time of writing), so we have to iterate over each fragment.
1111 : const char *pszGeomFieldName =
1112 8 : m_poFeatureDefn->GetGeomFieldDefn(iGeomField)->GetNameRef();
1113 8 : auto oIter = m_oMapGeometryColumns.find(pszGeomFieldName);
1114 8 : if (oIter != m_oMapGeometryColumns.end())
1115 : {
1116 8 : auto statusFragments = m_poDataset->GetFragments();
1117 8 : if (statusFragments.ok())
1118 : {
1119 8 : *psExtent = OGREnvelope();
1120 8 : int nFragmentCount = 0;
1121 8 : int nBBoxFragmentCount = 0;
1122 18 : for (const auto &oFragmentStatus : *statusFragments)
1123 : {
1124 10 : if (oFragmentStatus.ok())
1125 : {
1126 : auto statusSchema =
1127 10 : (*oFragmentStatus)->ReadPhysicalSchema();
1128 10 : if (statusSchema.ok())
1129 : {
1130 10 : nFragmentCount++;
1131 10 : const auto &kv_metadata = (*statusSchema)->metadata();
1132 10 : if (kv_metadata && kv_metadata->Contains("geo"))
1133 : {
1134 20 : auto geo = kv_metadata->Get("geo");
1135 20 : CPLJSONDocument oDoc;
1136 10 : if (geo.ok() && oDoc.LoadMemory(*geo))
1137 : {
1138 20 : auto oRoot = oDoc.GetRoot();
1139 30 : auto oColumns = oRoot.GetObj("columns");
1140 30 : auto oCol = oColumns.GetObj(pszGeomFieldName);
1141 10 : OGREnvelope3D sFragmentExtent;
1142 20 : if (oCol.IsValid() &&
1143 10 : GetExtentFromMetadata(
1144 : oCol, &sFragmentExtent) == OGRERR_NONE)
1145 : {
1146 8 : nBBoxFragmentCount++;
1147 8 : psExtent->Merge(sFragmentExtent);
1148 : }
1149 : }
1150 : }
1151 10 : if (nFragmentCount != nBBoxFragmentCount)
1152 2 : break;
1153 : }
1154 : }
1155 : }
1156 8 : if (nFragmentCount == nBBoxFragmentCount)
1157 : {
1158 6 : m_oMapExtents[iGeomField] = *psExtent;
1159 6 : return OGRERR_NONE;
1160 : }
1161 : }
1162 : }
1163 :
1164 2 : return OGRParquetLayerBase::GetExtent(iGeomField, psExtent, bForce);
1165 : }
1166 :
1167 : /************************************************************************/
1168 : /* SetSpatialFilter() */
1169 : /************************************************************************/
1170 :
1171 492 : void OGRParquetDatasetLayer::SetSpatialFilter(int iGeomField,
1172 : OGRGeometry *poGeomIn)
1173 :
1174 : {
1175 492 : OGRParquetLayerBase::SetSpatialFilter(iGeomField, poGeomIn);
1176 492 : m_bRebuildScanner = true;
1177 :
1178 : // Full invalidation
1179 492 : InvalidateCachedBatches();
1180 492 : }
1181 :
1182 : /************************************************************************/
1183 : /* SetIgnoredFields() */
1184 : /************************************************************************/
1185 :
1186 103 : OGRErr OGRParquetDatasetLayer::SetIgnoredFields(CSLConstList papszFields)
1187 : {
1188 103 : m_bRebuildScanner = true;
1189 103 : m_aosProjectedFields.clear();
1190 103 : m_bIgnoredFields = false;
1191 103 : m_anMapFieldIndexToArrayIndex.clear();
1192 103 : m_anMapGeomFieldIndexToArrayIndex.clear();
1193 103 : m_nRequestedFIDColumn = -1;
1194 103 : OGRErr eErr = OGRParquetLayerBase::SetIgnoredFields(papszFields);
1195 103 : if (eErr == OGRERR_NONE)
1196 : {
1197 103 : m_bIgnoredFields = papszFields != nullptr && papszFields[0] != nullptr;
1198 103 : if (m_bIgnoredFields)
1199 : {
1200 68 : if (m_iFIDArrowColumn >= 0)
1201 : {
1202 1 : m_nRequestedFIDColumn =
1203 1 : static_cast<int>(m_aosProjectedFields.size());
1204 1 : m_aosProjectedFields.emplace_back(GetFIDColumn());
1205 : }
1206 :
1207 68 : const auto &fields = m_poSchema->fields();
1208 828 : for (int i = 0; i < m_poFeatureDefn->GetFieldCount(); ++i)
1209 : {
1210 : const auto &field =
1211 760 : fields[m_anMapFieldIndexToArrowColumn[i][0]];
1212 760 : const auto eArrowType = field->type()->id();
1213 760 : if (eArrowType == arrow::Type::STRUCT)
1214 : {
1215 : // For a struct, for the sake of simplicity in
1216 : // GetNextRawFeature(), as soon as one of the member if
1217 : // requested, request the struct field, so that the Arrow
1218 : // type doesn't change
1219 9 : bool bFoundNotIgnored = false;
1220 46 : for (int j = i; j < m_poFeatureDefn->GetFieldCount() &&
1221 46 : m_anMapFieldIndexToArrowColumn[i][0] ==
1222 23 : m_anMapFieldIndexToArrowColumn[j][0];
1223 : ++j)
1224 : {
1225 21 : if (!m_poFeatureDefn->GetFieldDefn(j)->IsIgnored())
1226 : {
1227 7 : bFoundNotIgnored = true;
1228 7 : break;
1229 : }
1230 : }
1231 9 : if (bFoundNotIgnored)
1232 : {
1233 : int j;
1234 98 : for (j = i; j < m_poFeatureDefn->GetFieldCount() &&
1235 98 : m_anMapFieldIndexToArrowColumn[i][0] ==
1236 49 : m_anMapFieldIndexToArrowColumn[j][0];
1237 : ++j)
1238 : {
1239 42 : if (!m_poFeatureDefn->GetFieldDefn(j)->IsIgnored())
1240 : {
1241 80 : m_anMapFieldIndexToArrayIndex.push_back(
1242 40 : static_cast<int>(
1243 40 : m_aosProjectedFields.size()));
1244 : }
1245 : else
1246 : {
1247 2 : m_anMapFieldIndexToArrayIndex.push_back(-1);
1248 : }
1249 : }
1250 7 : i = j - 1;
1251 :
1252 7 : m_aosProjectedFields.emplace_back(field->name());
1253 : }
1254 : else
1255 : {
1256 : int j;
1257 28 : for (j = i; j < m_poFeatureDefn->GetFieldCount() &&
1258 28 : m_anMapFieldIndexToArrowColumn[i][0] ==
1259 14 : m_anMapFieldIndexToArrowColumn[j][0];
1260 : ++j)
1261 : {
1262 12 : m_anMapFieldIndexToArrayIndex.push_back(-1);
1263 : }
1264 2 : i = j - 1;
1265 : }
1266 : }
1267 751 : else if (!m_poFeatureDefn->GetFieldDefn(i)->IsIgnored())
1268 : {
1269 1382 : m_anMapFieldIndexToArrayIndex.push_back(
1270 691 : static_cast<int>(m_aosProjectedFields.size()));
1271 691 : m_aosProjectedFields.emplace_back(field->name());
1272 : }
1273 : else
1274 : {
1275 60 : m_anMapFieldIndexToArrayIndex.push_back(-1);
1276 : }
1277 : }
1278 :
1279 135 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
1280 : {
1281 : const auto &field =
1282 67 : fields[m_anMapGeomFieldIndexToArrowColumn[i]];
1283 67 : if (!m_poFeatureDefn->GetGeomFieldDefn(i)->IsIgnored())
1284 : {
1285 124 : m_anMapGeomFieldIndexToArrayIndex.push_back(
1286 62 : static_cast<int>(m_aosProjectedFields.size()));
1287 62 : m_aosProjectedFields.emplace_back(field->name());
1288 : }
1289 : else
1290 : {
1291 5 : m_anMapGeomFieldIndexToArrayIndex.push_back(-1);
1292 : }
1293 : }
1294 : }
1295 : }
1296 :
1297 103 : m_nExpectedBatchColumns =
1298 103 : m_bIgnoredFields ? static_cast<int>(m_aosProjectedFields.size()) : -1;
1299 :
1300 : // Full invalidation
1301 103 : InvalidateCachedBatches();
1302 :
1303 103 : return eErr;
1304 : }
1305 :
1306 : /************************************************************************/
1307 : /* TestCapability() */
1308 : /************************************************************************/
1309 :
1310 234 : int OGRParquetDatasetLayer::TestCapability(const char *pszCap)
1311 : {
1312 234 : if (EQUAL(pszCap, OLCIgnoreFields))
1313 5 : return true;
1314 :
1315 229 : if (EQUAL(pszCap, OLCFastSpatialFilter))
1316 : {
1317 171 : if (m_iGeomFieldFilter >= 0 &&
1318 114 : m_iGeomFieldFilter < static_cast<int>(m_aeGeomEncoding.size()) &&
1319 57 : m_aeGeomEncoding[m_iGeomFieldFilter] ==
1320 : OGRArrowGeomEncoding::GEOARROW_STRUCT_POINT)
1321 : {
1322 8 : return true;
1323 : }
1324 : // fallback to base method
1325 : }
1326 :
1327 221 : return OGRParquetLayerBase::TestCapability(pszCap);
1328 : }
1329 :
1330 : /***********************************************************************/
1331 : /* SetAttributeFilter() */
1332 : /***********************************************************************/
1333 :
1334 191 : OGRErr OGRParquetDatasetLayer::SetAttributeFilter(const char *pszFilter)
1335 : {
1336 191 : m_bRebuildScanner = true;
1337 191 : return OGRParquetLayerBase::SetAttributeFilter(pszFilter);
1338 : }
|