LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow - ogrfeatherlayer.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 321 382 84.0 %
Date: 2026-01-23 20:24:11 Functions: 19 20 95.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Feather Translator
       4             :  * Purpose:  Implements OGRFeatherDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "cpl_json.h"
      14             : #include "cpl_time.h"
      15             : #include "gdal_pam.h"
      16             : #include "ogrsf_frmts.h"
      17             : #include "ogr_p.h"
      18             : 
      19             : #include <cinttypes>
      20             : #include <limits>
      21             : #include <map>
      22             : #include <set>
      23             : #include <utility>
      24             : 
      25             : #include "ogr_feather.h"
      26             : 
      27             : #include "../arrow_common/ograrrowlayer.hpp"
      28             : #include "../arrow_common/ograrrowdataset.hpp"
      29             : 
      30             : /************************************************************************/
      31             : /*                        OGRFeatherLayer()                             */
      32             : /************************************************************************/
      33             : 
      34         538 : OGRFeatherLayer::OGRFeatherLayer(
      35             :     OGRFeatherDataset *poDS, const char *pszLayerName,
      36             :     std::shared_ptr<arrow::ipc::RecordBatchFileReader> &poRecordBatchFileReader,
      37         538 :     CSLConstList papszOpenOptions)
      38             :     : OGRArrowLayer(poDS, pszLayerName,
      39         538 :                     CPLTestBool(CSLFetchNameValueDef(
      40             :                         papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
      41        1076 :       m_poDS(poDS), m_poRecordBatchFileReader(poRecordBatchFileReader)
      42             : {
      43         538 :     EstablishFeatureDefn();
      44         538 :     CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
      45             :               m_poFeatureDefn->GetGeomFieldCount());
      46         538 : }
      47             : 
      48             : /************************************************************************/
      49             : /*                        OGRFeatherLayer()                             */
      50             : /************************************************************************/
      51             : 
      52          10 : OGRFeatherLayer::OGRFeatherLayer(
      53             :     OGRFeatherDataset *poDS, const char *pszLayerName,
      54             :     std::shared_ptr<arrow::io::RandomAccessFile> poFile, bool bSeekable,
      55             :     const arrow::ipc::IpcReadOptions &oOptions,
      56             :     std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
      57             :         &poRecordBatchStreamReader,
      58          10 :     CSLConstList papszOpenOptions)
      59             :     : OGRArrowLayer(poDS, pszLayerName,
      60          10 :                     CPLTestBool(CSLFetchNameValueDef(
      61             :                         papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
      62          10 :       m_poDS(poDS), m_poFile(std::move(poFile)), m_bSeekable(bSeekable),
      63          20 :       m_oOptions(oOptions), m_poRecordBatchReader(poRecordBatchStreamReader)
      64             : {
      65          10 :     EstablishFeatureDefn();
      66          10 :     CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
      67             :               m_poFeatureDefn->GetGeomFieldCount());
      68          10 : }
      69             : 
      70             : /************************************************************************/
      71             : /*                           GetDataset()                               */
      72             : /************************************************************************/
      73             : 
      74         112 : GDALDataset *OGRFeatherLayer::GetDataset()
      75             : {
      76         112 :     return m_poDS;
      77             : }
      78             : 
      79             : /************************************************************************/
      80             : /*                          LoadGeoMetadata()                           */
      81             : /************************************************************************/
      82             : 
      83         548 : void OGRFeatherLayer::LoadGeoMetadata(
      84             :     const arrow::KeyValueMetadata *kv_metadata, const std::string &key)
      85             : {
      86         548 :     if (kv_metadata && kv_metadata->Contains(key))
      87             :     {
      88        1066 :         auto geo = kv_metadata->Get(key);
      89         533 :         if (geo.ok())
      90             :         {
      91        1066 :             CPLJSONDocument oDoc;
      92         533 :             if (oDoc.LoadMemory(*geo))
      93             :             {
      94        1066 :                 auto oRoot = oDoc.GetRoot();
      95        1599 :                 const auto osVersion = oRoot.GetString("schema_version");
      96         533 :                 if (key != GDAL_GEO_FOOTER_KEY && osVersion != "0.1.0")
      97             :                 {
      98         398 :                     CPLDebug("FEATHER",
      99             :                              "schema_version = %s not explicitly handled by "
     100             :                              "the driver",
     101             :                              osVersion.c_str());
     102             :                 }
     103        1599 :                 auto oColumns = oRoot.GetObj("columns");
     104         533 :                 if (oColumns.IsValid())
     105             :                 {
     106        1066 :                     for (const auto &oColumn : oColumns.GetChildren())
     107             :                     {
     108         533 :                         m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
     109             :                     }
     110             :                 }
     111             :             }
     112             :             else
     113             :             {
     114           0 :                 CPLError(CE_Warning, CPLE_AppDefined,
     115             :                          "Cannot parse 'geo' metadata");
     116             :             }
     117             :         }
     118             :     }
     119         548 : }
     120             : 
     121             : /************************************************************************/
     122             : /*                        EstablishFeatureDefn()                        */
     123             : /************************************************************************/
     124             : 
     125         548 : void OGRFeatherLayer::EstablishFeatureDefn()
     126             : {
     127        1096 :     m_poSchema = m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     128        1096 :                                            : m_poRecordBatchReader->schema();
     129         548 :     const auto &kv_metadata = m_poSchema->metadata();
     130             : 
     131             : #ifdef DEBUG
     132         548 :     if (kv_metadata)
     133             :     {
     134        1079 :         for (const auto &keyValue : kv_metadata->sorted_pairs())
     135             :         {
     136         541 :             CPLDebug("FEATHER", "%s = %s", keyValue.first.c_str(),
     137             :                      keyValue.second.c_str());
     138             :         }
     139             :     }
     140             : #endif
     141             : 
     142             :     auto poFooterMetadata = m_poRecordBatchFileReader
     143         538 :                                 ? m_poRecordBatchFileReader->metadata()
     144        1634 :                                 : nullptr;
     145         674 :     if (poFooterMetadata && poFooterMetadata->Contains(GDAL_GEO_FOOTER_KEY) &&
     146         126 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_READ_GDAL_FOOTER", "YES")))
     147             :     {
     148         126 :         LoadGeoMetadata(poFooterMetadata.get(), GDAL_GEO_FOOTER_KEY);
     149             :     }
     150             :     else
     151             :     {
     152         422 :         LoadGeoMetadata(kv_metadata.get(), "geo");
     153             :     }
     154             :     const auto oMapFieldNameToGDALSchemaFieldDefn =
     155        1096 :         LoadGDALSchema(kv_metadata.get());
     156             : 
     157         548 :     const auto &fields = m_poSchema->fields();
     158        4175 :     for (int i = 0; i < m_poSchema->num_fields(); ++i)
     159             :     {
     160        3627 :         const auto &field = fields[i];
     161        3627 :         const auto &fieldName = field->name();
     162             : 
     163        3627 :         const auto &field_kv_metadata = field->metadata();
     164        3627 :         std::string osExtensionName;
     165        3627 :         std::string osExtensionMetadata;
     166        3627 :         if (field->type()->id() == arrow::Type::EXTENSION)
     167             :         {
     168             :             osExtensionName =
     169         127 :                 cpl::down_cast<arrow::ExtensionType *>(field->type().get())
     170         127 :                     ->extension_name();
     171             :         }
     172        3500 :         else if (field_kv_metadata)
     173             :         {
     174             :             auto extension_name =
     175        1048 :                 field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
     176         524 :             if (extension_name.ok())
     177             :             {
     178         522 :                 osExtensionName = *extension_name;
     179             :             }
     180             : 
     181             :             auto extension_metadata =
     182        1048 :                 field_kv_metadata->Get(ARROW_EXTENSION_METADATA_KEY);
     183         524 :             if (extension_metadata.ok())
     184             :             {
     185         384 :                 osExtensionMetadata = *extension_metadata;
     186             :             }
     187             : #ifdef DEBUG
     188         524 :             CPLDebug("FEATHER", "Metadata field %s:", fieldName.c_str());
     189        1432 :             for (const auto &keyValue : field_kv_metadata->sorted_pairs())
     190             :             {
     191         908 :                 CPLDebug("FEATHER", "  %s = %s", keyValue.first.c_str(),
     192             :                          keyValue.second.c_str());
     193             :             }
     194             : #endif
     195             :         }
     196             : 
     197        3627 :         if (!m_osFIDColumn.empty() && fieldName == m_osFIDColumn)
     198             :         {
     199           6 :             m_iFIDArrowColumn = i;
     200           6 :             continue;
     201             :         }
     202             : 
     203        3621 :         bool bRegularField = true;
     204        3621 :         auto oIter = m_oMapGeometryColumns.find(fieldName);
     205        9670 :         if (oIter != m_oMapGeometryColumns.end() ||
     206        6049 :             (osExtensionName != EXTENSION_NAME_ARROW_JSON &&
     207        2961 :              osExtensionName != EXTENSION_NAME_ARROW_TIMESTAMP_WITH_OFFSET &&
     208        2959 :              !osExtensionName.empty()))
     209             :         {
     210        1072 :             CPLJSONObject oJSONDef;
     211         536 :             if (oIter != m_oMapGeometryColumns.end())
     212         533 :                 oJSONDef = oIter->second;
     213        1608 :             auto osEncoding = oJSONDef.GetString("encoding");
     214         536 :             if (osEncoding.empty() && !osExtensionName.empty())
     215           3 :                 osEncoding = osExtensionName;
     216             : 
     217         536 :             OGRwkbGeometryType eGeomType = wkbUnknown;
     218         536 :             auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
     219         536 :             if (IsValidGeometryEncoding(field, osEncoding,
     220        1072 :                                         oIter != m_oMapGeometryColumns.end(),
     221             :                                         eGeomType, eGeomEncoding))
     222             :             {
     223         534 :                 bRegularField = false;
     224        1068 :                 OGRGeomFieldDefn oField(fieldName.c_str(), wkbUnknown);
     225             : 
     226        1602 :                 auto osCRS = oJSONDef.GetString("crs");
     227             : 
     228             : #if ARROW_VERSION_MAJOR >= 21
     229             :                 if (osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
     230             :                     osExtensionMetadata.empty() &&
     231             :                     field->type()->id() == arrow::Type::EXTENSION)
     232             :                 {
     233             :                     const auto arrowWkb =
     234             :                         std::dynamic_pointer_cast<OGRGeoArrowWkbExtensionType>(
     235             :                             field->type());
     236             :                     if (arrowWkb)
     237             :                     {
     238             :                         osExtensionMetadata = arrowWkb->Serialize();
     239             :                     }
     240             :                 }
     241             : #endif
     242             : 
     243         931 :                 if (osCRS.empty() &&
     244         397 :                     osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
     245         160 :                     !osExtensionMetadata.empty() &&
     246         931 :                     osExtensionMetadata[0] == '{' &&
     247           0 :                     osExtensionMetadata.back() == '}')
     248             :                 {
     249           0 :                     CPLJSONDocument oDoc;
     250           0 :                     if (oDoc.LoadMemory(osExtensionMetadata))
     251             :                     {
     252           0 :                         auto jCrs = oDoc.GetRoot()["crs"];
     253           0 :                         if (jCrs.GetType() == CPLJSONObject::Type::Object)
     254             :                         {
     255             :                             osCRS =
     256           0 :                                 jCrs.Format(CPLJSONObject::PrettyFormat::Plain);
     257             :                         }
     258           0 :                         else if (jCrs.GetType() == CPLJSONObject::Type::String)
     259             :                         {
     260           0 :                             osCRS = jCrs.ToString();
     261             :                         }
     262           0 :                         if (oDoc.GetRoot()["edges"].ToString() == "spherical")
     263             :                         {
     264           0 :                             SetMetadataItem("EDGES", "SPHERICAL");
     265             :                         }
     266             :                     }
     267             :                 }
     268             : 
     269         534 :                 if (osCRS.empty())
     270             :                 {
     271             : #if 0
     272             :                     CPLError(CE_Warning, CPLE_AppDefined,
     273             :                              "Missing required 'crs' field for geometry column %s",
     274             :                              fieldName.c_str());
     275             : #endif
     276             :                 }
     277             :                 else
     278             :                 {
     279         137 :                     OGRSpatialReference *poSRS = new OGRSpatialReference();
     280         137 :                     poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
     281             : 
     282         137 :                     if (poSRS->SetFromUserInput(
     283             :                             osCRS.c_str(),
     284             :                             OGRSpatialReference::
     285         137 :                                 SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
     286             :                         OGRERR_NONE)
     287             :                     {
     288             :                         const char *pszAuthName =
     289         137 :                             poSRS->GetAuthorityName(nullptr);
     290             :                         const char *pszAuthCode =
     291         137 :                             poSRS->GetAuthorityCode(nullptr);
     292         137 :                         if (pszAuthName && pszAuthCode &&
     293         137 :                             EQUAL(pszAuthName, "OGC") &&
     294           0 :                             EQUAL(pszAuthCode, "CRS84"))
     295             :                         {
     296           0 :                             poSRS->importFromEPSG(4326);
     297             :                         }
     298             : 
     299         137 :                         const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
     300         137 :                         if (dfCoordEpoch > 0)
     301           2 :                             poSRS->SetCoordinateEpoch(dfCoordEpoch);
     302             : 
     303         137 :                         oField.SetSpatialRef(poSRS);
     304             :                     }
     305         137 :                     poSRS->Release();
     306             :                 }
     307             : 
     308             :                 // m_aeGeomEncoding be filled before calling
     309             :                 // ComputeGeometryColumnType()
     310         534 :                 m_aeGeomEncoding.push_back(eGeomEncoding);
     311         534 :                 if (eGeomType == wkbUnknown)
     312             :                 {
     313         708 :                     auto osType = oJSONDef.GetString("geometry_type");
     314         236 :                     if (osType.empty())
     315         236 :                         osType = oJSONDef.GetString("gdal:geometry_type");
     316         472 :                     if (m_bSeekable && osType.empty() &&
     317         236 :                         CPLTestBool(CPLGetConfigOption(
     318             :                             "OGR_ARROW_COMPUTE_GEOMETRY_TYPE", "YES")))
     319             :                     {
     320         236 :                         eGeomType = ComputeGeometryColumnType(
     321         236 :                             m_poFeatureDefn->GetGeomFieldCount(), i);
     322         236 :                         if (m_poRecordBatchReader)
     323           0 :                             ResetRecordBatchReader();
     324             :                     }
     325             :                     else
     326           0 :                         eGeomType = GetGeometryTypeFromString(osType);
     327             :                 }
     328             : 
     329         534 :                 oField.SetType(eGeomType);
     330         534 :                 oField.SetNullable(field->nullable());
     331         534 :                 m_poFeatureDefn->AddGeomFieldDefn(&oField);
     332         534 :                 m_anMapGeomFieldIndexToArrowColumn.push_back(i);
     333             :             }
     334             :         }
     335             : 
     336        3621 :         if (bRegularField)
     337             :         {
     338        3087 :             CreateFieldFromSchema(field, {i},
     339             :                                   oMapFieldNameToGDALSchemaFieldDefn);
     340             :         }
     341             :     }
     342             : 
     343         548 :     CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
     344             :               m_poFeatureDefn->GetFieldCount());
     345         548 :     CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
     346             :               m_poFeatureDefn->GetGeomFieldCount());
     347         548 : }
     348             : 
     349             : /************************************************************************/
     350             : /*                       ResetRecordBatchReader()                       */
     351             : /************************************************************************/
     352             : 
     353          12 : bool OGRFeatherLayer::ResetRecordBatchReader()
     354             : {
     355          12 :     const auto nPos = *(m_poFile->Tell());
     356          12 :     CPL_IGNORE_RET_VAL(m_poFile->Seek(0));
     357             :     auto result =
     358          24 :         arrow::ipc::RecordBatchStreamReader::Open(m_poFile, m_oOptions);
     359          12 :     if (!result.ok())
     360             :     {
     361           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     362             :                  "RecordBatchStreamReader::Open() failed with %s",
     363           0 :                  result.status().message().c_str());
     364           0 :         CPL_IGNORE_RET_VAL(m_poFile->Seek(nPos));
     365           0 :         return false;
     366             :     }
     367             :     else
     368             :     {
     369          12 :         m_poRecordBatchReader = *result;
     370          12 :         return true;
     371             :     }
     372             : }
     373             : 
     374             : /************************************************************************/
     375             : /*                     ComputeGeometryColumnType()                      */
     376             : /************************************************************************/
     377             : 
     378         236 : OGRwkbGeometryType OGRFeatherLayer::ComputeGeometryColumnType(int iGeomCol,
     379             :                                                               int iCol) const
     380             : {
     381             :     // Compute type of geometry column by iterating over each geometry, and
     382             :     // looking at the WKB geometry type in the first 5 bytes of each geometry.
     383             : 
     384         236 :     OGRwkbGeometryType eGeomType = wkbNone;
     385             : 
     386         236 :     if (m_poRecordBatchReader != nullptr)
     387             :     {
     388           0 :         std::shared_ptr<arrow::RecordBatch> poBatch;
     389             :         while (true)
     390             :         {
     391           0 :             auto status = m_poRecordBatchReader->ReadNext(&poBatch);
     392           0 :             if (!status.ok())
     393             :             {
     394           0 :                 CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
     395           0 :                          status.message().c_str());
     396           0 :                 break;
     397             :             }
     398           0 :             else if (!poBatch)
     399           0 :                 break;
     400           0 :             eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
     401             :                                                               iCol, eGeomType);
     402           0 :             if (eGeomType == wkbUnknown)
     403           0 :                 break;
     404           0 :         }
     405             :     }
     406             :     else
     407             :     {
     408         472 :         for (int iBatch = 0;
     409         472 :              iBatch < m_poRecordBatchFileReader->num_record_batches(); ++iBatch)
     410             :         {
     411         236 :             auto result = m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
     412         236 :             if (!result.ok())
     413             :             {
     414           0 :                 CPLError(CE_Failure, CPLE_AppDefined,
     415             :                          "ReadRecordBatch() failed: %s",
     416           0 :                          result.status().message().c_str());
     417           0 :                 break;
     418             :             }
     419         236 :             eGeomType = ComputeGeometryColumnTypeProcessBatch(*result, iGeomCol,
     420             :                                                               iCol, eGeomType);
     421         236 :             if (eGeomType == wkbUnknown)
     422           0 :                 break;
     423             :         }
     424             :     }
     425             : 
     426         236 :     return eGeomType == wkbNone ? wkbUnknown : eGeomType;
     427             : }
     428             : 
     429             : /************************************************************************/
     430             : /*                          BuildDomain()                               */
     431             : /************************************************************************/
     432             : 
     433             : std::unique_ptr<OGRFieldDomain>
     434          19 : OGRFeatherLayer::BuildDomain(const std::string &osDomainName,
     435             :                              int iFieldIndex) const
     436             : {
     437          19 :     const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
     438          19 :     CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
     439             :               arrow::Type::DICTIONARY);
     440             : 
     441          19 :     if (m_poRecordBatchReader)
     442             :     {
     443           6 :         if (m_poBatch)
     444             :         {
     445           6 :             return BuildDomainFromBatch(osDomainName, m_poBatch, iArrowCol);
     446             :         }
     447             :     }
     448          13 :     else if (m_poRecordBatchFileReader)
     449             :     {
     450          13 :         auto result = m_poRecordBatchFileReader->ReadRecordBatch(0);
     451          13 :         if (!result.ok())
     452             :         {
     453           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     454             :                      "ReadRecordBatch() failed: %s",
     455           0 :                      result.status().message().c_str());
     456             :         }
     457          13 :         auto poBatch = *result;
     458          13 :         if (poBatch)
     459             :         {
     460          13 :             return BuildDomainFromBatch(osDomainName, poBatch, iArrowCol);
     461             :         }
     462             :     }
     463             : 
     464           0 :     return nullptr;
     465             : }
     466             : 
     467             : /************************************************************************/
     468             : /*                           ResetReading()                             */
     469             : /************************************************************************/
     470             : 
     471         829 : void OGRFeatherLayer::ResetReading()
     472             : {
     473         829 :     if (m_poRecordBatchReader != nullptr && m_iRecordBatch > 0)
     474             :     {
     475          17 :         if (m_iRecordBatch == 1 && m_poBatchIdx1)
     476             :         {
     477             :             // do nothing
     478             :         }
     479             :         else
     480             :         {
     481          16 :             m_bResetRecordBatchReaderAsked = true;
     482             :         }
     483             :     }
     484         829 :     OGRArrowLayer::ResetReading();
     485         829 : }
     486             : 
     487             : /************************************************************************/
     488             : /*                           ReadNextBatch()                            */
     489             : /************************************************************************/
     490             : 
     491        1013 : bool OGRFeatherLayer::ReadNextBatch()
     492             : {
     493        1013 :     if (m_poRecordBatchFileReader == nullptr)
     494             :     {
     495         121 :         return ReadNextBatchStream();
     496             :     }
     497             :     else
     498             :     {
     499         892 :         return ReadNextBatchFile();
     500             :     }
     501             : }
     502             : 
     503             : /************************************************************************/
     504             : /*                         ReadNextBatchFile()                          */
     505             : /************************************************************************/
     506             : 
     507         892 : bool OGRFeatherLayer::ReadNextBatchFile()
     508             : {
     509             :     while (true)
     510             :     {
     511         892 :         ++m_iRecordBatch;
     512         892 :         if (m_iRecordBatch == m_poRecordBatchFileReader->num_record_batches())
     513             :         {
     514         451 :             if (m_iRecordBatch == 1)
     515         448 :                 m_iRecordBatch = 0;
     516             :             else
     517           3 :                 m_poBatch.reset();
     518         451 :             return false;
     519             :         }
     520             : 
     521         441 :         m_nIdxInBatch = 0;
     522             : 
     523             :         auto result =
     524         441 :             m_poRecordBatchFileReader->ReadRecordBatch(m_iRecordBatch);
     525         441 :         if (!result.ok())
     526             :         {
     527           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     528             :                      "ReadRecordBatch() failed: %s",
     529           0 :                      result.status().message().c_str());
     530           0 :             m_poBatch.reset();
     531           0 :             return false;
     532             :         }
     533         441 :         if ((*result)->num_rows() != 0)
     534             :         {
     535         441 :             SetBatch(*result);
     536         441 :             break;
     537             :         }
     538           0 :     }
     539             : 
     540         441 :     return true;
     541             : }
     542             : 
     543             : /************************************************************************/
     544             : /*                         ReadNextBatchStream()                        */
     545             : /************************************************************************/
     546             : 
     547         156 : bool OGRFeatherLayer::ReadNextBatchStream()
     548             : {
     549         156 :     m_nIdxInBatch = 0;
     550             : 
     551         312 :     std::shared_ptr<arrow::RecordBatch> poNextBatch;
     552           0 :     do
     553             :     {
     554         156 :         if (m_iRecordBatch == 0 && m_poBatchIdx0)
     555             :         {
     556           1 :             SetBatch(m_poBatchIdx0);
     557           1 :             m_iRecordBatch = 1;
     558         103 :             return true;
     559             :         }
     560             : 
     561         155 :         else if (m_iRecordBatch == 1 && m_poBatchIdx1)
     562             :         {
     563           1 :             SetBatch(m_poBatchIdx1);
     564           1 :             m_iRecordBatch = 2;
     565           1 :             return true;
     566             :         }
     567             : 
     568         154 :         else if (m_bSingleBatch)
     569             :         {
     570          83 :             CPLAssert(m_iRecordBatch == 0);
     571          83 :             CPLAssert(m_poBatch != nullptr);
     572          83 :             return false;
     573             :         }
     574             : 
     575          71 :         if (m_bResetRecordBatchReaderAsked)
     576             :         {
     577          13 :             if (!m_bSeekable)
     578             :             {
     579           1 :                 CPLError(CE_Failure, CPLE_NotSupported,
     580             :                          "Attempting to rewind non-seekable stream");
     581           1 :                 return false;
     582             :             }
     583          12 :             if (!ResetRecordBatchReader())
     584           0 :                 return false;
     585          12 :             m_bResetRecordBatchReaderAsked = false;
     586             :         }
     587             : 
     588          70 :         CPLAssert(m_poRecordBatchReader);
     589             : 
     590          70 :         ++m_iRecordBatch;
     591             : 
     592          70 :         poNextBatch.reset();
     593          70 :         auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
     594          70 :         if (!status.ok())
     595             :         {
     596           0 :             CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
     597           0 :                      status.message().c_str());
     598           0 :             poNextBatch.reset();
     599             :         }
     600          70 :         if (poNextBatch == nullptr)
     601             :         {
     602          17 :             if (m_iRecordBatch == 1)
     603             :             {
     604           3 :                 m_iRecordBatch = 0;
     605           3 :                 m_bSingleBatch = true;
     606             :             }
     607             :             else
     608             :             {
     609          14 :                 m_poBatch.reset();
     610          14 :                 m_poBatchColumns.clear();
     611             :             }
     612          17 :             return false;
     613             :         }
     614          53 :     } while (poNextBatch->num_rows() == 0);
     615             : 
     616          53 :     SetBatch(poNextBatch);
     617             : 
     618          53 :     return true;
     619             : }
     620             : 
     621             : /************************************************************************/
     622             : /*                     TryToCacheFirstTwoBatches()                      */
     623             : /************************************************************************/
     624             : 
     625           1 : void OGRFeatherLayer::TryToCacheFirstTwoBatches()
     626             : {
     627           2 :     if (m_poRecordBatchReader != nullptr && m_iRecordBatch <= 0 &&
     628           2 :         !m_bSingleBatch && m_poBatchIdx0 == nullptr)
     629             :     {
     630           1 :         ResetReading();
     631           1 :         if (!m_poBatch)
     632             :         {
     633           0 :             CPL_IGNORE_RET_VAL(ReadNextBatchStream());
     634             :         }
     635           1 :         if (m_poBatch)
     636             :         {
     637           2 :             auto poBatchIdx0 = m_poBatch;
     638           1 :             if (ReadNextBatchStream())
     639             :             {
     640           1 :                 CPLAssert(m_iRecordBatch == 1);
     641           1 :                 m_poBatchIdx0 = poBatchIdx0;
     642           1 :                 m_poBatchIdx1 = m_poBatch;
     643           1 :                 SetBatch(poBatchIdx0);
     644           1 :                 ResetReading();
     645             :             }
     646           1 :             ResetReading();
     647             :         }
     648             :     }
     649           1 : }
     650             : 
     651             : /************************************************************************/
     652             : /*                          CanPostFilterArrowArray()                   */
     653             : /************************************************************************/
     654             : 
     655          20 : bool OGRFeatherLayer::CanPostFilterArrowArray(
     656             :     const struct ArrowSchema *schema) const
     657             : {
     658          20 :     if (m_poRecordBatchReader)
     659          10 :         return false;
     660          10 :     return OGRArrowLayer::CanPostFilterArrowArray(schema);
     661             : }
     662             : 
     663             : /************************************************************************/
     664             : /*                     InvalidateCachedBatches()                        */
     665             : /************************************************************************/
     666             : 
     667         109 : void OGRFeatherLayer::InvalidateCachedBatches()
     668             : {
     669         109 :     if (m_poRecordBatchFileReader)
     670             :     {
     671          63 :         m_iRecordBatch = -1;
     672          63 :         ResetReading();
     673             :     }
     674         109 : }
     675             : 
     676             : /************************************************************************/
     677             : /*                        GetFeatureCount()                             */
     678             : /************************************************************************/
     679             : 
     680         336 : GIntBig OGRFeatherLayer::GetFeatureCount(int bForce)
     681             : {
     682         628 :     if (m_poRecordBatchFileReader != nullptr && m_poAttrQuery == nullptr &&
     683         292 :         m_poFilterGeom == nullptr)
     684             :     {
     685         288 :         auto result = m_poRecordBatchFileReader->CountRows();
     686         288 :         if (result.ok())
     687         288 :             return *result;
     688             :     }
     689          48 :     else if (m_poRecordBatchReader != nullptr)
     690             :     {
     691          36 :         if (!m_bSeekable && !bForce)
     692             :         {
     693           1 :             if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
     694             :             {
     695           1 :                 TryToCacheFirstTwoBatches();
     696             :             }
     697             : 
     698           1 :             if (!m_bSingleBatch)
     699             :             {
     700           1 :                 CPLError(
     701             :                     CE_Failure, CPLE_AppDefined,
     702             :                     "GetFeatureCount() cannot be run in non-forced mode on "
     703             :                     "a non-seekable file made of several batches");
     704           1 :                 return -1;
     705             :             }
     706             :         }
     707             : 
     708          35 :         if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
     709             :         {
     710          23 :             GIntBig nFeatures = 0;
     711          23 :             ResetReading();
     712          23 :             if (!m_poBatch)
     713           3 :                 ReadNextBatchStream();
     714          31 :             while (m_poBatch)
     715             :             {
     716          31 :                 nFeatures += m_poBatch->num_rows();
     717          31 :                 if (!ReadNextBatchStream())
     718          23 :                     break;
     719             :             }
     720          23 :             ResetReading();
     721          23 :             return nFeatures;
     722             :         }
     723             :     }
     724          24 :     return OGRLayer::GetFeatureCount(bForce);
     725             : }
     726             : 
     727             : /************************************************************************/
     728             : /*                       CanRunNonForcedGetExtent()                     */
     729             : /************************************************************************/
     730             : 
     731           0 : bool OGRFeatherLayer::CanRunNonForcedGetExtent()
     732             : {
     733           0 :     if (m_bSeekable)
     734           0 :         return true;
     735           0 :     TryToCacheFirstTwoBatches();
     736           0 :     if (!m_bSingleBatch)
     737             :     {
     738           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     739             :                  "GetExtent() cannot be run in non-forced mode on "
     740             :                  "a non-seekable file made of several batches");
     741           0 :         return false;
     742             :     }
     743           0 :     return true;
     744             : }
     745             : 
     746             : /************************************************************************/
     747             : /*                         TestCapability()                             */
     748             : /************************************************************************/
     749             : 
     750         296 : int OGRFeatherLayer::TestCapability(const char *pszCap) const
     751             : {
     752         296 :     if (EQUAL(pszCap, OLCFastFeatureCount))
     753             :     {
     754          28 :         return m_bSeekable && m_poAttrQuery == nullptr &&
     755          28 :                m_poFilterGeom == nullptr;
     756             :     }
     757             : 
     758         278 :     if (EQUAL(pszCap, OLCMeasuredGeometries))
     759          16 :         return true;
     760         262 :     if (EQUAL(pszCap, OLCZGeometries))
     761          12 :         return true;
     762             : 
     763         250 :     return OGRArrowLayer::TestCapability(pszCap);
     764             : }
     765             : 
     766             : /************************************************************************/
     767             : /*                         GetMetadataItem()                            */
     768             : /************************************************************************/
     769             : 
     770         259 : const char *OGRFeatherLayer::GetMetadataItem(const char *pszName,
     771             :                                              const char *pszDomain)
     772             : {
     773             :     // Mostly for unit test purposes
     774         259 :     if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_"))
     775             :     {
     776           9 :         if (EQUAL(pszName, "FORMAT"))
     777             :         {
     778           5 :             return m_poRecordBatchFileReader ? "FILE" : "STREAM";
     779             :         }
     780           4 :         if (m_poRecordBatchFileReader != nullptr)
     781             :         {
     782           4 :             int iBatch = -1;
     783           4 :             if (EQUAL(pszName, "NUM_RECORD_BATCHES"))
     784             :             {
     785           1 :                 return CPLSPrintf(
     786           5 :                     "%d", m_poRecordBatchFileReader->num_record_batches());
     787             :             }
     788           6 :             else if (sscanf(pszName, "RECORD_BATCHES[%d]", &iBatch) == 1 &&
     789           3 :                      strstr(pszName, ".NUM_ROWS"))
     790             :             {
     791             :                 auto result =
     792           6 :                     m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
     793           3 :                 if (!result.ok())
     794             :                 {
     795           0 :                     return nullptr;
     796             :                 }
     797           3 :                 return CPLSPrintf("%" PRId64, (*result)->num_rows());
     798             :             }
     799             :         }
     800           0 :         return nullptr;
     801             :     }
     802         250 :     else if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
     803             :     {
     804             :         const auto kv_metadata =
     805           5 :             (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     806           8 :                                        : m_poRecordBatchReader->schema())
     807          10 :                 ->metadata();
     808           5 :         if (kv_metadata && kv_metadata->Contains(pszName))
     809             :         {
     810           5 :             auto metadataItem = kv_metadata->Get(pszName);
     811           5 :             if (metadataItem.ok())
     812             :             {
     813           5 :                 return CPLSPrintf("%s", metadataItem->c_str());
     814             :             }
     815             :         }
     816           0 :         return nullptr;
     817             :     }
     818         476 :     else if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
     819         231 :              EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
     820             :     {
     821           2 :         const auto kv_metadata = m_poRecordBatchFileReader->metadata();
     822           1 :         if (kv_metadata && kv_metadata->Contains(pszName))
     823             :         {
     824           1 :             auto metadataItem = kv_metadata->Get(pszName);
     825           1 :             if (metadataItem.ok())
     826             :             {
     827           1 :                 return CPLSPrintf("%s", metadataItem->c_str());
     828             :             }
     829             :         }
     830           0 :         return nullptr;
     831             :     }
     832         244 :     return OGRLayer::GetMetadataItem(pszName, pszDomain);
     833             : }
     834             : 
     835             : /************************************************************************/
     836             : /*                           GetMetadata()                              */
     837             : /************************************************************************/
     838             : 
     839         144 : CSLConstList OGRFeatherLayer::GetMetadata(const char *pszDomain)
     840             : {
     841             :     // Mostly for unit test purposes
     842         144 :     if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
     843             :     {
     844           5 :         m_aosFeatherMetadata.Clear();
     845             :         const auto kv_metadata =
     846           5 :             (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     847           8 :                                        : m_poRecordBatchReader->schema())
     848          10 :                 ->metadata();
     849           5 :         if (kv_metadata)
     850             :         {
     851          11 :             for (const auto &kv : kv_metadata->sorted_pairs())
     852             :             {
     853             :                 m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
     854           6 :                                                   kv.second.c_str());
     855             :             }
     856             :         }
     857           5 :         return m_aosFeatherMetadata.List();
     858             :     }
     859         261 :     if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
     860         122 :         EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
     861             :     {
     862           2 :         m_aosFeatherMetadata.Clear();
     863           4 :         const auto kv_metadata = m_poRecordBatchFileReader->metadata();
     864           2 :         if (kv_metadata)
     865             :         {
     866           3 :             for (const auto &kv : kv_metadata->sorted_pairs())
     867             :             {
     868             :                 m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
     869           1 :                                                   kv.second.c_str());
     870             :             }
     871             :         }
     872           2 :         return m_aosFeatherMetadata.List();
     873             :     }
     874         137 :     return OGRLayer::GetMetadata(pszDomain);
     875             : }

Generated by: LCOV version 1.14