Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "cpl_json.h"
14 : #include "cpl_time.h"
15 : #include "gdal_pam.h"
16 : #include "ogrsf_frmts.h"
17 : #include "ogr_p.h"
18 :
19 : #include <cinttypes>
20 : #include <limits>
21 : #include <map>
22 : #include <set>
23 : #include <utility>
24 :
25 : #include "ogr_feather.h"
26 :
27 : #include "../arrow_common/ograrrowlayer.hpp"
28 : #include "../arrow_common/ograrrowdataset.hpp"
29 :
30 : /************************************************************************/
31 : /* OGRFeatherLayer() */
32 : /************************************************************************/
33 :
34 536 : OGRFeatherLayer::OGRFeatherLayer(
35 : OGRFeatherDataset *poDS, const char *pszLayerName,
36 : std::shared_ptr<arrow::ipc::RecordBatchFileReader> &poRecordBatchFileReader,
37 536 : CSLConstList papszOpenOptions)
38 : : OGRArrowLayer(poDS, pszLayerName,
39 536 : CPLTestBool(CSLFetchNameValueDef(
40 : papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
41 1072 : m_poDS(poDS), m_poRecordBatchFileReader(poRecordBatchFileReader)
42 : {
43 536 : EstablishFeatureDefn();
44 536 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
45 : m_poFeatureDefn->GetGeomFieldCount());
46 536 : }
47 :
48 : /************************************************************************/
49 : /* OGRFeatherLayer() */
50 : /************************************************************************/
51 :
52 10 : OGRFeatherLayer::OGRFeatherLayer(
53 : OGRFeatherDataset *poDS, const char *pszLayerName,
54 : std::shared_ptr<arrow::io::RandomAccessFile> poFile, bool bSeekable,
55 : const arrow::ipc::IpcReadOptions &oOptions,
56 : std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
57 : &poRecordBatchStreamReader,
58 10 : CSLConstList papszOpenOptions)
59 : : OGRArrowLayer(poDS, pszLayerName,
60 10 : CPLTestBool(CSLFetchNameValueDef(
61 : papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
62 10 : m_poDS(poDS), m_poFile(std::move(poFile)), m_bSeekable(bSeekable),
63 20 : m_oOptions(oOptions), m_poRecordBatchReader(poRecordBatchStreamReader)
64 : {
65 10 : EstablishFeatureDefn();
66 10 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
67 : m_poFeatureDefn->GetGeomFieldCount());
68 10 : }
69 :
70 : /************************************************************************/
71 : /* GetDataset() */
72 : /************************************************************************/
73 :
74 112 : GDALDataset *OGRFeatherLayer::GetDataset()
75 : {
76 112 : return m_poDS;
77 : }
78 :
79 : /************************************************************************/
80 : /* LoadGeoMetadata() */
81 : /************************************************************************/
82 :
83 546 : void OGRFeatherLayer::LoadGeoMetadata(
84 : const arrow::KeyValueMetadata *kv_metadata, const std::string &key)
85 : {
86 546 : if (kv_metadata && kv_metadata->Contains(key))
87 : {
88 1062 : auto geo = kv_metadata->Get(key);
89 531 : if (geo.ok())
90 : {
91 1062 : CPLJSONDocument oDoc;
92 531 : if (oDoc.LoadMemory(*geo))
93 : {
94 1062 : auto oRoot = oDoc.GetRoot();
95 1593 : const auto osVersion = oRoot.GetString("schema_version");
96 531 : if (key != GDAL_GEO_FOOTER_KEY && osVersion != "0.1.0")
97 : {
98 398 : CPLDebug("FEATHER",
99 : "schema_version = %s not explicitly handled by "
100 : "the driver",
101 : osVersion.c_str());
102 : }
103 1593 : auto oColumns = oRoot.GetObj("columns");
104 531 : if (oColumns.IsValid())
105 : {
106 1062 : for (const auto &oColumn : oColumns.GetChildren())
107 : {
108 531 : m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
109 : }
110 : }
111 : }
112 : else
113 : {
114 0 : CPLError(CE_Warning, CPLE_AppDefined,
115 : "Cannot parse 'geo' metadata");
116 : }
117 : }
118 : }
119 546 : }
120 :
121 : /************************************************************************/
122 : /* EstablishFeatureDefn() */
123 : /************************************************************************/
124 :
125 546 : void OGRFeatherLayer::EstablishFeatureDefn()
126 : {
127 1092 : m_poSchema = m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
128 1092 : : m_poRecordBatchReader->schema();
129 546 : const auto &kv_metadata = m_poSchema->metadata();
130 :
131 : #ifdef DEBUG
132 546 : if (kv_metadata)
133 : {
134 1075 : for (const auto &keyValue : kv_metadata->sorted_pairs())
135 : {
136 539 : CPLDebug("FEATHER", "%s = %s", keyValue.first.c_str(),
137 : keyValue.second.c_str());
138 : }
139 : }
140 : #endif
141 :
142 : auto poFooterMetadata = m_poRecordBatchFileReader
143 536 : ? m_poRecordBatchFileReader->metadata()
144 1628 : : nullptr;
145 670 : if (poFooterMetadata && poFooterMetadata->Contains(GDAL_GEO_FOOTER_KEY) &&
146 124 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_READ_GDAL_FOOTER", "YES")))
147 : {
148 124 : LoadGeoMetadata(poFooterMetadata.get(), GDAL_GEO_FOOTER_KEY);
149 : }
150 : else
151 : {
152 422 : LoadGeoMetadata(kv_metadata.get(), "geo");
153 : }
154 : const auto oMapFieldNameToGDALSchemaFieldDefn =
155 1092 : LoadGDALSchema(kv_metadata.get());
156 :
157 546 : const auto &fields = m_poSchema->fields();
158 4167 : for (int i = 0; i < m_poSchema->num_fields(); ++i)
159 : {
160 3621 : const auto &field = fields[i];
161 3621 : const auto &fieldName = field->name();
162 :
163 3621 : const auto &field_kv_metadata = field->metadata();
164 3621 : std::string osExtensionName;
165 3621 : std::string osExtensionMetadata;
166 3621 : if (field->type()->id() == arrow::Type::EXTENSION)
167 : {
168 : osExtensionName =
169 127 : cpl::down_cast<arrow::ExtensionType *>(field->type().get())
170 127 : ->extension_name();
171 : }
172 3494 : else if (field_kv_metadata)
173 : {
174 : auto extension_name =
175 1040 : field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
176 520 : if (extension_name.ok())
177 : {
178 518 : osExtensionName = *extension_name;
179 : }
180 :
181 : auto extension_metadata =
182 1040 : field_kv_metadata->Get(ARROW_EXTENSION_METADATA_KEY);
183 520 : if (extension_metadata.ok())
184 : {
185 384 : osExtensionMetadata = *extension_metadata;
186 : }
187 : #ifdef DEBUG
188 520 : CPLDebug("FEATHER", "Metadata field %s:", fieldName.c_str());
189 1424 : for (const auto &keyValue : field_kv_metadata->sorted_pairs())
190 : {
191 904 : CPLDebug("FEATHER", " %s = %s", keyValue.first.c_str(),
192 : keyValue.second.c_str());
193 : }
194 : #endif
195 : }
196 :
197 3621 : if (!m_osFIDColumn.empty() && fieldName == m_osFIDColumn)
198 : {
199 6 : m_iFIDArrowColumn = i;
200 6 : continue;
201 : }
202 :
203 3615 : bool bRegularField = true;
204 3615 : auto oIter = m_oMapGeometryColumns.find(fieldName);
205 3615 : if (oIter != m_oMapGeometryColumns.end() || !osExtensionName.empty())
206 : {
207 1322 : CPLJSONObject oJSONDef;
208 661 : if (oIter != m_oMapGeometryColumns.end())
209 531 : oJSONDef = oIter->second;
210 1983 : auto osEncoding = oJSONDef.GetString("encoding");
211 661 : if (osEncoding.empty() && !osExtensionName.empty())
212 130 : osEncoding = osExtensionName;
213 :
214 661 : OGRwkbGeometryType eGeomType = wkbUnknown;
215 661 : auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
216 661 : if (IsValidGeometryEncoding(field, osEncoding,
217 1322 : oIter != m_oMapGeometryColumns.end(),
218 : eGeomType, eGeomEncoding))
219 : {
220 532 : bRegularField = false;
221 1064 : OGRGeomFieldDefn oField(fieldName.c_str(), wkbUnknown);
222 :
223 1596 : auto osCRS = oJSONDef.GetString("crs");
224 :
225 : #if ARROW_VERSION_MAJOR >= 21
226 : if (osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
227 : osExtensionMetadata.empty() &&
228 : field->type()->id() == arrow::Type::EXTENSION)
229 : {
230 : const auto arrowWkb =
231 : std::dynamic_pointer_cast<OGRGeoArrowWkbExtensionType>(
232 : field->type());
233 : if (arrowWkb)
234 : {
235 : osExtensionMetadata = arrowWkb->Serialize();
236 : }
237 : }
238 : #endif
239 :
240 927 : if (osCRS.empty() &&
241 395 : osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
242 160 : !osExtensionMetadata.empty() &&
243 927 : osExtensionMetadata[0] == '{' &&
244 0 : osExtensionMetadata.back() == '}')
245 : {
246 0 : CPLJSONDocument oDoc;
247 0 : if (oDoc.LoadMemory(osExtensionMetadata))
248 : {
249 0 : auto jCrs = oDoc.GetRoot()["crs"];
250 0 : if (jCrs.GetType() == CPLJSONObject::Type::Object)
251 : {
252 : osCRS =
253 0 : jCrs.Format(CPLJSONObject::PrettyFormat::Plain);
254 : }
255 0 : else if (jCrs.GetType() == CPLJSONObject::Type::String)
256 : {
257 0 : osCRS = jCrs.ToString();
258 : }
259 0 : if (oDoc.GetRoot()["edges"].ToString() == "spherical")
260 : {
261 0 : SetMetadataItem("EDGES", "SPHERICAL");
262 : }
263 : }
264 : }
265 :
266 532 : if (osCRS.empty())
267 : {
268 : #if 0
269 : CPLError(CE_Warning, CPLE_AppDefined,
270 : "Missing required 'crs' field for geometry column %s",
271 : fieldName.c_str());
272 : #endif
273 : }
274 : else
275 : {
276 137 : OGRSpatialReference *poSRS = new OGRSpatialReference();
277 137 : poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
278 :
279 137 : if (poSRS->SetFromUserInput(
280 : osCRS.c_str(),
281 : OGRSpatialReference::
282 137 : SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
283 : OGRERR_NONE)
284 : {
285 : const char *pszAuthName =
286 137 : poSRS->GetAuthorityName(nullptr);
287 : const char *pszAuthCode =
288 137 : poSRS->GetAuthorityCode(nullptr);
289 137 : if (pszAuthName && pszAuthCode &&
290 137 : EQUAL(pszAuthName, "OGC") &&
291 0 : EQUAL(pszAuthCode, "CRS84"))
292 : {
293 0 : poSRS->importFromEPSG(4326);
294 : }
295 :
296 137 : const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
297 137 : if (dfCoordEpoch > 0)
298 2 : poSRS->SetCoordinateEpoch(dfCoordEpoch);
299 :
300 137 : oField.SetSpatialRef(poSRS);
301 : }
302 137 : poSRS->Release();
303 : }
304 :
305 : // m_aeGeomEncoding be filled before calling
306 : // ComputeGeometryColumnType()
307 532 : m_aeGeomEncoding.push_back(eGeomEncoding);
308 532 : if (eGeomType == wkbUnknown)
309 : {
310 708 : auto osType = oJSONDef.GetString("geometry_type");
311 236 : if (osType.empty())
312 236 : osType = oJSONDef.GetString("gdal:geometry_type");
313 472 : if (m_bSeekable && osType.empty() &&
314 236 : CPLTestBool(CPLGetConfigOption(
315 : "OGR_ARROW_COMPUTE_GEOMETRY_TYPE", "YES")))
316 : {
317 236 : eGeomType = ComputeGeometryColumnType(
318 236 : m_poFeatureDefn->GetGeomFieldCount(), i);
319 236 : if (m_poRecordBatchReader)
320 0 : ResetRecordBatchReader();
321 : }
322 : else
323 0 : eGeomType = GetGeometryTypeFromString(osType);
324 : }
325 :
326 532 : oField.SetType(eGeomType);
327 532 : oField.SetNullable(field->nullable());
328 532 : m_poFeatureDefn->AddGeomFieldDefn(&oField);
329 532 : m_anMapGeomFieldIndexToArrowColumn.push_back(i);
330 : }
331 : }
332 :
333 3615 : if (bRegularField)
334 : {
335 3083 : CreateFieldFromSchema(field, {i},
336 : oMapFieldNameToGDALSchemaFieldDefn);
337 : }
338 : }
339 :
340 546 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
341 : m_poFeatureDefn->GetFieldCount());
342 546 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
343 : m_poFeatureDefn->GetGeomFieldCount());
344 546 : }
345 :
346 : /************************************************************************/
347 : /* ResetRecordBatchReader() */
348 : /************************************************************************/
349 :
350 12 : bool OGRFeatherLayer::ResetRecordBatchReader()
351 : {
352 12 : const auto nPos = *(m_poFile->Tell());
353 12 : CPL_IGNORE_RET_VAL(m_poFile->Seek(0));
354 : auto result =
355 24 : arrow::ipc::RecordBatchStreamReader::Open(m_poFile, m_oOptions);
356 12 : if (!result.ok())
357 : {
358 0 : CPLError(CE_Failure, CPLE_AppDefined,
359 : "RecordBatchStreamReader::Open() failed with %s",
360 0 : result.status().message().c_str());
361 0 : CPL_IGNORE_RET_VAL(m_poFile->Seek(nPos));
362 0 : return false;
363 : }
364 : else
365 : {
366 12 : m_poRecordBatchReader = *result;
367 12 : return true;
368 : }
369 : }
370 :
371 : /************************************************************************/
372 : /* ComputeGeometryColumnType() */
373 : /************************************************************************/
374 :
375 236 : OGRwkbGeometryType OGRFeatherLayer::ComputeGeometryColumnType(int iGeomCol,
376 : int iCol) const
377 : {
378 : // Compute type of geometry column by iterating over each geometry, and
379 : // looking at the WKB geometry type in the first 5 bytes of each geometry.
380 :
381 236 : OGRwkbGeometryType eGeomType = wkbNone;
382 :
383 236 : if (m_poRecordBatchReader != nullptr)
384 : {
385 0 : std::shared_ptr<arrow::RecordBatch> poBatch;
386 : while (true)
387 : {
388 0 : auto status = m_poRecordBatchReader->ReadNext(&poBatch);
389 0 : if (!status.ok())
390 : {
391 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
392 0 : status.message().c_str());
393 0 : break;
394 : }
395 0 : else if (!poBatch)
396 0 : break;
397 0 : eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
398 : iCol, eGeomType);
399 0 : if (eGeomType == wkbUnknown)
400 0 : break;
401 0 : }
402 : }
403 : else
404 : {
405 472 : for (int iBatch = 0;
406 472 : iBatch < m_poRecordBatchFileReader->num_record_batches(); ++iBatch)
407 : {
408 236 : auto result = m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
409 236 : if (!result.ok())
410 : {
411 0 : CPLError(CE_Failure, CPLE_AppDefined,
412 : "ReadRecordBatch() failed: %s",
413 0 : result.status().message().c_str());
414 0 : break;
415 : }
416 236 : eGeomType = ComputeGeometryColumnTypeProcessBatch(*result, iGeomCol,
417 : iCol, eGeomType);
418 236 : if (eGeomType == wkbUnknown)
419 0 : break;
420 : }
421 : }
422 :
423 236 : return eGeomType == wkbNone ? wkbUnknown : eGeomType;
424 : }
425 :
426 : /************************************************************************/
427 : /* BuildDomain() */
428 : /************************************************************************/
429 :
430 : std::unique_ptr<OGRFieldDomain>
431 19 : OGRFeatherLayer::BuildDomain(const std::string &osDomainName,
432 : int iFieldIndex) const
433 : {
434 19 : const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
435 19 : CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
436 : arrow::Type::DICTIONARY);
437 :
438 19 : if (m_poRecordBatchReader)
439 : {
440 6 : if (m_poBatch)
441 : {
442 6 : return BuildDomainFromBatch(osDomainName, m_poBatch, iArrowCol);
443 : }
444 : }
445 13 : else if (m_poRecordBatchFileReader)
446 : {
447 13 : auto result = m_poRecordBatchFileReader->ReadRecordBatch(0);
448 13 : if (!result.ok())
449 : {
450 0 : CPLError(CE_Failure, CPLE_AppDefined,
451 : "ReadRecordBatch() failed: %s",
452 0 : result.status().message().c_str());
453 : }
454 13 : auto poBatch = *result;
455 13 : if (poBatch)
456 : {
457 13 : return BuildDomainFromBatch(osDomainName, poBatch, iArrowCol);
458 : }
459 : }
460 :
461 0 : return nullptr;
462 : }
463 :
464 : /************************************************************************/
465 : /* ResetReading() */
466 : /************************************************************************/
467 :
468 829 : void OGRFeatherLayer::ResetReading()
469 : {
470 829 : if (m_poRecordBatchReader != nullptr && m_iRecordBatch > 0)
471 : {
472 17 : if (m_iRecordBatch == 1 && m_poBatchIdx1)
473 : {
474 : // do nothing
475 : }
476 : else
477 : {
478 16 : m_bResetRecordBatchReaderAsked = true;
479 : }
480 : }
481 829 : OGRArrowLayer::ResetReading();
482 829 : }
483 :
484 : /************************************************************************/
485 : /* ReadNextBatch() */
486 : /************************************************************************/
487 :
488 1011 : bool OGRFeatherLayer::ReadNextBatch()
489 : {
490 1011 : if (m_poRecordBatchFileReader == nullptr)
491 : {
492 121 : return ReadNextBatchStream();
493 : }
494 : else
495 : {
496 890 : return ReadNextBatchFile();
497 : }
498 : }
499 :
500 : /************************************************************************/
501 : /* ReadNextBatchFile() */
502 : /************************************************************************/
503 :
504 890 : bool OGRFeatherLayer::ReadNextBatchFile()
505 : {
506 : while (true)
507 : {
508 890 : ++m_iRecordBatch;
509 890 : if (m_iRecordBatch == m_poRecordBatchFileReader->num_record_batches())
510 : {
511 451 : if (m_iRecordBatch == 1)
512 448 : m_iRecordBatch = 0;
513 : else
514 3 : m_poBatch.reset();
515 451 : return false;
516 : }
517 :
518 439 : m_nIdxInBatch = 0;
519 :
520 : auto result =
521 439 : m_poRecordBatchFileReader->ReadRecordBatch(m_iRecordBatch);
522 439 : if (!result.ok())
523 : {
524 0 : CPLError(CE_Failure, CPLE_AppDefined,
525 : "ReadRecordBatch() failed: %s",
526 0 : result.status().message().c_str());
527 0 : m_poBatch.reset();
528 0 : return false;
529 : }
530 439 : if ((*result)->num_rows() != 0)
531 : {
532 439 : SetBatch(*result);
533 439 : break;
534 : }
535 0 : }
536 :
537 439 : return true;
538 : }
539 :
540 : /************************************************************************/
541 : /* ReadNextBatchStream() */
542 : /************************************************************************/
543 :
544 156 : bool OGRFeatherLayer::ReadNextBatchStream()
545 : {
546 156 : m_nIdxInBatch = 0;
547 :
548 312 : std::shared_ptr<arrow::RecordBatch> poNextBatch;
549 0 : do
550 : {
551 156 : if (m_iRecordBatch == 0 && m_poBatchIdx0)
552 : {
553 1 : SetBatch(m_poBatchIdx0);
554 1 : m_iRecordBatch = 1;
555 103 : return true;
556 : }
557 :
558 155 : else if (m_iRecordBatch == 1 && m_poBatchIdx1)
559 : {
560 1 : SetBatch(m_poBatchIdx1);
561 1 : m_iRecordBatch = 2;
562 1 : return true;
563 : }
564 :
565 154 : else if (m_bSingleBatch)
566 : {
567 83 : CPLAssert(m_iRecordBatch == 0);
568 83 : CPLAssert(m_poBatch != nullptr);
569 83 : return false;
570 : }
571 :
572 71 : if (m_bResetRecordBatchReaderAsked)
573 : {
574 13 : if (!m_bSeekable)
575 : {
576 1 : CPLError(CE_Failure, CPLE_NotSupported,
577 : "Attempting to rewind non-seekable stream");
578 1 : return false;
579 : }
580 12 : if (!ResetRecordBatchReader())
581 0 : return false;
582 12 : m_bResetRecordBatchReaderAsked = false;
583 : }
584 :
585 70 : CPLAssert(m_poRecordBatchReader);
586 :
587 70 : ++m_iRecordBatch;
588 :
589 70 : poNextBatch.reset();
590 70 : auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
591 70 : if (!status.ok())
592 : {
593 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
594 0 : status.message().c_str());
595 0 : poNextBatch.reset();
596 : }
597 70 : if (poNextBatch == nullptr)
598 : {
599 17 : if (m_iRecordBatch == 1)
600 : {
601 3 : m_iRecordBatch = 0;
602 3 : m_bSingleBatch = true;
603 : }
604 : else
605 : {
606 14 : m_poBatch.reset();
607 14 : m_poBatchColumns.clear();
608 : }
609 17 : return false;
610 : }
611 53 : } while (poNextBatch->num_rows() == 0);
612 :
613 53 : SetBatch(poNextBatch);
614 :
615 53 : return true;
616 : }
617 :
618 : /************************************************************************/
619 : /* TryToCacheFirstTwoBatches() */
620 : /************************************************************************/
621 :
622 1 : void OGRFeatherLayer::TryToCacheFirstTwoBatches()
623 : {
624 2 : if (m_poRecordBatchReader != nullptr && m_iRecordBatch <= 0 &&
625 2 : !m_bSingleBatch && m_poBatchIdx0 == nullptr)
626 : {
627 1 : ResetReading();
628 1 : if (!m_poBatch)
629 : {
630 0 : CPL_IGNORE_RET_VAL(ReadNextBatchStream());
631 : }
632 1 : if (m_poBatch)
633 : {
634 2 : auto poBatchIdx0 = m_poBatch;
635 1 : if (ReadNextBatchStream())
636 : {
637 1 : CPLAssert(m_iRecordBatch == 1);
638 1 : m_poBatchIdx0 = poBatchIdx0;
639 1 : m_poBatchIdx1 = m_poBatch;
640 1 : SetBatch(poBatchIdx0);
641 1 : ResetReading();
642 : }
643 1 : ResetReading();
644 : }
645 : }
646 1 : }
647 :
648 : /************************************************************************/
649 : /* CanPostFilterArrowArray() */
650 : /************************************************************************/
651 :
652 20 : bool OGRFeatherLayer::CanPostFilterArrowArray(
653 : const struct ArrowSchema *schema) const
654 : {
655 20 : if (m_poRecordBatchReader)
656 10 : return false;
657 10 : return OGRArrowLayer::CanPostFilterArrowArray(schema);
658 : }
659 :
660 : /************************************************************************/
661 : /* InvalidateCachedBatches() */
662 : /************************************************************************/
663 :
664 109 : void OGRFeatherLayer::InvalidateCachedBatches()
665 : {
666 109 : if (m_poRecordBatchFileReader)
667 : {
668 63 : m_iRecordBatch = -1;
669 63 : ResetReading();
670 : }
671 109 : }
672 :
673 : /************************************************************************/
674 : /* GetFeatureCount() */
675 : /************************************************************************/
676 :
677 336 : GIntBig OGRFeatherLayer::GetFeatureCount(int bForce)
678 : {
679 628 : if (m_poRecordBatchFileReader != nullptr && m_poAttrQuery == nullptr &&
680 292 : m_poFilterGeom == nullptr)
681 : {
682 288 : auto result = m_poRecordBatchFileReader->CountRows();
683 288 : if (result.ok())
684 288 : return *result;
685 : }
686 48 : else if (m_poRecordBatchReader != nullptr)
687 : {
688 36 : if (!m_bSeekable && !bForce)
689 : {
690 1 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
691 : {
692 1 : TryToCacheFirstTwoBatches();
693 : }
694 :
695 1 : if (!m_bSingleBatch)
696 : {
697 1 : CPLError(
698 : CE_Failure, CPLE_AppDefined,
699 : "GetFeatureCount() cannot be run in non-forced mode on "
700 : "a non-seekable file made of several batches");
701 1 : return -1;
702 : }
703 : }
704 :
705 35 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
706 : {
707 23 : GIntBig nFeatures = 0;
708 23 : ResetReading();
709 23 : if (!m_poBatch)
710 3 : ReadNextBatchStream();
711 31 : while (m_poBatch)
712 : {
713 31 : nFeatures += m_poBatch->num_rows();
714 31 : if (!ReadNextBatchStream())
715 23 : break;
716 : }
717 23 : ResetReading();
718 23 : return nFeatures;
719 : }
720 : }
721 24 : return OGRLayer::GetFeatureCount(bForce);
722 : }
723 :
724 : /************************************************************************/
725 : /* CanRunNonForcedGetExtent() */
726 : /************************************************************************/
727 :
728 0 : bool OGRFeatherLayer::CanRunNonForcedGetExtent()
729 : {
730 0 : if (m_bSeekable)
731 0 : return true;
732 0 : TryToCacheFirstTwoBatches();
733 0 : if (!m_bSingleBatch)
734 : {
735 0 : CPLError(CE_Failure, CPLE_AppDefined,
736 : "GetExtent() cannot be run in non-forced mode on "
737 : "a non-seekable file made of several batches");
738 0 : return false;
739 : }
740 0 : return true;
741 : }
742 :
743 : /************************************************************************/
744 : /* TestCapability() */
745 : /************************************************************************/
746 :
747 296 : int OGRFeatherLayer::TestCapability(const char *pszCap) const
748 : {
749 296 : if (EQUAL(pszCap, OLCFastFeatureCount))
750 : {
751 28 : return m_bSeekable && m_poAttrQuery == nullptr &&
752 28 : m_poFilterGeom == nullptr;
753 : }
754 :
755 278 : if (EQUAL(pszCap, OLCMeasuredGeometries))
756 16 : return true;
757 262 : if (EQUAL(pszCap, OLCZGeometries))
758 12 : return true;
759 :
760 250 : return OGRArrowLayer::TestCapability(pszCap);
761 : }
762 :
763 : /************************************************************************/
764 : /* GetMetadataItem() */
765 : /************************************************************************/
766 :
767 259 : const char *OGRFeatherLayer::GetMetadataItem(const char *pszName,
768 : const char *pszDomain)
769 : {
770 : // Mostly for unit test purposes
771 259 : if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_"))
772 : {
773 9 : if (EQUAL(pszName, "FORMAT"))
774 : {
775 5 : return m_poRecordBatchFileReader ? "FILE" : "STREAM";
776 : }
777 4 : if (m_poRecordBatchFileReader != nullptr)
778 : {
779 4 : int iBatch = -1;
780 4 : if (EQUAL(pszName, "NUM_RECORD_BATCHES"))
781 : {
782 1 : return CPLSPrintf(
783 5 : "%d", m_poRecordBatchFileReader->num_record_batches());
784 : }
785 6 : else if (sscanf(pszName, "RECORD_BATCHES[%d]", &iBatch) == 1 &&
786 3 : strstr(pszName, ".NUM_ROWS"))
787 : {
788 : auto result =
789 6 : m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
790 3 : if (!result.ok())
791 : {
792 0 : return nullptr;
793 : }
794 3 : return CPLSPrintf("%" PRId64, (*result)->num_rows());
795 : }
796 : }
797 0 : return nullptr;
798 : }
799 250 : else if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
800 : {
801 : const auto kv_metadata =
802 5 : (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
803 8 : : m_poRecordBatchReader->schema())
804 10 : ->metadata();
805 5 : if (kv_metadata && kv_metadata->Contains(pszName))
806 : {
807 5 : auto metadataItem = kv_metadata->Get(pszName);
808 5 : if (metadataItem.ok())
809 : {
810 5 : return CPLSPrintf("%s", metadataItem->c_str());
811 : }
812 : }
813 0 : return nullptr;
814 : }
815 476 : else if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
816 231 : EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
817 : {
818 2 : const auto kv_metadata = m_poRecordBatchFileReader->metadata();
819 1 : if (kv_metadata && kv_metadata->Contains(pszName))
820 : {
821 1 : auto metadataItem = kv_metadata->Get(pszName);
822 1 : if (metadataItem.ok())
823 : {
824 1 : return CPLSPrintf("%s", metadataItem->c_str());
825 : }
826 : }
827 0 : return nullptr;
828 : }
829 244 : return OGRLayer::GetMetadataItem(pszName, pszDomain);
830 : }
831 :
832 : /************************************************************************/
833 : /* GetMetadata() */
834 : /************************************************************************/
835 :
836 144 : char **OGRFeatherLayer::GetMetadata(const char *pszDomain)
837 : {
838 : // Mostly for unit test purposes
839 144 : if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
840 : {
841 5 : m_aosFeatherMetadata.Clear();
842 : const auto kv_metadata =
843 5 : (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
844 8 : : m_poRecordBatchReader->schema())
845 10 : ->metadata();
846 5 : if (kv_metadata)
847 : {
848 11 : for (const auto &kv : kv_metadata->sorted_pairs())
849 : {
850 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
851 6 : kv.second.c_str());
852 : }
853 : }
854 5 : return m_aosFeatherMetadata.List();
855 : }
856 261 : if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
857 122 : EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
858 : {
859 2 : m_aosFeatherMetadata.Clear();
860 4 : const auto kv_metadata = m_poRecordBatchFileReader->metadata();
861 2 : if (kv_metadata)
862 : {
863 3 : for (const auto &kv : kv_metadata->sorted_pairs())
864 : {
865 : m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
866 1 : kv.second.c_str());
867 : }
868 : }
869 2 : return m_aosFeatherMetadata.List();
870 : }
871 137 : return OGRLayer::GetMetadata(pszDomain);
872 : }
|