Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "cpl_json.h"
14 : #include "cpl_time.h"
15 : #include "gdal_pam.h"
16 : #include "ogrsf_frmts.h"
17 : #include "ogr_p.h"
18 :
19 : #include <cinttypes>
20 : #include <limits>
21 : #include <map>
22 : #include <set>
23 : #include <utility>
24 :
25 : #include "ogr_feather.h"
26 :
27 : #include "../arrow_common/ograrrowlayer.hpp"
28 : #include "../arrow_common/ograrrowdataset.hpp"
29 :
30 : /************************************************************************/
31 : /* OGRFeatherLayer() */
32 : /************************************************************************/
33 :
34 538 : OGRFeatherLayer::OGRFeatherLayer(
35 : OGRFeatherDataset *poDS, const char *pszLayerName,
36 : std::shared_ptr<arrow::ipc::RecordBatchFileReader> &poRecordBatchFileReader,
37 538 : CSLConstList papszOpenOptions)
38 : : OGRArrowLayer(poDS, pszLayerName,
39 538 : CPLTestBool(CSLFetchNameValueDef(
40 : papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
41 1076 : m_poDS(poDS), m_poRecordBatchFileReader(poRecordBatchFileReader)
42 : {
43 538 : EstablishFeatureDefn();
44 538 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
45 : m_poFeatureDefn->GetGeomFieldCount());
46 538 : }
47 :
48 : /************************************************************************/
49 : /* OGRFeatherLayer() */
50 : /************************************************************************/
51 :
52 10 : OGRFeatherLayer::OGRFeatherLayer(
53 : OGRFeatherDataset *poDS, const char *pszLayerName,
54 : std::shared_ptr<arrow::io::RandomAccessFile> poFile, bool bSeekable,
55 : const arrow::ipc::IpcReadOptions &oOptions,
56 : std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
57 : &poRecordBatchStreamReader,
58 10 : CSLConstList papszOpenOptions)
59 : : OGRArrowLayer(poDS, pszLayerName,
60 10 : CPLTestBool(CSLFetchNameValueDef(
61 : papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
62 10 : m_poDS(poDS), m_poFile(std::move(poFile)), m_bSeekable(bSeekable),
63 20 : m_oOptions(oOptions), m_poRecordBatchReader(poRecordBatchStreamReader)
64 : {
65 10 : EstablishFeatureDefn();
66 10 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
67 : m_poFeatureDefn->GetGeomFieldCount());
68 10 : }
69 :
70 : /************************************************************************/
71 : /* GetDataset() */
72 : /************************************************************************/
73 :
74 112 : GDALDataset *OGRFeatherLayer::GetDataset()
75 : {
76 112 : return m_poDS;
77 : }
78 :
79 : /************************************************************************/
80 : /* LoadGeoMetadata() */
81 : /************************************************************************/
82 :
83 548 : void OGRFeatherLayer::LoadGeoMetadata(
84 : const arrow::KeyValueMetadata *kv_metadata, const std::string &key)
85 : {
86 548 : if (kv_metadata && kv_metadata->Contains(key))
87 : {
88 1066 : auto geo = kv_metadata->Get(key);
89 533 : if (geo.ok())
90 : {
91 1066 : CPLJSONDocument oDoc;
92 533 : if (oDoc.LoadMemory(*geo))
93 : {
94 1066 : auto oRoot = oDoc.GetRoot();
95 1599 : const auto osVersion = oRoot.GetString("schema_version");
96 533 : if (key != GDAL_GEO_FOOTER_KEY && osVersion != "0.1.0")
97 : {
98 398 : CPLDebug("FEATHER",
99 : "schema_version = %s not explicitly handled by "
100 : "the driver",
101 : osVersion.c_str());
102 : }
103 1599 : auto oColumns = oRoot.GetObj("columns");
104 533 : if (oColumns.IsValid())
105 : {
106 1066 : for (const auto &oColumn : oColumns.GetChildren())
107 : {
108 533 : m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
109 : }
110 : }
111 : }
112 : else
113 : {
114 0 : CPLError(CE_Warning, CPLE_AppDefined,
115 : "Cannot parse 'geo' metadata");
116 : }
117 : }
118 : }
119 548 : }
120 :
121 : /************************************************************************/
122 : /* EstablishFeatureDefn() */
123 : /************************************************************************/
124 :
125 548 : void OGRFeatherLayer::EstablishFeatureDefn()
126 : {
127 1096 : m_poSchema = m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
128 1096 : : m_poRecordBatchReader->schema();
129 548 : const auto &kv_metadata = m_poSchema->metadata();
130 :
131 : #ifdef DEBUG
132 548 : if (kv_metadata)
133 : {
134 1079 : for (const auto &keyValue : kv_metadata->sorted_pairs())
135 : {
136 541 : CPLDebug("FEATHER", "%s = %s", keyValue.first.c_str(),
137 : keyValue.second.c_str());
138 : }
139 : }
140 : #endif
141 :
142 : auto poFooterMetadata = m_poRecordBatchFileReader
143 538 : ? m_poRecordBatchFileReader->metadata()
144 1634 : : nullptr;
145 674 : if (poFooterMetadata && poFooterMetadata->Contains(GDAL_GEO_FOOTER_KEY) &&
146 126 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_READ_GDAL_FOOTER", "YES")))
147 : {
148 126 : LoadGeoMetadata(poFooterMetadata.get(), GDAL_GEO_FOOTER_KEY);
149 : }
150 : else
151 : {
152 422 : LoadGeoMetadata(kv_metadata.get(), "geo");
153 : }
154 : const auto oMapFieldNameToGDALSchemaFieldDefn =
155 1096 : LoadGDALSchema(kv_metadata.get());
156 :
157 548 : const auto &fields = m_poSchema->fields();
158 4175 : for (int i = 0; i < m_poSchema->num_fields(); ++i)
159 : {
160 3627 : const auto &field = fields[i];
161 3627 : const auto &fieldName = field->name();
162 :
163 3627 : const auto &field_kv_metadata = field->metadata();
164 3627 : std::string osExtensionName;
165 3627 : std::string osExtensionMetadata;
166 3627 : if (field->type()->id() == arrow::Type::EXTENSION)
167 : {
168 : osExtensionName =
169 127 : cpl::down_cast<arrow::ExtensionType *>(field->type().get())
170 127 : ->extension_name();
171 : }
172 3500 : else if (field_kv_metadata)
173 : {
174 : auto extension_name =
175 1048 : field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
176 524 : if (extension_name.ok())
177 : {
178 522 : osExtensionName = *extension_name;
179 : }
180 :
181 : auto extension_metadata =
182 1048 : field_kv_metadata->Get(ARROW_EXTENSION_METADATA_KEY);
183 524 : if (extension_metadata.ok())
184 : {
185 384 : osExtensionMetadata = *extension_metadata;
186 : }
187 : #ifdef DEBUG
188 524 : CPLDebug("FEATHER", "Metadata field %s:", fieldName.c_str());
189 1432 : for (const auto &keyValue : field_kv_metadata->sorted_pairs())
190 : {
191 908 : CPLDebug("FEATHER", " %s = %s", keyValue.first.c_str(),
192 : keyValue.second.c_str());
193 : }
194 : #endif
195 : }
196 :
197 3627 : if (!m_osFIDColumn.empty() && fieldName == m_osFIDColumn)
198 : {
199 6 : m_iFIDArrowColumn = i;
200 6 : continue;
201 : }
202 :
203 3621 : bool bRegularField = true;
204 3621 : auto oIter = m_oMapGeometryColumns.find(fieldName);
205 9670 : if (oIter != m_oMapGeometryColumns.end() ||
206 6049 : (osExtensionName != EXTENSION_NAME_ARROW_JSON &&
207 2961 : osExtensionName != EXTENSION_NAME_ARROW_TIMESTAMP_WITH_OFFSET &&
208 2959 : !osExtensionName.empty()))
209 : {
210 1072 : CPLJSONObject oJSONDef;
211 536 : if (oIter != m_oMapGeometryColumns.end())
212 533 : oJSONDef = oIter->second;
213 1608 : auto osEncoding = oJSONDef.GetString("encoding");
214 536 : if (osEncoding.empty() && !osExtensionName.empty())
215 3 : osEncoding = osExtensionName;
216 :
217 536 : OGRwkbGeometryType eGeomType = wkbUnknown;
218 536 : auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
219 536 : if (IsValidGeometryEncoding(field, osEncoding,
220 1072 : oIter != m_oMapGeometryColumns.end(),
221 : eGeomType, eGeomEncoding))
222 : {
223 534 : bRegularField = false;
224 1068 : OGRGeomFieldDefn oField(fieldName.c_str(), wkbUnknown);
225 :
226 1602 : auto osCRS = oJSONDef.GetString("crs");
227 :
228 : #if ARROW_VERSION_MAJOR >= 21
229 : if (osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
230 : osExtensionMetadata.empty() &&
231 : field->type()->id() == arrow::Type::EXTENSION)
232 : {
233 : const auto arrowWkb =
234 : std::dynamic_pointer_cast<OGRGeoArrowWkbExtensionType>(
235 : field->type());
236 : if (arrowWkb)
237 : {
238 : osExtensionMetadata = arrowWkb->Serialize();
239 : }
240 : }
241 : #endif
242 :
243 931 : if (osCRS.empty() &&
244 397 : osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
245 160 : !osExtensionMetadata.empty() &&
246 931 : osExtensionMetadata[0] == '{' &&
247 0 : osExtensionMetadata.back() == '}')
248 : {
249 0 : CPLJSONDocument oDoc;
250 0 : if (oDoc.LoadMemory(osExtensionMetadata))
251 : {
252 0 : auto jCrs = oDoc.GetRoot()["crs"];
253 0 : if (jCrs.GetType() == CPLJSONObject::Type::Object)
254 : {
255 : osCRS =
256 0 : jCrs.Format(CPLJSONObject::PrettyFormat::Plain);
257 : }
258 0 : else if (jCrs.GetType() == CPLJSONObject::Type::String)
259 : {
260 0 : osCRS = jCrs.ToString();
261 : }
262 0 : if (oDoc.GetRoot()["edges"].ToString() == "spherical")
263 : {
264 0 : SetMetadataItem("EDGES", "SPHERICAL");
265 : }
266 : }
267 : }
268 :
269 534 : if (osCRS.empty())
270 : {
271 : #if 0
272 : CPLError(CE_Warning, CPLE_AppDefined,
273 : "Missing required 'crs' field for geometry column %s",
274 : fieldName.c_str());
275 : #endif
276 : }
277 : else
278 : {
279 137 : OGRSpatialReference *poSRS = new OGRSpatialReference();
280 137 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
281 :
282 137 : if (poSRS->SetFromUserInput(
283 : osCRS.c_str(),
284 : OGRSpatialReference::
285 137 : SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
286 : OGRERR_NONE)
287 : {
288 : const char *pszAuthName =
289 137 : poSRS->GetAuthorityName(nullptr);
290 : const char *pszAuthCode =
291 137 : poSRS->GetAuthorityCode(nullptr);
292 137 : if (pszAuthName && pszAuthCode &&
293 137 : EQUAL(pszAuthName, "OGC") &&
294 0 : EQUAL(pszAuthCode, "CRS84"))
295 : {
296 0 : poSRS->importFromEPSG(4326);
297 : }
298 :
299 137 : const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
300 137 : if (dfCoordEpoch > 0)
301 2 : poSRS->SetCoordinateEpoch(dfCoordEpoch);
302 :
303 137 : oField.SetSpatialRef(poSRS);
304 : }
305 137 : poSRS->Release();
306 : }
307 :
308 : // m_aeGeomEncoding be filled before calling
309 : // ComputeGeometryColumnType()
310 534 : m_aeGeomEncoding.push_back(eGeomEncoding);
311 534 : if (eGeomType == wkbUnknown)
312 : {
313 708 : auto osType = oJSONDef.GetString("geometry_type");
314 236 : if (osType.empty())
315 236 : osType = oJSONDef.GetString("gdal:geometry_type");
316 472 : if (m_bSeekable && osType.empty() &&
317 236 : CPLTestBool(CPLGetConfigOption(
318 : "OGR_ARROW_COMPUTE_GEOMETRY_TYPE", "YES")))
319 : {
320 236 : eGeomType = ComputeGeometryColumnType(
321 236 : m_poFeatureDefn->GetGeomFieldCount(), i);
322 236 : if (m_poRecordBatchReader)
323 0 : ResetRecordBatchReader();
324 : }
325 : else
326 0 : eGeomType = GetGeometryTypeFromString(osType);
327 : }
328 :
329 534 : oField.SetType(eGeomType);
330 534 : oField.SetNullable(field->nullable());
331 534 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
332 534 : m_anMapGeomFieldIndexToArrowColumn.push_back(i);
333 : }
334 : }
335 :
336 3621 : if (bRegularField)
337 : {
338 3087 : CreateFieldFromSchema(field, {i},
339 : oMapFieldNameToGDALSchemaFieldDefn);
340 : }
341 : }
342 :
343 548 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
344 : m_poFeatureDefn->GetFieldCount());
345 548 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
346 : m_poFeatureDefn->GetGeomFieldCount());
347 548 : }
348 :
349 : /************************************************************************/
350 : /* ResetRecordBatchReader() */
351 : /************************************************************************/
352 :
353 12 : bool OGRFeatherLayer::ResetRecordBatchReader()
354 : {
355 12 : const auto nPos = *(m_poFile->Tell());
356 12 : CPL_IGNORE_RET_VAL(m_poFile->Seek(0));
357 : auto result =
358 24 : arrow::ipc::RecordBatchStreamReader::Open(m_poFile, m_oOptions);
359 12 : if (!result.ok())
360 : {
361 0 : CPLError(CE_Failure, CPLE_AppDefined,
362 : "RecordBatchStreamReader::Open() failed with %s",
363 0 : result.status().message().c_str());
364 0 : CPL_IGNORE_RET_VAL(m_poFile->Seek(nPos));
365 0 : return false;
366 : }
367 : else
368 : {
369 12 : m_poRecordBatchReader = *result;
370 12 : return true;
371 : }
372 : }
373 :
374 : /************************************************************************/
375 : /* ComputeGeometryColumnType() */
376 : /************************************************************************/
377 :
378 236 : OGRwkbGeometryType OGRFeatherLayer::ComputeGeometryColumnType(int iGeomCol,
379 : int iCol) const
380 : {
381 : // Compute type of geometry column by iterating over each geometry, and
382 : // looking at the WKB geometry type in the first 5 bytes of each geometry.
383 :
384 236 : OGRwkbGeometryType eGeomType = wkbNone;
385 :
386 236 : if (m_poRecordBatchReader != nullptr)
387 : {
388 0 : std::shared_ptr<arrow::RecordBatch> poBatch;
389 : while (true)
390 : {
391 0 : auto status = m_poRecordBatchReader->ReadNext(&poBatch);
392 0 : if (!status.ok())
393 : {
394 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
395 0 : status.message().c_str());
396 0 : break;
397 : }
398 0 : else if (!poBatch)
399 0 : break;
400 0 : eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
401 : iCol, eGeomType);
402 0 : if (eGeomType == wkbUnknown)
403 0 : break;
404 0 : }
405 : }
406 : else
407 : {
408 472 : for (int iBatch = 0;
409 472 : iBatch < m_poRecordBatchFileReader->num_record_batches(); ++iBatch)
410 : {
411 236 : auto result = m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
412 236 : if (!result.ok())
413 : {
414 0 : CPLError(CE_Failure, CPLE_AppDefined,
415 : "ReadRecordBatch() failed: %s",
416 0 : result.status().message().c_str());
417 0 : break;
418 : }
419 236 : eGeomType = ComputeGeometryColumnTypeProcessBatch(*result, iGeomCol,
420 : iCol, eGeomType);
421 236 : if (eGeomType == wkbUnknown)
422 0 : break;
423 : }
424 : }
425 :
426 236 : return eGeomType == wkbNone ? wkbUnknown : eGeomType;
427 : }
428 :
429 : /************************************************************************/
430 : /* BuildDomain() */
431 : /************************************************************************/
432 :
433 : std::unique_ptr<OGRFieldDomain>
434 19 : OGRFeatherLayer::BuildDomain(const std::string &osDomainName,
435 : int iFieldIndex) const
436 : {
437 19 : const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
438 19 : CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
439 : arrow::Type::DICTIONARY);
440 :
441 19 : if (m_poRecordBatchReader)
442 : {
443 6 : if (m_poBatch)
444 : {
445 6 : return BuildDomainFromBatch(osDomainName, m_poBatch, iArrowCol);
446 : }
447 : }
448 13 : else if (m_poRecordBatchFileReader)
449 : {
450 13 : auto result = m_poRecordBatchFileReader->ReadRecordBatch(0);
451 13 : if (!result.ok())
452 : {
453 0 : CPLError(CE_Failure, CPLE_AppDefined,
454 : "ReadRecordBatch() failed: %s",
455 0 : result.status().message().c_str());
456 : }
457 13 : auto poBatch = *result;
458 13 : if (poBatch)
459 : {
460 13 : return BuildDomainFromBatch(osDomainName, poBatch, iArrowCol);
461 : }
462 : }
463 :
464 0 : return nullptr;
465 : }
466 :
467 : /************************************************************************/
468 : /* ResetReading() */
469 : /************************************************************************/
470 :
471 829 : void OGRFeatherLayer::ResetReading()
472 : {
473 829 : if (m_poRecordBatchReader != nullptr && m_iRecordBatch > 0)
474 : {
475 17 : if (m_iRecordBatch == 1 && m_poBatchIdx1)
476 : {
477 : // do nothing
478 : }
479 : else
480 : {
481 16 : m_bResetRecordBatchReaderAsked = true;
482 : }
483 : }
484 829 : OGRArrowLayer::ResetReading();
485 829 : }
486 :
487 : /************************************************************************/
488 : /* ReadNextBatch() */
489 : /************************************************************************/
490 :
491 1013 : bool OGRFeatherLayer::ReadNextBatch()
492 : {
493 1013 : if (m_poRecordBatchFileReader == nullptr)
494 : {
495 121 : return ReadNextBatchStream();
496 : }
497 : else
498 : {
499 892 : return ReadNextBatchFile();
500 : }
501 : }
502 :
503 : /************************************************************************/
504 : /* ReadNextBatchFile() */
505 : /************************************************************************/
506 :
507 892 : bool OGRFeatherLayer::ReadNextBatchFile()
508 : {
509 : while (true)
510 : {
511 892 : ++m_iRecordBatch;
512 892 : if (m_iRecordBatch == m_poRecordBatchFileReader->num_record_batches())
513 : {
514 451 : if (m_iRecordBatch == 1)
515 448 : m_iRecordBatch = 0;
516 : else
517 3 : m_poBatch.reset();
518 451 : return false;
519 : }
520 :
521 441 : m_nIdxInBatch = 0;
522 :
523 : auto result =
524 441 : m_poRecordBatchFileReader->ReadRecordBatch(m_iRecordBatch);
525 441 : if (!result.ok())
526 : {
527 0 : CPLError(CE_Failure, CPLE_AppDefined,
528 : "ReadRecordBatch() failed: %s",
529 0 : result.status().message().c_str());
530 0 : m_poBatch.reset();
531 0 : return false;
532 : }
533 441 : if ((*result)->num_rows() != 0)
534 : {
535 441 : SetBatch(*result);
536 441 : break;
537 : }
538 0 : }
539 :
540 441 : return true;
541 : }
542 :
543 : /************************************************************************/
544 : /* ReadNextBatchStream() */
545 : /************************************************************************/
546 :
547 156 : bool OGRFeatherLayer::ReadNextBatchStream()
548 : {
549 156 : m_nIdxInBatch = 0;
550 :
551 312 : std::shared_ptr<arrow::RecordBatch> poNextBatch;
552 0 : do
553 : {
554 156 : if (m_iRecordBatch == 0 && m_poBatchIdx0)
555 : {
556 1 : SetBatch(m_poBatchIdx0);
557 1 : m_iRecordBatch = 1;
558 103 : return true;
559 : }
560 :
561 155 : else if (m_iRecordBatch == 1 && m_poBatchIdx1)
562 : {
563 1 : SetBatch(m_poBatchIdx1);
564 1 : m_iRecordBatch = 2;
565 1 : return true;
566 : }
567 :
568 154 : else if (m_bSingleBatch)
569 : {
570 83 : CPLAssert(m_iRecordBatch == 0);
571 83 : CPLAssert(m_poBatch != nullptr);
572 83 : return false;
573 : }
574 :
575 71 : if (m_bResetRecordBatchReaderAsked)
576 : {
577 13 : if (!m_bSeekable)
578 : {
579 1 : CPLError(CE_Failure, CPLE_NotSupported,
580 : "Attempting to rewind non-seekable stream");
581 1 : return false;
582 : }
583 12 : if (!ResetRecordBatchReader())
584 0 : return false;
585 12 : m_bResetRecordBatchReaderAsked = false;
586 : }
587 :
588 70 : CPLAssert(m_poRecordBatchReader);
589 :
590 70 : ++m_iRecordBatch;
591 :
592 70 : poNextBatch.reset();
593 70 : auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
594 70 : if (!status.ok())
595 : {
596 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
597 0 : status.message().c_str());
598 0 : poNextBatch.reset();
599 : }
600 70 : if (poNextBatch == nullptr)
601 : {
602 17 : if (m_iRecordBatch == 1)
603 : {
604 3 : m_iRecordBatch = 0;
605 3 : m_bSingleBatch = true;
606 : }
607 : else
608 : {
609 14 : m_poBatch.reset();
610 14 : m_poBatchColumns.clear();
611 : }
612 17 : return false;
613 : }
614 53 : } while (poNextBatch->num_rows() == 0);
615 :
616 53 : SetBatch(poNextBatch);
617 :
618 53 : return true;
619 : }
620 :
621 : /************************************************************************/
622 : /* TryToCacheFirstTwoBatches() */
623 : /************************************************************************/
624 :
625 1 : void OGRFeatherLayer::TryToCacheFirstTwoBatches()
626 : {
627 2 : if (m_poRecordBatchReader != nullptr && m_iRecordBatch <= 0 &&
628 2 : !m_bSingleBatch && m_poBatchIdx0 == nullptr)
629 : {
630 1 : ResetReading();
631 1 : if (!m_poBatch)
632 : {
633 0 : CPL_IGNORE_RET_VAL(ReadNextBatchStream());
634 : }
635 1 : if (m_poBatch)
636 : {
637 2 : auto poBatchIdx0 = m_poBatch;
638 1 : if (ReadNextBatchStream())
639 : {
640 1 : CPLAssert(m_iRecordBatch == 1);
641 1 : m_poBatchIdx0 = poBatchIdx0;
642 1 : m_poBatchIdx1 = m_poBatch;
643 1 : SetBatch(poBatchIdx0);
644 1 : ResetReading();
645 : }
646 1 : ResetReading();
647 : }
648 : }
649 1 : }
650 :
651 : /************************************************************************/
652 : /* CanPostFilterArrowArray() */
653 : /************************************************************************/
654 :
655 20 : bool OGRFeatherLayer::CanPostFilterArrowArray(
656 : const struct ArrowSchema *schema) const
657 : {
658 20 : if (m_poRecordBatchReader)
659 10 : return false;
660 10 : return OGRArrowLayer::CanPostFilterArrowArray(schema);
661 : }
662 :
663 : /************************************************************************/
664 : /* InvalidateCachedBatches() */
665 : /************************************************************************/
666 :
667 109 : void OGRFeatherLayer::InvalidateCachedBatches()
668 : {
669 109 : if (m_poRecordBatchFileReader)
670 : {
671 63 : m_iRecordBatch = -1;
672 63 : ResetReading();
673 : }
674 109 : }
675 :
676 : /************************************************************************/
677 : /* GetFeatureCount() */
678 : /************************************************************************/
679 :
680 336 : GIntBig OGRFeatherLayer::GetFeatureCount(int bForce)
681 : {
682 628 : if (m_poRecordBatchFileReader != nullptr && m_poAttrQuery == nullptr &&
683 292 : m_poFilterGeom == nullptr)
684 : {
685 288 : auto result = m_poRecordBatchFileReader->CountRows();
686 288 : if (result.ok())
687 288 : return *result;
688 : }
689 48 : else if (m_poRecordBatchReader != nullptr)
690 : {
691 36 : if (!m_bSeekable && !bForce)
692 : {
693 1 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
694 : {
695 1 : TryToCacheFirstTwoBatches();
696 : }
697 :
698 1 : if (!m_bSingleBatch)
699 : {
700 1 : CPLError(
701 : CE_Failure, CPLE_AppDefined,
702 : "GetFeatureCount() cannot be run in non-forced mode on "
703 : "a non-seekable file made of several batches");
704 1 : return -1;
705 : }
706 : }
707 :
708 35 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
709 : {
710 23 : GIntBig nFeatures = 0;
711 23 : ResetReading();
712 23 : if (!m_poBatch)
713 3 : ReadNextBatchStream();
714 31 : while (m_poBatch)
715 : {
716 31 : nFeatures += m_poBatch->num_rows();
717 31 : if (!ReadNextBatchStream())
718 23 : break;
719 : }
720 23 : ResetReading();
721 23 : return nFeatures;
722 : }
723 : }
724 24 : return OGRLayer::GetFeatureCount(bForce);
725 : }
726 :
727 : /************************************************************************/
728 : /* CanRunNonForcedGetExtent() */
729 : /************************************************************************/
730 :
731 0 : bool OGRFeatherLayer::CanRunNonForcedGetExtent()
732 : {
733 0 : if (m_bSeekable)
734 0 : return true;
735 0 : TryToCacheFirstTwoBatches();
736 0 : if (!m_bSingleBatch)
737 : {
738 0 : CPLError(CE_Failure, CPLE_AppDefined,
739 : "GetExtent() cannot be run in non-forced mode on "
740 : "a non-seekable file made of several batches");
741 0 : return false;
742 : }
743 0 : return true;
744 : }
745 :
746 : /************************************************************************/
747 : /* TestCapability() */
748 : /************************************************************************/
749 :
750 296 : int OGRFeatherLayer::TestCapability(const char *pszCap) const
751 : {
752 296 : if (EQUAL(pszCap, OLCFastFeatureCount))
753 : {
754 28 : return m_bSeekable && m_poAttrQuery == nullptr &&
755 28 : m_poFilterGeom == nullptr;
756 : }
757 :
758 278 : if (EQUAL(pszCap, OLCMeasuredGeometries))
759 16 : return true;
760 262 : if (EQUAL(pszCap, OLCZGeometries))
761 12 : return true;
762 :
763 250 : return OGRArrowLayer::TestCapability(pszCap);
764 : }
765 :
766 : /************************************************************************/
767 : /* GetMetadataItem() */
768 : /************************************************************************/
769 :
770 259 : const char *OGRFeatherLayer::GetMetadataItem(const char *pszName,
771 : const char *pszDomain)
772 : {
773 : // Mostly for unit test purposes
774 259 : if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_"))
775 : {
776 9 : if (EQUAL(pszName, "FORMAT"))
777 : {
778 5 : return m_poRecordBatchFileReader ? "FILE" : "STREAM";
779 : }
780 4 : if (m_poRecordBatchFileReader != nullptr)
781 : {
782 4 : int iBatch = -1;
783 4 : if (EQUAL(pszName, "NUM_RECORD_BATCHES"))
784 : {
785 1 : return CPLSPrintf(
786 5 : "%d", m_poRecordBatchFileReader->num_record_batches());
787 : }
788 6 : else if (sscanf(pszName, "RECORD_BATCHES[%d]", &iBatch) == 1 &&
789 3 : strstr(pszName, ".NUM_ROWS"))
790 : {
791 : auto result =
792 6 : m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
793 3 : if (!result.ok())
794 : {
795 0 : return nullptr;
796 : }
797 3 : return CPLSPrintf("%" PRId64, (*result)->num_rows());
798 : }
799 : }
800 0 : return nullptr;
801 : }
802 250 : else if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
803 : {
804 : const auto kv_metadata =
805 5 : (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
806 8 : : m_poRecordBatchReader->schema())
807 10 : ->metadata();
808 5 : if (kv_metadata && kv_metadata->Contains(pszName))
809 : {
810 5 : auto metadataItem = kv_metadata->Get(pszName);
811 5 : if (metadataItem.ok())
812 : {
813 5 : return CPLSPrintf("%s", metadataItem->c_str());
814 : }
815 : }
816 0 : return nullptr;
817 : }
818 476 : else if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
819 231 : EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
820 : {
821 2 : const auto kv_metadata = m_poRecordBatchFileReader->metadata();
822 1 : if (kv_metadata && kv_metadata->Contains(pszName))
823 : {
824 1 : auto metadataItem = kv_metadata->Get(pszName);
825 1 : if (metadataItem.ok())
826 : {
827 1 : return CPLSPrintf("%s", metadataItem->c_str());
828 : }
829 : }
830 0 : return nullptr;
831 : }
832 244 : return OGRLayer::GetMetadataItem(pszName, pszDomain);
833 : }
834 :
835 : /************************************************************************/
836 : /* GetMetadata() */
837 : /************************************************************************/
838 :
839 144 : CSLConstList OGRFeatherLayer::GetMetadata(const char *pszDomain)
840 : {
841 : // Mostly for unit test purposes
842 144 : if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
843 : {
844 5 : m_aosFeatherMetadata.Clear();
845 : const auto kv_metadata =
846 5 : (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
847 8 : : m_poRecordBatchReader->schema())
848 10 : ->metadata();
849 5 : if (kv_metadata)
850 : {
851 11 : for (const auto &kv : kv_metadata->sorted_pairs())
852 : {
853 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
854 6 : kv.second.c_str());
855 : }
856 : }
857 5 : return m_aosFeatherMetadata.List();
858 : }
859 261 : if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
860 122 : EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
861 : {
862 2 : m_aosFeatherMetadata.Clear();
863 4 : const auto kv_metadata = m_poRecordBatchFileReader->metadata();
864 2 : if (kv_metadata)
865 : {
866 3 : for (const auto &kv : kv_metadata->sorted_pairs())
867 : {
868 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
869 1 : kv.second.c_str());
870 : }
871 : }
872 2 : return m_aosFeatherMetadata.List();
873 : }
874 137 : return OGRLayer::GetMetadata(pszDomain);
875 : }
|