Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "cpl_json.h"
14 : #include "cpl_time.h"
15 : #include "gdal_pam.h"
16 : #include "ogrsf_frmts.h"
17 : #include "ogr_p.h"
18 :
19 : #include <cinttypes>
20 : #include <limits>
21 : #include <map>
22 : #include <set>
23 : #include <utility>
24 :
25 : #include "ogr_feather.h"
26 :
27 : #include "../arrow_common/ograrrowlayer.hpp"
28 : #include "../arrow_common/ograrrowdataset.hpp"
29 :
30 : /************************************************************************/
31 : /* OGRFeatherLayer() */
32 : /************************************************************************/
33 :
34 538 : OGRFeatherLayer::OGRFeatherLayer(
35 : OGRFeatherDataset *poDS, const char *pszLayerName,
36 : std::shared_ptr<arrow::ipc::RecordBatchFileReader> &poRecordBatchFileReader,
37 538 : CSLConstList papszOpenOptions)
38 : : OGRArrowLayer(poDS, pszLayerName,
39 538 : CPLTestBool(CSLFetchNameValueDef(
40 : papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
41 1076 : m_poDS(poDS), m_poRecordBatchFileReader(poRecordBatchFileReader)
42 : {
43 538 : EstablishFeatureDefn();
44 538 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
45 : m_poFeatureDefn->GetGeomFieldCount());
46 538 : }
47 :
48 : /************************************************************************/
49 : /* OGRFeatherLayer() */
50 : /************************************************************************/
51 :
52 10 : OGRFeatherLayer::OGRFeatherLayer(
53 : OGRFeatherDataset *poDS, const char *pszLayerName,
54 : std::shared_ptr<arrow::io::RandomAccessFile> poFile, bool bSeekable,
55 : const arrow::ipc::IpcReadOptions &oOptions,
56 : std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
57 : &poRecordBatchStreamReader,
58 10 : CSLConstList papszOpenOptions)
59 : : OGRArrowLayer(poDS, pszLayerName,
60 10 : CPLTestBool(CSLFetchNameValueDef(
61 : papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
62 10 : m_poDS(poDS), m_poFile(std::move(poFile)), m_bSeekable(bSeekable),
63 20 : m_oOptions(oOptions), m_poRecordBatchReader(poRecordBatchStreamReader)
64 : {
65 10 : EstablishFeatureDefn();
66 10 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
67 : m_poFeatureDefn->GetGeomFieldCount());
68 10 : }
69 :
70 : /************************************************************************/
71 : /* GetDataset() */
72 : /************************************************************************/
73 :
74 112 : GDALDataset *OGRFeatherLayer::GetDataset()
75 : {
76 112 : return m_poDS;
77 : }
78 :
79 : /************************************************************************/
80 : /* LoadGeoMetadata() */
81 : /************************************************************************/
82 :
83 548 : void OGRFeatherLayer::LoadGeoMetadata(
84 : const arrow::KeyValueMetadata *kv_metadata, const std::string &key)
85 : {
86 548 : if (kv_metadata && kv_metadata->Contains(key))
87 : {
88 1066 : auto geo = kv_metadata->Get(key);
89 533 : if (geo.ok())
90 : {
91 1066 : CPLJSONDocument oDoc;
92 533 : if (oDoc.LoadMemory(*geo))
93 : {
94 1066 : auto oRoot = oDoc.GetRoot();
95 1599 : const auto osVersion = oRoot.GetString("schema_version");
96 533 : if (key != GDAL_GEO_FOOTER_KEY && osVersion != "0.1.0")
97 : {
98 398 : CPLDebug("FEATHER",
99 : "schema_version = %s not explicitly handled by "
100 : "the driver",
101 : osVersion.c_str());
102 : }
103 1599 : auto oColumns = oRoot.GetObj("columns");
104 533 : if (oColumns.IsValid())
105 : {
106 1066 : for (const auto &oColumn : oColumns.GetChildren())
107 : {
108 533 : m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
109 : }
110 : }
111 : }
112 : else
113 : {
114 0 : CPLError(CE_Warning, CPLE_AppDefined,
115 : "Cannot parse 'geo' metadata");
116 : }
117 : }
118 : }
119 548 : }
120 :
121 : /************************************************************************/
122 : /* EstablishFeatureDefn() */
123 : /************************************************************************/
124 :
125 548 : void OGRFeatherLayer::EstablishFeatureDefn()
126 : {
127 1096 : m_poSchema = m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
128 1096 : : m_poRecordBatchReader->schema();
129 548 : const auto &kv_metadata = m_poSchema->metadata();
130 :
131 : #ifdef DEBUG
132 548 : if (kv_metadata)
133 : {
134 1079 : for (const auto &keyValue : kv_metadata->sorted_pairs())
135 : {
136 541 : CPLDebug("FEATHER", "%s = %s", keyValue.first.c_str(),
137 : keyValue.second.c_str());
138 : }
139 : }
140 : #endif
141 :
142 : auto poFooterMetadata = m_poRecordBatchFileReader
143 538 : ? m_poRecordBatchFileReader->metadata()
144 1634 : : nullptr;
145 674 : if (poFooterMetadata && poFooterMetadata->Contains(GDAL_GEO_FOOTER_KEY) &&
146 126 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_READ_GDAL_FOOTER", "YES")))
147 : {
148 126 : LoadGeoMetadata(poFooterMetadata.get(), GDAL_GEO_FOOTER_KEY);
149 : }
150 : else
151 : {
152 422 : LoadGeoMetadata(kv_metadata.get(), "geo");
153 : }
154 : const auto oMapFieldNameToGDALSchemaFieldDefn =
155 1096 : LoadGDALSchema(kv_metadata.get());
156 :
157 548 : const auto &fields = m_poSchema->fields();
158 4175 : for (int i = 0; i < m_poSchema->num_fields(); ++i)
159 : {
160 3627 : const auto &field = fields[i];
161 3627 : const auto &fieldName = field->name();
162 :
163 3627 : const auto &field_kv_metadata = field->metadata();
164 3627 : std::string osExtensionName;
165 3627 : std::string osExtensionMetadata;
166 3627 : if (field->type()->id() == arrow::Type::EXTENSION)
167 : {
168 : osExtensionName =
169 127 : cpl::down_cast<arrow::ExtensionType *>(field->type().get())
170 127 : ->extension_name();
171 : }
172 3500 : else if (field_kv_metadata)
173 : {
174 : auto extension_name =
175 1048 : field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
176 524 : if (extension_name.ok())
177 : {
178 522 : osExtensionName = *extension_name;
179 : }
180 :
181 : auto extension_metadata =
182 1048 : field_kv_metadata->Get(ARROW_EXTENSION_METADATA_KEY);
183 524 : if (extension_metadata.ok())
184 : {
185 384 : osExtensionMetadata = *extension_metadata;
186 : }
187 : #ifdef DEBUG
188 524 : CPLDebug("FEATHER", "Metadata field %s:", fieldName.c_str());
189 1432 : for (const auto &keyValue : field_kv_metadata->sorted_pairs())
190 : {
191 908 : CPLDebug("FEATHER", " %s = %s", keyValue.first.c_str(),
192 : keyValue.second.c_str());
193 : }
194 : #endif
195 : }
196 :
197 3627 : if (!m_osFIDColumn.empty() && fieldName == m_osFIDColumn)
198 : {
199 6 : m_iFIDArrowColumn = i;
200 6 : continue;
201 : }
202 :
203 3621 : bool bRegularField = true;
204 3621 : auto oIter = m_oMapGeometryColumns.find(fieldName);
205 9670 : if (oIter != m_oMapGeometryColumns.end() ||
206 6049 : (osExtensionName != EXTENSION_NAME_ARROW_JSON &&
207 2961 : osExtensionName != EXTENSION_NAME_ARROW_TIMESTAMP_WITH_OFFSET &&
208 2959 : !osExtensionName.empty()))
209 : {
210 1072 : CPLJSONObject oJSONDef;
211 536 : if (oIter != m_oMapGeometryColumns.end())
212 533 : oJSONDef = oIter->second;
213 1608 : auto osEncoding = oJSONDef.GetString("encoding");
214 536 : if (osEncoding.empty() && !osExtensionName.empty())
215 3 : osEncoding = osExtensionName;
216 :
217 536 : OGRwkbGeometryType eGeomType = wkbUnknown;
218 536 : auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
219 536 : if (IsValidGeometryEncoding(field, osEncoding,
220 1072 : oIter != m_oMapGeometryColumns.end(),
221 : eGeomType, eGeomEncoding))
222 : {
223 534 : bRegularField = false;
224 1068 : OGRGeomFieldDefn oField(fieldName.c_str(), wkbUnknown);
225 :
226 1602 : auto osCRS = oJSONDef.GetString("crs");
227 :
228 : #if ARROW_VERSION_MAJOR >= 21
229 : if (osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
230 : osExtensionMetadata.empty() &&
231 : field->type()->id() == arrow::Type::EXTENSION)
232 : {
233 : const auto arrowWkb =
234 : std::dynamic_pointer_cast<OGRGeoArrowWkbExtensionType>(
235 : field->type());
236 : if (arrowWkb)
237 : {
238 : osExtensionMetadata = arrowWkb->Serialize();
239 : }
240 : }
241 : #endif
242 :
243 931 : if (osCRS.empty() &&
244 397 : osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
245 160 : !osExtensionMetadata.empty() &&
246 931 : osExtensionMetadata[0] == '{' &&
247 0 : osExtensionMetadata.back() == '}')
248 : {
249 0 : CPLJSONDocument oDoc;
250 0 : if (oDoc.LoadMemory(osExtensionMetadata))
251 : {
252 0 : auto jCrs = oDoc.GetRoot()["crs"];
253 0 : if (jCrs.GetType() == CPLJSONObject::Type::Object)
254 : {
255 : osCRS =
256 0 : jCrs.Format(CPLJSONObject::PrettyFormat::Plain);
257 : }
258 0 : else if (jCrs.GetType() == CPLJSONObject::Type::String)
259 : {
260 0 : osCRS = jCrs.ToString();
261 : }
262 0 : if (oDoc.GetRoot()["edges"].ToString() == "spherical")
263 : {
264 0 : SetMetadataItem("EDGES", "SPHERICAL");
265 : }
266 : }
267 : }
268 :
269 534 : if (osCRS.empty())
270 : {
271 : #if 0
272 : CPLError(CE_Warning, CPLE_AppDefined,
273 : "Missing required 'crs' field for geometry column %s",
274 : fieldName.c_str());
275 : #endif
276 : }
277 : else
278 : {
279 137 : OGRSpatialReference *poSRS = new OGRSpatialReference();
280 137 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
281 :
282 137 : if (poSRS->SetFromUserInput(
283 : osCRS.c_str(),
284 : OGRSpatialReference::
285 137 : SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
286 : OGRERR_NONE)
287 : {
288 137 : const char *pszAuthName = poSRS->GetAuthorityName();
289 137 : const char *pszAuthCode = poSRS->GetAuthorityCode();
290 137 : if (pszAuthName && pszAuthCode &&
291 137 : EQUAL(pszAuthName, "OGC") &&
292 0 : EQUAL(pszAuthCode, "CRS84"))
293 : {
294 0 : poSRS->importFromEPSG(4326);
295 : }
296 :
297 137 : const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
298 137 : if (dfCoordEpoch > 0)
299 2 : poSRS->SetCoordinateEpoch(dfCoordEpoch);
300 :
301 137 : oField.SetSpatialRef(poSRS);
302 : }
303 137 : poSRS->Release();
304 : }
305 :
306 : // m_aeGeomEncoding be filled before calling
307 : // ComputeGeometryColumnType()
308 534 : m_aeGeomEncoding.push_back(eGeomEncoding);
309 534 : if (eGeomType == wkbUnknown)
310 : {
311 708 : auto osType = oJSONDef.GetString("geometry_type");
312 236 : if (osType.empty())
313 236 : osType = oJSONDef.GetString("gdal:geometry_type");
314 472 : if (m_bSeekable && osType.empty() &&
315 236 : CPLTestBool(CPLGetConfigOption(
316 : "OGR_ARROW_COMPUTE_GEOMETRY_TYPE", "YES")))
317 : {
318 236 : eGeomType = ComputeGeometryColumnType(
319 236 : m_poFeatureDefn->GetGeomFieldCount(), i);
320 236 : if (m_poRecordBatchReader)
321 0 : ResetRecordBatchReader();
322 : }
323 : else
324 0 : eGeomType = GetGeometryTypeFromString(osType);
325 : }
326 :
327 534 : oField.SetType(eGeomType);
328 534 : oField.SetNullable(field->nullable());
329 534 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
330 534 : m_anMapGeomFieldIndexToArrowColumn.push_back(i);
331 : }
332 : }
333 :
334 3621 : if (bRegularField)
335 : {
336 3087 : CreateFieldFromSchema(field, {i},
337 : oMapFieldNameToGDALSchemaFieldDefn);
338 : }
339 : }
340 :
341 548 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
342 : m_poFeatureDefn->GetFieldCount());
343 548 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
344 : m_poFeatureDefn->GetGeomFieldCount());
345 548 : }
346 :
347 : /************************************************************************/
348 : /* ResetRecordBatchReader() */
349 : /************************************************************************/
350 :
351 12 : bool OGRFeatherLayer::ResetRecordBatchReader()
352 : {
353 12 : const auto nPos = *(m_poFile->Tell());
354 12 : CPL_IGNORE_RET_VAL(m_poFile->Seek(0));
355 : auto result =
356 24 : arrow::ipc::RecordBatchStreamReader::Open(m_poFile, m_oOptions);
357 12 : if (!result.ok())
358 : {
359 0 : CPLError(CE_Failure, CPLE_AppDefined,
360 : "RecordBatchStreamReader::Open() failed with %s",
361 0 : result.status().message().c_str());
362 0 : CPL_IGNORE_RET_VAL(m_poFile->Seek(nPos));
363 0 : return false;
364 : }
365 : else
366 : {
367 12 : m_poRecordBatchReader = *result;
368 12 : return true;
369 : }
370 : }
371 :
372 : /************************************************************************/
373 : /* ComputeGeometryColumnType() */
374 : /************************************************************************/
375 :
376 236 : OGRwkbGeometryType OGRFeatherLayer::ComputeGeometryColumnType(int iGeomCol,
377 : int iCol) const
378 : {
379 : // Compute type of geometry column by iterating over each geometry, and
380 : // looking at the WKB geometry type in the first 5 bytes of each geometry.
381 :
382 236 : OGRwkbGeometryType eGeomType = wkbNone;
383 :
384 236 : if (m_poRecordBatchReader != nullptr)
385 : {
386 0 : std::shared_ptr<arrow::RecordBatch> poBatch;
387 : while (true)
388 : {
389 0 : auto status = m_poRecordBatchReader->ReadNext(&poBatch);
390 0 : if (!status.ok())
391 : {
392 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
393 0 : status.message().c_str());
394 0 : break;
395 : }
396 0 : else if (!poBatch)
397 0 : break;
398 0 : eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
399 : iCol, eGeomType);
400 0 : if (eGeomType == wkbUnknown)
401 0 : break;
402 0 : }
403 : }
404 : else
405 : {
406 472 : for (int iBatch = 0;
407 472 : iBatch < m_poRecordBatchFileReader->num_record_batches(); ++iBatch)
408 : {
409 236 : auto result = m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
410 236 : if (!result.ok())
411 : {
412 0 : CPLError(CE_Failure, CPLE_AppDefined,
413 : "ReadRecordBatch() failed: %s",
414 0 : result.status().message().c_str());
415 0 : break;
416 : }
417 236 : eGeomType = ComputeGeometryColumnTypeProcessBatch(*result, iGeomCol,
418 : iCol, eGeomType);
419 236 : if (eGeomType == wkbUnknown)
420 0 : break;
421 : }
422 : }
423 :
424 236 : return eGeomType == wkbNone ? wkbUnknown : eGeomType;
425 : }
426 :
427 : /************************************************************************/
428 : /* BuildDomain() */
429 : /************************************************************************/
430 :
431 : std::unique_ptr<OGRFieldDomain>
432 19 : OGRFeatherLayer::BuildDomain(const std::string &osDomainName,
433 : int iFieldIndex) const
434 : {
435 19 : const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
436 19 : CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
437 : arrow::Type::DICTIONARY);
438 :
439 19 : if (m_poRecordBatchReader)
440 : {
441 6 : if (m_poBatch)
442 : {
443 6 : return BuildDomainFromBatch(osDomainName, m_poBatch, iArrowCol);
444 : }
445 : }
446 13 : else if (m_poRecordBatchFileReader)
447 : {
448 13 : auto result = m_poRecordBatchFileReader->ReadRecordBatch(0);
449 13 : if (!result.ok())
450 : {
451 0 : CPLError(CE_Failure, CPLE_AppDefined,
452 : "ReadRecordBatch() failed: %s",
453 0 : result.status().message().c_str());
454 : }
455 13 : auto poBatch = *result;
456 13 : if (poBatch)
457 : {
458 13 : return BuildDomainFromBatch(osDomainName, poBatch, iArrowCol);
459 : }
460 : }
461 :
462 0 : return nullptr;
463 : }
464 :
465 : /************************************************************************/
466 : /* ResetReading() */
467 : /************************************************************************/
468 :
469 829 : void OGRFeatherLayer::ResetReading()
470 : {
471 829 : if (m_poRecordBatchReader != nullptr && m_iRecordBatch > 0)
472 : {
473 17 : if (m_iRecordBatch == 1 && m_poBatchIdx1)
474 : {
475 : // do nothing
476 : }
477 : else
478 : {
479 16 : m_bResetRecordBatchReaderAsked = true;
480 : }
481 : }
482 829 : OGRArrowLayer::ResetReading();
483 829 : }
484 :
485 : /************************************************************************/
486 : /* ReadNextBatch() */
487 : /************************************************************************/
488 :
489 1013 : bool OGRFeatherLayer::ReadNextBatch()
490 : {
491 1013 : if (m_poRecordBatchFileReader == nullptr)
492 : {
493 121 : return ReadNextBatchStream();
494 : }
495 : else
496 : {
497 892 : return ReadNextBatchFile();
498 : }
499 : }
500 :
501 : /************************************************************************/
502 : /* ReadNextBatchFile() */
503 : /************************************************************************/
504 :
505 892 : bool OGRFeatherLayer::ReadNextBatchFile()
506 : {
507 : while (true)
508 : {
509 892 : ++m_iRecordBatch;
510 892 : if (m_iRecordBatch == m_poRecordBatchFileReader->num_record_batches())
511 : {
512 451 : if (m_iRecordBatch == 1)
513 448 : m_iRecordBatch = 0;
514 : else
515 3 : m_poBatch.reset();
516 451 : return false;
517 : }
518 :
519 441 : m_nIdxInBatch = 0;
520 :
521 : auto result =
522 441 : m_poRecordBatchFileReader->ReadRecordBatch(m_iRecordBatch);
523 441 : if (!result.ok())
524 : {
525 0 : CPLError(CE_Failure, CPLE_AppDefined,
526 : "ReadRecordBatch() failed: %s",
527 0 : result.status().message().c_str());
528 0 : m_poBatch.reset();
529 0 : return false;
530 : }
531 441 : if ((*result)->num_rows() != 0)
532 : {
533 441 : SetBatch(*result);
534 441 : break;
535 : }
536 0 : }
537 :
538 441 : return true;
539 : }
540 :
541 : /************************************************************************/
542 : /* ReadNextBatchStream() */
543 : /************************************************************************/
544 :
545 156 : bool OGRFeatherLayer::ReadNextBatchStream()
546 : {
547 156 : m_nIdxInBatch = 0;
548 :
549 312 : std::shared_ptr<arrow::RecordBatch> poNextBatch;
550 0 : do
551 : {
552 156 : if (m_iRecordBatch == 0 && m_poBatchIdx0)
553 : {
554 1 : SetBatch(m_poBatchIdx0);
555 1 : m_iRecordBatch = 1;
556 103 : return true;
557 : }
558 :
559 155 : else if (m_iRecordBatch == 1 && m_poBatchIdx1)
560 : {
561 1 : SetBatch(m_poBatchIdx1);
562 1 : m_iRecordBatch = 2;
563 1 : return true;
564 : }
565 :
566 154 : else if (m_bSingleBatch)
567 : {
568 83 : CPLAssert(m_iRecordBatch == 0);
569 83 : CPLAssert(m_poBatch != nullptr);
570 83 : return false;
571 : }
572 :
573 71 : if (m_bResetRecordBatchReaderAsked)
574 : {
575 13 : if (!m_bSeekable)
576 : {
577 1 : CPLError(CE_Failure, CPLE_NotSupported,
578 : "Attempting to rewind non-seekable stream");
579 1 : return false;
580 : }
581 12 : if (!ResetRecordBatchReader())
582 0 : return false;
583 12 : m_bResetRecordBatchReaderAsked = false;
584 : }
585 :
586 70 : CPLAssert(m_poRecordBatchReader);
587 :
588 70 : ++m_iRecordBatch;
589 :
590 70 : poNextBatch.reset();
591 70 : auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
592 70 : if (!status.ok())
593 : {
594 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
595 0 : status.message().c_str());
596 0 : poNextBatch.reset();
597 : }
598 70 : if (poNextBatch == nullptr)
599 : {
600 17 : if (m_iRecordBatch == 1)
601 : {
602 3 : m_iRecordBatch = 0;
603 3 : m_bSingleBatch = true;
604 : }
605 : else
606 : {
607 14 : m_poBatch.reset();
608 14 : m_poBatchColumns.clear();
609 : }
610 17 : return false;
611 : }
612 53 : } while (poNextBatch->num_rows() == 0);
613 :
614 53 : SetBatch(poNextBatch);
615 :
616 53 : return true;
617 : }
618 :
619 : /************************************************************************/
620 : /* TryToCacheFirstTwoBatches() */
621 : /************************************************************************/
622 :
623 1 : void OGRFeatherLayer::TryToCacheFirstTwoBatches()
624 : {
625 2 : if (m_poRecordBatchReader != nullptr && m_iRecordBatch <= 0 &&
626 2 : !m_bSingleBatch && m_poBatchIdx0 == nullptr)
627 : {
628 1 : ResetReading();
629 1 : if (!m_poBatch)
630 : {
631 0 : CPL_IGNORE_RET_VAL(ReadNextBatchStream());
632 : }
633 1 : if (m_poBatch)
634 : {
635 2 : auto poBatchIdx0 = m_poBatch;
636 1 : if (ReadNextBatchStream())
637 : {
638 1 : CPLAssert(m_iRecordBatch == 1);
639 1 : m_poBatchIdx0 = poBatchIdx0;
640 1 : m_poBatchIdx1 = m_poBatch;
641 1 : SetBatch(poBatchIdx0);
642 1 : ResetReading();
643 : }
644 1 : ResetReading();
645 : }
646 : }
647 1 : }
648 :
649 : /************************************************************************/
650 : /* CanPostFilterArrowArray() */
651 : /************************************************************************/
652 :
653 20 : bool OGRFeatherLayer::CanPostFilterArrowArray(
654 : const struct ArrowSchema *schema) const
655 : {
656 20 : if (m_poRecordBatchReader)
657 10 : return false;
658 10 : return OGRArrowLayer::CanPostFilterArrowArray(schema);
659 : }
660 :
661 : /************************************************************************/
662 : /* InvalidateCachedBatches() */
663 : /************************************************************************/
664 :
665 109 : void OGRFeatherLayer::InvalidateCachedBatches()
666 : {
667 109 : if (m_poRecordBatchFileReader)
668 : {
669 63 : m_iRecordBatch = -1;
670 63 : ResetReading();
671 : }
672 109 : }
673 :
674 : /************************************************************************/
675 : /* GetFeatureCount() */
676 : /************************************************************************/
677 :
678 336 : GIntBig OGRFeatherLayer::GetFeatureCount(int bForce)
679 : {
680 628 : if (m_poRecordBatchFileReader != nullptr && m_poAttrQuery == nullptr &&
681 292 : m_poFilterGeom == nullptr)
682 : {
683 288 : auto result = m_poRecordBatchFileReader->CountRows();
684 288 : if (result.ok())
685 288 : return *result;
686 : }
687 48 : else if (m_poRecordBatchReader != nullptr)
688 : {
689 36 : if (!m_bSeekable && !bForce)
690 : {
691 1 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
692 : {
693 1 : TryToCacheFirstTwoBatches();
694 : }
695 :
696 1 : if (!m_bSingleBatch)
697 : {
698 1 : CPLError(
699 : CE_Failure, CPLE_AppDefined,
700 : "GetFeatureCount() cannot be run in non-forced mode on "
701 : "a non-seekable file made of several batches");
702 1 : return -1;
703 : }
704 : }
705 :
706 35 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
707 : {
708 23 : GIntBig nFeatures = 0;
709 23 : ResetReading();
710 23 : if (!m_poBatch)
711 3 : ReadNextBatchStream();
712 31 : while (m_poBatch)
713 : {
714 31 : nFeatures += m_poBatch->num_rows();
715 31 : if (!ReadNextBatchStream())
716 23 : break;
717 : }
718 23 : ResetReading();
719 23 : return nFeatures;
720 : }
721 : }
722 24 : return OGRLayer::GetFeatureCount(bForce);
723 : }
724 :
725 : /************************************************************************/
726 : /* CanRunNonForcedGetExtent() */
727 : /************************************************************************/
728 :
729 0 : bool OGRFeatherLayer::CanRunNonForcedGetExtent()
730 : {
731 0 : if (m_bSeekable)
732 0 : return true;
733 0 : TryToCacheFirstTwoBatches();
734 0 : if (!m_bSingleBatch)
735 : {
736 0 : CPLError(CE_Failure, CPLE_AppDefined,
737 : "GetExtent() cannot be run in non-forced mode on "
738 : "a non-seekable file made of several batches");
739 0 : return false;
740 : }
741 0 : return true;
742 : }
743 :
744 : /************************************************************************/
745 : /* TestCapability() */
746 : /************************************************************************/
747 :
748 296 : int OGRFeatherLayer::TestCapability(const char *pszCap) const
749 : {
750 296 : if (EQUAL(pszCap, OLCFastFeatureCount))
751 : {
752 28 : return m_bSeekable && m_poAttrQuery == nullptr &&
753 28 : m_poFilterGeom == nullptr;
754 : }
755 :
756 278 : if (EQUAL(pszCap, OLCMeasuredGeometries))
757 16 : return true;
758 262 : if (EQUAL(pszCap, OLCZGeometries))
759 12 : return true;
760 :
761 250 : return OGRArrowLayer::TestCapability(pszCap);
762 : }
763 :
764 : /************************************************************************/
765 : /* GetMetadataItem() */
766 : /************************************************************************/
767 :
768 259 : const char *OGRFeatherLayer::GetMetadataItem(const char *pszName,
769 : const char *pszDomain)
770 : {
771 : // Mostly for unit test purposes
772 259 : if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_"))
773 : {
774 9 : if (EQUAL(pszName, "FORMAT"))
775 : {
776 5 : return m_poRecordBatchFileReader ? "FILE" : "STREAM";
777 : }
778 4 : if (m_poRecordBatchFileReader != nullptr)
779 : {
780 4 : int iBatch = -1;
781 4 : if (EQUAL(pszName, "NUM_RECORD_BATCHES"))
782 : {
783 1 : return CPLSPrintf(
784 5 : "%d", m_poRecordBatchFileReader->num_record_batches());
785 : }
786 6 : else if (sscanf(pszName, "RECORD_BATCHES[%d]", &iBatch) == 1 &&
787 3 : strstr(pszName, ".NUM_ROWS"))
788 : {
789 : auto result =
790 6 : m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
791 3 : if (!result.ok())
792 : {
793 0 : return nullptr;
794 : }
795 3 : return CPLSPrintf("%" PRId64, (*result)->num_rows());
796 : }
797 : }
798 0 : return nullptr;
799 : }
800 250 : else if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
801 : {
802 : const auto kv_metadata =
803 5 : (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
804 8 : : m_poRecordBatchReader->schema())
805 10 : ->metadata();
806 5 : if (kv_metadata && kv_metadata->Contains(pszName))
807 : {
808 5 : auto metadataItem = kv_metadata->Get(pszName);
809 5 : if (metadataItem.ok())
810 : {
811 5 : return CPLSPrintf("%s", metadataItem->c_str());
812 : }
813 : }
814 0 : return nullptr;
815 : }
816 476 : else if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
817 231 : EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
818 : {
819 2 : const auto kv_metadata = m_poRecordBatchFileReader->metadata();
820 1 : if (kv_metadata && kv_metadata->Contains(pszName))
821 : {
822 1 : auto metadataItem = kv_metadata->Get(pszName);
823 1 : if (metadataItem.ok())
824 : {
825 1 : return CPLSPrintf("%s", metadataItem->c_str());
826 : }
827 : }
828 0 : return nullptr;
829 : }
830 244 : return OGRLayer::GetMetadataItem(pszName, pszDomain);
831 : }
832 :
833 : /************************************************************************/
834 : /* GetMetadata() */
835 : /************************************************************************/
836 :
837 144 : CSLConstList OGRFeatherLayer::GetMetadata(const char *pszDomain)
838 : {
839 : // Mostly for unit test purposes
840 144 : if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
841 : {
842 5 : m_aosFeatherMetadata.Clear();
843 : const auto kv_metadata =
844 5 : (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
845 8 : : m_poRecordBatchReader->schema())
846 10 : ->metadata();
847 5 : if (kv_metadata)
848 : {
849 11 : for (const auto &kv : kv_metadata->sorted_pairs())
850 : {
851 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
852 6 : kv.second.c_str());
853 : }
854 : }
855 5 : return m_aosFeatherMetadata.List();
856 : }
857 261 : if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
858 122 : EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
859 : {
860 2 : m_aosFeatherMetadata.Clear();
861 4 : const auto kv_metadata = m_poRecordBatchFileReader->metadata();
862 2 : if (kv_metadata)
863 : {
864 3 : for (const auto &kv : kv_metadata->sorted_pairs())
865 : {
866 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
867 1 : kv.second.c_str());
868 : }
869 : }
870 2 : return m_aosFeatherMetadata.List();
871 : }
872 137 : return OGRLayer::GetMetadata(pszDomain);
873 : }
|