Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "cpl_json.h"
14 : #include "cpl_time.h"
15 : #include "gdal_pam.h"
16 : #include "ogrsf_frmts.h"
17 : #include "ogr_p.h"
18 :
19 : #include <cinttypes>
20 : #include <limits>
21 : #include <map>
22 : #include <set>
23 : #include <utility>
24 :
25 : #include "ogr_feather.h"
26 :
27 : #include "../arrow_common/ograrrowlayer.hpp"
28 : #include "../arrow_common/ograrrowdataset.hpp"
29 :
30 : /************************************************************************/
31 : /* OGRFeatherLayer() */
32 : /************************************************************************/
33 :
34 535 : OGRFeatherLayer::OGRFeatherLayer(
35 : OGRFeatherDataset *poDS, const char *pszLayerName,
36 535 : std::shared_ptr<arrow::ipc::RecordBatchFileReader> &poRecordBatchFileReader)
37 : : OGRArrowLayer(poDS, pszLayerName), m_poDS(poDS),
38 535 : m_poRecordBatchFileReader(poRecordBatchFileReader)
39 : {
40 535 : EstablishFeatureDefn();
41 535 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
42 : m_poFeatureDefn->GetGeomFieldCount());
43 535 : }
44 :
45 : /************************************************************************/
46 : /* OGRFeatherLayer() */
47 : /************************************************************************/
48 :
49 10 : OGRFeatherLayer::OGRFeatherLayer(
50 : OGRFeatherDataset *poDS, const char *pszLayerName,
51 : std::shared_ptr<arrow::io::RandomAccessFile> poFile, bool bSeekable,
52 : const arrow::ipc::IpcReadOptions &oOptions,
53 : std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
54 10 : &poRecordBatchStreamReader)
55 : : OGRArrowLayer(poDS, pszLayerName), m_poDS(poDS),
56 10 : m_poFile(std::move(poFile)), m_bSeekable(bSeekable), m_oOptions(oOptions),
57 10 : m_poRecordBatchReader(poRecordBatchStreamReader)
58 : {
59 10 : EstablishFeatureDefn();
60 10 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
61 : m_poFeatureDefn->GetGeomFieldCount());
62 10 : }
63 :
64 : /************************************************************************/
65 : /* GetDataset() */
66 : /************************************************************************/
67 :
68 112 : GDALDataset *OGRFeatherLayer::GetDataset()
69 : {
70 112 : return m_poDS;
71 : }
72 :
73 : /************************************************************************/
74 : /* LoadGeoMetadata() */
75 : /************************************************************************/
76 :
77 545 : void OGRFeatherLayer::LoadGeoMetadata(
78 : const arrow::KeyValueMetadata *kv_metadata, const std::string &key)
79 : {
80 545 : if (kv_metadata && kv_metadata->Contains(key))
81 : {
82 1060 : auto geo = kv_metadata->Get(key);
83 530 : if (geo.ok())
84 : {
85 1060 : CPLJSONDocument oDoc;
86 530 : if (oDoc.LoadMemory(*geo))
87 : {
88 1060 : auto oRoot = oDoc.GetRoot();
89 1590 : const auto osVersion = oRoot.GetString("schema_version");
90 530 : if (key != GDAL_GEO_FOOTER_KEY && osVersion != "0.1.0")
91 : {
92 397 : CPLDebug("FEATHER",
93 : "schema_version = %s not explicitly handled by "
94 : "the driver",
95 : osVersion.c_str());
96 : }
97 1590 : auto oColumns = oRoot.GetObj("columns");
98 530 : if (oColumns.IsValid())
99 : {
100 1060 : for (const auto &oColumn : oColumns.GetChildren())
101 : {
102 530 : m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
103 : }
104 : }
105 : }
106 : else
107 : {
108 0 : CPLError(CE_Warning, CPLE_AppDefined,
109 : "Cannot parse 'geo' metadata");
110 : }
111 : }
112 : }
113 545 : }
114 :
115 : /************************************************************************/
116 : /* EstablishFeatureDefn() */
117 : /************************************************************************/
118 :
119 545 : void OGRFeatherLayer::EstablishFeatureDefn()
120 : {
121 1090 : m_poSchema = m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
122 1090 : : m_poRecordBatchReader->schema();
123 545 : const auto &kv_metadata = m_poSchema->metadata();
124 :
125 : #ifdef DEBUG
126 545 : if (kv_metadata)
127 : {
128 1073 : for (const auto &keyValue : kv_metadata->sorted_pairs())
129 : {
130 538 : CPLDebug("FEATHER", "%s = %s", keyValue.first.c_str(),
131 : keyValue.second.c_str());
132 : }
133 : }
134 : #endif
135 :
136 : auto poFooterMetadata = m_poRecordBatchFileReader
137 535 : ? m_poRecordBatchFileReader->metadata()
138 1625 : : nullptr;
139 669 : if (poFooterMetadata && poFooterMetadata->Contains(GDAL_GEO_FOOTER_KEY) &&
140 124 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_READ_GDAL_FOOTER", "YES")))
141 : {
142 124 : LoadGeoMetadata(poFooterMetadata.get(), GDAL_GEO_FOOTER_KEY);
143 : }
144 : else
145 : {
146 421 : LoadGeoMetadata(kv_metadata.get(), "geo");
147 : }
148 : const auto oMapFieldNameToGDALSchemaFieldDefn =
149 1090 : LoadGDALSchema(kv_metadata.get());
150 :
151 545 : const auto &fields = m_poSchema->fields();
152 4082 : for (int i = 0; i < m_poSchema->num_fields(); ++i)
153 : {
154 3537 : const auto &field = fields[i];
155 3537 : const auto &fieldName = field->name();
156 :
157 3537 : const auto &field_kv_metadata = field->metadata();
158 3537 : std::string osExtensionName;
159 3537 : std::string osExtensionMetadata;
160 3537 : if (field->type()->id() == arrow::Type::EXTENSION)
161 : {
162 : osExtensionName =
163 127 : cpl::down_cast<arrow::ExtensionType *>(field->type().get())
164 127 : ->extension_name();
165 : }
166 3410 : else if (field_kv_metadata)
167 : {
168 : auto extension_name =
169 1040 : field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
170 520 : if (extension_name.ok())
171 : {
172 518 : osExtensionName = *extension_name;
173 : }
174 :
175 : auto extension_metadata =
176 1040 : field_kv_metadata->Get(ARROW_EXTENSION_METADATA_KEY);
177 520 : if (extension_metadata.ok())
178 : {
179 384 : osExtensionMetadata = *extension_metadata;
180 : }
181 : #ifdef DEBUG
182 520 : CPLDebug("FEATHER", "Metadata field %s:", fieldName.c_str());
183 1424 : for (const auto &keyValue : field_kv_metadata->sorted_pairs())
184 : {
185 904 : CPLDebug("FEATHER", " %s = %s", keyValue.first.c_str(),
186 : keyValue.second.c_str());
187 : }
188 : #endif
189 : }
190 :
191 3537 : if (!m_osFIDColumn.empty() && fieldName == m_osFIDColumn)
192 : {
193 6 : m_iFIDArrowColumn = i;
194 6 : continue;
195 : }
196 :
197 3531 : bool bRegularField = true;
198 3531 : auto oIter = m_oMapGeometryColumns.find(fieldName);
199 3531 : if (oIter != m_oMapGeometryColumns.end() || !osExtensionName.empty())
200 : {
201 1320 : CPLJSONObject oJSONDef;
202 660 : if (oIter != m_oMapGeometryColumns.end())
203 530 : oJSONDef = oIter->second;
204 1980 : auto osEncoding = oJSONDef.GetString("encoding");
205 660 : if (osEncoding.empty() && !osExtensionName.empty())
206 130 : osEncoding = osExtensionName;
207 :
208 660 : OGRwkbGeometryType eGeomType = wkbUnknown;
209 660 : auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
210 660 : if (IsValidGeometryEncoding(field, osEncoding,
211 1320 : oIter != m_oMapGeometryColumns.end(),
212 : eGeomType, eGeomEncoding))
213 : {
214 531 : bRegularField = false;
215 1062 : OGRGeomFieldDefn oField(fieldName.c_str(), wkbUnknown);
216 :
217 1593 : auto osCRS = oJSONDef.GetString("crs");
218 :
219 : #if ARROW_VERSION_MAJOR >= 21
220 : if (osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
221 : osExtensionMetadata.empty() &&
222 : field->type()->id() == arrow::Type::EXTENSION)
223 : {
224 : const auto arrowWkb =
225 : std::dynamic_pointer_cast<OGRGeoArrowWkbExtensionType>(
226 : field->type());
227 : if (arrowWkb)
228 : {
229 : osExtensionMetadata = arrowWkb->Serialize();
230 : }
231 : }
232 : #endif
233 :
234 926 : if (osCRS.empty() &&
235 395 : osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
236 160 : !osExtensionMetadata.empty() &&
237 926 : osExtensionMetadata[0] == '{' &&
238 0 : osExtensionMetadata.back() == '}')
239 : {
240 0 : CPLJSONDocument oDoc;
241 0 : if (oDoc.LoadMemory(osExtensionMetadata))
242 : {
243 0 : auto jCrs = oDoc.GetRoot()["crs"];
244 0 : if (jCrs.GetType() == CPLJSONObject::Type::Object)
245 : {
246 : osCRS =
247 0 : jCrs.Format(CPLJSONObject::PrettyFormat::Plain);
248 : }
249 0 : else if (jCrs.GetType() == CPLJSONObject::Type::String)
250 : {
251 0 : osCRS = jCrs.ToString();
252 : }
253 0 : if (oDoc.GetRoot()["edges"].ToString() == "spherical")
254 : {
255 0 : SetMetadataItem("EDGES", "SPHERICAL");
256 : }
257 : }
258 : }
259 :
260 531 : if (osCRS.empty())
261 : {
262 : #if 0
263 : CPLError(CE_Warning, CPLE_AppDefined,
264 : "Missing required 'crs' field for geometry column %s",
265 : fieldName.c_str());
266 : #endif
267 : }
268 : else
269 : {
270 136 : OGRSpatialReference *poSRS = new OGRSpatialReference();
271 136 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
272 :
273 136 : if (poSRS->SetFromUserInput(
274 : osCRS.c_str(),
275 : OGRSpatialReference::
276 136 : SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
277 : OGRERR_NONE)
278 : {
279 : const char *pszAuthName =
280 136 : poSRS->GetAuthorityName(nullptr);
281 : const char *pszAuthCode =
282 136 : poSRS->GetAuthorityCode(nullptr);
283 136 : if (pszAuthName && pszAuthCode &&
284 136 : EQUAL(pszAuthName, "OGC") &&
285 0 : EQUAL(pszAuthCode, "CRS84"))
286 : {
287 0 : poSRS->importFromEPSG(4326);
288 : }
289 :
290 136 : const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
291 136 : if (dfCoordEpoch > 0)
292 2 : poSRS->SetCoordinateEpoch(dfCoordEpoch);
293 :
294 136 : oField.SetSpatialRef(poSRS);
295 : }
296 136 : poSRS->Release();
297 : }
298 :
299 : // m_aeGeomEncoding be filled before calling
300 : // ComputeGeometryColumnType()
301 531 : m_aeGeomEncoding.push_back(eGeomEncoding);
302 531 : if (eGeomType == wkbUnknown)
303 : {
304 705 : auto osType = oJSONDef.GetString("geometry_type");
305 235 : if (osType.empty())
306 235 : osType = oJSONDef.GetString("gdal:geometry_type");
307 470 : if (m_bSeekable && osType.empty() &&
308 235 : CPLTestBool(CPLGetConfigOption(
309 : "OGR_ARROW_COMPUTE_GEOMETRY_TYPE", "YES")))
310 : {
311 235 : eGeomType = ComputeGeometryColumnType(
312 235 : m_poFeatureDefn->GetGeomFieldCount(), i);
313 235 : if (m_poRecordBatchReader)
314 0 : ResetRecordBatchReader();
315 : }
316 : else
317 0 : eGeomType = GetGeometryTypeFromString(osType);
318 : }
319 :
320 531 : oField.SetType(eGeomType);
321 531 : oField.SetNullable(field->nullable());
322 531 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
323 531 : m_anMapGeomFieldIndexToArrowColumn.push_back(i);
324 : }
325 : }
326 :
327 3531 : if (bRegularField)
328 : {
329 3000 : CreateFieldFromSchema(field, {i},
330 : oMapFieldNameToGDALSchemaFieldDefn);
331 : }
332 : }
333 :
334 545 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
335 : m_poFeatureDefn->GetFieldCount());
336 545 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
337 : m_poFeatureDefn->GetGeomFieldCount());
338 545 : }
339 :
340 : /************************************************************************/
341 : /* ResetRecordBatchReader() */
342 : /************************************************************************/
343 :
344 12 : bool OGRFeatherLayer::ResetRecordBatchReader()
345 : {
346 12 : const auto nPos = *(m_poFile->Tell());
347 12 : CPL_IGNORE_RET_VAL(m_poFile->Seek(0));
348 : auto result =
349 24 : arrow::ipc::RecordBatchStreamReader::Open(m_poFile, m_oOptions);
350 12 : if (!result.ok())
351 : {
352 0 : CPLError(CE_Failure, CPLE_AppDefined,
353 : "RecordBatchStreamReader::Open() failed with %s",
354 0 : result.status().message().c_str());
355 0 : CPL_IGNORE_RET_VAL(m_poFile->Seek(nPos));
356 0 : return false;
357 : }
358 : else
359 : {
360 12 : m_poRecordBatchReader = *result;
361 12 : return true;
362 : }
363 : }
364 :
365 : /************************************************************************/
366 : /* ComputeGeometryColumnType() */
367 : /************************************************************************/
368 :
369 235 : OGRwkbGeometryType OGRFeatherLayer::ComputeGeometryColumnType(int iGeomCol,
370 : int iCol) const
371 : {
372 : // Compute type of geometry column by iterating over each geometry, and
373 : // looking at the WKB geometry type in the first 5 bytes of each geometry.
374 :
375 235 : OGRwkbGeometryType eGeomType = wkbNone;
376 :
377 235 : if (m_poRecordBatchReader != nullptr)
378 : {
379 0 : std::shared_ptr<arrow::RecordBatch> poBatch;
380 : while (true)
381 : {
382 0 : auto status = m_poRecordBatchReader->ReadNext(&poBatch);
383 0 : if (!status.ok())
384 : {
385 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
386 0 : status.message().c_str());
387 0 : break;
388 : }
389 0 : else if (!poBatch)
390 0 : break;
391 0 : eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
392 : iCol, eGeomType);
393 0 : if (eGeomType == wkbUnknown)
394 0 : break;
395 0 : }
396 : }
397 : else
398 : {
399 470 : for (int iBatch = 0;
400 470 : iBatch < m_poRecordBatchFileReader->num_record_batches(); ++iBatch)
401 : {
402 235 : auto result = m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
403 235 : if (!result.ok())
404 : {
405 0 : CPLError(CE_Failure, CPLE_AppDefined,
406 : "ReadRecordBatch() failed: %s",
407 0 : result.status().message().c_str());
408 0 : break;
409 : }
410 235 : eGeomType = ComputeGeometryColumnTypeProcessBatch(*result, iGeomCol,
411 : iCol, eGeomType);
412 235 : if (eGeomType == wkbUnknown)
413 0 : break;
414 : }
415 : }
416 :
417 235 : return eGeomType == wkbNone ? wkbUnknown : eGeomType;
418 : }
419 :
420 : /************************************************************************/
421 : /* BuildDomain() */
422 : /************************************************************************/
423 :
424 : std::unique_ptr<OGRFieldDomain>
425 19 : OGRFeatherLayer::BuildDomain(const std::string &osDomainName,
426 : int iFieldIndex) const
427 : {
428 19 : const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
429 19 : CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
430 : arrow::Type::DICTIONARY);
431 :
432 19 : if (m_poRecordBatchReader)
433 : {
434 6 : if (m_poBatch)
435 : {
436 6 : return BuildDomainFromBatch(osDomainName, m_poBatch, iArrowCol);
437 : }
438 : }
439 13 : else if (m_poRecordBatchFileReader)
440 : {
441 13 : auto result = m_poRecordBatchFileReader->ReadRecordBatch(0);
442 13 : if (!result.ok())
443 : {
444 0 : CPLError(CE_Failure, CPLE_AppDefined,
445 : "ReadRecordBatch() failed: %s",
446 0 : result.status().message().c_str());
447 : }
448 13 : auto poBatch = *result;
449 13 : if (poBatch)
450 : {
451 13 : return BuildDomainFromBatch(osDomainName, poBatch, iArrowCol);
452 : }
453 : }
454 :
455 0 : return nullptr;
456 : }
457 :
458 : /************************************************************************/
459 : /* ResetReading() */
460 : /************************************************************************/
461 :
462 820 : void OGRFeatherLayer::ResetReading()
463 : {
464 820 : if (m_poRecordBatchReader != nullptr && m_iRecordBatch > 0)
465 : {
466 17 : if (m_iRecordBatch == 1 && m_poBatchIdx1)
467 : {
468 : // do nothing
469 : }
470 : else
471 : {
472 16 : m_bResetRecordBatchReaderAsked = true;
473 : }
474 : }
475 820 : OGRArrowLayer::ResetReading();
476 820 : }
477 :
478 : /************************************************************************/
479 : /* ReadNextBatch() */
480 : /************************************************************************/
481 :
482 1006 : bool OGRFeatherLayer::ReadNextBatch()
483 : {
484 1006 : if (m_poRecordBatchFileReader == nullptr)
485 : {
486 119 : return ReadNextBatchStream();
487 : }
488 : else
489 : {
490 887 : return ReadNextBatchFile();
491 : }
492 : }
493 :
494 : /************************************************************************/
495 : /* ReadNextBatchFile() */
496 : /************************************************************************/
497 :
498 887 : bool OGRFeatherLayer::ReadNextBatchFile()
499 : {
500 : while (true)
501 : {
502 887 : ++m_iRecordBatch;
503 887 : if (m_iRecordBatch == m_poRecordBatchFileReader->num_record_batches())
504 : {
505 449 : if (m_iRecordBatch == 1)
506 446 : m_iRecordBatch = 0;
507 : else
508 3 : m_poBatch.reset();
509 449 : return false;
510 : }
511 :
512 438 : m_nIdxInBatch = 0;
513 :
514 : auto result =
515 438 : m_poRecordBatchFileReader->ReadRecordBatch(m_iRecordBatch);
516 438 : if (!result.ok())
517 : {
518 0 : CPLError(CE_Failure, CPLE_AppDefined,
519 : "ReadRecordBatch() failed: %s",
520 0 : result.status().message().c_str());
521 0 : m_poBatch.reset();
522 0 : return false;
523 : }
524 438 : if ((*result)->num_rows() != 0)
525 : {
526 438 : SetBatch(*result);
527 438 : break;
528 : }
529 0 : }
530 :
531 438 : return true;
532 : }
533 :
534 : /************************************************************************/
535 : /* ReadNextBatchStream() */
536 : /************************************************************************/
537 :
538 154 : bool OGRFeatherLayer::ReadNextBatchStream()
539 : {
540 154 : m_nIdxInBatch = 0;
541 :
542 308 : std::shared_ptr<arrow::RecordBatch> poNextBatch;
543 0 : do
544 : {
545 154 : if (m_iRecordBatch == 0 && m_poBatchIdx0)
546 : {
547 1 : SetBatch(m_poBatchIdx0);
548 1 : m_iRecordBatch = 1;
549 101 : return true;
550 : }
551 :
552 153 : else if (m_iRecordBatch == 1 && m_poBatchIdx1)
553 : {
554 1 : SetBatch(m_poBatchIdx1);
555 1 : m_iRecordBatch = 2;
556 1 : return true;
557 : }
558 :
559 152 : else if (m_bSingleBatch)
560 : {
561 81 : CPLAssert(m_iRecordBatch == 0);
562 81 : CPLAssert(m_poBatch != nullptr);
563 81 : return false;
564 : }
565 :
566 71 : if (m_bResetRecordBatchReaderAsked)
567 : {
568 13 : if (!m_bSeekable)
569 : {
570 1 : CPLError(CE_Failure, CPLE_NotSupported,
571 : "Attempting to rewind non-seekable stream");
572 1 : return false;
573 : }
574 12 : if (!ResetRecordBatchReader())
575 0 : return false;
576 12 : m_bResetRecordBatchReaderAsked = false;
577 : }
578 :
579 70 : CPLAssert(m_poRecordBatchReader);
580 :
581 70 : ++m_iRecordBatch;
582 :
583 70 : poNextBatch.reset();
584 70 : auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
585 70 : if (!status.ok())
586 : {
587 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
588 0 : status.message().c_str());
589 0 : poNextBatch.reset();
590 : }
591 70 : if (poNextBatch == nullptr)
592 : {
593 17 : if (m_iRecordBatch == 1)
594 : {
595 3 : m_iRecordBatch = 0;
596 3 : m_bSingleBatch = true;
597 : }
598 : else
599 : {
600 14 : m_poBatch.reset();
601 14 : m_poBatchColumns.clear();
602 : }
603 17 : return false;
604 : }
605 53 : } while (poNextBatch->num_rows() == 0);
606 :
607 53 : SetBatch(poNextBatch);
608 :
609 53 : return true;
610 : }
611 :
612 : /************************************************************************/
613 : /* TryToCacheFirstTwoBatches() */
614 : /************************************************************************/
615 :
616 1 : void OGRFeatherLayer::TryToCacheFirstTwoBatches()
617 : {
618 2 : if (m_poRecordBatchReader != nullptr && m_iRecordBatch <= 0 &&
619 2 : !m_bSingleBatch && m_poBatchIdx0 == nullptr)
620 : {
621 1 : ResetReading();
622 1 : if (!m_poBatch)
623 : {
624 0 : CPL_IGNORE_RET_VAL(ReadNextBatchStream());
625 : }
626 1 : if (m_poBatch)
627 : {
628 2 : auto poBatchIdx0 = m_poBatch;
629 1 : if (ReadNextBatchStream())
630 : {
631 1 : CPLAssert(m_iRecordBatch == 1);
632 1 : m_poBatchIdx0 = poBatchIdx0;
633 1 : m_poBatchIdx1 = m_poBatch;
634 1 : SetBatch(poBatchIdx0);
635 1 : ResetReading();
636 : }
637 1 : ResetReading();
638 : }
639 : }
640 1 : }
641 :
642 : /************************************************************************/
643 : /* CanPostFilterArrowArray() */
644 : /************************************************************************/
645 :
646 20 : bool OGRFeatherLayer::CanPostFilterArrowArray(
647 : const struct ArrowSchema *schema) const
648 : {
649 20 : if (m_poRecordBatchReader)
650 10 : return false;
651 10 : return OGRArrowLayer::CanPostFilterArrowArray(schema);
652 : }
653 :
654 : /************************************************************************/
655 : /* InvalidateCachedBatches() */
656 : /************************************************************************/
657 :
658 109 : void OGRFeatherLayer::InvalidateCachedBatches()
659 : {
660 109 : if (m_poRecordBatchFileReader)
661 : {
662 63 : m_iRecordBatch = -1;
663 63 : ResetReading();
664 : }
665 109 : }
666 :
667 : /************************************************************************/
668 : /* GetFeatureCount() */
669 : /************************************************************************/
670 :
671 336 : GIntBig OGRFeatherLayer::GetFeatureCount(int bForce)
672 : {
673 628 : if (m_poRecordBatchFileReader != nullptr && m_poAttrQuery == nullptr &&
674 292 : m_poFilterGeom == nullptr)
675 : {
676 288 : auto result = m_poRecordBatchFileReader->CountRows();
677 288 : if (result.ok())
678 288 : return *result;
679 : }
680 48 : else if (m_poRecordBatchReader != nullptr)
681 : {
682 36 : if (!m_bSeekable && !bForce)
683 : {
684 1 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
685 : {
686 1 : TryToCacheFirstTwoBatches();
687 : }
688 :
689 1 : if (!m_bSingleBatch)
690 : {
691 1 : CPLError(
692 : CE_Failure, CPLE_AppDefined,
693 : "GetFeatureCount() cannot be run in non-forced mode on "
694 : "a non-seekable file made of several batches");
695 1 : return -1;
696 : }
697 : }
698 :
699 35 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
700 : {
701 23 : GIntBig nFeatures = 0;
702 23 : ResetReading();
703 23 : if (!m_poBatch)
704 3 : ReadNextBatchStream();
705 31 : while (m_poBatch)
706 : {
707 31 : nFeatures += m_poBatch->num_rows();
708 31 : if (!ReadNextBatchStream())
709 23 : break;
710 : }
711 23 : ResetReading();
712 23 : return nFeatures;
713 : }
714 : }
715 24 : return OGRLayer::GetFeatureCount(bForce);
716 : }
717 :
718 : /************************************************************************/
719 : /* CanRunNonForcedGetExtent() */
720 : /************************************************************************/
721 :
722 0 : bool OGRFeatherLayer::CanRunNonForcedGetExtent()
723 : {
724 0 : if (m_bSeekable)
725 0 : return true;
726 0 : TryToCacheFirstTwoBatches();
727 0 : if (!m_bSingleBatch)
728 : {
729 0 : CPLError(CE_Failure, CPLE_AppDefined,
730 : "GetExtent() cannot be run in non-forced mode on "
731 : "a non-seekable file made of several batches");
732 0 : return false;
733 : }
734 0 : return true;
735 : }
736 :
737 : /************************************************************************/
738 : /* TestCapability() */
739 : /************************************************************************/
740 :
741 292 : int OGRFeatherLayer::TestCapability(const char *pszCap)
742 : {
743 292 : if (EQUAL(pszCap, OLCFastFeatureCount))
744 : {
745 28 : return m_bSeekable && m_poAttrQuery == nullptr &&
746 28 : m_poFilterGeom == nullptr;
747 : }
748 :
749 274 : if (EQUAL(pszCap, OLCMeasuredGeometries))
750 16 : return true;
751 258 : if (EQUAL(pszCap, OLCZGeometries))
752 12 : return true;
753 :
754 246 : return OGRArrowLayer::TestCapability(pszCap);
755 : }
756 :
757 : /************************************************************************/
758 : /* GetMetadataItem() */
759 : /************************************************************************/
760 :
761 259 : const char *OGRFeatherLayer::GetMetadataItem(const char *pszName,
762 : const char *pszDomain)
763 : {
764 : // Mostly for unit test purposes
765 259 : if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_"))
766 : {
767 9 : if (EQUAL(pszName, "FORMAT"))
768 : {
769 5 : return m_poRecordBatchFileReader ? "FILE" : "STREAM";
770 : }
771 4 : if (m_poRecordBatchFileReader != nullptr)
772 : {
773 4 : int iBatch = -1;
774 4 : if (EQUAL(pszName, "NUM_RECORD_BATCHES"))
775 : {
776 1 : return CPLSPrintf(
777 5 : "%d", m_poRecordBatchFileReader->num_record_batches());
778 : }
779 6 : else if (sscanf(pszName, "RECORD_BATCHES[%d]", &iBatch) == 1 &&
780 3 : strstr(pszName, ".NUM_ROWS"))
781 : {
782 : auto result =
783 6 : m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
784 3 : if (!result.ok())
785 : {
786 0 : return nullptr;
787 : }
788 3 : return CPLSPrintf("%" PRId64, (*result)->num_rows());
789 : }
790 : }
791 0 : return nullptr;
792 : }
793 250 : else if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
794 : {
795 : const auto kv_metadata =
796 5 : (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
797 8 : : m_poRecordBatchReader->schema())
798 10 : ->metadata();
799 5 : if (kv_metadata && kv_metadata->Contains(pszName))
800 : {
801 5 : auto metadataItem = kv_metadata->Get(pszName);
802 5 : if (metadataItem.ok())
803 : {
804 5 : return CPLSPrintf("%s", metadataItem->c_str());
805 : }
806 : }
807 0 : return nullptr;
808 : }
809 476 : else if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
810 231 : EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
811 : {
812 2 : const auto kv_metadata = m_poRecordBatchFileReader->metadata();
813 1 : if (kv_metadata && kv_metadata->Contains(pszName))
814 : {
815 1 : auto metadataItem = kv_metadata->Get(pszName);
816 1 : if (metadataItem.ok())
817 : {
818 1 : return CPLSPrintf("%s", metadataItem->c_str());
819 : }
820 : }
821 0 : return nullptr;
822 : }
823 244 : return OGRLayer::GetMetadataItem(pszName, pszDomain);
824 : }
825 :
826 : /************************************************************************/
827 : /* GetMetadata() */
828 : /************************************************************************/
829 :
830 34 : char **OGRFeatherLayer::GetMetadata(const char *pszDomain)
831 : {
832 : // Mostly for unit test purposes
833 34 : if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
834 : {
835 5 : m_aosFeatherMetadata.Clear();
836 : const auto kv_metadata =
837 5 : (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
838 8 : : m_poRecordBatchReader->schema())
839 10 : ->metadata();
840 5 : if (kv_metadata)
841 : {
842 11 : for (const auto &kv : kv_metadata->sorted_pairs())
843 : {
844 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
845 6 : kv.second.c_str());
846 : }
847 : }
848 5 : return m_aosFeatherMetadata.List();
849 : }
850 41 : if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
851 12 : EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
852 : {
853 2 : m_aosFeatherMetadata.Clear();
854 4 : const auto kv_metadata = m_poRecordBatchFileReader->metadata();
855 2 : if (kv_metadata)
856 : {
857 3 : for (const auto &kv : kv_metadata->sorted_pairs())
858 : {
859 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
860 1 : kv.second.c_str());
861 : }
862 : }
863 2 : return m_aosFeatherMetadata.List();
864 : }
865 27 : return OGRLayer::GetMetadata(pszDomain);
866 : }
|