LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow - ogrfeatherwriterlayer.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 208 232 89.7 %
Date: 2026-06-11 20:55:18 Functions: 12 12 100.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Feather Translator
       4             :  * Purpose:  Implements OGRFeatherDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "ogr_feather.h"
      14             : 
      15             : #include "../arrow_common/ograrrowwriterlayer.hpp"
      16             : 
      17             : /************************************************************************/
      18             : /*                       OGRFeatherWriterLayer()                        */
      19             : /************************************************************************/
      20             : 
      21         152 : OGRFeatherWriterLayer::OGRFeatherWriterLayer(
      22             :     GDALDataset *poDS, arrow::MemoryPool *poMemoryPool,
      23             :     const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
      24         152 :     const char *pszLayerName)
      25             :     : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
      26         152 :       m_poDS(poDS)
      27             : {
      28         152 :     m_bWriteFieldArrowExtensionName = true;
      29         152 : }
      30             : 
      31             : /************************************************************************/
      32             : /*                               Close()                                */
      33             : /************************************************************************/
      34             : 
      35         143 : bool OGRFeatherWriterLayer::Close()
      36             : {
      37         143 :     if (m_bInitializationOK)
      38             :     {
      39         143 :         if (!FinalizeWriting())
      40           0 :             return false;
      41             :     }
      42             : 
      43         143 :     return true;
      44             : }
      45             : 
      46             : /************************************************************************/
      47             : /*                      IsSupportedGeometryType()                       */
      48             : /************************************************************************/
      49             : 
      50         146 : bool OGRFeatherWriterLayer::IsSupportedGeometryType(
      51             :     OGRwkbGeometryType eGType) const
      52             : {
      53         146 :     if (eGType != wkbFlatten(eGType))
      54             :     {
      55             :         const auto osConfigOptionName =
      56         170 :             "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
      57          85 :         if (!CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
      58             :         {
      59           7 :             CPLError(CE_Failure, CPLE_NotSupported,
      60             :                      "Only 2D geometry types are supported (unless the "
      61             :                      "%s configuration option is set to YES)",
      62             :                      osConfigOptionName.c_str());
      63           7 :             return false;
      64             :         }
      65             :     }
      66         139 :     return true;
      67             : }
      68             : 
      69             : /************************************************************************/
      70             : /*                             SetOptions()                             */
      71             : /************************************************************************/
      72             : 
      73         152 : bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
      74             :                                        CSLConstList papszOptions,
      75             :                                        const OGRSpatialReference *poSpatialRef,
      76             :                                        OGRwkbGeometryType eGType)
      77             : {
      78         152 :     m_aosCreationOptions = papszOptions;
      79             : 
      80             :     const char *pszDefaultFormat =
      81         304 :         (EQUAL(CPLGetExtensionSafe(osFilename.c_str()).c_str(), "arrows") ||
      82         152 :          STARTS_WITH_CI(osFilename.c_str(), "/vsistdout"))
      83         456 :             ? "STREAM"
      84         152 :             : "FILE";
      85         152 :     m_bStreamFormat =
      86         152 :         EQUAL(CSLFetchNameValueDef(papszOptions, "FORMAT", pszDefaultFormat),
      87             :               "STREAM");
      88             : 
      89             :     const char *pszGeomEncoding =
      90         152 :         CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
      91         152 :     m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
      92         152 :     if (pszGeomEncoding)
      93             :     {
      94         121 :         if (EQUAL(pszGeomEncoding, "WKB"))
      95          31 :             m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
      96          90 :         else if (EQUAL(pszGeomEncoding, "WKT"))
      97          29 :             m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
      98          61 :         else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
      99          30 :             m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
     100          31 :         else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
     101           6 :                  EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
     102          31 :             m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
     103             :         else
     104             :         {
     105           0 :             CPLError(CE_Failure, CPLE_NotSupported,
     106             :                      "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
     107           0 :             return false;
     108             :         }
     109             :     }
     110             : 
     111         152 :     if (eGType != wkbNone)
     112             :     {
     113         146 :         if (!IsSupportedGeometryType(eGType))
     114             :         {
     115           9 :             return false;
     116             :         }
     117             : 
     118         139 :         if (poSpatialRef == nullptr)
     119             :         {
     120          10 :             CPLError(CE_Warning, CPLE_AppDefined,
     121             :                      "Geometry column should have an associated CRS");
     122             :         }
     123             : 
     124         139 :         m_poFeatureDefn->SetGeomType(eGType);
     125         139 :         auto eGeomEncoding = m_eGeomEncoding;
     126         139 :         if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
     127         109 :             eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
     128             :         {
     129          79 :             const auto eEncodingType = eGeomEncoding;
     130          79 :             eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
     131          79 :             if (eGeomEncoding == eEncodingType)
     132           2 :                 return false;
     133             :         }
     134         137 :         m_aeGeomEncoding.push_back(eGeomEncoding);
     135         137 :         m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
     136             :             CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
     137         137 :         if (poSpatialRef)
     138             :         {
     139         129 :             auto poSRS = poSpatialRef->Clone();
     140         129 :             m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
     141         129 :             poSRS->Release();
     142             :         }
     143             :     }
     144             : 
     145         143 :     m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
     146             : 
     147             :     const char *pszCompression =
     148         143 :         CSLFetchNameValue(papszOptions, GDALMD_COMPRESSION);
     149         143 :     if (pszCompression == nullptr)
     150             :     {
     151         420 :         auto oResult = arrow::util::Codec::GetCompressionType("lz4");
     152         140 :         if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
     153             :         {
     154         140 :             pszCompression = "LZ4";
     155             :         }
     156             :         else
     157             :         {
     158           0 :             pszCompression = "NONE";
     159             :         }
     160             :     }
     161             : 
     162         143 :     if (EQUAL(pszCompression, "NONE"))
     163           0 :         pszCompression = "UNCOMPRESSED";
     164             :     auto oResult = arrow::util::Codec::GetCompressionType(
     165         286 :         CPLString(pszCompression).tolower());
     166         143 :     if (!oResult.ok())
     167             :     {
     168           0 :         CPLError(CE_Failure, CPLE_NotSupported,
     169             :                  "Unrecognized compression method: %s", pszCompression);
     170           0 :         return false;
     171             :     }
     172         143 :     m_eCompression = *oResult;
     173         143 :     if (!arrow::util::Codec::IsAvailable(m_eCompression))
     174             :     {
     175           0 :         CPLError(CE_Failure, CPLE_NotSupported,
     176             :                  "Compression method %s is known, but libarrow has not "
     177             :                  "been built with support for it",
     178             :                  pszCompression);
     179           0 :         return false;
     180             :     }
     181             : 
     182         143 :     const char *pszRowGroupSize = CSLFetchNameValue(papszOptions, "BATCH_SIZE");
     183         143 :     if (pszRowGroupSize)
     184             :     {
     185           3 :         auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
     186           3 :         if (nRowGroupSize > 0)
     187             :         {
     188           3 :             if (nRowGroupSize > INT_MAX)
     189           0 :                 nRowGroupSize = INT_MAX;
     190           3 :             m_nRowGroupSize = nRowGroupSize;
     191             :         }
     192             :     }
     193             : 
     194         143 :     m_bInitializationOK = true;
     195         143 :     return true;
     196             : }
     197             : 
     198             : /************************************************************************/
     199             : /*                          CloseFileWriter()                           */
     200             : /************************************************************************/
     201             : 
     202         143 : bool OGRFeatherWriterLayer::CloseFileWriter()
     203             : {
     204         286 :     auto status = m_poFileWriter->Close();
     205         143 :     if (!status.ok())
     206             :     {
     207           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     208             :                  "FileWriter::Close() failed with %s",
     209           0 :                  status.message().c_str());
     210             :     }
     211         286 :     return status.ok();
     212             : }
     213             : 
     214             : /************************************************************************/
     215             : /*                            CreateSchema()                            */
     216             : /************************************************************************/
     217             : 
     218         143 : void OGRFeatherWriterLayer::CreateSchema()
     219             : {
     220         143 :     CreateSchemaCommon();
     221             : 
     222         280 :     if (m_poFeatureDefn->GetGeomFieldCount() != 0 &&
     223         137 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GEO", "YES")))
     224             :     {
     225         272 :         CPLJSONObject oRoot;
     226         136 :         oRoot.Add("schema_version", "0.1.0");
     227         136 :         oRoot.Add("primary_column",
     228         136 :                   m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
     229         272 :         CPLJSONObject oColumns;
     230         136 :         oRoot.Add("columns", oColumns);
     231         272 :         for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
     232             :         {
     233         136 :             const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
     234         272 :             CPLJSONObject oColumn;
     235         136 :             oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
     236         136 :             oColumn.Add("encoding",
     237         136 :                         GetGeomEncodingAsString(m_aeGeomEncoding[i], false));
     238             : 
     239         136 :             const auto poSRS = poGeomFieldDefn->GetSpatialRef();
     240         136 :             if (poSRS)
     241             :             {
     242         128 :                 const char *const apszOptions[] = {"FORMAT=WKT2_2019",
     243             :                                                    "MULTILINE=NO", nullptr};
     244         128 :                 char *pszWKT = nullptr;
     245         128 :                 poSRS->exportToWkt(&pszWKT, apszOptions);
     246         128 :                 if (pszWKT)
     247         128 :                     oColumn.Add("crs", pszWKT);
     248         128 :                 CPLFree(pszWKT);
     249             : 
     250         128 :                 const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
     251         128 :                 if (dfCoordEpoch > 0)
     252           2 :                     oColumn.Add("epoch", dfCoordEpoch);
     253             :             }
     254             : 
     255             : #if 0
     256             :             if( m_aoEnvelopes[i].IsInit() &&
     257             :                 CPLTestBool(CPLGetConfigOption(
     258             :                     "OGR_ARROW_WRITE_BBOX", "YES")) )
     259             :             {
     260             :                 CPLJSONArray oBBOX;
     261             :                 oBBOX.Add(m_aoEnvelopes[i].MinX);
     262             :                 oBBOX.Add(m_aoEnvelopes[i].MinY);
     263             :                 oBBOX.Add(m_aoEnvelopes[i].MaxX);
     264             :                 oBBOX.Add(m_aoEnvelopes[i].MaxY);
     265             :                 oColumn.Add("bbox", oBBOX);
     266             :             }
     267             : #endif
     268         136 :             const auto eType = poGeomFieldDefn->GetType();
     269         136 :             if (CPLTestBool(CPLGetConfigOption(
     270         272 :                     "OGR_ARROW_WRITE_GDAL_GEOMETRY_TYPE", "YES")) &&
     271         136 :                 eType == wkbFlatten(eType))
     272             :             {
     273             :                 // Geometry type, place under a temporary "gdal:geometry_type"
     274             :                 // property pending acceptance of proposal at
     275             :                 // https://github.com/opengeospatial/geoparquet/issues/41
     276          58 :                 const char *pszType = "mixed";
     277          58 :                 if (wkbPoint == eType)
     278          19 :                     pszType = "Point";
     279          39 :                 else if (wkbLineString == eType)
     280           7 :                     pszType = "LineString";
     281          32 :                 else if (wkbPolygon == eType)
     282           9 :                     pszType = "Polygon";
     283          23 :                 else if (wkbMultiPoint == eType)
     284           7 :                     pszType = "MultiPoint";
     285          16 :                 else if (wkbMultiLineString == eType)
     286           7 :                     pszType = "MultiLineString";
     287           9 :                 else if (wkbMultiPolygon == eType)
     288           7 :                     pszType = "MultiPolygon";
     289           2 :                 else if (wkbGeometryCollection == eType)
     290           2 :                     pszType = "GeometryCollection";
     291          58 :                 oColumn.Add("gdal:geometry_type", pszType);
     292             :             }
     293             :         }
     294             : 
     295         136 :         auto kvMetadata = m_poSchema->metadata()
     296           2 :                               ? m_poSchema->metadata()->Copy()
     297         274 :                               : std::make_shared<arrow::KeyValueMetadata>();
     298         272 :         kvMetadata->Append("geo",
     299         272 :                            oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
     300         136 :         m_poSchema = m_poSchema->WithMetadata(kvMetadata);
     301         136 :         CPLAssert(m_poSchema);
     302             :     }
     303         143 : }
     304             : 
     305             : /************************************************************************/
     306             : /*                            CreateWriter()                            */
     307             : /************************************************************************/
     308             : 
     309         143 : void OGRFeatherWriterLayer::CreateWriter()
     310             : {
     311         143 :     CPLAssert(m_poFileWriter == nullptr);
     312             : 
     313         143 :     if (m_poSchema == nullptr)
     314             :     {
     315          15 :         CreateSchema();
     316             :     }
     317             :     else
     318             :     {
     319         128 :         FinalizeSchema();
     320             :     }
     321             : 
     322         286 :     auto options = arrow::ipc::IpcWriteOptions::Defaults();
     323         143 :     options.memory_pool = m_poMemoryPool;
     324             : 
     325             :     {
     326         286 :         auto result = arrow::util::Codec::Create(m_eCompression);
     327         143 :         if (!result.ok())
     328             :         {
     329           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     330             :                      "Codec::Create() failed with %s",
     331           0 :                      result.status().message().c_str());
     332             :         }
     333             :         else
     334             :         {
     335         143 :             options.codec.reset(result->release());
     336             :         }
     337             :     }
     338             : 
     339         143 :     if (m_bStreamFormat)
     340             :     {
     341             :         auto result =
     342           8 :             arrow::ipc::MakeStreamWriter(m_poOutputStream, m_poSchema, options);
     343           4 :         if (!result.ok())
     344             :         {
     345           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     346             :                      "arrow::ipc::MakeStreamWriter() failed with %s",
     347           0 :                      result.status().message().c_str());
     348             :         }
     349             :         else
     350             :         {
     351           4 :             m_poFileWriter = *result;
     352             :         }
     353             :     }
     354             :     else
     355             :     {
     356             :         m_poFooterKeyValueMetadata =
     357         139 :             std::make_shared<arrow::KeyValueMetadata>();
     358             :         auto result = arrow::ipc::MakeFileWriter(
     359         417 :             m_poOutputStream, m_poSchema, options, m_poFooterKeyValueMetadata);
     360         139 :         if (!result.ok())
     361             :         {
     362           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     363             :                      "arrow::ipc::MakeFileWriter() failed with %s",
     364           0 :                      result.status().message().c_str());
     365             :         }
     366             :         else
     367             :         {
     368         139 :             m_poFileWriter = *result;
     369             :         }
     370             :     }
     371         143 : }
     372             : 
     373             : /************************************************************************/
     374             : /*                 PerformStepsBeforeFinalFlushGroup()                  */
     375             : /************************************************************************/
     376             : 
     377             : // Add a gdal:geo extension metadata for now, which embeds a bbox
     378         143 : void OGRFeatherWriterLayer::PerformStepsBeforeFinalFlushGroup()
     379             : {
     380         282 :     if (m_poFooterKeyValueMetadata &&
     381         277 :         m_poFeatureDefn->GetGeomFieldCount() != 0 &&
     382         134 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GDAL_FOOTER", "YES")))
     383             :     {
     384         262 :         CPLJSONObject oRoot;
     385         131 :         oRoot.Add("primary_column",
     386         131 :                   m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
     387         131 :         CPLJSONObject oColumns;
     388         131 :         oRoot.Add("columns", oColumns);
     389         262 :         for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
     390             :         {
     391         131 :             const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
     392         262 :             CPLJSONObject oColumn;
     393         131 :             oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
     394         131 :             oColumn.Add("encoding",
     395         131 :                         GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
     396             : 
     397         131 :             const auto poSRS = poGeomFieldDefn->GetSpatialRef();
     398         131 :             if (poSRS)
     399             :             {
     400         123 :                 const char *const apszOptions[] = {"FORMAT=WKT2_2019",
     401             :                                                    "MULTILINE=NO", nullptr};
     402         123 :                 char *pszWKT = nullptr;
     403         123 :                 poSRS->exportToWkt(&pszWKT, apszOptions);
     404         123 :                 if (pszWKT)
     405         123 :                     oColumn.Add("crs", pszWKT);
     406         123 :                 CPLFree(pszWKT);
     407             : 
     408         123 :                 const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
     409         123 :                 if (dfCoordEpoch > 0)
     410           1 :                     oColumn.Add("epoch", dfCoordEpoch);
     411             :             }
     412             : 
     413         131 :             if (m_aoEnvelopes[i].IsInit())
     414             :             {
     415         116 :                 CPLJSONArray oBBOX;
     416         116 :                 oBBOX.Add(m_aoEnvelopes[i].MinX);
     417         116 :                 oBBOX.Add(m_aoEnvelopes[i].MinY);
     418         116 :                 oBBOX.Add(m_aoEnvelopes[i].MaxX);
     419         116 :                 oBBOX.Add(m_aoEnvelopes[i].MaxY);
     420         116 :                 oColumn.Add("bbox", oBBOX);
     421             :             }
     422             :         }
     423             : 
     424         262 :         m_poFooterKeyValueMetadata->Append(
     425             :             GDAL_GEO_FOOTER_KEY,
     426         262 :             oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
     427             :     }
     428         143 : }
     429             : 
     430             : /************************************************************************/
     431             : /*                             FlushGroup()                             */
     432             : /************************************************************************/
     433             : 
     434          22 : bool OGRFeatherWriterLayer::FlushGroup()
     435             : {
     436          22 :     std::vector<std::shared_ptr<arrow::Array>> columns;
     437          22 :     auto ret = WriteArrays(
     438         846 :         [&columns](const std::shared_ptr<arrow::Field> &,
     439         846 :                    const std::shared_ptr<arrow::Array> &array)
     440             :         {
     441         846 :             columns.emplace_back(array);
     442         846 :             return true;
     443             :         });
     444             : 
     445          22 :     if (ret)
     446             :     {
     447             :         auto poRecordBatch = arrow::RecordBatch::Make(
     448          66 :             m_poSchema, !columns.empty() ? columns[0]->length() : 0, columns);
     449          44 :         auto status = m_poFileWriter->WriteRecordBatch(*poRecordBatch);
     450          22 :         if (!status.ok())
     451             :         {
     452           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     453             :                      "WriteRecordBatch() failed with %s",
     454           0 :                      status.message().c_str());
     455           0 :             ret = false;
     456             :         }
     457             :     }
     458             : 
     459          22 :     ClearArrayBuilers();
     460          44 :     return ret;
     461             : }
     462             : 
     463             : /************************************************************************/
     464             : /*                          WriteArrowBatch()                           */
     465             : /************************************************************************/
     466             : 
     467             : inline bool
     468         112 : OGRFeatherWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
     469             :                                        struct ArrowArray *array,
     470             :                                        CSLConstList papszOptions)
     471             : {
     472         224 :     return WriteArrowBatchInternal(
     473             :         schema, array, papszOptions,
     474         112 :         [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
     475             :         {
     476         224 :             auto status = m_poFileWriter->WriteRecordBatch(*poBatch);
     477         112 :             if (!status.ok())
     478             :             {
     479           0 :                 CPLError(CE_Failure, CPLE_AppDefined,
     480             :                          "WriteRecordBatch() failed: %s",
     481           0 :                          status.message().c_str());
     482           0 :                 return false;
     483             :             }
     484             : 
     485         112 :             return true;
     486         224 :         });
     487             : }

Generated by: LCOV version 1.14