LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow - ogrfeatherwriterlayer.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 208 232 89.7 %
Date: 2026-01-23 20:24:11 Functions: 12 12 100.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Feather Translator
       4             :  * Purpose:  Implements OGRFeatherDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "ogr_feather.h"
      14             : 
      15             : #include "../arrow_common/ograrrowwriterlayer.hpp"
      16             : 
      17             : /************************************************************************/
      18             : /*                      OGRFeatherWriterLayer()                         */
      19             : /************************************************************************/
      20             : 
      21         152 : OGRFeatherWriterLayer::OGRFeatherWriterLayer(
      22             :     GDALDataset *poDS, arrow::MemoryPool *poMemoryPool,
      23             :     const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
      24         152 :     const char *pszLayerName)
      25             :     : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
      26         152 :       m_poDS(poDS)
      27             : {
      28         152 :     m_bWriteFieldArrowExtensionName = true;
      29         152 : }
      30             : 
      31             : /************************************************************************/
      32             : /*                                Close()                               */
      33             : /************************************************************************/
      34             : 
      35         143 : bool OGRFeatherWriterLayer::Close()
      36             : {
      37         143 :     if (m_bInitializationOK)
      38             :     {
      39         143 :         if (!FinalizeWriting())
      40           0 :             return false;
      41             :     }
      42             : 
      43         143 :     return true;
      44             : }
      45             : 
      46             : /************************************************************************/
      47             : /*                       IsSupportedGeometryType()                      */
      48             : /************************************************************************/
      49             : 
      50         146 : bool OGRFeatherWriterLayer::IsSupportedGeometryType(
      51             :     OGRwkbGeometryType eGType) const
      52             : {
      53         146 :     if (eGType != wkbFlatten(eGType))
      54             :     {
      55             :         const auto osConfigOptionName =
      56         170 :             "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
      57          85 :         if (!CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
      58             :         {
      59           7 :             CPLError(CE_Failure, CPLE_NotSupported,
      60             :                      "Only 2D geometry types are supported (unless the "
      61             :                      "%s configuration option is set to YES)",
      62             :                      osConfigOptionName.c_str());
      63           7 :             return false;
      64             :         }
      65             :     }
      66         139 :     return true;
      67             : }
      68             : 
      69             : /************************************************************************/
      70             : /*                           SetOptions()                               */
      71             : /************************************************************************/
      72             : 
      73         152 : bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
      74             :                                        CSLConstList papszOptions,
      75             :                                        const OGRSpatialReference *poSpatialRef,
      76             :                                        OGRwkbGeometryType eGType)
      77             : {
      78         152 :     m_aosCreationOptions = papszOptions;
      79             : 
      80             :     const char *pszDefaultFormat =
      81         304 :         (EQUAL(CPLGetExtensionSafe(osFilename.c_str()).c_str(), "arrows") ||
      82         152 :          STARTS_WITH_CI(osFilename.c_str(), "/vsistdout"))
      83         456 :             ? "STREAM"
      84         152 :             : "FILE";
      85         152 :     m_bStreamFormat =
      86         152 :         EQUAL(CSLFetchNameValueDef(papszOptions, "FORMAT", pszDefaultFormat),
      87             :               "STREAM");
      88             : 
      89             :     const char *pszGeomEncoding =
      90         152 :         CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
      91         152 :     m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
      92         152 :     if (pszGeomEncoding)
      93             :     {
      94         121 :         if (EQUAL(pszGeomEncoding, "WKB"))
      95          31 :             m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
      96          90 :         else if (EQUAL(pszGeomEncoding, "WKT"))
      97          29 :             m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
      98          61 :         else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
      99          30 :             m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
     100          31 :         else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
     101           6 :                  EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
     102          31 :             m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
     103             :         else
     104             :         {
     105           0 :             CPLError(CE_Failure, CPLE_NotSupported,
     106             :                      "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
     107           0 :             return false;
     108             :         }
     109             :     }
     110             : 
     111         152 :     if (eGType != wkbNone)
     112             :     {
     113         146 :         if (!IsSupportedGeometryType(eGType))
     114             :         {
     115           9 :             return false;
     116             :         }
     117             : 
     118         139 :         if (poSpatialRef == nullptr)
     119             :         {
     120          10 :             CPLError(CE_Warning, CPLE_AppDefined,
     121             :                      "Geometry column should have an associated CRS");
     122             :         }
     123             : 
     124         139 :         m_poFeatureDefn->SetGeomType(eGType);
     125         139 :         auto eGeomEncoding = m_eGeomEncoding;
     126         139 :         if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
     127         109 :             eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
     128             :         {
     129          79 :             const auto eEncodingType = eGeomEncoding;
     130          79 :             eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
     131          79 :             if (eGeomEncoding == eEncodingType)
     132           2 :                 return false;
     133             :         }
     134         137 :         m_aeGeomEncoding.push_back(eGeomEncoding);
     135         137 :         m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
     136             :             CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
     137         137 :         if (poSpatialRef)
     138             :         {
     139         129 :             auto poSRS = poSpatialRef->Clone();
     140         129 :             m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
     141         129 :             poSRS->Release();
     142             :         }
     143             :     }
     144             : 
     145         143 :     m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
     146             : 
     147         143 :     const char *pszCompression = CSLFetchNameValue(papszOptions, "COMPRESSION");
     148         143 :     if (pszCompression == nullptr)
     149             :     {
     150         420 :         auto oResult = arrow::util::Codec::GetCompressionType("lz4");
     151         140 :         if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
     152             :         {
     153         140 :             pszCompression = "LZ4";
     154             :         }
     155             :         else
     156             :         {
     157           0 :             pszCompression = "NONE";
     158             :         }
     159             :     }
     160             : 
     161         143 :     if (EQUAL(pszCompression, "NONE"))
     162           0 :         pszCompression = "UNCOMPRESSED";
     163             :     auto oResult = arrow::util::Codec::GetCompressionType(
     164         286 :         CPLString(pszCompression).tolower());
     165         143 :     if (!oResult.ok())
     166             :     {
     167           0 :         CPLError(CE_Failure, CPLE_NotSupported,
     168             :                  "Unrecognized compression method: %s", pszCompression);
     169           0 :         return false;
     170             :     }
     171         143 :     m_eCompression = *oResult;
     172         143 :     if (!arrow::util::Codec::IsAvailable(m_eCompression))
     173             :     {
     174           0 :         CPLError(CE_Failure, CPLE_NotSupported,
     175             :                  "Compression method %s is known, but libarrow has not "
     176             :                  "been built with support for it",
     177             :                  pszCompression);
     178           0 :         return false;
     179             :     }
     180             : 
     181         143 :     const char *pszRowGroupSize = CSLFetchNameValue(papszOptions, "BATCH_SIZE");
     182         143 :     if (pszRowGroupSize)
     183             :     {
     184           3 :         auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
     185           3 :         if (nRowGroupSize > 0)
     186             :         {
     187           3 :             if (nRowGroupSize > INT_MAX)
     188           0 :                 nRowGroupSize = INT_MAX;
     189           3 :             m_nRowGroupSize = nRowGroupSize;
     190             :         }
     191             :     }
     192             : 
     193         143 :     m_bInitializationOK = true;
     194         143 :     return true;
     195             : }
     196             : 
     197             : /************************************************************************/
     198             : /*                         CloseFileWriter()                            */
     199             : /************************************************************************/
     200             : 
     201         143 : bool OGRFeatherWriterLayer::CloseFileWriter()
     202             : {
     203         286 :     auto status = m_poFileWriter->Close();
     204         143 :     if (!status.ok())
     205             :     {
     206           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     207             :                  "FileWriter::Close() failed with %s",
     208           0 :                  status.message().c_str());
     209             :     }
     210         286 :     return status.ok();
     211             : }
     212             : 
     213             : /************************************************************************/
     214             : /*                          CreateSchema()                              */
     215             : /************************************************************************/
     216             : 
     217         143 : void OGRFeatherWriterLayer::CreateSchema()
     218             : {
     219         143 :     CreateSchemaCommon();
     220             : 
     221         280 :     if (m_poFeatureDefn->GetGeomFieldCount() != 0 &&
     222         137 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GEO", "YES")))
     223             :     {
     224         272 :         CPLJSONObject oRoot;
     225         136 :         oRoot.Add("schema_version", "0.1.0");
     226         136 :         oRoot.Add("primary_column",
     227         136 :                   m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
     228         272 :         CPLJSONObject oColumns;
     229         136 :         oRoot.Add("columns", oColumns);
     230         272 :         for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
     231             :         {
     232         136 :             const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
     233         272 :             CPLJSONObject oColumn;
     234         136 :             oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
     235         136 :             oColumn.Add("encoding",
     236         136 :                         GetGeomEncodingAsString(m_aeGeomEncoding[i], false));
     237             : 
     238         136 :             const auto poSRS = poGeomFieldDefn->GetSpatialRef();
     239         136 :             if (poSRS)
     240             :             {
     241         128 :                 const char *const apszOptions[] = {"FORMAT=WKT2_2019",
     242             :                                                    "MULTILINE=NO", nullptr};
     243         128 :                 char *pszWKT = nullptr;
     244         128 :                 poSRS->exportToWkt(&pszWKT, apszOptions);
     245         128 :                 if (pszWKT)
     246         128 :                     oColumn.Add("crs", pszWKT);
     247         128 :                 CPLFree(pszWKT);
     248             : 
     249         128 :                 const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
     250         128 :                 if (dfCoordEpoch > 0)
     251           2 :                     oColumn.Add("epoch", dfCoordEpoch);
     252             :             }
     253             : 
     254             : #if 0
     255             :             if( m_aoEnvelopes[i].IsInit() &&
     256             :                 CPLTestBool(CPLGetConfigOption(
     257             :                     "OGR_ARROW_WRITE_BBOX", "YES")) )
     258             :             {
     259             :                 CPLJSONArray oBBOX;
     260             :                 oBBOX.Add(m_aoEnvelopes[i].MinX);
     261             :                 oBBOX.Add(m_aoEnvelopes[i].MinY);
     262             :                 oBBOX.Add(m_aoEnvelopes[i].MaxX);
     263             :                 oBBOX.Add(m_aoEnvelopes[i].MaxY);
     264             :                 oColumn.Add("bbox", oBBOX);
     265             :             }
     266             : #endif
     267         136 :             const auto eType = poGeomFieldDefn->GetType();
     268         136 :             if (CPLTestBool(CPLGetConfigOption(
     269         272 :                     "OGR_ARROW_WRITE_GDAL_GEOMETRY_TYPE", "YES")) &&
     270         136 :                 eType == wkbFlatten(eType))
     271             :             {
     272             :                 // Geometry type, place under a temporary "gdal:geometry_type"
     273             :                 // property pending acceptance of proposal at
     274             :                 // https://github.com/opengeospatial/geoparquet/issues/41
     275          58 :                 const char *pszType = "mixed";
     276          58 :                 if (wkbPoint == eType)
     277          19 :                     pszType = "Point";
     278          39 :                 else if (wkbLineString == eType)
     279           7 :                     pszType = "LineString";
     280          32 :                 else if (wkbPolygon == eType)
     281           9 :                     pszType = "Polygon";
     282          23 :                 else if (wkbMultiPoint == eType)
     283           7 :                     pszType = "MultiPoint";
     284          16 :                 else if (wkbMultiLineString == eType)
     285           7 :                     pszType = "MultiLineString";
     286           9 :                 else if (wkbMultiPolygon == eType)
     287           7 :                     pszType = "MultiPolygon";
     288           2 :                 else if (wkbGeometryCollection == eType)
     289           2 :                     pszType = "GeometryCollection";
     290          58 :                 oColumn.Add("gdal:geometry_type", pszType);
     291             :             }
     292             :         }
     293             : 
     294         136 :         auto kvMetadata = m_poSchema->metadata()
     295           2 :                               ? m_poSchema->metadata()->Copy()
     296         274 :                               : std::make_shared<arrow::KeyValueMetadata>();
     297         272 :         kvMetadata->Append("geo",
     298         272 :                            oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
     299         136 :         m_poSchema = m_poSchema->WithMetadata(kvMetadata);
     300         136 :         CPLAssert(m_poSchema);
     301             :     }
     302         143 : }
     303             : 
     304             : /************************************************************************/
     305             : /*                          CreateWriter()                              */
     306             : /************************************************************************/
     307             : 
     308         143 : void OGRFeatherWriterLayer::CreateWriter()
     309             : {
     310         143 :     CPLAssert(m_poFileWriter == nullptr);
     311             : 
     312         143 :     if (m_poSchema == nullptr)
     313             :     {
     314          15 :         CreateSchema();
     315             :     }
     316             :     else
     317             :     {
     318         128 :         FinalizeSchema();
     319             :     }
     320             : 
     321         286 :     auto options = arrow::ipc::IpcWriteOptions::Defaults();
     322         143 :     options.memory_pool = m_poMemoryPool;
     323             : 
     324             :     {
     325         286 :         auto result = arrow::util::Codec::Create(m_eCompression);
     326         143 :         if (!result.ok())
     327             :         {
     328           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     329             :                      "Codec::Create() failed with %s",
     330           0 :                      result.status().message().c_str());
     331             :         }
     332             :         else
     333             :         {
     334         143 :             options.codec.reset(result->release());
     335             :         }
     336             :     }
     337             : 
     338         143 :     if (m_bStreamFormat)
     339             :     {
     340             :         auto result =
     341           8 :             arrow::ipc::MakeStreamWriter(m_poOutputStream, m_poSchema, options);
     342           4 :         if (!result.ok())
     343             :         {
     344           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     345             :                      "arrow::ipc::MakeStreamWriter() failed with %s",
     346           0 :                      result.status().message().c_str());
     347             :         }
     348             :         else
     349             :         {
     350           4 :             m_poFileWriter = *result;
     351             :         }
     352             :     }
     353             :     else
     354             :     {
     355             :         m_poFooterKeyValueMetadata =
     356         139 :             std::make_shared<arrow::KeyValueMetadata>();
     357             :         auto result = arrow::ipc::MakeFileWriter(
     358         417 :             m_poOutputStream, m_poSchema, options, m_poFooterKeyValueMetadata);
     359         139 :         if (!result.ok())
     360             :         {
     361           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     362             :                      "arrow::ipc::MakeFileWriter() failed with %s",
     363           0 :                      result.status().message().c_str());
     364             :         }
     365             :         else
     366             :         {
     367         139 :             m_poFileWriter = *result;
     368             :         }
     369             :     }
     370         143 : }
     371             : 
     372             : /************************************************************************/
     373             : /*               PerformStepsBeforeFinalFlushGroup()                    */
     374             : /************************************************************************/
     375             : 
     376             : // Add a gdal:geo extension metadata for now, which embeds a bbox
     377         143 : void OGRFeatherWriterLayer::PerformStepsBeforeFinalFlushGroup()
     378             : {
     379         282 :     if (m_poFooterKeyValueMetadata &&
     380         277 :         m_poFeatureDefn->GetGeomFieldCount() != 0 &&
     381         134 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GDAL_FOOTER", "YES")))
     382             :     {
     383         262 :         CPLJSONObject oRoot;
     384         131 :         oRoot.Add("primary_column",
     385         131 :                   m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
     386         131 :         CPLJSONObject oColumns;
     387         131 :         oRoot.Add("columns", oColumns);
     388         262 :         for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
     389             :         {
     390         131 :             const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
     391         262 :             CPLJSONObject oColumn;
     392         131 :             oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
     393         131 :             oColumn.Add("encoding",
     394         131 :                         GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
     395             : 
     396         131 :             const auto poSRS = poGeomFieldDefn->GetSpatialRef();
     397         131 :             if (poSRS)
     398             :             {
     399         123 :                 const char *const apszOptions[] = {"FORMAT=WKT2_2019",
     400             :                                                    "MULTILINE=NO", nullptr};
     401         123 :                 char *pszWKT = nullptr;
     402         123 :                 poSRS->exportToWkt(&pszWKT, apszOptions);
     403         123 :                 if (pszWKT)
     404         123 :                     oColumn.Add("crs", pszWKT);
     405         123 :                 CPLFree(pszWKT);
     406             : 
     407         123 :                 const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
     408         123 :                 if (dfCoordEpoch > 0)
     409           1 :                     oColumn.Add("epoch", dfCoordEpoch);
     410             :             }
     411             : 
     412         131 :             if (m_aoEnvelopes[i].IsInit())
     413             :             {
     414         116 :                 CPLJSONArray oBBOX;
     415         116 :                 oBBOX.Add(m_aoEnvelopes[i].MinX);
     416         116 :                 oBBOX.Add(m_aoEnvelopes[i].MinY);
     417         116 :                 oBBOX.Add(m_aoEnvelopes[i].MaxX);
     418         116 :                 oBBOX.Add(m_aoEnvelopes[i].MaxY);
     419         116 :                 oColumn.Add("bbox", oBBOX);
     420             :             }
     421             :         }
     422             : 
     423         262 :         m_poFooterKeyValueMetadata->Append(
     424             :             GDAL_GEO_FOOTER_KEY,
     425         262 :             oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
     426             :     }
     427         143 : }
     428             : 
     429             : /************************************************************************/
     430             : /*                            FlushGroup()                              */
     431             : /************************************************************************/
     432             : 
     433          22 : bool OGRFeatherWriterLayer::FlushGroup()
     434             : {
     435          22 :     std::vector<std::shared_ptr<arrow::Array>> columns;
     436          22 :     auto ret = WriteArrays(
     437         846 :         [&columns](const std::shared_ptr<arrow::Field> &,
     438         846 :                    const std::shared_ptr<arrow::Array> &array)
     439             :         {
     440         846 :             columns.emplace_back(array);
     441         846 :             return true;
     442             :         });
     443             : 
     444          22 :     if (ret)
     445             :     {
     446             :         auto poRecordBatch = arrow::RecordBatch::Make(
     447          66 :             m_poSchema, !columns.empty() ? columns[0]->length() : 0, columns);
     448          44 :         auto status = m_poFileWriter->WriteRecordBatch(*poRecordBatch);
     449          22 :         if (!status.ok())
     450             :         {
     451           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     452             :                      "WriteRecordBatch() failed with %s",
     453           0 :                      status.message().c_str());
     454           0 :             ret = false;
     455             :         }
     456             :     }
     457             : 
     458          22 :     ClearArrayBuilers();
     459          44 :     return ret;
     460             : }
     461             : 
     462             : /************************************************************************/
     463             : /*                          WriteArrowBatch()                           */
     464             : /************************************************************************/
     465             : 
     466             : inline bool
     467         112 : OGRFeatherWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
     468             :                                        struct ArrowArray *array,
     469             :                                        CSLConstList papszOptions)
     470             : {
     471         224 :     return WriteArrowBatchInternal(
     472             :         schema, array, papszOptions,
     473         112 :         [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
     474             :         {
     475         224 :             auto status = m_poFileWriter->WriteRecordBatch(*poBatch);
     476         112 :             if (!status.ok())
     477             :             {
     478           0 :                 CPLError(CE_Failure, CPLE_AppDefined,
     479             :                          "WriteRecordBatch() failed: %s",
     480           0 :                          status.message().c_str());
     481           0 :                 return false;
     482             :             }
     483             : 
     484         112 :             return true;
     485         224 :         });
     486             : }

Generated by: LCOV version 1.14