LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow - ogrfeatherlayer.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 316 377 83.8 %
Date: 2025-07-11 10:11:13 Functions: 19 20 95.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Feather Translator
       4             :  * Purpose:  Implements OGRFeatherDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "cpl_json.h"
      14             : #include "cpl_time.h"
      15             : #include "gdal_pam.h"
      16             : #include "ogrsf_frmts.h"
      17             : #include "ogr_p.h"
      18             : 
      19             : #include <cinttypes>
      20             : #include <limits>
      21             : #include <map>
      22             : #include <set>
      23             : #include <utility>
      24             : 
      25             : #include "ogr_feather.h"
      26             : 
      27             : #include "../arrow_common/ograrrowlayer.hpp"
      28             : #include "../arrow_common/ograrrowdataset.hpp"
      29             : 
      30             : /************************************************************************/
      31             : /*                        OGRFeatherLayer()                             */
      32             : /************************************************************************/
      33             : 
      34         535 : OGRFeatherLayer::OGRFeatherLayer(
      35             :     OGRFeatherDataset *poDS, const char *pszLayerName,
      36         535 :     std::shared_ptr<arrow::ipc::RecordBatchFileReader> &poRecordBatchFileReader)
      37             :     : OGRArrowLayer(poDS, pszLayerName), m_poDS(poDS),
      38         535 :       m_poRecordBatchFileReader(poRecordBatchFileReader)
      39             : {
      40         535 :     EstablishFeatureDefn();
      41         535 :     CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
      42             :               m_poFeatureDefn->GetGeomFieldCount());
      43         535 : }
      44             : 
      45             : /************************************************************************/
      46             : /*                        OGRFeatherLayer()                             */
      47             : /************************************************************************/
      48             : 
      49          10 : OGRFeatherLayer::OGRFeatherLayer(
      50             :     OGRFeatherDataset *poDS, const char *pszLayerName,
      51             :     std::shared_ptr<arrow::io::RandomAccessFile> poFile, bool bSeekable,
      52             :     const arrow::ipc::IpcReadOptions &oOptions,
      53             :     std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
      54          10 :         &poRecordBatchStreamReader)
      55             :     : OGRArrowLayer(poDS, pszLayerName), m_poDS(poDS),
      56          10 :       m_poFile(std::move(poFile)), m_bSeekable(bSeekable), m_oOptions(oOptions),
      57          10 :       m_poRecordBatchReader(poRecordBatchStreamReader)
      58             : {
      59          10 :     EstablishFeatureDefn();
      60          10 :     CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
      61             :               m_poFeatureDefn->GetGeomFieldCount());
      62          10 : }
      63             : 
      64             : /************************************************************************/
      65             : /*                           GetDataset()                               */
      66             : /************************************************************************/
      67             : 
      68         112 : GDALDataset *OGRFeatherLayer::GetDataset()
      69             : {
      70         112 :     return m_poDS;
      71             : }
      72             : 
      73             : /************************************************************************/
      74             : /*                          LoadGeoMetadata()                           */
      75             : /************************************************************************/
      76             : 
      77         545 : void OGRFeatherLayer::LoadGeoMetadata(
      78             :     const arrow::KeyValueMetadata *kv_metadata, const std::string &key)
      79             : {
      80         545 :     if (kv_metadata && kv_metadata->Contains(key))
      81             :     {
      82        1060 :         auto geo = kv_metadata->Get(key);
      83         530 :         if (geo.ok())
      84             :         {
      85        1060 :             CPLJSONDocument oDoc;
      86         530 :             if (oDoc.LoadMemory(*geo))
      87             :             {
      88        1060 :                 auto oRoot = oDoc.GetRoot();
      89        1590 :                 const auto osVersion = oRoot.GetString("schema_version");
      90         530 :                 if (key != GDAL_GEO_FOOTER_KEY && osVersion != "0.1.0")
      91             :                 {
      92         397 :                     CPLDebug("FEATHER",
      93             :                              "schema_version = %s not explicitly handled by "
      94             :                              "the driver",
      95             :                              osVersion.c_str());
      96             :                 }
      97        1590 :                 auto oColumns = oRoot.GetObj("columns");
      98         530 :                 if (oColumns.IsValid())
      99             :                 {
     100        1060 :                     for (const auto &oColumn : oColumns.GetChildren())
     101             :                     {
     102         530 :                         m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
     103             :                     }
     104             :                 }
     105             :             }
     106             :             else
     107             :             {
     108           0 :                 CPLError(CE_Warning, CPLE_AppDefined,
     109             :                          "Cannot parse 'geo' metadata");
     110             :             }
     111             :         }
     112             :     }
     113         545 : }
     114             : 
     115             : /************************************************************************/
     116             : /*                        EstablishFeatureDefn()                        */
     117             : /************************************************************************/
     118             : 
     119         545 : void OGRFeatherLayer::EstablishFeatureDefn()
     120             : {
     121        1090 :     m_poSchema = m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     122        1090 :                                            : m_poRecordBatchReader->schema();
     123         545 :     const auto &kv_metadata = m_poSchema->metadata();
     124             : 
     125             : #ifdef DEBUG
     126         545 :     if (kv_metadata)
     127             :     {
     128        1073 :         for (const auto &keyValue : kv_metadata->sorted_pairs())
     129             :         {
     130         538 :             CPLDebug("FEATHER", "%s = %s", keyValue.first.c_str(),
     131             :                      keyValue.second.c_str());
     132             :         }
     133             :     }
     134             : #endif
     135             : 
     136             :     auto poFooterMetadata = m_poRecordBatchFileReader
     137         535 :                                 ? m_poRecordBatchFileReader->metadata()
     138        1625 :                                 : nullptr;
     139         669 :     if (poFooterMetadata && poFooterMetadata->Contains(GDAL_GEO_FOOTER_KEY) &&
     140         124 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_READ_GDAL_FOOTER", "YES")))
     141             :     {
     142         124 :         LoadGeoMetadata(poFooterMetadata.get(), GDAL_GEO_FOOTER_KEY);
     143             :     }
     144             :     else
     145             :     {
     146         421 :         LoadGeoMetadata(kv_metadata.get(), "geo");
     147             :     }
     148             :     const auto oMapFieldNameToGDALSchemaFieldDefn =
     149        1090 :         LoadGDALSchema(kv_metadata.get());
     150             : 
     151         545 :     const auto &fields = m_poSchema->fields();
     152        4082 :     for (int i = 0; i < m_poSchema->num_fields(); ++i)
     153             :     {
     154        3537 :         const auto &field = fields[i];
     155        3537 :         const auto &fieldName = field->name();
     156             : 
     157        3537 :         const auto &field_kv_metadata = field->metadata();
     158        3537 :         std::string osExtensionName;
     159        3537 :         std::string osExtensionMetadata;
     160        3537 :         if (field->type()->id() == arrow::Type::EXTENSION)
     161             :         {
     162             :             osExtensionName =
     163         127 :                 cpl::down_cast<arrow::ExtensionType *>(field->type().get())
     164         127 :                     ->extension_name();
     165             :         }
     166        3410 :         else if (field_kv_metadata)
     167             :         {
     168             :             auto extension_name =
     169        1040 :                 field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
     170         520 :             if (extension_name.ok())
     171             :             {
     172         518 :                 osExtensionName = *extension_name;
     173             :             }
     174             : 
     175             :             auto extension_metadata =
     176        1040 :                 field_kv_metadata->Get(ARROW_EXTENSION_METADATA_KEY);
     177         520 :             if (extension_metadata.ok())
     178             :             {
     179         384 :                 osExtensionMetadata = *extension_metadata;
     180             :             }
     181             : #ifdef DEBUG
     182         520 :             CPLDebug("FEATHER", "Metadata field %s:", fieldName.c_str());
     183        1424 :             for (const auto &keyValue : field_kv_metadata->sorted_pairs())
     184             :             {
     185         904 :                 CPLDebug("FEATHER", "  %s = %s", keyValue.first.c_str(),
     186             :                          keyValue.second.c_str());
     187             :             }
     188             : #endif
     189             :         }
     190             : 
     191        3537 :         if (!m_osFIDColumn.empty() && fieldName == m_osFIDColumn)
     192             :         {
     193           6 :             m_iFIDArrowColumn = i;
     194           6 :             continue;
     195             :         }
     196             : 
     197        3531 :         bool bRegularField = true;
     198        3531 :         auto oIter = m_oMapGeometryColumns.find(fieldName);
     199        3531 :         if (oIter != m_oMapGeometryColumns.end() || !osExtensionName.empty())
     200             :         {
     201        1320 :             CPLJSONObject oJSONDef;
     202         660 :             if (oIter != m_oMapGeometryColumns.end())
     203         530 :                 oJSONDef = oIter->second;
     204        1980 :             auto osEncoding = oJSONDef.GetString("encoding");
     205         660 :             if (osEncoding.empty() && !osExtensionName.empty())
     206         130 :                 osEncoding = osExtensionName;
     207             : 
     208         660 :             OGRwkbGeometryType eGeomType = wkbUnknown;
     209         660 :             auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
     210         660 :             if (IsValidGeometryEncoding(field, osEncoding,
     211        1320 :                                         oIter != m_oMapGeometryColumns.end(),
     212             :                                         eGeomType, eGeomEncoding))
     213             :             {
     214         531 :                 bRegularField = false;
     215        1062 :                 OGRGeomFieldDefn oField(fieldName.c_str(), wkbUnknown);
     216             : 
     217        1593 :                 auto osCRS = oJSONDef.GetString("crs");
     218             : 
     219             : #if ARROW_VERSION_MAJOR >= 21
     220             :                 if (osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
     221             :                     osExtensionMetadata.empty() &&
     222             :                     field->type()->id() == arrow::Type::EXTENSION)
     223             :                 {
     224             :                     const auto arrowWkb =
     225             :                         std::dynamic_pointer_cast<OGRGeoArrowWkbExtensionType>(
     226             :                             field->type());
     227             :                     if (arrowWkb)
     228             :                     {
     229             :                         osExtensionMetadata = arrowWkb->Serialize();
     230             :                     }
     231             :                 }
     232             : #endif
     233             : 
     234         926 :                 if (osCRS.empty() &&
     235         395 :                     osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
     236         160 :                     !osExtensionMetadata.empty() &&
     237         926 :                     osExtensionMetadata[0] == '{' &&
     238           0 :                     osExtensionMetadata.back() == '}')
     239             :                 {
     240           0 :                     CPLJSONDocument oDoc;
     241           0 :                     if (oDoc.LoadMemory(osExtensionMetadata))
     242             :                     {
     243           0 :                         auto jCrs = oDoc.GetRoot()["crs"];
     244           0 :                         if (jCrs.GetType() == CPLJSONObject::Type::Object)
     245             :                         {
     246             :                             osCRS =
     247           0 :                                 jCrs.Format(CPLJSONObject::PrettyFormat::Plain);
     248             :                         }
     249           0 :                         else if (jCrs.GetType() == CPLJSONObject::Type::String)
     250             :                         {
     251           0 :                             osCRS = jCrs.ToString();
     252             :                         }
     253           0 :                         if (oDoc.GetRoot()["edges"].ToString() == "spherical")
     254             :                         {
     255           0 :                             SetMetadataItem("EDGES", "SPHERICAL");
     256             :                         }
     257             :                     }
     258             :                 }
     259             : 
     260         531 :                 if (osCRS.empty())
     261             :                 {
     262             : #if 0
     263             :                     CPLError(CE_Warning, CPLE_AppDefined,
     264             :                              "Missing required 'crs' field for geometry column %s",
     265             :                              fieldName.c_str());
     266             : #endif
     267             :                 }
     268             :                 else
     269             :                 {
     270         136 :                     OGRSpatialReference *poSRS = new OGRSpatialReference();
     271         136 :                     poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
     272             : 
     273         136 :                     if (poSRS->SetFromUserInput(
     274             :                             osCRS.c_str(),
     275             :                             OGRSpatialReference::
     276         136 :                                 SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
     277             :                         OGRERR_NONE)
     278             :                     {
     279             :                         const char *pszAuthName =
     280         136 :                             poSRS->GetAuthorityName(nullptr);
     281             :                         const char *pszAuthCode =
     282         136 :                             poSRS->GetAuthorityCode(nullptr);
     283         136 :                         if (pszAuthName && pszAuthCode &&
     284         136 :                             EQUAL(pszAuthName, "OGC") &&
     285           0 :                             EQUAL(pszAuthCode, "CRS84"))
     286             :                         {
     287           0 :                             poSRS->importFromEPSG(4326);
     288             :                         }
     289             : 
     290         136 :                         const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
     291         136 :                         if (dfCoordEpoch > 0)
     292           2 :                             poSRS->SetCoordinateEpoch(dfCoordEpoch);
     293             : 
     294         136 :                         oField.SetSpatialRef(poSRS);
     295             :                     }
     296         136 :                     poSRS->Release();
     297             :                 }
     298             : 
     299             :                 // m_aeGeomEncoding be filled before calling
     300             :                 // ComputeGeometryColumnType()
     301         531 :                 m_aeGeomEncoding.push_back(eGeomEncoding);
     302         531 :                 if (eGeomType == wkbUnknown)
     303             :                 {
     304         705 :                     auto osType = oJSONDef.GetString("geometry_type");
     305         235 :                     if (osType.empty())
     306         235 :                         osType = oJSONDef.GetString("gdal:geometry_type");
     307         470 :                     if (m_bSeekable && osType.empty() &&
     308         235 :                         CPLTestBool(CPLGetConfigOption(
     309             :                             "OGR_ARROW_COMPUTE_GEOMETRY_TYPE", "YES")))
     310             :                     {
     311         235 :                         eGeomType = ComputeGeometryColumnType(
     312         235 :                             m_poFeatureDefn->GetGeomFieldCount(), i);
     313         235 :                         if (m_poRecordBatchReader)
     314           0 :                             ResetRecordBatchReader();
     315             :                     }
     316             :                     else
     317           0 :                         eGeomType = GetGeometryTypeFromString(osType);
     318             :                 }
     319             : 
     320         531 :                 oField.SetType(eGeomType);
     321         531 :                 oField.SetNullable(field->nullable());
     322         531 :                 m_poFeatureDefn->AddGeomFieldDefn(&oField);
     323         531 :                 m_anMapGeomFieldIndexToArrowColumn.push_back(i);
     324             :             }
     325             :         }
     326             : 
     327        3531 :         if (bRegularField)
     328             :         {
     329        3000 :             CreateFieldFromSchema(field, {i},
     330             :                                   oMapFieldNameToGDALSchemaFieldDefn);
     331             :         }
     332             :     }
     333             : 
     334         545 :     CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
     335             :               m_poFeatureDefn->GetFieldCount());
     336         545 :     CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
     337             :               m_poFeatureDefn->GetGeomFieldCount());
     338         545 : }
     339             : 
     340             : /************************************************************************/
     341             : /*                       ResetRecordBatchReader()                       */
     342             : /************************************************************************/
     343             : 
     344          12 : bool OGRFeatherLayer::ResetRecordBatchReader()
     345             : {
     346          12 :     const auto nPos = *(m_poFile->Tell());
     347          12 :     CPL_IGNORE_RET_VAL(m_poFile->Seek(0));
     348             :     auto result =
     349          24 :         arrow::ipc::RecordBatchStreamReader::Open(m_poFile, m_oOptions);
     350          12 :     if (!result.ok())
     351             :     {
     352           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     353             :                  "RecordBatchStreamReader::Open() failed with %s",
     354           0 :                  result.status().message().c_str());
     355           0 :         CPL_IGNORE_RET_VAL(m_poFile->Seek(nPos));
     356           0 :         return false;
     357             :     }
     358             :     else
     359             :     {
     360          12 :         m_poRecordBatchReader = *result;
     361          12 :         return true;
     362             :     }
     363             : }
     364             : 
     365             : /************************************************************************/
     366             : /*                     ComputeGeometryColumnType()                      */
     367             : /************************************************************************/
     368             : 
     369         235 : OGRwkbGeometryType OGRFeatherLayer::ComputeGeometryColumnType(int iGeomCol,
     370             :                                                               int iCol) const
     371             : {
     372             :     // Compute type of geometry column by iterating over each geometry, and
     373             :     // looking at the WKB geometry type in the first 5 bytes of each geometry.
     374             : 
     375         235 :     OGRwkbGeometryType eGeomType = wkbNone;
     376             : 
     377         235 :     if (m_poRecordBatchReader != nullptr)
     378             :     {
     379           0 :         std::shared_ptr<arrow::RecordBatch> poBatch;
     380             :         while (true)
     381             :         {
     382           0 :             auto status = m_poRecordBatchReader->ReadNext(&poBatch);
     383           0 :             if (!status.ok())
     384             :             {
     385           0 :                 CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
     386           0 :                          status.message().c_str());
     387           0 :                 break;
     388             :             }
     389           0 :             else if (!poBatch)
     390           0 :                 break;
     391           0 :             eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
     392             :                                                               iCol, eGeomType);
     393           0 :             if (eGeomType == wkbUnknown)
     394           0 :                 break;
     395           0 :         }
     396             :     }
     397             :     else
     398             :     {
     399         470 :         for (int iBatch = 0;
     400         470 :              iBatch < m_poRecordBatchFileReader->num_record_batches(); ++iBatch)
     401             :         {
     402         235 :             auto result = m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
     403         235 :             if (!result.ok())
     404             :             {
     405           0 :                 CPLError(CE_Failure, CPLE_AppDefined,
     406             :                          "ReadRecordBatch() failed: %s",
     407           0 :                          result.status().message().c_str());
     408           0 :                 break;
     409             :             }
     410         235 :             eGeomType = ComputeGeometryColumnTypeProcessBatch(*result, iGeomCol,
     411             :                                                               iCol, eGeomType);
     412         235 :             if (eGeomType == wkbUnknown)
     413           0 :                 break;
     414             :         }
     415             :     }
     416             : 
     417         235 :     return eGeomType == wkbNone ? wkbUnknown : eGeomType;
     418             : }
     419             : 
     420             : /************************************************************************/
     421             : /*                          BuildDomain()                               */
     422             : /************************************************************************/
     423             : 
     424             : std::unique_ptr<OGRFieldDomain>
     425          19 : OGRFeatherLayer::BuildDomain(const std::string &osDomainName,
     426             :                              int iFieldIndex) const
     427             : {
     428          19 :     const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
     429          19 :     CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
     430             :               arrow::Type::DICTIONARY);
     431             : 
     432          19 :     if (m_poRecordBatchReader)
     433             :     {
     434           6 :         if (m_poBatch)
     435             :         {
     436           6 :             return BuildDomainFromBatch(osDomainName, m_poBatch, iArrowCol);
     437             :         }
     438             :     }
     439          13 :     else if (m_poRecordBatchFileReader)
     440             :     {
     441          13 :         auto result = m_poRecordBatchFileReader->ReadRecordBatch(0);
     442          13 :         if (!result.ok())
     443             :         {
     444           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     445             :                      "ReadRecordBatch() failed: %s",
     446           0 :                      result.status().message().c_str());
     447             :         }
     448          13 :         auto poBatch = *result;
     449          13 :         if (poBatch)
     450             :         {
     451          13 :             return BuildDomainFromBatch(osDomainName, poBatch, iArrowCol);
     452             :         }
     453             :     }
     454             : 
     455           0 :     return nullptr;
     456             : }
     457             : 
     458             : /************************************************************************/
     459             : /*                           ResetReading()                             */
     460             : /************************************************************************/
     461             : 
     462         820 : void OGRFeatherLayer::ResetReading()
     463             : {
     464         820 :     if (m_poRecordBatchReader != nullptr && m_iRecordBatch > 0)
     465             :     {
     466          17 :         if (m_iRecordBatch == 1 && m_poBatchIdx1)
     467             :         {
     468             :             // do nothing
     469             :         }
     470             :         else
     471             :         {
     472          16 :             m_bResetRecordBatchReaderAsked = true;
     473             :         }
     474             :     }
     475         820 :     OGRArrowLayer::ResetReading();
     476         820 : }
     477             : 
     478             : /************************************************************************/
     479             : /*                           ReadNextBatch()                            */
     480             : /************************************************************************/
     481             : 
     482        1006 : bool OGRFeatherLayer::ReadNextBatch()
     483             : {
     484        1006 :     if (m_poRecordBatchFileReader == nullptr)
     485             :     {
     486         119 :         return ReadNextBatchStream();
     487             :     }
     488             :     else
     489             :     {
     490         887 :         return ReadNextBatchFile();
     491             :     }
     492             : }
     493             : 
     494             : /************************************************************************/
     495             : /*                         ReadNextBatchFile()                          */
     496             : /************************************************************************/
     497             : 
     498         887 : bool OGRFeatherLayer::ReadNextBatchFile()
     499             : {
     500             :     while (true)
     501             :     {
     502         887 :         ++m_iRecordBatch;
     503         887 :         if (m_iRecordBatch == m_poRecordBatchFileReader->num_record_batches())
     504             :         {
     505         449 :             if (m_iRecordBatch == 1)
     506         446 :                 m_iRecordBatch = 0;
     507             :             else
     508           3 :                 m_poBatch.reset();
     509         449 :             return false;
     510             :         }
     511             : 
     512         438 :         m_nIdxInBatch = 0;
     513             : 
     514             :         auto result =
     515         438 :             m_poRecordBatchFileReader->ReadRecordBatch(m_iRecordBatch);
     516         438 :         if (!result.ok())
     517             :         {
     518           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     519             :                      "ReadRecordBatch() failed: %s",
     520           0 :                      result.status().message().c_str());
     521           0 :             m_poBatch.reset();
     522           0 :             return false;
     523             :         }
     524         438 :         if ((*result)->num_rows() != 0)
     525             :         {
     526         438 :             SetBatch(*result);
     527         438 :             break;
     528             :         }
     529           0 :     }
     530             : 
     531         438 :     return true;
     532             : }
     533             : 
     534             : /************************************************************************/
     535             : /*                         ReadNextBatchStream()                        */
     536             : /************************************************************************/
     537             : 
     538         154 : bool OGRFeatherLayer::ReadNextBatchStream()
     539             : {
     540         154 :     m_nIdxInBatch = 0;
     541             : 
     542         308 :     std::shared_ptr<arrow::RecordBatch> poNextBatch;
     543           0 :     do
     544             :     {
     545         154 :         if (m_iRecordBatch == 0 && m_poBatchIdx0)
     546             :         {
     547           1 :             SetBatch(m_poBatchIdx0);
     548           1 :             m_iRecordBatch = 1;
     549         101 :             return true;
     550             :         }
     551             : 
     552         153 :         else if (m_iRecordBatch == 1 && m_poBatchIdx1)
     553             :         {
     554           1 :             SetBatch(m_poBatchIdx1);
     555           1 :             m_iRecordBatch = 2;
     556           1 :             return true;
     557             :         }
     558             : 
     559         152 :         else if (m_bSingleBatch)
     560             :         {
     561          81 :             CPLAssert(m_iRecordBatch == 0);
     562          81 :             CPLAssert(m_poBatch != nullptr);
     563          81 :             return false;
     564             :         }
     565             : 
     566          71 :         if (m_bResetRecordBatchReaderAsked)
     567             :         {
     568          13 :             if (!m_bSeekable)
     569             :             {
     570           1 :                 CPLError(CE_Failure, CPLE_NotSupported,
     571             :                          "Attempting to rewind non-seekable stream");
     572           1 :                 return false;
     573             :             }
     574          12 :             if (!ResetRecordBatchReader())
     575           0 :                 return false;
     576          12 :             m_bResetRecordBatchReaderAsked = false;
     577             :         }
     578             : 
     579          70 :         CPLAssert(m_poRecordBatchReader);
     580             : 
     581          70 :         ++m_iRecordBatch;
     582             : 
     583          70 :         poNextBatch.reset();
     584          70 :         auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
     585          70 :         if (!status.ok())
     586             :         {
     587           0 :             CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
     588           0 :                      status.message().c_str());
     589           0 :             poNextBatch.reset();
     590             :         }
     591          70 :         if (poNextBatch == nullptr)
     592             :         {
     593          17 :             if (m_iRecordBatch == 1)
     594             :             {
     595           3 :                 m_iRecordBatch = 0;
     596           3 :                 m_bSingleBatch = true;
     597             :             }
     598             :             else
     599             :             {
     600          14 :                 m_poBatch.reset();
     601          14 :                 m_poBatchColumns.clear();
     602             :             }
     603          17 :             return false;
     604             :         }
     605          53 :     } while (poNextBatch->num_rows() == 0);
     606             : 
     607          53 :     SetBatch(poNextBatch);
     608             : 
     609          53 :     return true;
     610             : }
     611             : 
     612             : /************************************************************************/
     613             : /*                     TryToCacheFirstTwoBatches()                      */
     614             : /************************************************************************/
     615             : 
     616           1 : void OGRFeatherLayer::TryToCacheFirstTwoBatches()
     617             : {
     618           2 :     if (m_poRecordBatchReader != nullptr && m_iRecordBatch <= 0 &&
     619           2 :         !m_bSingleBatch && m_poBatchIdx0 == nullptr)
     620             :     {
     621           1 :         ResetReading();
     622           1 :         if (!m_poBatch)
     623             :         {
     624           0 :             CPL_IGNORE_RET_VAL(ReadNextBatchStream());
     625             :         }
     626           1 :         if (m_poBatch)
     627             :         {
     628           2 :             auto poBatchIdx0 = m_poBatch;
     629           1 :             if (ReadNextBatchStream())
     630             :             {
     631           1 :                 CPLAssert(m_iRecordBatch == 1);
     632           1 :                 m_poBatchIdx0 = poBatchIdx0;
     633           1 :                 m_poBatchIdx1 = m_poBatch;
     634           1 :                 SetBatch(poBatchIdx0);
     635           1 :                 ResetReading();
     636             :             }
     637           1 :             ResetReading();
     638             :         }
     639             :     }
     640           1 : }
     641             : 
     642             : /************************************************************************/
     643             : /*                          CanPostFilterArrowArray()                   */
     644             : /************************************************************************/
     645             : 
     646          20 : bool OGRFeatherLayer::CanPostFilterArrowArray(
     647             :     const struct ArrowSchema *schema) const
     648             : {
     649          20 :     if (m_poRecordBatchReader)
     650          10 :         return false;
     651          10 :     return OGRArrowLayer::CanPostFilterArrowArray(schema);
     652             : }
     653             : 
     654             : /************************************************************************/
     655             : /*                     InvalidateCachedBatches()                        */
     656             : /************************************************************************/
     657             : 
     658         109 : void OGRFeatherLayer::InvalidateCachedBatches()
     659             : {
     660         109 :     if (m_poRecordBatchFileReader)
     661             :     {
     662          63 :         m_iRecordBatch = -1;
     663          63 :         ResetReading();
     664             :     }
     665         109 : }
     666             : 
     667             : /************************************************************************/
     668             : /*                        GetFeatureCount()                             */
     669             : /************************************************************************/
     670             : 
     671         336 : GIntBig OGRFeatherLayer::GetFeatureCount(int bForce)
     672             : {
     673         628 :     if (m_poRecordBatchFileReader != nullptr && m_poAttrQuery == nullptr &&
     674         292 :         m_poFilterGeom == nullptr)
     675             :     {
     676         288 :         auto result = m_poRecordBatchFileReader->CountRows();
     677         288 :         if (result.ok())
     678         288 :             return *result;
     679             :     }
     680          48 :     else if (m_poRecordBatchReader != nullptr)
     681             :     {
     682          36 :         if (!m_bSeekable && !bForce)
     683             :         {
     684           1 :             if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
     685             :             {
     686           1 :                 TryToCacheFirstTwoBatches();
     687             :             }
     688             : 
     689           1 :             if (!m_bSingleBatch)
     690             :             {
     691           1 :                 CPLError(
     692             :                     CE_Failure, CPLE_AppDefined,
     693             :                     "GetFeatureCount() cannot be run in non-forced mode on "
     694             :                     "a non-seekable file made of several batches");
     695           1 :                 return -1;
     696             :             }
     697             :         }
     698             : 
     699          35 :         if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
     700             :         {
     701          23 :             GIntBig nFeatures = 0;
     702          23 :             ResetReading();
     703          23 :             if (!m_poBatch)
     704           3 :                 ReadNextBatchStream();
     705          31 :             while (m_poBatch)
     706             :             {
     707          31 :                 nFeatures += m_poBatch->num_rows();
     708          31 :                 if (!ReadNextBatchStream())
     709          23 :                     break;
     710             :             }
     711          23 :             ResetReading();
     712          23 :             return nFeatures;
     713             :         }
     714             :     }
     715          24 :     return OGRLayer::GetFeatureCount(bForce);
     716             : }
     717             : 
     718             : /************************************************************************/
     719             : /*                       CanRunNonForcedGetExtent()                     */
     720             : /************************************************************************/
     721             : 
     722           0 : bool OGRFeatherLayer::CanRunNonForcedGetExtent()
     723             : {
     724           0 :     if (m_bSeekable)
     725           0 :         return true;
     726           0 :     TryToCacheFirstTwoBatches();
     727           0 :     if (!m_bSingleBatch)
     728             :     {
     729           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     730             :                  "GetExtent() cannot be run in non-forced mode on "
     731             :                  "a non-seekable file made of several batches");
     732           0 :         return false;
     733             :     }
     734           0 :     return true;
     735             : }
     736             : 
     737             : /************************************************************************/
     738             : /*                         TestCapability()                             */
     739             : /************************************************************************/
     740             : 
     741         292 : int OGRFeatherLayer::TestCapability(const char *pszCap)
     742             : {
     743         292 :     if (EQUAL(pszCap, OLCFastFeatureCount))
     744             :     {
     745          28 :         return m_bSeekable && m_poAttrQuery == nullptr &&
     746          28 :                m_poFilterGeom == nullptr;
     747             :     }
     748             : 
     749         274 :     if (EQUAL(pszCap, OLCMeasuredGeometries))
     750          16 :         return true;
     751         258 :     if (EQUAL(pszCap, OLCZGeometries))
     752          12 :         return true;
     753             : 
     754         246 :     return OGRArrowLayer::TestCapability(pszCap);
     755             : }
     756             : 
     757             : /************************************************************************/
     758             : /*                         GetMetadataItem()                            */
     759             : /************************************************************************/
     760             : 
     761         259 : const char *OGRFeatherLayer::GetMetadataItem(const char *pszName,
     762             :                                              const char *pszDomain)
     763             : {
     764             :     // Mostly for unit test purposes
     765         259 :     if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_"))
     766             :     {
     767           9 :         if (EQUAL(pszName, "FORMAT"))
     768             :         {
     769           5 :             return m_poRecordBatchFileReader ? "FILE" : "STREAM";
     770             :         }
     771           4 :         if (m_poRecordBatchFileReader != nullptr)
     772             :         {
     773           4 :             int iBatch = -1;
     774           4 :             if (EQUAL(pszName, "NUM_RECORD_BATCHES"))
     775             :             {
     776           1 :                 return CPLSPrintf(
     777           5 :                     "%d", m_poRecordBatchFileReader->num_record_batches());
     778             :             }
     779           6 :             else if (sscanf(pszName, "RECORD_BATCHES[%d]", &iBatch) == 1 &&
     780           3 :                      strstr(pszName, ".NUM_ROWS"))
     781             :             {
     782             :                 auto result =
     783           6 :                     m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
     784           3 :                 if (!result.ok())
     785             :                 {
     786           0 :                     return nullptr;
     787             :                 }
     788           3 :                 return CPLSPrintf("%" PRId64, (*result)->num_rows());
     789             :             }
     790             :         }
     791           0 :         return nullptr;
     792             :     }
     793         250 :     else if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
     794             :     {
     795             :         const auto kv_metadata =
     796           5 :             (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     797           8 :                                        : m_poRecordBatchReader->schema())
     798          10 :                 ->metadata();
     799           5 :         if (kv_metadata && kv_metadata->Contains(pszName))
     800             :         {
     801           5 :             auto metadataItem = kv_metadata->Get(pszName);
     802           5 :             if (metadataItem.ok())
     803             :             {
     804           5 :                 return CPLSPrintf("%s", metadataItem->c_str());
     805             :             }
     806             :         }
     807           0 :         return nullptr;
     808             :     }
     809         476 :     else if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
     810         231 :              EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
     811             :     {
     812           2 :         const auto kv_metadata = m_poRecordBatchFileReader->metadata();
     813           1 :         if (kv_metadata && kv_metadata->Contains(pszName))
     814             :         {
     815           1 :             auto metadataItem = kv_metadata->Get(pszName);
     816           1 :             if (metadataItem.ok())
     817             :             {
     818           1 :                 return CPLSPrintf("%s", metadataItem->c_str());
     819             :             }
     820             :         }
     821           0 :         return nullptr;
     822             :     }
     823         244 :     return OGRLayer::GetMetadataItem(pszName, pszDomain);
     824             : }
     825             : 
     826             : /************************************************************************/
     827             : /*                           GetMetadata()                              */
     828             : /************************************************************************/
     829             : 
     830          34 : char **OGRFeatherLayer::GetMetadata(const char *pszDomain)
     831             : {
     832             :     // Mostly for unit test purposes
     833          34 :     if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
     834             :     {
     835           5 :         m_aosFeatherMetadata.Clear();
     836             :         const auto kv_metadata =
     837           5 :             (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     838           8 :                                        : m_poRecordBatchReader->schema())
     839          10 :                 ->metadata();
     840           5 :         if (kv_metadata)
     841             :         {
     842          11 :             for (const auto &kv : kv_metadata->sorted_pairs())
     843             :             {
     844             :                 m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
     845           6 :                                                   kv.second.c_str());
     846             :             }
     847             :         }
     848           5 :         return m_aosFeatherMetadata.List();
     849             :     }
     850          41 :     if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
     851          12 :         EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
     852             :     {
     853           2 :         m_aosFeatherMetadata.Clear();
     854           4 :         const auto kv_metadata = m_poRecordBatchFileReader->metadata();
     855           2 :         if (kv_metadata)
     856             :         {
     857           3 :             for (const auto &kv : kv_metadata->sorted_pairs())
     858             :             {
     859             :                 m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
     860           1 :                                                   kv.second.c_str());
     861             :             }
     862             :         }
     863           2 :         return m_aosFeatherMetadata.List();
     864             :     }
     865          27 :     return OGRLayer::GetMetadata(pszDomain);
     866             : }

Generated by: LCOV version 1.14