LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow - ogrfeatherwriterlayer.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 207 231 89.6 %
Date: 2025-09-10 17:48:50 Functions: 12 12 100.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Feather Translator
       4             :  * Purpose:  Implements OGRFeatherDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "ogr_feather.h"
      14             : 
      15             : #include "../arrow_common/ograrrowwriterlayer.hpp"
      16             : 
      17             : /************************************************************************/
      18             : /*                      OGRFeatherWriterLayer()                         */
      19             : /************************************************************************/
      20             : 
      21         150 : OGRFeatherWriterLayer::OGRFeatherWriterLayer(
      22             :     GDALDataset *poDS, arrow::MemoryPool *poMemoryPool,
      23             :     const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
      24         150 :     const char *pszLayerName)
      25             :     : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
      26         150 :       m_poDS(poDS)
      27             : {
      28         150 :     m_bWriteFieldArrowExtensionName = true;
      29         150 : }
      30             : 
      31             : /************************************************************************/
      32             : /*                                Close()                               */
      33             : /************************************************************************/
      34             : 
      35         141 : bool OGRFeatherWriterLayer::Close()
      36             : {
      37         141 :     if (m_bInitializationOK)
      38             :     {
      39         141 :         if (!FinalizeWriting())
      40           0 :             return false;
      41             :     }
      42             : 
      43         141 :     return true;
      44             : }
      45             : 
      46             : /************************************************************************/
      47             : /*                       IsSupportedGeometryType()                      */
      48             : /************************************************************************/
      49             : 
      50         144 : bool OGRFeatherWriterLayer::IsSupportedGeometryType(
      51             :     OGRwkbGeometryType eGType) const
      52             : {
      53         144 :     if (eGType != wkbFlatten(eGType))
      54             :     {
      55             :         const auto osConfigOptionName =
      56         170 :             "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
      57          85 :         if (!CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
      58             :         {
      59           7 :             CPLError(CE_Failure, CPLE_NotSupported,
      60             :                      "Only 2D geometry types are supported (unless the "
      61             :                      "%s configuration option is set to YES)",
      62             :                      osConfigOptionName.c_str());
      63           7 :             return false;
      64             :         }
      65             :     }
      66         137 :     return true;
      67             : }
      68             : 
      69             : /************************************************************************/
      70             : /*                           SetOptions()                               */
      71             : /************************************************************************/
      72             : 
      73         150 : bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
      74             :                                        CSLConstList papszOptions,
      75             :                                        const OGRSpatialReference *poSpatialRef,
      76             :                                        OGRwkbGeometryType eGType)
      77             : {
      78             :     const char *pszDefaultFormat =
      79         300 :         (EQUAL(CPLGetExtensionSafe(osFilename.c_str()).c_str(), "arrows") ||
      80         150 :          STARTS_WITH_CI(osFilename.c_str(), "/vsistdout"))
      81         450 :             ? "STREAM"
      82         150 :             : "FILE";
      83         150 :     m_bStreamFormat =
      84         150 :         EQUAL(CSLFetchNameValueDef(papszOptions, "FORMAT", pszDefaultFormat),
      85             :               "STREAM");
      86             : 
      87             :     const char *pszGeomEncoding =
      88         150 :         CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
      89         150 :     m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
      90         150 :     if (pszGeomEncoding)
      91             :     {
      92         121 :         if (EQUAL(pszGeomEncoding, "WKB"))
      93          31 :             m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
      94          90 :         else if (EQUAL(pszGeomEncoding, "WKT"))
      95          29 :             m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
      96          61 :         else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
      97          30 :             m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
      98          31 :         else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
      99           6 :                  EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
     100          31 :             m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
     101             :         else
     102             :         {
     103           0 :             CPLError(CE_Failure, CPLE_NotSupported,
     104             :                      "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
     105           0 :             return false;
     106             :         }
     107             :     }
     108             : 
     109         150 :     if (eGType != wkbNone)
     110             :     {
     111         144 :         if (!IsSupportedGeometryType(eGType))
     112             :         {
     113           9 :             return false;
     114             :         }
     115             : 
     116         137 :         if (poSpatialRef == nullptr)
     117             :         {
     118           8 :             CPLError(CE_Warning, CPLE_AppDefined,
     119             :                      "Geometry column should have an associated CRS");
     120             :         }
     121             : 
     122         137 :         m_poFeatureDefn->SetGeomType(eGType);
     123         137 :         auto eGeomEncoding = m_eGeomEncoding;
     124         137 :         if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
     125         107 :             eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
     126             :         {
     127          77 :             const auto eEncodingType = eGeomEncoding;
     128          77 :             eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
     129          77 :             if (eGeomEncoding == eEncodingType)
     130           2 :                 return false;
     131             :         }
     132         135 :         m_aeGeomEncoding.push_back(eGeomEncoding);
     133         135 :         m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
     134             :             CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
     135         135 :         if (poSpatialRef)
     136             :         {
     137         129 :             auto poSRS = poSpatialRef->Clone();
     138         129 :             m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
     139         129 :             poSRS->Release();
     140             :         }
     141             :     }
     142             : 
     143         141 :     m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
     144             : 
     145         141 :     const char *pszCompression = CSLFetchNameValue(papszOptions, "COMPRESSION");
     146         141 :     if (pszCompression == nullptr)
     147             :     {
     148         414 :         auto oResult = arrow::util::Codec::GetCompressionType("lz4");
     149         138 :         if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
     150             :         {
     151         138 :             pszCompression = "LZ4";
     152             :         }
     153             :         else
     154             :         {
     155           0 :             pszCompression = "NONE";
     156             :         }
     157             :     }
     158             : 
     159         141 :     if (EQUAL(pszCompression, "NONE"))
     160           0 :         pszCompression = "UNCOMPRESSED";
     161             :     auto oResult = arrow::util::Codec::GetCompressionType(
     162         282 :         CPLString(pszCompression).tolower());
     163         141 :     if (!oResult.ok())
     164             :     {
     165           0 :         CPLError(CE_Failure, CPLE_NotSupported,
     166             :                  "Unrecognized compression method: %s", pszCompression);
     167           0 :         return false;
     168             :     }
     169         141 :     m_eCompression = *oResult;
     170         141 :     if (!arrow::util::Codec::IsAvailable(m_eCompression))
     171             :     {
     172           0 :         CPLError(CE_Failure, CPLE_NotSupported,
     173             :                  "Compression method %s is known, but libarrow has not "
     174             :                  "been built with support for it",
     175             :                  pszCompression);
     176           0 :         return false;
     177             :     }
     178             : 
     179         141 :     const char *pszRowGroupSize = CSLFetchNameValue(papszOptions, "BATCH_SIZE");
     180         141 :     if (pszRowGroupSize)
     181             :     {
     182           3 :         auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
     183           3 :         if (nRowGroupSize > 0)
     184             :         {
     185           3 :             if (nRowGroupSize > INT_MAX)
     186           0 :                 nRowGroupSize = INT_MAX;
     187           3 :             m_nRowGroupSize = nRowGroupSize;
     188             :         }
     189             :     }
     190             : 
     191         141 :     m_bInitializationOK = true;
     192         141 :     return true;
     193             : }
     194             : 
     195             : /************************************************************************/
     196             : /*                         CloseFileWriter()                            */
     197             : /************************************************************************/
     198             : 
     199         141 : bool OGRFeatherWriterLayer::CloseFileWriter()
     200             : {
     201         282 :     auto status = m_poFileWriter->Close();
     202         141 :     if (!status.ok())
     203             :     {
     204           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     205             :                  "FileWriter::Close() failed with %s",
     206           0 :                  status.message().c_str());
     207             :     }
     208         282 :     return status.ok();
     209             : }
     210             : 
     211             : /************************************************************************/
     212             : /*                          CreateSchema()                              */
     213             : /************************************************************************/
     214             : 
     215         141 : void OGRFeatherWriterLayer::CreateSchema()
     216             : {
     217         141 :     CreateSchemaCommon();
     218             : 
     219         276 :     if (m_poFeatureDefn->GetGeomFieldCount() != 0 &&
     220         135 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GEO", "YES")))
     221             :     {
     222         268 :         CPLJSONObject oRoot;
     223         134 :         oRoot.Add("schema_version", "0.1.0");
     224         134 :         oRoot.Add("primary_column",
     225         134 :                   m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
     226         268 :         CPLJSONObject oColumns;
     227         134 :         oRoot.Add("columns", oColumns);
     228         268 :         for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
     229             :         {
     230         134 :             const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
     231         268 :             CPLJSONObject oColumn;
     232         134 :             oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
     233         134 :             oColumn.Add("encoding",
     234         134 :                         GetGeomEncodingAsString(m_aeGeomEncoding[i], false));
     235             : 
     236         134 :             const auto poSRS = poGeomFieldDefn->GetSpatialRef();
     237         134 :             if (poSRS)
     238             :             {
     239         128 :                 const char *const apszOptions[] = {"FORMAT=WKT2_2019",
     240             :                                                    "MULTILINE=NO", nullptr};
     241         128 :                 char *pszWKT = nullptr;
     242         128 :                 poSRS->exportToWkt(&pszWKT, apszOptions);
     243         128 :                 if (pszWKT)
     244         128 :                     oColumn.Add("crs", pszWKT);
     245         128 :                 CPLFree(pszWKT);
     246             : 
     247         128 :                 const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
     248         128 :                 if (dfCoordEpoch > 0)
     249           2 :                     oColumn.Add("epoch", dfCoordEpoch);
     250             :             }
     251             : 
     252             : #if 0
     253             :             if( m_aoEnvelopes[i].IsInit() &&
     254             :                 CPLTestBool(CPLGetConfigOption(
     255             :                     "OGR_ARROW_WRITE_BBOX", "YES")) )
     256             :             {
     257             :                 CPLJSONArray oBBOX;
     258             :                 oBBOX.Add(m_aoEnvelopes[i].MinX);
     259             :                 oBBOX.Add(m_aoEnvelopes[i].MinY);
     260             :                 oBBOX.Add(m_aoEnvelopes[i].MaxX);
     261             :                 oBBOX.Add(m_aoEnvelopes[i].MaxY);
     262             :                 oColumn.Add("bbox", oBBOX);
     263             :             }
     264             : #endif
     265         134 :             const auto eType = poGeomFieldDefn->GetType();
     266         134 :             if (CPLTestBool(CPLGetConfigOption(
     267         268 :                     "OGR_ARROW_WRITE_GDAL_GEOMETRY_TYPE", "YES")) &&
     268         134 :                 eType == wkbFlatten(eType))
     269             :             {
     270             :                 // Geometry type, place under a temporary "gdal:geometry_type"
     271             :                 // property pending acceptance of proposal at
     272             :                 // https://github.com/opengeospatial/geoparquet/issues/41
     273          56 :                 const char *pszType = "mixed";
     274          56 :                 if (wkbPoint == eType)
     275          19 :                     pszType = "Point";
     276          37 :                 else if (wkbLineString == eType)
     277           7 :                     pszType = "LineString";
     278          30 :                 else if (wkbPolygon == eType)
     279           7 :                     pszType = "Polygon";
     280          23 :                 else if (wkbMultiPoint == eType)
     281           7 :                     pszType = "MultiPoint";
     282          16 :                 else if (wkbMultiLineString == eType)
     283           7 :                     pszType = "MultiLineString";
     284           9 :                 else if (wkbMultiPolygon == eType)
     285           7 :                     pszType = "MultiPolygon";
     286           2 :                 else if (wkbGeometryCollection == eType)
     287           2 :                     pszType = "GeometryCollection";
     288          56 :                 oColumn.Add("gdal:geometry_type", pszType);
     289             :             }
     290             :         }
     291             : 
     292         134 :         auto kvMetadata = m_poSchema->metadata()
     293           2 :                               ? m_poSchema->metadata()->Copy()
     294         270 :                               : std::make_shared<arrow::KeyValueMetadata>();
     295         268 :         kvMetadata->Append("geo",
     296         268 :                            oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
     297         134 :         m_poSchema = m_poSchema->WithMetadata(kvMetadata);
     298         134 :         CPLAssert(m_poSchema);
     299             :     }
     300         141 : }
     301             : 
     302             : /************************************************************************/
     303             : /*                          CreateWriter()                              */
     304             : /************************************************************************/
     305             : 
     306         141 : void OGRFeatherWriterLayer::CreateWriter()
     307             : {
     308         141 :     CPLAssert(m_poFileWriter == nullptr);
     309             : 
     310         141 :     if (m_poSchema == nullptr)
     311             :     {
     312          15 :         CreateSchema();
     313             :     }
     314             :     else
     315             :     {
     316         126 :         FinalizeSchema();
     317             :     }
     318             : 
     319         282 :     auto options = arrow::ipc::IpcWriteOptions::Defaults();
     320         141 :     options.memory_pool = m_poMemoryPool;
     321             : 
     322             :     {
     323         282 :         auto result = arrow::util::Codec::Create(m_eCompression);
     324         141 :         if (!result.ok())
     325             :         {
     326           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     327             :                      "Codec::Create() failed with %s",
     328           0 :                      result.status().message().c_str());
     329             :         }
     330             :         else
     331             :         {
     332         141 :             options.codec.reset(result->release());
     333             :         }
     334             :     }
     335             : 
     336         141 :     if (m_bStreamFormat)
     337             :     {
     338             :         auto result =
     339           8 :             arrow::ipc::MakeStreamWriter(m_poOutputStream, m_poSchema, options);
     340           4 :         if (!result.ok())
     341             :         {
     342           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     343             :                      "arrow::ipc::MakeStreamWriter() failed with %s",
     344           0 :                      result.status().message().c_str());
     345             :         }
     346             :         else
     347             :         {
     348           4 :             m_poFileWriter = *result;
     349             :         }
     350             :     }
     351             :     else
     352             :     {
     353             :         m_poFooterKeyValueMetadata =
     354         137 :             std::make_shared<arrow::KeyValueMetadata>();
     355             :         auto result = arrow::ipc::MakeFileWriter(
     356         411 :             m_poOutputStream, m_poSchema, options, m_poFooterKeyValueMetadata);
     357         137 :         if (!result.ok())
     358             :         {
     359           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     360             :                      "arrow::ipc::MakeFileWriter() failed with %s",
     361           0 :                      result.status().message().c_str());
     362             :         }
     363             :         else
     364             :         {
     365         137 :             m_poFileWriter = *result;
     366             :         }
     367             :     }
     368         141 : }
     369             : 
     370             : /************************************************************************/
     371             : /*               PerformStepsBeforeFinalFlushGroup()                    */
     372             : /************************************************************************/
     373             : 
     374             : // Add a gdal:geo extension metadata for now, which embeds a bbox
     375         141 : void OGRFeatherWriterLayer::PerformStepsBeforeFinalFlushGroup()
     376             : {
     377         278 :     if (m_poFooterKeyValueMetadata &&
     378         273 :         m_poFeatureDefn->GetGeomFieldCount() != 0 &&
     379         132 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GDAL_FOOTER", "YES")))
     380             :     {
     381         258 :         CPLJSONObject oRoot;
     382         129 :         oRoot.Add("primary_column",
     383         129 :                   m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
     384         129 :         CPLJSONObject oColumns;
     385         129 :         oRoot.Add("columns", oColumns);
     386         258 :         for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
     387             :         {
     388         129 :             const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
     389         258 :             CPLJSONObject oColumn;
     390         129 :             oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
     391         129 :             oColumn.Add("encoding",
     392         129 :                         GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
     393             : 
     394         129 :             const auto poSRS = poGeomFieldDefn->GetSpatialRef();
     395         129 :             if (poSRS)
     396             :             {
     397         123 :                 const char *const apszOptions[] = {"FORMAT=WKT2_2019",
     398             :                                                    "MULTILINE=NO", nullptr};
     399         123 :                 char *pszWKT = nullptr;
     400         123 :                 poSRS->exportToWkt(&pszWKT, apszOptions);
     401         123 :                 if (pszWKT)
     402         123 :                     oColumn.Add("crs", pszWKT);
     403         123 :                 CPLFree(pszWKT);
     404             : 
     405         123 :                 const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
     406         123 :                 if (dfCoordEpoch > 0)
     407           1 :                     oColumn.Add("epoch", dfCoordEpoch);
     408             :             }
     409             : 
     410         129 :             if (m_aoEnvelopes[i].IsInit())
     411             :             {
     412         116 :                 CPLJSONArray oBBOX;
     413         116 :                 oBBOX.Add(m_aoEnvelopes[i].MinX);
     414         116 :                 oBBOX.Add(m_aoEnvelopes[i].MinY);
     415         116 :                 oBBOX.Add(m_aoEnvelopes[i].MaxX);
     416         116 :                 oBBOX.Add(m_aoEnvelopes[i].MaxY);
     417         116 :                 oColumn.Add("bbox", oBBOX);
     418             :             }
     419             :         }
     420             : 
     421         258 :         m_poFooterKeyValueMetadata->Append(
     422             :             GDAL_GEO_FOOTER_KEY,
     423         258 :             oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
     424             :     }
     425         141 : }
     426             : 
     427             : /************************************************************************/
     428             : /*                            FlushGroup()                              */
     429             : /************************************************************************/
     430             : 
     431          20 : bool OGRFeatherWriterLayer::FlushGroup()
     432             : {
     433          20 :     std::vector<std::shared_ptr<arrow::Array>> columns;
     434          20 :     auto ret = WriteArrays(
     435         840 :         [&columns](const std::shared_ptr<arrow::Field> &,
     436         840 :                    const std::shared_ptr<arrow::Array> &array)
     437             :         {
     438         840 :             columns.emplace_back(array);
     439         840 :             return true;
     440             :         });
     441             : 
     442          20 :     if (ret)
     443             :     {
     444             :         auto poRecordBatch = arrow::RecordBatch::Make(
     445          60 :             m_poSchema, !columns.empty() ? columns[0]->length() : 0, columns);
     446          40 :         auto status = m_poFileWriter->WriteRecordBatch(*poRecordBatch);
     447          20 :         if (!status.ok())
     448             :         {
     449           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     450             :                      "WriteRecordBatch() failed with %s",
     451           0 :                      status.message().c_str());
     452           0 :             ret = false;
     453             :         }
     454             :     }
     455             : 
     456          20 :     ClearArrayBuilers();
     457          40 :     return ret;
     458             : }
     459             : 
     460             : /************************************************************************/
     461             : /*                          WriteArrowBatch()                           */
     462             : /************************************************************************/
     463             : 
     464             : inline bool
     465         112 : OGRFeatherWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
     466             :                                        struct ArrowArray *array,
     467             :                                        CSLConstList papszOptions)
     468             : {
     469         224 :     return WriteArrowBatchInternal(
     470             :         schema, array, papszOptions,
     471         112 :         [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
     472             :         {
     473         224 :             auto status = m_poFileWriter->WriteRecordBatch(*poBatch);
     474         112 :             if (!status.ok())
     475             :             {
     476           0 :                 CPLError(CE_Failure, CPLE_AppDefined,
     477             :                          "WriteRecordBatch() failed: %s",
     478           0 :                          status.message().c_str());
     479           0 :                 return false;
     480             :             }
     481             : 
     482         112 :             return true;
     483         224 :         });
     484             : }

Generated by: LCOV version 1.14