LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow - ogrfeatherwriterlayer.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 207 230 90.0 %
Date: 2024-11-21 22:18:42 Functions: 13 13 100.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Feather Translator
       4             :  * Purpose:  Implements OGRFeatherDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "ogr_feather.h"
      14             : 
      15             : #include "../arrow_common/ograrrowwriterlayer.hpp"
      16             : 
      17             : /************************************************************************/
      18             : /*                      OGRFeatherWriterLayer()                         */
      19             : /************************************************************************/
      20             : 
      21         150 : OGRFeatherWriterLayer::OGRFeatherWriterLayer(
      22             :     GDALDataset *poDS, arrow::MemoryPool *poMemoryPool,
      23             :     const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
      24         150 :     const char *pszLayerName)
      25             :     : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
      26         150 :       m_poDS(poDS)
      27             : {
      28         150 :     m_bWriteFieldArrowExtensionName = true;
      29         150 : }
      30             : 
      31             : /************************************************************************/
      32             : /*                     ~OGRFeatherWriterLayer()                         */
      33             : /************************************************************************/
      34             : 
      35         300 : OGRFeatherWriterLayer::~OGRFeatherWriterLayer()
      36             : {
      37         150 :     if (m_bInitializationOK)
      38         141 :         FinalizeWriting();
      39         300 : }
      40             : 
      41             : /************************************************************************/
      42             : /*                       IsSupportedGeometryType()                      */
      43             : /************************************************************************/
      44             : 
      45         144 : bool OGRFeatherWriterLayer::IsSupportedGeometryType(
      46             :     OGRwkbGeometryType eGType) const
      47             : {
      48         144 :     if (eGType != wkbFlatten(eGType))
      49             :     {
      50             :         const auto osConfigOptionName =
      51         170 :             "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
      52          85 :         if (!CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
      53             :         {
      54           7 :             CPLError(CE_Failure, CPLE_NotSupported,
      55             :                      "Only 2D geometry types are supported (unless the "
      56             :                      "%s configuration option is set to YES)",
      57             :                      osConfigOptionName.c_str());
      58           7 :             return false;
      59             :         }
      60             :     }
      61         137 :     return true;
      62             : }
      63             : 
      64             : /************************************************************************/
      65             : /*                           SetOptions()                               */
      66             : /************************************************************************/
      67             : 
      68         150 : bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
      69             :                                        CSLConstList papszOptions,
      70             :                                        const OGRSpatialReference *poSpatialRef,
      71             :                                        OGRwkbGeometryType eGType)
      72             : {
      73             :     const char *pszDefaultFormat =
      74         150 :         (EQUAL(CPLGetExtension(osFilename.c_str()), "arrows") ||
      75         150 :          STARTS_WITH_CI(osFilename.c_str(), "/vsistdout"))
      76         300 :             ? "STREAM"
      77         150 :             : "FILE";
      78         150 :     m_bStreamFormat =
      79         150 :         EQUAL(CSLFetchNameValueDef(papszOptions, "FORMAT", pszDefaultFormat),
      80             :               "STREAM");
      81             : 
      82             :     const char *pszGeomEncoding =
      83         150 :         CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
      84         150 :     m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
      85         150 :     if (pszGeomEncoding)
      86             :     {
      87         121 :         if (EQUAL(pszGeomEncoding, "WKB"))
      88          31 :             m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
      89          90 :         else if (EQUAL(pszGeomEncoding, "WKT"))
      90          29 :             m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
      91          61 :         else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
      92          30 :             m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
      93          31 :         else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
      94           6 :                  EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
      95          31 :             m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
      96             :         else
      97             :         {
      98           0 :             CPLError(CE_Failure, CPLE_NotSupported,
      99             :                      "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
     100           0 :             return false;
     101             :         }
     102             :     }
     103             : 
     104         150 :     if (eGType != wkbNone)
     105             :     {
     106         144 :         if (!IsSupportedGeometryType(eGType))
     107             :         {
     108           9 :             return false;
     109             :         }
     110             : 
     111         137 :         if (poSpatialRef == nullptr)
     112             :         {
     113           8 :             CPLError(CE_Warning, CPLE_AppDefined,
     114             :                      "Geometry column should have an associated CRS");
     115             :         }
     116             : 
     117         137 :         m_poFeatureDefn->SetGeomType(eGType);
     118         137 :         auto eGeomEncoding = m_eGeomEncoding;
     119         137 :         if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
     120         107 :             eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
     121             :         {
     122          77 :             const auto eEncodingType = eGeomEncoding;
     123          77 :             eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
     124          77 :             if (eGeomEncoding == eEncodingType)
     125           2 :                 return false;
     126             :         }
     127         135 :         m_aeGeomEncoding.push_back(eGeomEncoding);
     128         135 :         m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
     129             :             CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
     130         135 :         if (poSpatialRef)
     131             :         {
     132         129 :             auto poSRS = poSpatialRef->Clone();
     133         129 :             m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
     134         129 :             poSRS->Release();
     135             :         }
     136             :     }
     137             : 
     138         141 :     m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
     139             : 
     140         141 :     const char *pszCompression = CSLFetchNameValue(papszOptions, "COMPRESSION");
     141         141 :     if (pszCompression == nullptr)
     142             :     {
     143         414 :         auto oResult = arrow::util::Codec::GetCompressionType("lz4");
     144         138 :         if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
     145             :         {
     146         138 :             pszCompression = "LZ4";
     147             :         }
     148             :         else
     149             :         {
     150           0 :             pszCompression = "NONE";
     151             :         }
     152             :     }
     153             : 
     154         141 :     if (EQUAL(pszCompression, "NONE"))
     155           0 :         pszCompression = "UNCOMPRESSED";
     156             :     auto oResult = arrow::util::Codec::GetCompressionType(
     157         282 :         CPLString(pszCompression).tolower());
     158         141 :     if (!oResult.ok())
     159             :     {
     160           0 :         CPLError(CE_Failure, CPLE_NotSupported,
     161             :                  "Unrecognized compression method: %s", pszCompression);
     162           0 :         return false;
     163             :     }
     164         141 :     m_eCompression = *oResult;
     165         141 :     if (!arrow::util::Codec::IsAvailable(m_eCompression))
     166             :     {
     167           0 :         CPLError(CE_Failure, CPLE_NotSupported,
     168             :                  "Compression method %s is known, but libarrow has not "
     169             :                  "been built with support for it",
     170             :                  pszCompression);
     171           0 :         return false;
     172             :     }
     173             : 
     174         141 :     const char *pszRowGroupSize = CSLFetchNameValue(papszOptions, "BATCH_SIZE");
     175         141 :     if (pszRowGroupSize)
     176             :     {
     177           3 :         auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
     178           3 :         if (nRowGroupSize > 0)
     179             :         {
     180           3 :             if (nRowGroupSize > INT_MAX)
     181           0 :                 nRowGroupSize = INT_MAX;
     182           3 :             m_nRowGroupSize = nRowGroupSize;
     183             :         }
     184             :     }
     185             : 
     186         141 :     m_bInitializationOK = true;
     187         141 :     return true;
     188             : }
     189             : 
     190             : /************************************************************************/
     191             : /*                         CloseFileWriter()                            */
     192             : /************************************************************************/
     193             : 
     194         141 : bool OGRFeatherWriterLayer::CloseFileWriter()
     195             : {
     196         282 :     auto status = m_poFileWriter->Close();
     197         141 :     if (!status.ok())
     198             :     {
     199           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     200             :                  "FileWriter::Close() failed with %s",
     201           0 :                  status.message().c_str());
     202             :     }
     203         282 :     return status.ok();
     204             : }
     205             : 
     206             : /************************************************************************/
     207             : /*                          CreateSchema()                              */
     208             : /************************************************************************/
     209             : 
     210         141 : void OGRFeatherWriterLayer::CreateSchema()
     211             : {
     212         141 :     CreateSchemaCommon();
     213             : 
     214         276 :     if (m_poFeatureDefn->GetGeomFieldCount() != 0 &&
     215         135 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GEO", "YES")))
     216             :     {
     217         268 :         CPLJSONObject oRoot;
     218         134 :         oRoot.Add("schema_version", "0.1.0");
     219         134 :         oRoot.Add("primary_column",
     220         134 :                   m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
     221         268 :         CPLJSONObject oColumns;
     222         134 :         oRoot.Add("columns", oColumns);
     223         268 :         for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
     224             :         {
     225         134 :             const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
     226         268 :             CPLJSONObject oColumn;
     227         134 :             oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
     228         134 :             oColumn.Add("encoding",
     229         134 :                         GetGeomEncodingAsString(m_aeGeomEncoding[i], false));
     230             : 
     231         134 :             const auto poSRS = poGeomFieldDefn->GetSpatialRef();
     232         134 :             if (poSRS)
     233             :             {
     234         128 :                 const char *const apszOptions[] = {"FORMAT=WKT2_2019",
     235             :                                                    "MULTILINE=NO", nullptr};
     236         128 :                 char *pszWKT = nullptr;
     237         128 :                 poSRS->exportToWkt(&pszWKT, apszOptions);
     238         128 :                 if (pszWKT)
     239         128 :                     oColumn.Add("crs", pszWKT);
     240         128 :                 CPLFree(pszWKT);
     241             : 
     242         128 :                 const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
     243         128 :                 if (dfCoordEpoch > 0)
     244           2 :                     oColumn.Add("epoch", dfCoordEpoch);
     245             :             }
     246             : 
     247             : #if 0
     248             :             if( m_aoEnvelopes[i].IsInit() &&
     249             :                 CPLTestBool(CPLGetConfigOption(
     250             :                     "OGR_ARROW_WRITE_BBOX", "YES")) )
     251             :             {
     252             :                 CPLJSONArray oBBOX;
     253             :                 oBBOX.Add(m_aoEnvelopes[i].MinX);
     254             :                 oBBOX.Add(m_aoEnvelopes[i].MinY);
     255             :                 oBBOX.Add(m_aoEnvelopes[i].MaxX);
     256             :                 oBBOX.Add(m_aoEnvelopes[i].MaxY);
     257             :                 oColumn.Add("bbox", oBBOX);
     258             :             }
     259             : #endif
     260         134 :             const auto eType = poGeomFieldDefn->GetType();
     261         134 :             if (CPLTestBool(CPLGetConfigOption(
     262         268 :                     "OGR_ARROW_WRITE_GDAL_GEOMETRY_TYPE", "YES")) &&
     263         134 :                 eType == wkbFlatten(eType))
     264             :             {
     265             :                 // Geometry type, place under a temporary "gdal:geometry_type"
     266             :                 // property pending acceptance of proposal at
     267             :                 // https://github.com/opengeospatial/geoparquet/issues/41
     268          56 :                 const char *pszType = "mixed";
     269          56 :                 if (wkbPoint == eType)
     270          19 :                     pszType = "Point";
     271          37 :                 else if (wkbLineString == eType)
     272           7 :                     pszType = "LineString";
     273          30 :                 else if (wkbPolygon == eType)
     274           7 :                     pszType = "Polygon";
     275          23 :                 else if (wkbMultiPoint == eType)
     276           7 :                     pszType = "MultiPoint";
     277          16 :                 else if (wkbMultiLineString == eType)
     278           7 :                     pszType = "MultiLineString";
     279           9 :                 else if (wkbMultiPolygon == eType)
     280           7 :                     pszType = "MultiPolygon";
     281           2 :                 else if (wkbGeometryCollection == eType)
     282           2 :                     pszType = "GeometryCollection";
     283          56 :                 oColumn.Add("gdal:geometry_type", pszType);
     284             :             }
     285             :         }
     286             : 
     287         134 :         auto kvMetadata = m_poSchema->metadata()
     288           2 :                               ? m_poSchema->metadata()->Copy()
     289         270 :                               : std::make_shared<arrow::KeyValueMetadata>();
     290         268 :         kvMetadata->Append("geo",
     291         268 :                            oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
     292         134 :         m_poSchema = m_poSchema->WithMetadata(kvMetadata);
     293         134 :         CPLAssert(m_poSchema);
     294             :     }
     295         141 : }
     296             : 
     297             : /************************************************************************/
     298             : /*                          CreateWriter()                              */
     299             : /************************************************************************/
     300             : 
     301         141 : void OGRFeatherWriterLayer::CreateWriter()
     302             : {
     303         141 :     CPLAssert(m_poFileWriter == nullptr);
     304             : 
     305         141 :     if (m_poSchema == nullptr)
     306             :     {
     307          15 :         CreateSchema();
     308             :     }
     309             :     else
     310             :     {
     311         126 :         FinalizeSchema();
     312             :     }
     313             : 
     314         282 :     auto options = arrow::ipc::IpcWriteOptions::Defaults();
     315         141 :     options.memory_pool = m_poMemoryPool;
     316             : 
     317             :     {
     318         282 :         auto result = arrow::util::Codec::Create(m_eCompression);
     319         141 :         if (!result.ok())
     320             :         {
     321           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     322             :                      "Codec::Create() failed with %s",
     323           0 :                      result.status().message().c_str());
     324             :         }
     325             :         else
     326             :         {
     327         141 :             options.codec.reset(result->release());
     328             :         }
     329             :     }
     330             : 
     331         141 :     if (m_bStreamFormat)
     332             :     {
     333             :         auto result =
     334           8 :             arrow::ipc::MakeStreamWriter(m_poOutputStream, m_poSchema, options);
     335           4 :         if (!result.ok())
     336             :         {
     337           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     338             :                      "arrow::ipc::MakeStreamWriter() failed with %s",
     339           0 :                      result.status().message().c_str());
     340             :         }
     341             :         else
     342             :         {
     343           4 :             m_poFileWriter = *result;
     344             :         }
     345             :     }
     346             :     else
     347             :     {
     348             :         m_poFooterKeyValueMetadata =
     349         137 :             std::make_shared<arrow::KeyValueMetadata>();
     350             :         auto result = arrow::ipc::MakeFileWriter(
     351         411 :             m_poOutputStream, m_poSchema, options, m_poFooterKeyValueMetadata);
     352         137 :         if (!result.ok())
     353             :         {
     354           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     355             :                      "arrow::ipc::MakeFileWriter() failed with %s",
     356           0 :                      result.status().message().c_str());
     357             :         }
     358             :         else
     359             :         {
     360         137 :             m_poFileWriter = *result;
     361             :         }
     362             :     }
     363         141 : }
     364             : 
     365             : /************************************************************************/
     366             : /*               PerformStepsBeforeFinalFlushGroup()                    */
     367             : /************************************************************************/
     368             : 
     369             : // Add a gdal:geo extension metadata for now, which embeds a bbox
     370         141 : void OGRFeatherWriterLayer::PerformStepsBeforeFinalFlushGroup()
     371             : {
     372         278 :     if (m_poFooterKeyValueMetadata &&
     373         273 :         m_poFeatureDefn->GetGeomFieldCount() != 0 &&
     374         132 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GDAL_FOOTER", "YES")))
     375             :     {
     376         258 :         CPLJSONObject oRoot;
     377         129 :         oRoot.Add("primary_column",
     378         129 :                   m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
     379         129 :         CPLJSONObject oColumns;
     380         129 :         oRoot.Add("columns", oColumns);
     381         258 :         for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
     382             :         {
     383         129 :             const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
     384         258 :             CPLJSONObject oColumn;
     385         129 :             oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
     386         129 :             oColumn.Add("encoding",
     387         129 :                         GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
     388             : 
     389         129 :             const auto poSRS = poGeomFieldDefn->GetSpatialRef();
     390         129 :             if (poSRS)
     391             :             {
     392         123 :                 const char *const apszOptions[] = {"FORMAT=WKT2_2019",
     393             :                                                    "MULTILINE=NO", nullptr};
     394         123 :                 char *pszWKT = nullptr;
     395         123 :                 poSRS->exportToWkt(&pszWKT, apszOptions);
     396         123 :                 if (pszWKT)
     397         123 :                     oColumn.Add("crs", pszWKT);
     398         123 :                 CPLFree(pszWKT);
     399             : 
     400         123 :                 const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
     401         123 :                 if (dfCoordEpoch > 0)
     402           1 :                     oColumn.Add("epoch", dfCoordEpoch);
     403             :             }
     404             : 
     405         129 :             if (m_aoEnvelopes[i].IsInit())
     406             :             {
     407         116 :                 CPLJSONArray oBBOX;
     408         116 :                 oBBOX.Add(m_aoEnvelopes[i].MinX);
     409         116 :                 oBBOX.Add(m_aoEnvelopes[i].MinY);
     410         116 :                 oBBOX.Add(m_aoEnvelopes[i].MaxX);
     411         116 :                 oBBOX.Add(m_aoEnvelopes[i].MaxY);
     412         116 :                 oColumn.Add("bbox", oBBOX);
     413             :             }
     414             :         }
     415             : 
     416         258 :         m_poFooterKeyValueMetadata->Append(
     417             :             GDAL_GEO_FOOTER_KEY,
     418         258 :             oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
     419             :     }
     420         141 : }
     421             : 
     422             : /************************************************************************/
     423             : /*                            FlushGroup()                              */
     424             : /************************************************************************/
     425             : 
     426          20 : bool OGRFeatherWriterLayer::FlushGroup()
     427             : {
     428          20 :     std::vector<std::shared_ptr<arrow::Array>> columns;
     429          20 :     auto ret = WriteArrays(
     430         840 :         [&columns](const std::shared_ptr<arrow::Field> &,
     431         840 :                    const std::shared_ptr<arrow::Array> &array)
     432             :         {
     433         840 :             columns.emplace_back(array);
     434         840 :             return true;
     435             :         });
     436             : 
     437          20 :     if (ret)
     438             :     {
     439             :         auto poRecordBatch = arrow::RecordBatch::Make(
     440          60 :             m_poSchema, !columns.empty() ? columns[0]->length() : 0, columns);
     441          40 :         auto status = m_poFileWriter->WriteRecordBatch(*poRecordBatch);
     442          20 :         if (!status.ok())
     443             :         {
     444           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     445             :                      "WriteRecordBatch() failed with %s",
     446           0 :                      status.message().c_str());
     447           0 :             ret = false;
     448             :         }
     449             :     }
     450             : 
     451          20 :     ClearArrayBuilers();
     452          40 :     return ret;
     453             : }
     454             : 
     455             : /************************************************************************/
     456             : /*                          WriteArrowBatch()                           */
     457             : /************************************************************************/
     458             : 
     459             : inline bool
     460         112 : OGRFeatherWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
     461             :                                        struct ArrowArray *array,
     462             :                                        CSLConstList papszOptions)
     463             : {
     464         224 :     return WriteArrowBatchInternal(
     465             :         schema, array, papszOptions,
     466         112 :         [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
     467             :         {
     468         224 :             auto status = m_poFileWriter->WriteRecordBatch(*poBatch);
     469         112 :             if (!status.ok())
     470             :             {
     471           0 :                 CPLError(CE_Failure, CPLE_AppDefined,
     472             :                          "WriteRecordBatch() failed: %s",
     473           0 :                          status.message().c_str());
     474           0 :                 return false;
     475             :             }
     476             : 
     477         112 :             return true;
     478         224 :         });
     479             : }

Generated by: LCOV version 1.14