LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow - ogrfeatherlayer.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 318 379 83.9 %
Date: 2025-12-05 02:43:06 Functions: 19 20 95.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Feather Translator
       4             :  * Purpose:  Implements OGRFeatherDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "cpl_json.h"
      14             : #include "cpl_time.h"
      15             : #include "gdal_pam.h"
      16             : #include "ogrsf_frmts.h"
      17             : #include "ogr_p.h"
      18             : 
      19             : #include <cinttypes>
      20             : #include <limits>
      21             : #include <map>
      22             : #include <set>
      23             : #include <utility>
      24             : 
      25             : #include "ogr_feather.h"
      26             : 
      27             : #include "../arrow_common/ograrrowlayer.hpp"
      28             : #include "../arrow_common/ograrrowdataset.hpp"
      29             : 
      30             : /************************************************************************/
      31             : /*                        OGRFeatherLayer()                             */
      32             : /************************************************************************/
      33             : 
      34         536 : OGRFeatherLayer::OGRFeatherLayer(
      35             :     OGRFeatherDataset *poDS, const char *pszLayerName,
      36             :     std::shared_ptr<arrow::ipc::RecordBatchFileReader> &poRecordBatchFileReader,
      37         536 :     CSLConstList papszOpenOptions)
      38             :     : OGRArrowLayer(poDS, pszLayerName,
      39         536 :                     CPLTestBool(CSLFetchNameValueDef(
      40             :                         papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
      41        1072 :       m_poDS(poDS), m_poRecordBatchFileReader(poRecordBatchFileReader)
      42             : {
      43         536 :     EstablishFeatureDefn();
      44         536 :     CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
      45             :               m_poFeatureDefn->GetGeomFieldCount());
      46         536 : }
      47             : 
      48             : /************************************************************************/
      49             : /*                        OGRFeatherLayer()                             */
      50             : /************************************************************************/
      51             : 
      52          10 : OGRFeatherLayer::OGRFeatherLayer(
      53             :     OGRFeatherDataset *poDS, const char *pszLayerName,
      54             :     std::shared_ptr<arrow::io::RandomAccessFile> poFile, bool bSeekable,
      55             :     const arrow::ipc::IpcReadOptions &oOptions,
      56             :     std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
      57             :         &poRecordBatchStreamReader,
      58          10 :     CSLConstList papszOpenOptions)
      59             :     : OGRArrowLayer(poDS, pszLayerName,
      60          10 :                     CPLTestBool(CSLFetchNameValueDef(
      61             :                         papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
      62          10 :       m_poDS(poDS), m_poFile(std::move(poFile)), m_bSeekable(bSeekable),
      63          20 :       m_oOptions(oOptions), m_poRecordBatchReader(poRecordBatchStreamReader)
      64             : {
      65          10 :     EstablishFeatureDefn();
      66          10 :     CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
      67             :               m_poFeatureDefn->GetGeomFieldCount());
      68          10 : }
      69             : 
      70             : /************************************************************************/
      71             : /*                           GetDataset()                               */
      72             : /************************************************************************/
      73             : 
      74         112 : GDALDataset *OGRFeatherLayer::GetDataset()
      75             : {
      76         112 :     return m_poDS;
      77             : }
      78             : 
      79             : /************************************************************************/
      80             : /*                          LoadGeoMetadata()                           */
      81             : /************************************************************************/
      82             : 
      83         546 : void OGRFeatherLayer::LoadGeoMetadata(
      84             :     const arrow::KeyValueMetadata *kv_metadata, const std::string &key)
      85             : {
      86         546 :     if (kv_metadata && kv_metadata->Contains(key))
      87             :     {
      88        1062 :         auto geo = kv_metadata->Get(key);
      89         531 :         if (geo.ok())
      90             :         {
      91        1062 :             CPLJSONDocument oDoc;
      92         531 :             if (oDoc.LoadMemory(*geo))
      93             :             {
      94        1062 :                 auto oRoot = oDoc.GetRoot();
      95        1593 :                 const auto osVersion = oRoot.GetString("schema_version");
      96         531 :                 if (key != GDAL_GEO_FOOTER_KEY && osVersion != "0.1.0")
      97             :                 {
      98         398 :                     CPLDebug("FEATHER",
      99             :                              "schema_version = %s not explicitly handled by "
     100             :                              "the driver",
     101             :                              osVersion.c_str());
     102             :                 }
     103        1593 :                 auto oColumns = oRoot.GetObj("columns");
     104         531 :                 if (oColumns.IsValid())
     105             :                 {
     106        1062 :                     for (const auto &oColumn : oColumns.GetChildren())
     107             :                     {
     108         531 :                         m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
     109             :                     }
     110             :                 }
     111             :             }
     112             :             else
     113             :             {
     114           0 :                 CPLError(CE_Warning, CPLE_AppDefined,
     115             :                          "Cannot parse 'geo' metadata");
     116             :             }
     117             :         }
     118             :     }
     119         546 : }
     120             : 
     121             : /************************************************************************/
     122             : /*                        EstablishFeatureDefn()                        */
     123             : /************************************************************************/
     124             : 
     125         546 : void OGRFeatherLayer::EstablishFeatureDefn()
     126             : {
     127        1092 :     m_poSchema = m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     128        1092 :                                            : m_poRecordBatchReader->schema();
     129         546 :     const auto &kv_metadata = m_poSchema->metadata();
     130             : 
     131             : #ifdef DEBUG
     132         546 :     if (kv_metadata)
     133             :     {
     134        1075 :         for (const auto &keyValue : kv_metadata->sorted_pairs())
     135             :         {
     136         539 :             CPLDebug("FEATHER", "%s = %s", keyValue.first.c_str(),
     137             :                      keyValue.second.c_str());
     138             :         }
     139             :     }
     140             : #endif
     141             : 
     142             :     auto poFooterMetadata = m_poRecordBatchFileReader
     143         536 :                                 ? m_poRecordBatchFileReader->metadata()
     144        1628 :                                 : nullptr;
     145         670 :     if (poFooterMetadata && poFooterMetadata->Contains(GDAL_GEO_FOOTER_KEY) &&
     146         124 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_READ_GDAL_FOOTER", "YES")))
     147             :     {
     148         124 :         LoadGeoMetadata(poFooterMetadata.get(), GDAL_GEO_FOOTER_KEY);
     149             :     }
     150             :     else
     151             :     {
     152         422 :         LoadGeoMetadata(kv_metadata.get(), "geo");
     153             :     }
     154             :     const auto oMapFieldNameToGDALSchemaFieldDefn =
     155        1092 :         LoadGDALSchema(kv_metadata.get());
     156             : 
     157         546 :     const auto &fields = m_poSchema->fields();
     158        4167 :     for (int i = 0; i < m_poSchema->num_fields(); ++i)
     159             :     {
     160        3621 :         const auto &field = fields[i];
     161        3621 :         const auto &fieldName = field->name();
     162             : 
     163        3621 :         const auto &field_kv_metadata = field->metadata();
     164        3621 :         std::string osExtensionName;
     165        3621 :         std::string osExtensionMetadata;
     166        3621 :         if (field->type()->id() == arrow::Type::EXTENSION)
     167             :         {
     168             :             osExtensionName =
     169         127 :                 cpl::down_cast<arrow::ExtensionType *>(field->type().get())
     170         127 :                     ->extension_name();
     171             :         }
     172        3494 :         else if (field_kv_metadata)
     173             :         {
     174             :             auto extension_name =
     175        1040 :                 field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
     176         520 :             if (extension_name.ok())
     177             :             {
     178         518 :                 osExtensionName = *extension_name;
     179             :             }
     180             : 
     181             :             auto extension_metadata =
     182        1040 :                 field_kv_metadata->Get(ARROW_EXTENSION_METADATA_KEY);
     183         520 :             if (extension_metadata.ok())
     184             :             {
     185         384 :                 osExtensionMetadata = *extension_metadata;
     186             :             }
     187             : #ifdef DEBUG
     188         520 :             CPLDebug("FEATHER", "Metadata field %s:", fieldName.c_str());
     189        1424 :             for (const auto &keyValue : field_kv_metadata->sorted_pairs())
     190             :             {
     191         904 :                 CPLDebug("FEATHER", "  %s = %s", keyValue.first.c_str(),
     192             :                          keyValue.second.c_str());
     193             :             }
     194             : #endif
     195             :         }
     196             : 
     197        3621 :         if (!m_osFIDColumn.empty() && fieldName == m_osFIDColumn)
     198             :         {
     199           6 :             m_iFIDArrowColumn = i;
     200           6 :             continue;
     201             :         }
     202             : 
     203        3615 :         bool bRegularField = true;
     204        3615 :         auto oIter = m_oMapGeometryColumns.find(fieldName);
     205        3615 :         if (oIter != m_oMapGeometryColumns.end() || !osExtensionName.empty())
     206             :         {
     207        1322 :             CPLJSONObject oJSONDef;
     208         661 :             if (oIter != m_oMapGeometryColumns.end())
     209         531 :                 oJSONDef = oIter->second;
     210        1983 :             auto osEncoding = oJSONDef.GetString("encoding");
     211         661 :             if (osEncoding.empty() && !osExtensionName.empty())
     212         130 :                 osEncoding = osExtensionName;
     213             : 
     214         661 :             OGRwkbGeometryType eGeomType = wkbUnknown;
     215         661 :             auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
     216         661 :             if (IsValidGeometryEncoding(field, osEncoding,
     217        1322 :                                         oIter != m_oMapGeometryColumns.end(),
     218             :                                         eGeomType, eGeomEncoding))
     219             :             {
     220         532 :                 bRegularField = false;
     221        1064 :                 OGRGeomFieldDefn oField(fieldName.c_str(), wkbUnknown);
     222             : 
     223        1596 :                 auto osCRS = oJSONDef.GetString("crs");
     224             : 
     225             : #if ARROW_VERSION_MAJOR >= 21
     226             :                 if (osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
     227             :                     osExtensionMetadata.empty() &&
     228             :                     field->type()->id() == arrow::Type::EXTENSION)
     229             :                 {
     230             :                     const auto arrowWkb =
     231             :                         std::dynamic_pointer_cast<OGRGeoArrowWkbExtensionType>(
     232             :                             field->type());
     233             :                     if (arrowWkb)
     234             :                     {
     235             :                         osExtensionMetadata = arrowWkb->Serialize();
     236             :                     }
     237             :                 }
     238             : #endif
     239             : 
     240         927 :                 if (osCRS.empty() &&
     241         395 :                     osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
     242         160 :                     !osExtensionMetadata.empty() &&
     243         927 :                     osExtensionMetadata[0] == '{' &&
     244           0 :                     osExtensionMetadata.back() == '}')
     245             :                 {
     246           0 :                     CPLJSONDocument oDoc;
     247           0 :                     if (oDoc.LoadMemory(osExtensionMetadata))
     248             :                     {
     249           0 :                         auto jCrs = oDoc.GetRoot()["crs"];
     250           0 :                         if (jCrs.GetType() == CPLJSONObject::Type::Object)
     251             :                         {
     252             :                             osCRS =
     253           0 :                                 jCrs.Format(CPLJSONObject::PrettyFormat::Plain);
     254             :                         }
     255           0 :                         else if (jCrs.GetType() == CPLJSONObject::Type::String)
     256             :                         {
     257           0 :                             osCRS = jCrs.ToString();
     258             :                         }
     259           0 :                         if (oDoc.GetRoot()["edges"].ToString() == "spherical")
     260             :                         {
     261           0 :                             SetMetadataItem("EDGES", "SPHERICAL");
     262             :                         }
     263             :                     }
     264             :                 }
     265             : 
     266         532 :                 if (osCRS.empty())
     267             :                 {
     268             : #if 0
     269             :                     CPLError(CE_Warning, CPLE_AppDefined,
     270             :                              "Missing required 'crs' field for geometry column %s",
     271             :                              fieldName.c_str());
     272             : #endif
     273             :                 }
     274             :                 else
     275             :                 {
     276         137 :                     OGRSpatialReference *poSRS = new OGRSpatialReference();
     277         137 :                     poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
     278             : 
     279         137 :                     if (poSRS->SetFromUserInput(
     280             :                             osCRS.c_str(),
     281             :                             OGRSpatialReference::
     282         137 :                                 SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
     283             :                         OGRERR_NONE)
     284             :                     {
     285             :                         const char *pszAuthName =
     286         137 :                             poSRS->GetAuthorityName(nullptr);
     287             :                         const char *pszAuthCode =
     288         137 :                             poSRS->GetAuthorityCode(nullptr);
     289         137 :                         if (pszAuthName && pszAuthCode &&
     290         137 :                             EQUAL(pszAuthName, "OGC") &&
     291           0 :                             EQUAL(pszAuthCode, "CRS84"))
     292             :                         {
     293           0 :                             poSRS->importFromEPSG(4326);
     294             :                         }
     295             : 
     296         137 :                         const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
     297         137 :                         if (dfCoordEpoch > 0)
     298           2 :                             poSRS->SetCoordinateEpoch(dfCoordEpoch);
     299             : 
     300         137 :                         oField.SetSpatialRef(poSRS);
     301             :                     }
     302         137 :                     poSRS->Release();
     303             :                 }
     304             : 
     305             :                 // m_aeGeomEncoding be filled before calling
     306             :                 // ComputeGeometryColumnType()
     307         532 :                 m_aeGeomEncoding.push_back(eGeomEncoding);
     308         532 :                 if (eGeomType == wkbUnknown)
     309             :                 {
     310         708 :                     auto osType = oJSONDef.GetString("geometry_type");
     311         236 :                     if (osType.empty())
     312         236 :                         osType = oJSONDef.GetString("gdal:geometry_type");
     313         472 :                     if (m_bSeekable && osType.empty() &&
     314         236 :                         CPLTestBool(CPLGetConfigOption(
     315             :                             "OGR_ARROW_COMPUTE_GEOMETRY_TYPE", "YES")))
     316             :                     {
     317         236 :                         eGeomType = ComputeGeometryColumnType(
     318         236 :                             m_poFeatureDefn->GetGeomFieldCount(), i);
     319         236 :                         if (m_poRecordBatchReader)
     320           0 :                             ResetRecordBatchReader();
     321             :                     }
     322             :                     else
     323           0 :                         eGeomType = GetGeometryTypeFromString(osType);
     324             :                 }
     325             : 
     326         532 :                 oField.SetType(eGeomType);
     327         532 :                 oField.SetNullable(field->nullable());
     328         532 :                 m_poFeatureDefn->AddGeomFieldDefn(&oField);
     329         532 :                 m_anMapGeomFieldIndexToArrowColumn.push_back(i);
     330             :             }
     331             :         }
     332             : 
     333        3615 :         if (bRegularField)
     334             :         {
     335        3083 :             CreateFieldFromSchema(field, {i},
     336             :                                   oMapFieldNameToGDALSchemaFieldDefn);
     337             :         }
     338             :     }
     339             : 
     340         546 :     CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
     341             :               m_poFeatureDefn->GetFieldCount());
     342         546 :     CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
     343             :               m_poFeatureDefn->GetGeomFieldCount());
     344         546 : }
     345             : 
     346             : /************************************************************************/
     347             : /*                       ResetRecordBatchReader()                       */
     348             : /************************************************************************/
     349             : 
     350          12 : bool OGRFeatherLayer::ResetRecordBatchReader()
     351             : {
     352          12 :     const auto nPos = *(m_poFile->Tell());
     353          12 :     CPL_IGNORE_RET_VAL(m_poFile->Seek(0));
     354             :     auto result =
     355          24 :         arrow::ipc::RecordBatchStreamReader::Open(m_poFile, m_oOptions);
     356          12 :     if (!result.ok())
     357             :     {
     358           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     359             :                  "RecordBatchStreamReader::Open() failed with %s",
     360           0 :                  result.status().message().c_str());
     361           0 :         CPL_IGNORE_RET_VAL(m_poFile->Seek(nPos));
     362           0 :         return false;
     363             :     }
     364             :     else
     365             :     {
     366          12 :         m_poRecordBatchReader = *result;
     367          12 :         return true;
     368             :     }
     369             : }
     370             : 
     371             : /************************************************************************/
     372             : /*                     ComputeGeometryColumnType()                      */
     373             : /************************************************************************/
     374             : 
     375         236 : OGRwkbGeometryType OGRFeatherLayer::ComputeGeometryColumnType(int iGeomCol,
     376             :                                                               int iCol) const
     377             : {
     378             :     // Compute type of geometry column by iterating over each geometry, and
     379             :     // looking at the WKB geometry type in the first 5 bytes of each geometry.
     380             : 
     381         236 :     OGRwkbGeometryType eGeomType = wkbNone;
     382             : 
     383         236 :     if (m_poRecordBatchReader != nullptr)
     384             :     {
     385           0 :         std::shared_ptr<arrow::RecordBatch> poBatch;
     386             :         while (true)
     387             :         {
     388           0 :             auto status = m_poRecordBatchReader->ReadNext(&poBatch);
     389           0 :             if (!status.ok())
     390             :             {
     391           0 :                 CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
     392           0 :                          status.message().c_str());
     393           0 :                 break;
     394             :             }
     395           0 :             else if (!poBatch)
     396           0 :                 break;
     397           0 :             eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
     398             :                                                               iCol, eGeomType);
     399           0 :             if (eGeomType == wkbUnknown)
     400           0 :                 break;
     401           0 :         }
     402             :     }
     403             :     else
     404             :     {
     405         472 :         for (int iBatch = 0;
     406         472 :              iBatch < m_poRecordBatchFileReader->num_record_batches(); ++iBatch)
     407             :         {
     408         236 :             auto result = m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
     409         236 :             if (!result.ok())
     410             :             {
     411           0 :                 CPLError(CE_Failure, CPLE_AppDefined,
     412             :                          "ReadRecordBatch() failed: %s",
     413           0 :                          result.status().message().c_str());
     414           0 :                 break;
     415             :             }
     416         236 :             eGeomType = ComputeGeometryColumnTypeProcessBatch(*result, iGeomCol,
     417             :                                                               iCol, eGeomType);
     418         236 :             if (eGeomType == wkbUnknown)
     419           0 :                 break;
     420             :         }
     421             :     }
     422             : 
     423         236 :     return eGeomType == wkbNone ? wkbUnknown : eGeomType;
     424             : }
     425             : 
     426             : /************************************************************************/
     427             : /*                          BuildDomain()                               */
     428             : /************************************************************************/
     429             : 
     430             : std::unique_ptr<OGRFieldDomain>
     431          19 : OGRFeatherLayer::BuildDomain(const std::string &osDomainName,
     432             :                              int iFieldIndex) const
     433             : {
     434          19 :     const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
     435          19 :     CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
     436             :               arrow::Type::DICTIONARY);
     437             : 
     438          19 :     if (m_poRecordBatchReader)
     439             :     {
     440           6 :         if (m_poBatch)
     441             :         {
     442           6 :             return BuildDomainFromBatch(osDomainName, m_poBatch, iArrowCol);
     443             :         }
     444             :     }
     445          13 :     else if (m_poRecordBatchFileReader)
     446             :     {
     447          13 :         auto result = m_poRecordBatchFileReader->ReadRecordBatch(0);
     448          13 :         if (!result.ok())
     449             :         {
     450           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     451             :                      "ReadRecordBatch() failed: %s",
     452           0 :                      result.status().message().c_str());
     453             :         }
     454          13 :         auto poBatch = *result;
     455          13 :         if (poBatch)
     456             :         {
     457          13 :             return BuildDomainFromBatch(osDomainName, poBatch, iArrowCol);
     458             :         }
     459             :     }
     460             : 
     461           0 :     return nullptr;
     462             : }
     463             : 
     464             : /************************************************************************/
     465             : /*                           ResetReading()                             */
     466             : /************************************************************************/
     467             : 
     468         829 : void OGRFeatherLayer::ResetReading()
     469             : {
     470         829 :     if (m_poRecordBatchReader != nullptr && m_iRecordBatch > 0)
     471             :     {
     472          17 :         if (m_iRecordBatch == 1 && m_poBatchIdx1)
     473             :         {
     474             :             // do nothing
     475             :         }
     476             :         else
     477             :         {
     478          16 :             m_bResetRecordBatchReaderAsked = true;
     479             :         }
     480             :     }
     481         829 :     OGRArrowLayer::ResetReading();
     482         829 : }
     483             : 
     484             : /************************************************************************/
     485             : /*                           ReadNextBatch()                            */
     486             : /************************************************************************/
     487             : 
     488        1011 : bool OGRFeatherLayer::ReadNextBatch()
     489             : {
     490        1011 :     if (m_poRecordBatchFileReader == nullptr)
     491             :     {
     492         121 :         return ReadNextBatchStream();
     493             :     }
     494             :     else
     495             :     {
     496         890 :         return ReadNextBatchFile();
     497             :     }
     498             : }
     499             : 
     500             : /************************************************************************/
     501             : /*                         ReadNextBatchFile()                          */
     502             : /************************************************************************/
     503             : 
     504         890 : bool OGRFeatherLayer::ReadNextBatchFile()
     505             : {
     506             :     while (true)
     507             :     {
     508         890 :         ++m_iRecordBatch;
     509         890 :         if (m_iRecordBatch == m_poRecordBatchFileReader->num_record_batches())
     510             :         {
     511         451 :             if (m_iRecordBatch == 1)
     512         448 :                 m_iRecordBatch = 0;
     513             :             else
     514           3 :                 m_poBatch.reset();
     515         451 :             return false;
     516             :         }
     517             : 
     518         439 :         m_nIdxInBatch = 0;
     519             : 
     520             :         auto result =
     521         439 :             m_poRecordBatchFileReader->ReadRecordBatch(m_iRecordBatch);
     522         439 :         if (!result.ok())
     523             :         {
     524           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     525             :                      "ReadRecordBatch() failed: %s",
     526           0 :                      result.status().message().c_str());
     527           0 :             m_poBatch.reset();
     528           0 :             return false;
     529             :         }
     530         439 :         if ((*result)->num_rows() != 0)
     531             :         {
     532         439 :             SetBatch(*result);
     533         439 :             break;
     534             :         }
     535           0 :     }
     536             : 
     537         439 :     return true;
     538             : }
     539             : 
     540             : /************************************************************************/
     541             : /*                         ReadNextBatchStream()                        */
     542             : /************************************************************************/
     543             : 
     544         156 : bool OGRFeatherLayer::ReadNextBatchStream()
     545             : {
     546         156 :     m_nIdxInBatch = 0;
     547             : 
     548         312 :     std::shared_ptr<arrow::RecordBatch> poNextBatch;
     549           0 :     do
     550             :     {
     551         156 :         if (m_iRecordBatch == 0 && m_poBatchIdx0)
     552             :         {
     553           1 :             SetBatch(m_poBatchIdx0);
     554           1 :             m_iRecordBatch = 1;
     555         103 :             return true;
     556             :         }
     557             : 
     558         155 :         else if (m_iRecordBatch == 1 && m_poBatchIdx1)
     559             :         {
     560           1 :             SetBatch(m_poBatchIdx1);
     561           1 :             m_iRecordBatch = 2;
     562           1 :             return true;
     563             :         }
     564             : 
     565         154 :         else if (m_bSingleBatch)
     566             :         {
     567          83 :             CPLAssert(m_iRecordBatch == 0);
     568          83 :             CPLAssert(m_poBatch != nullptr);
     569          83 :             return false;
     570             :         }
     571             : 
     572          71 :         if (m_bResetRecordBatchReaderAsked)
     573             :         {
     574          13 :             if (!m_bSeekable)
     575             :             {
     576           1 :                 CPLError(CE_Failure, CPLE_NotSupported,
     577             :                          "Attempting to rewind non-seekable stream");
     578           1 :                 return false;
     579             :             }
     580          12 :             if (!ResetRecordBatchReader())
     581           0 :                 return false;
     582          12 :             m_bResetRecordBatchReaderAsked = false;
     583             :         }
     584             : 
     585          70 :         CPLAssert(m_poRecordBatchReader);
     586             : 
     587          70 :         ++m_iRecordBatch;
     588             : 
     589          70 :         poNextBatch.reset();
     590          70 :         auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
     591          70 :         if (!status.ok())
     592             :         {
     593           0 :             CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
     594           0 :                      status.message().c_str());
     595           0 :             poNextBatch.reset();
     596             :         }
     597          70 :         if (poNextBatch == nullptr)
     598             :         {
     599          17 :             if (m_iRecordBatch == 1)
     600             :             {
     601           3 :                 m_iRecordBatch = 0;
     602           3 :                 m_bSingleBatch = true;
     603             :             }
     604             :             else
     605             :             {
     606          14 :                 m_poBatch.reset();
     607          14 :                 m_poBatchColumns.clear();
     608             :             }
     609          17 :             return false;
     610             :         }
     611          53 :     } while (poNextBatch->num_rows() == 0);
     612             : 
     613          53 :     SetBatch(poNextBatch);
     614             : 
     615          53 :     return true;
     616             : }
     617             : 
     618             : /************************************************************************/
     619             : /*                     TryToCacheFirstTwoBatches()                      */
     620             : /************************************************************************/
     621             : 
     622           1 : void OGRFeatherLayer::TryToCacheFirstTwoBatches()
     623             : {
     624           2 :     if (m_poRecordBatchReader != nullptr && m_iRecordBatch <= 0 &&
     625           2 :         !m_bSingleBatch && m_poBatchIdx0 == nullptr)
     626             :     {
     627           1 :         ResetReading();
     628           1 :         if (!m_poBatch)
     629             :         {
     630           0 :             CPL_IGNORE_RET_VAL(ReadNextBatchStream());
     631             :         }
     632           1 :         if (m_poBatch)
     633             :         {
     634           2 :             auto poBatchIdx0 = m_poBatch;
     635           1 :             if (ReadNextBatchStream())
     636             :             {
     637           1 :                 CPLAssert(m_iRecordBatch == 1);
     638           1 :                 m_poBatchIdx0 = poBatchIdx0;
     639           1 :                 m_poBatchIdx1 = m_poBatch;
     640           1 :                 SetBatch(poBatchIdx0);
     641           1 :                 ResetReading();
     642             :             }
     643           1 :             ResetReading();
     644             :         }
     645             :     }
     646           1 : }
     647             : 
     648             : /************************************************************************/
     649             : /*                          CanPostFilterArrowArray()                   */
     650             : /************************************************************************/
     651             : 
     652          20 : bool OGRFeatherLayer::CanPostFilterArrowArray(
     653             :     const struct ArrowSchema *schema) const
     654             : {
     655          20 :     if (m_poRecordBatchReader)
     656          10 :         return false;
     657          10 :     return OGRArrowLayer::CanPostFilterArrowArray(schema);
     658             : }
     659             : 
     660             : /************************************************************************/
     661             : /*                     InvalidateCachedBatches()                        */
     662             : /************************************************************************/
     663             : 
     664         109 : void OGRFeatherLayer::InvalidateCachedBatches()
     665             : {
     666         109 :     if (m_poRecordBatchFileReader)
     667             :     {
     668          63 :         m_iRecordBatch = -1;
     669          63 :         ResetReading();
     670             :     }
     671         109 : }
     672             : 
     673             : /************************************************************************/
     674             : /*                        GetFeatureCount()                             */
     675             : /************************************************************************/
     676             : 
     677         336 : GIntBig OGRFeatherLayer::GetFeatureCount(int bForce)
     678             : {
     679         628 :     if (m_poRecordBatchFileReader != nullptr && m_poAttrQuery == nullptr &&
     680         292 :         m_poFilterGeom == nullptr)
     681             :     {
     682         288 :         auto result = m_poRecordBatchFileReader->CountRows();
     683         288 :         if (result.ok())
     684         288 :             return *result;
     685             :     }
     686          48 :     else if (m_poRecordBatchReader != nullptr)
     687             :     {
     688          36 :         if (!m_bSeekable && !bForce)
     689             :         {
     690           1 :             if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
     691             :             {
     692           1 :                 TryToCacheFirstTwoBatches();
     693             :             }
     694             : 
     695           1 :             if (!m_bSingleBatch)
     696             :             {
     697           1 :                 CPLError(
     698             :                     CE_Failure, CPLE_AppDefined,
     699             :                     "GetFeatureCount() cannot be run in non-forced mode on "
     700             :                     "a non-seekable file made of several batches");
     701           1 :                 return -1;
     702             :             }
     703             :         }
     704             : 
     705          35 :         if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
     706             :         {
     707          23 :             GIntBig nFeatures = 0;
     708          23 :             ResetReading();
     709          23 :             if (!m_poBatch)
     710           3 :                 ReadNextBatchStream();
     711          31 :             while (m_poBatch)
     712             :             {
     713          31 :                 nFeatures += m_poBatch->num_rows();
     714          31 :                 if (!ReadNextBatchStream())
     715          23 :                     break;
     716             :             }
     717          23 :             ResetReading();
     718          23 :             return nFeatures;
     719             :         }
     720             :     }
     721          24 :     return OGRLayer::GetFeatureCount(bForce);
     722             : }
     723             : 
     724             : /************************************************************************/
     725             : /*                       CanRunNonForcedGetExtent()                     */
     726             : /************************************************************************/
     727             : 
     728           0 : bool OGRFeatherLayer::CanRunNonForcedGetExtent()
     729             : {
     730           0 :     if (m_bSeekable)
     731           0 :         return true;
     732           0 :     TryToCacheFirstTwoBatches();
     733           0 :     if (!m_bSingleBatch)
     734             :     {
     735           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     736             :                  "GetExtent() cannot be run in non-forced mode on "
     737             :                  "a non-seekable file made of several batches");
     738           0 :         return false;
     739             :     }
     740           0 :     return true;
     741             : }
     742             : 
     743             : /************************************************************************/
     744             : /*                         TestCapability()                             */
     745             : /************************************************************************/
     746             : 
     747         296 : int OGRFeatherLayer::TestCapability(const char *pszCap) const
     748             : {
     749         296 :     if (EQUAL(pszCap, OLCFastFeatureCount))
     750             :     {
     751          28 :         return m_bSeekable && m_poAttrQuery == nullptr &&
     752          28 :                m_poFilterGeom == nullptr;
     753             :     }
     754             : 
     755         278 :     if (EQUAL(pszCap, OLCMeasuredGeometries))
     756          16 :         return true;
     757         262 :     if (EQUAL(pszCap, OLCZGeometries))
     758          12 :         return true;
     759             : 
     760         250 :     return OGRArrowLayer::TestCapability(pszCap);
     761             : }
     762             : 
     763             : /************************************************************************/
     764             : /*                         GetMetadataItem()                            */
     765             : /************************************************************************/
     766             : 
     767         259 : const char *OGRFeatherLayer::GetMetadataItem(const char *pszName,
     768             :                                              const char *pszDomain)
     769             : {
     770             :     // Mostly for unit test purposes
     771         259 :     if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_"))
     772             :     {
     773           9 :         if (EQUAL(pszName, "FORMAT"))
     774             :         {
     775           5 :             return m_poRecordBatchFileReader ? "FILE" : "STREAM";
     776             :         }
     777           4 :         if (m_poRecordBatchFileReader != nullptr)
     778             :         {
     779           4 :             int iBatch = -1;
     780           4 :             if (EQUAL(pszName, "NUM_RECORD_BATCHES"))
     781             :             {
     782           1 :                 return CPLSPrintf(
     783           5 :                     "%d", m_poRecordBatchFileReader->num_record_batches());
     784             :             }
     785           6 :             else if (sscanf(pszName, "RECORD_BATCHES[%d]", &iBatch) == 1 &&
     786           3 :                      strstr(pszName, ".NUM_ROWS"))
     787             :             {
     788             :                 auto result =
     789           6 :                     m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
     790           3 :                 if (!result.ok())
     791             :                 {
     792           0 :                     return nullptr;
     793             :                 }
     794           3 :                 return CPLSPrintf("%" PRId64, (*result)->num_rows());
     795             :             }
     796             :         }
     797           0 :         return nullptr;
     798             :     }
     799         250 :     else if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
     800             :     {
     801             :         const auto kv_metadata =
     802           5 :             (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     803           8 :                                        : m_poRecordBatchReader->schema())
     804          10 :                 ->metadata();
     805           5 :         if (kv_metadata && kv_metadata->Contains(pszName))
     806             :         {
     807           5 :             auto metadataItem = kv_metadata->Get(pszName);
     808           5 :             if (metadataItem.ok())
     809             :             {
     810           5 :                 return CPLSPrintf("%s", metadataItem->c_str());
     811             :             }
     812             :         }
     813           0 :         return nullptr;
     814             :     }
     815         476 :     else if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
     816         231 :              EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
     817             :     {
     818           2 :         const auto kv_metadata = m_poRecordBatchFileReader->metadata();
     819           1 :         if (kv_metadata && kv_metadata->Contains(pszName))
     820             :         {
     821           1 :             auto metadataItem = kv_metadata->Get(pszName);
     822           1 :             if (metadataItem.ok())
     823             :             {
     824           1 :                 return CPLSPrintf("%s", metadataItem->c_str());
     825             :             }
     826             :         }
     827           0 :         return nullptr;
     828             :     }
     829         244 :     return OGRLayer::GetMetadataItem(pszName, pszDomain);
     830             : }
     831             : 
     832             : /************************************************************************/
     833             : /*                           GetMetadata()                              */
     834             : /************************************************************************/
     835             : 
     836         144 : char **OGRFeatherLayer::GetMetadata(const char *pszDomain)
     837             : {
     838             :     // Mostly for unit test purposes
     839         144 :     if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
     840             :     {
     841           5 :         m_aosFeatherMetadata.Clear();
     842             :         const auto kv_metadata =
     843           5 :             (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     844           8 :                                        : m_poRecordBatchReader->schema())
     845          10 :                 ->metadata();
     846           5 :         if (kv_metadata)
     847             :         {
     848          11 :             for (const auto &kv : kv_metadata->sorted_pairs())
     849             :             {
     850             :                 m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
     851           6 :                                                   kv.second.c_str());
     852             :             }
     853             :         }
     854           5 :         return m_aosFeatherMetadata.List();
     855             :     }
     856         261 :     if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
     857         122 :         EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
     858             :     {
     859           2 :         m_aosFeatherMetadata.Clear();
     860           4 :         const auto kv_metadata = m_poRecordBatchFileReader->metadata();
     861           2 :         if (kv_metadata)
     862             :         {
     863           3 :             for (const auto &kv : kv_metadata->sorted_pairs())
     864             :             {
     865             :                 m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
     866           1 :                                                   kv.second.c_str());
     867             :             }
     868             :         }
     869           2 :         return m_aosFeatherMetadata.List();
     870             :     }
     871         137 :     return OGRLayer::GetMetadata(pszDomain);
     872             : }

Generated by: LCOV version 1.14