LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow - ogrfeatherlayer.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 321 382 84.0 %
Date: 2026-04-24 03:46:56 Functions: 19 20 95.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Feather Translator
       4             :  * Purpose:  Implements OGRFeatherDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "cpl_json.h"
      14             : #include "cpl_time.h"
      15             : #include "gdal_pam.h"
      16             : #include "ogrsf_frmts.h"
      17             : #include "ogr_p.h"
      18             : 
      19             : #include <cinttypes>
      20             : #include <limits>
      21             : #include <map>
      22             : #include <set>
      23             : #include <utility>
      24             : 
      25             : #include "ogr_feather.h"
      26             : 
      27             : #include "../arrow_common/ograrrowlayer.hpp"
      28             : #include "../arrow_common/ograrrowdataset.hpp"
      29             : 
      30             : /************************************************************************/
      31             : /*                          OGRFeatherLayer()                           */
      32             : /************************************************************************/
      33             : 
      34         538 : OGRFeatherLayer::OGRFeatherLayer(
      35             :     OGRFeatherDataset *poDS, const char *pszLayerName,
      36             :     std::shared_ptr<arrow::ipc::RecordBatchFileReader> &poRecordBatchFileReader,
      37         538 :     CSLConstList papszOpenOptions)
      38             :     : OGRArrowLayer(poDS, pszLayerName,
      39         538 :                     CPLTestBool(CSLFetchNameValueDef(
      40             :                         papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
      41        1076 :       m_poDS(poDS), m_poRecordBatchFileReader(poRecordBatchFileReader)
      42             : {
      43         538 :     EstablishFeatureDefn();
      44         538 :     CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
      45             :               m_poFeatureDefn->GetGeomFieldCount());
      46         538 : }
      47             : 
      48             : /************************************************************************/
      49             : /*                          OGRFeatherLayer()                           */
      50             : /************************************************************************/
      51             : 
      52          10 : OGRFeatherLayer::OGRFeatherLayer(
      53             :     OGRFeatherDataset *poDS, const char *pszLayerName,
      54             :     std::shared_ptr<arrow::io::RandomAccessFile> poFile, bool bSeekable,
      55             :     const arrow::ipc::IpcReadOptions &oOptions,
      56             :     std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
      57             :         &poRecordBatchStreamReader,
      58          10 :     CSLConstList papszOpenOptions)
      59             :     : OGRArrowLayer(poDS, pszLayerName,
      60          10 :                     CPLTestBool(CSLFetchNameValueDef(
      61             :                         papszOpenOptions, "LISTS_AS_STRING_JSON", "NO"))),
      62          10 :       m_poDS(poDS), m_poFile(std::move(poFile)), m_bSeekable(bSeekable),
      63          20 :       m_oOptions(oOptions), m_poRecordBatchReader(poRecordBatchStreamReader)
      64             : {
      65          10 :     EstablishFeatureDefn();
      66          10 :     CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
      67             :               m_poFeatureDefn->GetGeomFieldCount());
      68          10 : }
      69             : 
      70             : /************************************************************************/
      71             : /*                             GetDataset()                             */
      72             : /************************************************************************/
      73             : 
      74         112 : GDALDataset *OGRFeatherLayer::GetDataset()
      75             : {
      76         112 :     return m_poDS;
      77             : }
      78             : 
      79             : /************************************************************************/
      80             : /*                          LoadGeoMetadata()                           */
      81             : /************************************************************************/
      82             : 
      83         548 : void OGRFeatherLayer::LoadGeoMetadata(
      84             :     const arrow::KeyValueMetadata *kv_metadata, const std::string &key)
      85             : {
      86         548 :     if (kv_metadata && kv_metadata->Contains(key))
      87             :     {
      88        1066 :         auto geo = kv_metadata->Get(key);
      89         533 :         if (geo.ok())
      90             :         {
      91        1066 :             CPLJSONDocument oDoc;
      92         533 :             if (oDoc.LoadMemory(*geo))
      93             :             {
      94        1066 :                 auto oRoot = oDoc.GetRoot();
      95        1599 :                 const auto osVersion = oRoot.GetString("schema_version");
      96         533 :                 if (key != GDAL_GEO_FOOTER_KEY && osVersion != "0.1.0")
      97             :                 {
      98         398 :                     CPLDebug("FEATHER",
      99             :                              "schema_version = %s not explicitly handled by "
     100             :                              "the driver",
     101             :                              osVersion.c_str());
     102             :                 }
     103        1599 :                 auto oColumns = oRoot.GetObj("columns");
     104         533 :                 if (oColumns.IsValid())
     105             :                 {
     106        1066 :                     for (const auto &oColumn : oColumns.GetChildren())
     107             :                     {
     108         533 :                         m_oMapGeometryColumns[oColumn.GetName()] = oColumn;
     109             :                     }
     110             :                 }
     111             :             }
     112             :             else
     113             :             {
     114           0 :                 CPLError(CE_Warning, CPLE_AppDefined,
     115             :                          "Cannot parse 'geo' metadata");
     116             :             }
     117             :         }
     118             :     }
     119         548 : }
     120             : 
     121             : /************************************************************************/
     122             : /*                        EstablishFeatureDefn()                        */
     123             : /************************************************************************/
     124             : 
     125         548 : void OGRFeatherLayer::EstablishFeatureDefn()
     126             : {
     127        1096 :     m_poSchema = m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     128        1096 :                                            : m_poRecordBatchReader->schema();
     129         548 :     const auto &kv_metadata = m_poSchema->metadata();
     130             : 
     131             : #ifdef DEBUG
     132         548 :     if (kv_metadata)
     133             :     {
     134        1079 :         for (const auto &keyValue : kv_metadata->sorted_pairs())
     135             :         {
     136         541 :             CPLDebug("FEATHER", "%s = %s", keyValue.first.c_str(),
     137             :                      keyValue.second.c_str());
     138             :         }
     139             :     }
     140             : #endif
     141             : 
     142             :     auto poFooterMetadata = m_poRecordBatchFileReader
     143         538 :                                 ? m_poRecordBatchFileReader->metadata()
     144        1634 :                                 : nullptr;
     145         674 :     if (poFooterMetadata && poFooterMetadata->Contains(GDAL_GEO_FOOTER_KEY) &&
     146         126 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_READ_GDAL_FOOTER", "YES")))
     147             :     {
     148         126 :         LoadGeoMetadata(poFooterMetadata.get(), GDAL_GEO_FOOTER_KEY);
     149             :     }
     150             :     else
     151             :     {
     152         422 :         LoadGeoMetadata(kv_metadata.get(), "geo");
     153             :     }
     154             :     const auto oMapFieldNameToGDALSchemaFieldDefn =
     155        1096 :         LoadGDALSchema(kv_metadata.get());
     156             : 
     157         548 :     const auto &fields = m_poSchema->fields();
     158        4175 :     for (int i = 0; i < m_poSchema->num_fields(); ++i)
     159             :     {
     160        3627 :         const auto &field = fields[i];
     161        3627 :         const auto &fieldName = field->name();
     162             : 
     163        3627 :         const auto &field_kv_metadata = field->metadata();
     164        3627 :         std::string osExtensionName;
     165        3627 :         std::string osExtensionMetadata;
     166        3627 :         if (field->type()->id() == arrow::Type::EXTENSION)
     167             :         {
     168             :             osExtensionName =
     169         127 :                 cpl::down_cast<arrow::ExtensionType *>(field->type().get())
     170         127 :                     ->extension_name();
     171             :         }
     172        3500 :         else if (field_kv_metadata)
     173             :         {
     174             :             auto extension_name =
     175        1048 :                 field_kv_metadata->Get(ARROW_EXTENSION_NAME_KEY);
     176         524 :             if (extension_name.ok())
     177             :             {
     178         522 :                 osExtensionName = *extension_name;
     179             :             }
     180             : 
     181             :             auto extension_metadata =
     182        1048 :                 field_kv_metadata->Get(ARROW_EXTENSION_METADATA_KEY);
     183         524 :             if (extension_metadata.ok())
     184             :             {
     185         384 :                 osExtensionMetadata = *extension_metadata;
     186             :             }
     187             : #ifdef DEBUG
     188         524 :             CPLDebug("FEATHER", "Metadata field %s:", fieldName.c_str());
     189        1432 :             for (const auto &keyValue : field_kv_metadata->sorted_pairs())
     190             :             {
     191         908 :                 CPLDebug("FEATHER", "  %s = %s", keyValue.first.c_str(),
     192             :                          keyValue.second.c_str());
     193             :             }
     194             : #endif
     195             :         }
     196             : 
     197        3627 :         if (!m_osFIDColumn.empty() && fieldName == m_osFIDColumn)
     198             :         {
     199           6 :             m_iFIDArrowColumn = i;
     200           6 :             continue;
     201             :         }
     202             : 
     203        3621 :         bool bRegularField = true;
     204        3621 :         auto oIter = m_oMapGeometryColumns.find(fieldName);
     205        9670 :         if (oIter != m_oMapGeometryColumns.end() ||
     206        6049 :             (osExtensionName != EXTENSION_NAME_ARROW_JSON &&
     207        2961 :              osExtensionName != EXTENSION_NAME_ARROW_TIMESTAMP_WITH_OFFSET &&
     208        2959 :              !osExtensionName.empty()))
     209             :         {
     210        1072 :             CPLJSONObject oJSONDef;
     211         536 :             if (oIter != m_oMapGeometryColumns.end())
     212         533 :                 oJSONDef = oIter->second;
     213        1608 :             auto osEncoding = oJSONDef.GetString("encoding");
     214         536 :             if (osEncoding.empty() && !osExtensionName.empty())
     215           3 :                 osEncoding = osExtensionName;
     216             : 
     217         536 :             OGRwkbGeometryType eGeomType = wkbUnknown;
     218         536 :             auto eGeomEncoding = OGRArrowGeomEncoding::WKB;
     219         536 :             if (IsValidGeometryEncoding(field, osEncoding,
     220        1072 :                                         oIter != m_oMapGeometryColumns.end(),
     221             :                                         eGeomType, eGeomEncoding))
     222             :             {
     223         534 :                 bRegularField = false;
     224        1068 :                 OGRGeomFieldDefn oField(fieldName.c_str(), wkbUnknown);
     225             : 
     226        1602 :                 auto osCRS = oJSONDef.GetString("crs");
     227             : 
     228             : #if ARROW_VERSION_MAJOR >= 21
     229             :                 if (osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
     230             :                     osExtensionMetadata.empty() &&
     231             :                     field->type()->id() == arrow::Type::EXTENSION)
     232             :                 {
     233             :                     const auto arrowWkb =
     234             :                         std::dynamic_pointer_cast<OGRGeoArrowWkbExtensionType>(
     235             :                             field->type());
     236             :                     if (arrowWkb)
     237             :                     {
     238             :                         osExtensionMetadata = arrowWkb->Serialize();
     239             :                     }
     240             :                 }
     241             : #endif
     242             : 
     243         931 :                 if (osCRS.empty() &&
     244         397 :                     osExtensionName == EXTENSION_NAME_GEOARROW_WKB &&
     245         160 :                     !osExtensionMetadata.empty() &&
     246         931 :                     osExtensionMetadata[0] == '{' &&
     247           0 :                     osExtensionMetadata.back() == '}')
     248             :                 {
     249           0 :                     CPLJSONDocument oDoc;
     250           0 :                     if (oDoc.LoadMemory(osExtensionMetadata))
     251             :                     {
     252           0 :                         auto jCrs = oDoc.GetRoot()["crs"];
     253           0 :                         if (jCrs.GetType() == CPLJSONObject::Type::Object)
     254             :                         {
     255             :                             osCRS =
     256           0 :                                 jCrs.Format(CPLJSONObject::PrettyFormat::Plain);
     257             :                         }
     258           0 :                         else if (jCrs.GetType() == CPLJSONObject::Type::String)
     259             :                         {
     260           0 :                             osCRS = jCrs.ToString();
     261             :                         }
     262           0 :                         if (oDoc.GetRoot()["edges"].ToString() == "spherical")
     263             :                         {
     264           0 :                             SetMetadataItem("EDGES", "SPHERICAL");
     265             :                         }
     266             :                     }
     267             :                 }
     268             : 
     269         534 :                 if (osCRS.empty())
     270             :                 {
     271             : #if 0
     272             :                     CPLError(CE_Warning, CPLE_AppDefined,
     273             :                              "Missing required 'crs' field for geometry column %s",
     274             :                              fieldName.c_str());
     275             : #endif
     276             :                 }
     277             :                 else
     278             :                 {
     279         137 :                     OGRSpatialReference *poSRS = new OGRSpatialReference();
     280         137 :                     poSRS->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
     281             : 
     282         137 :                     if (poSRS->SetFromUserInput(
     283             :                             osCRS.c_str(),
     284             :                             OGRSpatialReference::
     285         137 :                                 SET_FROM_USER_INPUT_LIMITATIONS_get()) ==
     286             :                         OGRERR_NONE)
     287             :                     {
     288         137 :                         const char *pszAuthName = poSRS->GetAuthorityName();
     289         137 :                         const char *pszAuthCode = poSRS->GetAuthorityCode();
     290         137 :                         if (pszAuthName && pszAuthCode &&
     291         137 :                             EQUAL(pszAuthName, "OGC") &&
     292           0 :                             EQUAL(pszAuthCode, "CRS84"))
     293             :                         {
     294           0 :                             poSRS->importFromEPSG(4326);
     295             :                         }
     296             : 
     297         137 :                         const double dfCoordEpoch = oJSONDef.GetDouble("epoch");
     298         137 :                         if (dfCoordEpoch > 0)
     299           2 :                             poSRS->SetCoordinateEpoch(dfCoordEpoch);
     300             : 
     301         137 :                         oField.SetSpatialRef(poSRS);
     302             :                     }
     303         137 :                     poSRS->Release();
     304             :                 }
     305             : 
     306             :                 // m_aeGeomEncoding be filled before calling
     307             :                 // ComputeGeometryColumnType()
     308         534 :                 m_aeGeomEncoding.push_back(eGeomEncoding);
     309         534 :                 if (eGeomType == wkbUnknown)
     310             :                 {
     311         708 :                     auto osType = oJSONDef.GetString("geometry_type");
     312         236 :                     if (osType.empty())
     313         236 :                         osType = oJSONDef.GetString("gdal:geometry_type");
     314         472 :                     if (m_bSeekable && osType.empty() &&
     315         236 :                         CPLTestBool(CPLGetConfigOption(
     316             :                             "OGR_ARROW_COMPUTE_GEOMETRY_TYPE", "YES")))
     317             :                     {
     318         236 :                         eGeomType = ComputeGeometryColumnType(
     319         236 :                             m_poFeatureDefn->GetGeomFieldCount(), i);
     320         236 :                         if (m_poRecordBatchReader)
     321           0 :                             ResetRecordBatchReader();
     322             :                     }
     323             :                     else
     324           0 :                         eGeomType = GetGeometryTypeFromString(osType);
     325             :                 }
     326             : 
     327         534 :                 oField.SetType(eGeomType);
     328         534 :                 oField.SetNullable(field->nullable());
     329         534 :                 m_poFeatureDefn->AddGeomFieldDefn(&oField);
     330         534 :                 m_anMapGeomFieldIndexToArrowColumn.push_back(i);
     331             :             }
     332             :         }
     333             : 
     334        3621 :         if (bRegularField)
     335             :         {
     336        3087 :             CreateFieldFromSchema(field, {i},
     337             :                                   oMapFieldNameToGDALSchemaFieldDefn);
     338             :         }
     339             :     }
     340             : 
     341         548 :     CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
     342             :               m_poFeatureDefn->GetFieldCount());
     343         548 :     CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
     344             :               m_poFeatureDefn->GetGeomFieldCount());
     345         548 : }
     346             : 
     347             : /************************************************************************/
     348             : /*                       ResetRecordBatchReader()                       */
     349             : /************************************************************************/
     350             : 
     351          12 : bool OGRFeatherLayer::ResetRecordBatchReader()
     352             : {
     353          12 :     const auto nPos = *(m_poFile->Tell());
     354          12 :     CPL_IGNORE_RET_VAL(m_poFile->Seek(0));
     355             :     auto result =
     356          24 :         arrow::ipc::RecordBatchStreamReader::Open(m_poFile, m_oOptions);
     357          12 :     if (!result.ok())
     358             :     {
     359           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     360             :                  "RecordBatchStreamReader::Open() failed with %s",
     361           0 :                  result.status().message().c_str());
     362           0 :         CPL_IGNORE_RET_VAL(m_poFile->Seek(nPos));
     363           0 :         return false;
     364             :     }
     365             :     else
     366             :     {
     367          12 :         m_poRecordBatchReader = *result;
     368          12 :         return true;
     369             :     }
     370             : }
     371             : 
     372             : /************************************************************************/
     373             : /*                     ComputeGeometryColumnType()                      */
     374             : /************************************************************************/
     375             : 
     376         236 : OGRwkbGeometryType OGRFeatherLayer::ComputeGeometryColumnType(int iGeomCol,
     377             :                                                               int iCol) const
     378             : {
     379             :     // Compute type of geometry column by iterating over each geometry, and
     380             :     // looking at the WKB geometry type in the first 5 bytes of each geometry.
     381             : 
     382         236 :     OGRwkbGeometryType eGeomType = wkbNone;
     383             : 
     384         236 :     if (m_poRecordBatchReader != nullptr)
     385             :     {
     386           0 :         std::shared_ptr<arrow::RecordBatch> poBatch;
     387             :         while (true)
     388             :         {
     389           0 :             auto status = m_poRecordBatchReader->ReadNext(&poBatch);
     390           0 :             if (!status.ok())
     391             :             {
     392           0 :                 CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
     393           0 :                          status.message().c_str());
     394           0 :                 break;
     395             :             }
     396           0 :             else if (!poBatch)
     397           0 :                 break;
     398           0 :             eGeomType = ComputeGeometryColumnTypeProcessBatch(poBatch, iGeomCol,
     399             :                                                               iCol, eGeomType);
     400           0 :             if (eGeomType == wkbUnknown)
     401           0 :                 break;
     402           0 :         }
     403             :     }
     404             :     else
     405             :     {
     406         472 :         for (int iBatch = 0;
     407         472 :              iBatch < m_poRecordBatchFileReader->num_record_batches(); ++iBatch)
     408             :         {
     409         236 :             auto result = m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
     410         236 :             if (!result.ok())
     411             :             {
     412           0 :                 CPLError(CE_Failure, CPLE_AppDefined,
     413             :                          "ReadRecordBatch() failed: %s",
     414           0 :                          result.status().message().c_str());
     415           0 :                 break;
     416             :             }
     417         236 :             eGeomType = ComputeGeometryColumnTypeProcessBatch(*result, iGeomCol,
     418             :                                                               iCol, eGeomType);
     419         236 :             if (eGeomType == wkbUnknown)
     420           0 :                 break;
     421             :         }
     422             :     }
     423             : 
     424         236 :     return eGeomType == wkbNone ? wkbUnknown : eGeomType;
     425             : }
     426             : 
     427             : /************************************************************************/
     428             : /*                            BuildDomain()                             */
     429             : /************************************************************************/
     430             : 
     431             : std::unique_ptr<OGRFieldDomain>
     432          19 : OGRFeatherLayer::BuildDomain(const std::string &osDomainName,
     433             :                              int iFieldIndex) const
     434             : {
     435          19 :     const int iArrowCol = m_anMapFieldIndexToArrowColumn[iFieldIndex][0];
     436          19 :     CPLAssert(m_poSchema->fields()[iArrowCol]->type()->id() ==
     437             :               arrow::Type::DICTIONARY);
     438             : 
     439          19 :     if (m_poRecordBatchReader)
     440             :     {
     441           6 :         if (m_poBatch)
     442             :         {
     443           6 :             return BuildDomainFromBatch(osDomainName, m_poBatch, iArrowCol);
     444             :         }
     445             :     }
     446          13 :     else if (m_poRecordBatchFileReader)
     447             :     {
     448          13 :         auto result = m_poRecordBatchFileReader->ReadRecordBatch(0);
     449          13 :         if (!result.ok())
     450             :         {
     451           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     452             :                      "ReadRecordBatch() failed: %s",
     453           0 :                      result.status().message().c_str());
     454             :         }
     455          13 :         auto poBatch = *result;
     456          13 :         if (poBatch)
     457             :         {
     458          13 :             return BuildDomainFromBatch(osDomainName, poBatch, iArrowCol);
     459             :         }
     460             :     }
     461             : 
     462           0 :     return nullptr;
     463             : }
     464             : 
     465             : /************************************************************************/
     466             : /*                            ResetReading()                            */
     467             : /************************************************************************/
     468             : 
     469         829 : void OGRFeatherLayer::ResetReading()
     470             : {
     471         829 :     if (m_poRecordBatchReader != nullptr && m_iRecordBatch > 0)
     472             :     {
     473          17 :         if (m_iRecordBatch == 1 && m_poBatchIdx1)
     474             :         {
     475             :             // do nothing
     476             :         }
     477             :         else
     478             :         {
     479          16 :             m_bResetRecordBatchReaderAsked = true;
     480             :         }
     481             :     }
     482         829 :     OGRArrowLayer::ResetReading();
     483         829 : }
     484             : 
     485             : /************************************************************************/
     486             : /*                           ReadNextBatch()                            */
     487             : /************************************************************************/
     488             : 
     489        1013 : bool OGRFeatherLayer::ReadNextBatch()
     490             : {
     491        1013 :     if (m_poRecordBatchFileReader == nullptr)
     492             :     {
     493         121 :         return ReadNextBatchStream();
     494             :     }
     495             :     else
     496             :     {
     497         892 :         return ReadNextBatchFile();
     498             :     }
     499             : }
     500             : 
     501             : /************************************************************************/
     502             : /*                         ReadNextBatchFile()                          */
     503             : /************************************************************************/
     504             : 
     505         892 : bool OGRFeatherLayer::ReadNextBatchFile()
     506             : {
     507             :     while (true)
     508             :     {
     509         892 :         ++m_iRecordBatch;
     510         892 :         if (m_iRecordBatch == m_poRecordBatchFileReader->num_record_batches())
     511             :         {
     512         451 :             if (m_iRecordBatch == 1)
     513         448 :                 m_iRecordBatch = 0;
     514             :             else
     515           3 :                 m_poBatch.reset();
     516         451 :             return false;
     517             :         }
     518             : 
     519         441 :         m_nIdxInBatch = 0;
     520             : 
     521             :         auto result =
     522         441 :             m_poRecordBatchFileReader->ReadRecordBatch(m_iRecordBatch);
     523         441 :         if (!result.ok())
     524             :         {
     525           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     526             :                      "ReadRecordBatch() failed: %s",
     527           0 :                      result.status().message().c_str());
     528           0 :             m_poBatch.reset();
     529           0 :             return false;
     530             :         }
     531         441 :         if ((*result)->num_rows() != 0)
     532             :         {
     533         441 :             SetBatch(*result);
     534         441 :             break;
     535             :         }
     536           0 :     }
     537             : 
     538         441 :     return true;
     539             : }
     540             : 
     541             : /************************************************************************/
     542             : /*                        ReadNextBatchStream()                         */
     543             : /************************************************************************/
     544             : 
     545         156 : bool OGRFeatherLayer::ReadNextBatchStream()
     546             : {
     547         156 :     m_nIdxInBatch = 0;
     548             : 
     549         312 :     std::shared_ptr<arrow::RecordBatch> poNextBatch;
     550           0 :     do
     551             :     {
     552         156 :         if (m_iRecordBatch == 0 && m_poBatchIdx0)
     553             :         {
     554           1 :             SetBatch(m_poBatchIdx0);
     555           1 :             m_iRecordBatch = 1;
     556         103 :             return true;
     557             :         }
     558             : 
     559         155 :         else if (m_iRecordBatch == 1 && m_poBatchIdx1)
     560             :         {
     561           1 :             SetBatch(m_poBatchIdx1);
     562           1 :             m_iRecordBatch = 2;
     563           1 :             return true;
     564             :         }
     565             : 
     566         154 :         else if (m_bSingleBatch)
     567             :         {
     568          83 :             CPLAssert(m_iRecordBatch == 0);
     569          83 :             CPLAssert(m_poBatch != nullptr);
     570          83 :             return false;
     571             :         }
     572             : 
     573          71 :         if (m_bResetRecordBatchReaderAsked)
     574             :         {
     575          13 :             if (!m_bSeekable)
     576             :             {
     577           1 :                 CPLError(CE_Failure, CPLE_NotSupported,
     578             :                          "Attempting to rewind non-seekable stream");
     579           1 :                 return false;
     580             :             }
     581          12 :             if (!ResetRecordBatchReader())
     582           0 :                 return false;
     583          12 :             m_bResetRecordBatchReaderAsked = false;
     584             :         }
     585             : 
     586          70 :         CPLAssert(m_poRecordBatchReader);
     587             : 
     588          70 :         ++m_iRecordBatch;
     589             : 
     590          70 :         poNextBatch.reset();
     591          70 :         auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
     592          70 :         if (!status.ok())
     593             :         {
     594           0 :             CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
     595           0 :                      status.message().c_str());
     596           0 :             poNextBatch.reset();
     597             :         }
     598          70 :         if (poNextBatch == nullptr)
     599             :         {
     600          17 :             if (m_iRecordBatch == 1)
     601             :             {
     602           3 :                 m_iRecordBatch = 0;
     603           3 :                 m_bSingleBatch = true;
     604             :             }
     605             :             else
     606             :             {
     607          14 :                 m_poBatch.reset();
     608          14 :                 m_poBatchColumns.clear();
     609             :             }
     610          17 :             return false;
     611             :         }
     612          53 :     } while (poNextBatch->num_rows() == 0);
     613             : 
     614          53 :     SetBatch(poNextBatch);
     615             : 
     616          53 :     return true;
     617             : }
     618             : 
     619             : /************************************************************************/
     620             : /*                     TryToCacheFirstTwoBatches()                      */
     621             : /************************************************************************/
     622             : 
     623           1 : void OGRFeatherLayer::TryToCacheFirstTwoBatches()
     624             : {
     625           2 :     if (m_poRecordBatchReader != nullptr && m_iRecordBatch <= 0 &&
     626           2 :         !m_bSingleBatch && m_poBatchIdx0 == nullptr)
     627             :     {
     628           1 :         ResetReading();
     629           1 :         if (!m_poBatch)
     630             :         {
     631           0 :             CPL_IGNORE_RET_VAL(ReadNextBatchStream());
     632             :         }
     633           1 :         if (m_poBatch)
     634             :         {
     635           2 :             auto poBatchIdx0 = m_poBatch;
     636           1 :             if (ReadNextBatchStream())
     637             :             {
     638           1 :                 CPLAssert(m_iRecordBatch == 1);
     639           1 :                 m_poBatchIdx0 = poBatchIdx0;
     640           1 :                 m_poBatchIdx1 = m_poBatch;
     641           1 :                 SetBatch(poBatchIdx0);
     642           1 :                 ResetReading();
     643             :             }
     644           1 :             ResetReading();
     645             :         }
     646             :     }
     647           1 : }
     648             : 
     649             : /************************************************************************/
     650             : /*                      CanPostFilterArrowArray()                       */
     651             : /************************************************************************/
     652             : 
     653          20 : bool OGRFeatherLayer::CanPostFilterArrowArray(
     654             :     const struct ArrowSchema *schema) const
     655             : {
     656          20 :     if (m_poRecordBatchReader)
     657          10 :         return false;
     658          10 :     return OGRArrowLayer::CanPostFilterArrowArray(schema);
     659             : }
     660             : 
     661             : /************************************************************************/
     662             : /*                      InvalidateCachedBatches()                       */
     663             : /************************************************************************/
     664             : 
     665         109 : void OGRFeatherLayer::InvalidateCachedBatches()
     666             : {
     667         109 :     if (m_poRecordBatchFileReader)
     668             :     {
     669          63 :         m_iRecordBatch = -1;
     670          63 :         ResetReading();
     671             :     }
     672         109 : }
     673             : 
     674             : /************************************************************************/
     675             : /*                          GetFeatureCount()                           */
     676             : /************************************************************************/
     677             : 
     678         336 : GIntBig OGRFeatherLayer::GetFeatureCount(int bForce)
     679             : {
     680         628 :     if (m_poRecordBatchFileReader != nullptr && m_poAttrQuery == nullptr &&
     681         292 :         m_poFilterGeom == nullptr)
     682             :     {
     683         288 :         auto result = m_poRecordBatchFileReader->CountRows();
     684         288 :         if (result.ok())
     685         288 :             return *result;
     686             :     }
     687          48 :     else if (m_poRecordBatchReader != nullptr)
     688             :     {
     689          36 :         if (!m_bSeekable && !bForce)
     690             :         {
     691           1 :             if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
     692             :             {
     693           1 :                 TryToCacheFirstTwoBatches();
     694             :             }
     695             : 
     696           1 :             if (!m_bSingleBatch)
     697             :             {
     698           1 :                 CPLError(
     699             :                     CE_Failure, CPLE_AppDefined,
     700             :                     "GetFeatureCount() cannot be run in non-forced mode on "
     701             :                     "a non-seekable file made of several batches");
     702           1 :                 return -1;
     703             :             }
     704             :         }
     705             : 
     706          35 :         if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
     707             :         {
     708          23 :             GIntBig nFeatures = 0;
     709          23 :             ResetReading();
     710          23 :             if (!m_poBatch)
     711           3 :                 ReadNextBatchStream();
     712          31 :             while (m_poBatch)
     713             :             {
     714          31 :                 nFeatures += m_poBatch->num_rows();
     715          31 :                 if (!ReadNextBatchStream())
     716          23 :                     break;
     717             :             }
     718          23 :             ResetReading();
     719          23 :             return nFeatures;
     720             :         }
     721             :     }
     722          24 :     return OGRLayer::GetFeatureCount(bForce);
     723             : }
     724             : 
     725             : /************************************************************************/
     726             : /*                      CanRunNonForcedGetExtent()                      */
     727             : /************************************************************************/
     728             : 
     729           0 : bool OGRFeatherLayer::CanRunNonForcedGetExtent()
     730             : {
     731           0 :     if (m_bSeekable)
     732           0 :         return true;
     733           0 :     TryToCacheFirstTwoBatches();
     734           0 :     if (!m_bSingleBatch)
     735             :     {
     736           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     737             :                  "GetExtent() cannot be run in non-forced mode on "
     738             :                  "a non-seekable file made of several batches");
     739           0 :         return false;
     740             :     }
     741           0 :     return true;
     742             : }
     743             : 
     744             : /************************************************************************/
     745             : /*                           TestCapability()                           */
     746             : /************************************************************************/
     747             : 
     748         296 : int OGRFeatherLayer::TestCapability(const char *pszCap) const
     749             : {
     750         296 :     if (EQUAL(pszCap, OLCFastFeatureCount))
     751             :     {
     752          28 :         return m_bSeekable && m_poAttrQuery == nullptr &&
     753          28 :                m_poFilterGeom == nullptr;
     754             :     }
     755             : 
     756         278 :     if (EQUAL(pszCap, OLCMeasuredGeometries))
     757          16 :         return true;
     758         262 :     if (EQUAL(pszCap, OLCZGeometries))
     759          12 :         return true;
     760             : 
     761         250 :     return OGRArrowLayer::TestCapability(pszCap);
     762             : }
     763             : 
     764             : /************************************************************************/
     765             : /*                          GetMetadataItem()                           */
     766             : /************************************************************************/
     767             : 
     768         259 : const char *OGRFeatherLayer::GetMetadataItem(const char *pszName,
     769             :                                              const char *pszDomain)
     770             : {
     771             :     // Mostly for unit test purposes
     772         259 :     if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_"))
     773             :     {
     774           9 :         if (EQUAL(pszName, "FORMAT"))
     775             :         {
     776           5 :             return m_poRecordBatchFileReader ? "FILE" : "STREAM";
     777             :         }
     778           4 :         if (m_poRecordBatchFileReader != nullptr)
     779             :         {
     780           4 :             int iBatch = -1;
     781           4 :             if (EQUAL(pszName, "NUM_RECORD_BATCHES"))
     782             :             {
     783           1 :                 return CPLSPrintf(
     784           5 :                     "%d", m_poRecordBatchFileReader->num_record_batches());
     785             :             }
     786           6 :             else if (sscanf(pszName, "RECORD_BATCHES[%d]", &iBatch) == 1 &&
     787           3 :                      strstr(pszName, ".NUM_ROWS"))
     788             :             {
     789             :                 auto result =
     790           6 :                     m_poRecordBatchFileReader->ReadRecordBatch(iBatch);
     791           3 :                 if (!result.ok())
     792             :                 {
     793           0 :                     return nullptr;
     794             :                 }
     795           3 :                 return CPLSPrintf("%" PRId64, (*result)->num_rows());
     796             :             }
     797             :         }
     798           0 :         return nullptr;
     799             :     }
     800         250 :     else if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
     801             :     {
     802             :         const auto kv_metadata =
     803           5 :             (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     804           8 :                                        : m_poRecordBatchReader->schema())
     805          10 :                 ->metadata();
     806           5 :         if (kv_metadata && kv_metadata->Contains(pszName))
     807             :         {
     808           5 :             auto metadataItem = kv_metadata->Get(pszName);
     809           5 :             if (metadataItem.ok())
     810             :             {
     811           5 :                 return CPLSPrintf("%s", metadataItem->c_str());
     812             :             }
     813             :         }
     814           0 :         return nullptr;
     815             :     }
     816         476 :     else if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
     817         231 :              EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
     818             :     {
     819           2 :         const auto kv_metadata = m_poRecordBatchFileReader->metadata();
     820           1 :         if (kv_metadata && kv_metadata->Contains(pszName))
     821             :         {
     822           1 :             auto metadataItem = kv_metadata->Get(pszName);
     823           1 :             if (metadataItem.ok())
     824             :             {
     825           1 :                 return CPLSPrintf("%s", metadataItem->c_str());
     826             :             }
     827             :         }
     828           0 :         return nullptr;
     829             :     }
     830         244 :     return OGRLayer::GetMetadataItem(pszName, pszDomain);
     831             : }
     832             : 
     833             : /************************************************************************/
     834             : /*                            GetMetadata()                             */
     835             : /************************************************************************/
     836             : 
     837         144 : CSLConstList OGRFeatherLayer::GetMetadata(const char *pszDomain)
     838             : {
     839             :     // Mostly for unit test purposes
     840         144 :     if (pszDomain != nullptr && EQUAL(pszDomain, "_ARROW_METADATA_"))
     841             :     {
     842           5 :         m_aosFeatherMetadata.Clear();
     843             :         const auto kv_metadata =
     844           5 :             (m_poRecordBatchFileReader ? m_poRecordBatchFileReader->schema()
     845           8 :                                        : m_poRecordBatchReader->schema())
     846          10 :                 ->metadata();
     847           5 :         if (kv_metadata)
     848             :         {
     849          11 :             for (const auto &kv : kv_metadata->sorted_pairs())
     850             :             {
     851             :                 m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
     852           6 :                                                   kv.second.c_str());
     853             :             }
     854             :         }
     855           5 :         return m_aosFeatherMetadata.List();
     856             :     }
     857         261 :     if (m_poRecordBatchFileReader != nullptr && pszDomain != nullptr &&
     858         122 :         EQUAL(pszDomain, "_ARROW_FOOTER_METADATA_"))
     859             :     {
     860           2 :         m_aosFeatherMetadata.Clear();
     861           4 :         const auto kv_metadata = m_poRecordBatchFileReader->metadata();
     862           2 :         if (kv_metadata)
     863             :         {
     864           3 :             for (const auto &kv : kv_metadata->sorted_pairs())
     865             :             {
     866             :                 m_aosFeatherMetadata.SetNameValue(kv.first.c_str(),
     867           1 :                                                   kv.second.c_str());
     868             :             }
     869             :         }
     870           2 :         return m_aosFeatherMetadata.List();
     871             :     }
     872         137 :     return OGRLayer::GetMetadata(pszDomain);
     873             : }

Generated by: LCOV version 1.14