LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow - ogrfeatherwriterlayer.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 206 230 89.6 %
Date: 2024-05-14 13:00:50 Functions: 13 13 100.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Feather Translator
       4             :  * Purpose:  Implements OGRFeatherDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * Permission is hereby granted, free of charge, to any person obtaining a
      11             :  * copy of this software and associated documentation files (the "Software"),
      12             :  * to deal in the Software without restriction, including without limitation
      13             :  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
      14             :  * and/or sell copies of the Software, and to permit persons to whom the
      15             :  * Software is furnished to do so, subject to the following conditions:
      16             :  *
      17             :  * The above copyright notice and this permission notice shall be included
      18             :  * in all copies or substantial portions of the Software.
      19             :  *
      20             :  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
      21             :  * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      22             :  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
      23             :  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      24             :  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
      25             :  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
      26             :  * DEALINGS IN THE SOFTWARE.
      27             :  ****************************************************************************/
      28             : 
      29             : #include "ogr_feather.h"
      30             : 
      31             : #include "../arrow_common/ograrrowwriterlayer.hpp"
      32             : 
      33             : /************************************************************************/
      34             : /*                      OGRFeatherWriterLayer()                         */
      35             : /************************************************************************/
      36             : 
      37         138 : OGRFeatherWriterLayer::OGRFeatherWriterLayer(
      38             :     GDALDataset *poDS, arrow::MemoryPool *poMemoryPool,
      39             :     const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
      40         138 :     const char *pszLayerName)
      41             :     : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
      42         138 :       m_poDS(poDS)
      43             : {
      44         138 :     m_bWriteFieldArrowExtensionName = true;
      45         138 : }
      46             : 
      47             : /************************************************************************/
      48             : /*                     ~OGRFeatherWriterLayer()                         */
      49             : /************************************************************************/
      50             : 
      51         276 : OGRFeatherWriterLayer::~OGRFeatherWriterLayer()
      52             : {
      53         138 :     if (m_bInitializationOK)
      54         129 :         FinalizeWriting();
      55         276 : }
      56             : 
      57             : /************************************************************************/
      58             : /*                       IsSupportedGeometryType()                      */
      59             : /************************************************************************/
      60             : 
      61         133 : bool OGRFeatherWriterLayer::IsSupportedGeometryType(
      62             :     OGRwkbGeometryType eGType) const
      63             : {
      64         133 :     if (eGType != wkbFlatten(eGType))
      65             :     {
      66             :         const auto osConfigOptionName =
      67         170 :             "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
      68          85 :         if (!CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
      69             :         {
      70           7 :             CPLError(CE_Failure, CPLE_NotSupported,
      71             :                      "Only 2D geometry types are supported (unless the "
      72             :                      "%s configuration option is set to YES)",
      73             :                      osConfigOptionName.c_str());
      74           7 :             return false;
      75             :         }
      76             :     }
      77         126 :     return true;
      78             : }
      79             : 
      80             : /************************************************************************/
      81             : /*                           SetOptions()                               */
      82             : /************************************************************************/
      83             : 
      84         138 : bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
      85             :                                        CSLConstList papszOptions,
      86             :                                        const OGRSpatialReference *poSpatialRef,
      87             :                                        OGRwkbGeometryType eGType)
      88             : {
      89             :     const char *pszDefaultFormat =
      90         138 :         (EQUAL(CPLGetExtension(osFilename.c_str()), "arrows") ||
      91         138 :          STARTS_WITH_CI(osFilename.c_str(), "/vsistdout"))
      92         276 :             ? "STREAM"
      93         138 :             : "FILE";
      94         138 :     m_bStreamFormat =
      95         138 :         EQUAL(CSLFetchNameValueDef(papszOptions, "FORMAT", pszDefaultFormat),
      96             :               "STREAM");
      97             : 
      98             :     const char *pszGeomEncoding =
      99         138 :         CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
     100         138 :     m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
     101         138 :     if (pszGeomEncoding)
     102             :     {
     103         110 :         if (EQUAL(pszGeomEncoding, "WKB"))
     104          32 :             m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
     105          78 :         else if (EQUAL(pszGeomEncoding, "WKT"))
     106          29 :             m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
     107          49 :         else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
     108          24 :             m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
     109          25 :         else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
     110           0 :                  EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
     111          25 :             m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
     112             :         else
     113             :         {
     114           0 :             CPLError(CE_Failure, CPLE_NotSupported,
     115             :                      "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
     116           0 :             return false;
     117             :         }
     118             :     }
     119             : 
     120         138 :     if (eGType != wkbNone)
     121             :     {
     122         133 :         if (!IsSupportedGeometryType(eGType))
     123             :         {
     124           9 :             return false;
     125             :         }
     126             : 
     127         126 :         if (poSpatialRef == nullptr)
     128             :         {
     129           8 :             CPLError(CE_Warning, CPLE_AppDefined,
     130             :                      "Geometry column should have an associated CRS");
     131             :         }
     132             : 
     133         126 :         m_poFeatureDefn->SetGeomType(eGType);
     134         126 :         auto eGeomEncoding = m_eGeomEncoding;
     135         126 :         if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
     136         102 :             eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
     137             :         {
     138          65 :             const auto eEncodingType = eGeomEncoding;
     139          65 :             eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
     140          65 :             if (eGeomEncoding == eEncodingType)
     141           2 :                 return false;
     142             :         }
     143         124 :         m_aeGeomEncoding.push_back(eGeomEncoding);
     144         124 :         m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
     145             :             CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
     146         124 :         if (poSpatialRef)
     147             :         {
     148         118 :             auto poSRS = poSpatialRef->Clone();
     149         118 :             m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
     150         118 :             poSRS->Release();
     151             :         }
     152             :     }
     153             : 
     154         129 :     m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
     155             : 
     156         129 :     const char *pszCompression = CSLFetchNameValue(papszOptions, "COMPRESSION");
     157         129 :     if (pszCompression == nullptr)
     158             :     {
     159         378 :         auto oResult = arrow::util::Codec::GetCompressionType("lz4");
     160         126 :         if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
     161             :         {
     162         126 :             pszCompression = "LZ4";
     163             :         }
     164             :         else
     165             :         {
     166           0 :             pszCompression = "NONE";
     167             :         }
     168             :     }
     169             : 
     170         129 :     if (EQUAL(pszCompression, "NONE"))
     171           0 :         pszCompression = "UNCOMPRESSED";
     172             :     auto oResult = arrow::util::Codec::GetCompressionType(
     173         258 :         CPLString(pszCompression).tolower());
     174         129 :     if (!oResult.ok())
     175             :     {
     176           0 :         CPLError(CE_Failure, CPLE_NotSupported,
     177             :                  "Unrecognized compression method: %s", pszCompression);
     178           0 :         return false;
     179             :     }
     180         129 :     m_eCompression = *oResult;
     181         129 :     if (!arrow::util::Codec::IsAvailable(m_eCompression))
     182             :     {
     183           0 :         CPLError(CE_Failure, CPLE_NotSupported,
     184             :                  "Compression method %s is known, but libarrow has not "
     185             :                  "been built with support for it",
     186             :                  pszCompression);
     187           0 :         return false;
     188             :     }
     189             : 
     190         129 :     const char *pszRowGroupSize = CSLFetchNameValue(papszOptions, "BATCH_SIZE");
     191         129 :     if (pszRowGroupSize)
     192             :     {
     193           3 :         auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
     194           3 :         if (nRowGroupSize > 0)
     195             :         {
     196           3 :             if (nRowGroupSize > INT_MAX)
     197           0 :                 nRowGroupSize = INT_MAX;
     198           3 :             m_nRowGroupSize = nRowGroupSize;
     199             :         }
     200             :     }
     201             : 
     202         129 :     m_bInitializationOK = true;
     203         129 :     return true;
     204             : }
     205             : 
     206             : /************************************************************************/
     207             : /*                         CloseFileWriter()                            */
     208             : /************************************************************************/
     209             : 
     210         129 : bool OGRFeatherWriterLayer::CloseFileWriter()
     211             : {
     212         258 :     auto status = m_poFileWriter->Close();
     213         129 :     if (!status.ok())
     214             :     {
     215           0 :         CPLError(CE_Failure, CPLE_AppDefined,
     216             :                  "FileWriter::Close() failed with %s",
     217           0 :                  status.message().c_str());
     218             :     }
     219         258 :     return status.ok();
     220             : }
     221             : 
     222             : /************************************************************************/
     223             : /*                          CreateSchema()                              */
     224             : /************************************************************************/
     225             : 
     226         129 : void OGRFeatherWriterLayer::CreateSchema()
     227             : {
     228         129 :     CreateSchemaCommon();
     229             : 
     230         253 :     if (m_poFeatureDefn->GetGeomFieldCount() != 0 &&
     231         124 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GEO", "YES")))
     232             :     {
     233         246 :         CPLJSONObject oRoot;
     234         123 :         oRoot.Add("schema_version", "0.1.0");
     235         123 :         oRoot.Add("primary_column",
     236         123 :                   m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
     237         246 :         CPLJSONObject oColumns;
     238         123 :         oRoot.Add("columns", oColumns);
     239         246 :         for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
     240             :         {
     241         123 :             const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
     242         246 :             CPLJSONObject oColumn;
     243         123 :             oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
     244         123 :             oColumn.Add("encoding",
     245         123 :                         GetGeomEncodingAsString(m_aeGeomEncoding[i], false));
     246             : 
     247         123 :             const auto poSRS = poGeomFieldDefn->GetSpatialRef();
     248         123 :             if (poSRS)
     249             :             {
     250         117 :                 const char *const apszOptions[] = {"FORMAT=WKT2_2019",
     251             :                                                    "MULTILINE=NO", nullptr};
     252         117 :                 char *pszWKT = nullptr;
     253         117 :                 poSRS->exportToWkt(&pszWKT, apszOptions);
     254         117 :                 if (pszWKT)
     255         117 :                     oColumn.Add("crs", pszWKT);
     256         117 :                 CPLFree(pszWKT);
     257             : 
     258         117 :                 const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
     259         117 :                 if (dfCoordEpoch > 0)
     260           2 :                     oColumn.Add("epoch", dfCoordEpoch);
     261             :             }
     262             : 
     263             : #if 0
     264             :             if( m_aoEnvelopes[i].IsInit() &&
     265             :                 CPLTestBool(CPLGetConfigOption(
     266             :                     "OGR_ARROW_WRITE_BBOX", "YES")) )
     267             :             {
     268             :                 CPLJSONArray oBBOX;
     269             :                 oBBOX.Add(m_aoEnvelopes[i].MinX);
     270             :                 oBBOX.Add(m_aoEnvelopes[i].MinY);
     271             :                 oBBOX.Add(m_aoEnvelopes[i].MaxX);
     272             :                 oBBOX.Add(m_aoEnvelopes[i].MaxY);
     273             :                 oColumn.Add("bbox", oBBOX);
     274             :             }
     275             : #endif
     276         123 :             const auto eType = poGeomFieldDefn->GetType();
     277         123 :             if (CPLTestBool(CPLGetConfigOption(
     278         246 :                     "OGR_ARROW_WRITE_GDAL_GEOMETRY_TYPE", "YES")) &&
     279         123 :                 eType == wkbFlatten(eType))
     280             :             {
     281             :                 // Geometry type, place under a temporary "gdal:geometry_type"
     282             :                 // property pending acceptance of proposal at
     283             :                 // https://github.com/opengeospatial/geoparquet/issues/41
     284          45 :                 const char *pszType = "mixed";
     285          45 :                 if (wkbPoint == eType)
     286          18 :                     pszType = "Point";
     287          27 :                 else if (wkbLineString == eType)
     288           5 :                     pszType = "LineString";
     289          22 :                 else if (wkbPolygon == eType)
     290           5 :                     pszType = "Polygon";
     291          17 :                 else if (wkbMultiPoint == eType)
     292           5 :                     pszType = "MultiPoint";
     293          12 :                 else if (wkbMultiLineString == eType)
     294           5 :                     pszType = "MultiLineString";
     295           7 :                 else if (wkbMultiPolygon == eType)
     296           5 :                     pszType = "MultiPolygon";
     297           2 :                 else if (wkbGeometryCollection == eType)
     298           2 :                     pszType = "GeometryCollection";
     299          45 :                 oColumn.Add("gdal:geometry_type", pszType);
     300             :             }
     301             :         }
     302             : 
     303         123 :         auto kvMetadata = m_poSchema->metadata()
     304           5 :                               ? m_poSchema->metadata()->Copy()
     305         251 :                               : std::make_shared<arrow::KeyValueMetadata>();
     306         246 :         kvMetadata->Append("geo",
     307         246 :                            oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
     308         123 :         m_poSchema = m_poSchema->WithMetadata(kvMetadata);
     309         123 :         CPLAssert(m_poSchema);
     310             :     }
     311         129 : }
     312             : 
     313             : /************************************************************************/
     314             : /*                          CreateWriter()                              */
     315             : /************************************************************************/
     316             : 
     317         129 : void OGRFeatherWriterLayer::CreateWriter()
     318             : {
     319         129 :     CPLAssert(m_poFileWriter == nullptr);
     320             : 
     321         129 :     if (m_poSchema == nullptr)
     322             :     {
     323           3 :         CreateSchema();
     324             :     }
     325             :     else
     326             :     {
     327         126 :         FinalizeSchema();
     328             :     }
     329             : 
     330         258 :     auto options = arrow::ipc::IpcWriteOptions::Defaults();
     331         129 :     options.memory_pool = m_poMemoryPool;
     332             : 
     333             :     {
     334         258 :         auto result = arrow::util::Codec::Create(m_eCompression);
     335         129 :         if (!result.ok())
     336             :         {
     337           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     338             :                      "Codec::Create() failed with %s",
     339           0 :                      result.status().message().c_str());
     340             :         }
     341             :         else
     342             :         {
     343         129 :             options.codec.reset(result->release());
     344             :         }
     345             :     }
     346             : 
     347         129 :     if (m_bStreamFormat)
     348             :     {
     349             :         auto result =
     350           6 :             arrow::ipc::MakeStreamWriter(m_poOutputStream, m_poSchema, options);
     351           3 :         if (!result.ok())
     352             :         {
     353           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     354             :                      "arrow::ipc::MakeStreamWriter() failed with %s",
     355           0 :                      result.status().message().c_str());
     356             :         }
     357             :         else
     358             :         {
     359           3 :             m_poFileWriter = *result;
     360             :         }
     361             :     }
     362             :     else
     363             :     {
     364             :         m_poFooterKeyValueMetadata =
     365         126 :             std::make_shared<arrow::KeyValueMetadata>();
     366             :         auto result = arrow::ipc::MakeFileWriter(
     367         378 :             m_poOutputStream, m_poSchema, options, m_poFooterKeyValueMetadata);
     368         126 :         if (!result.ok())
     369             :         {
     370           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     371             :                      "arrow::ipc::MakeFileWriter() failed with %s",
     372           0 :                      result.status().message().c_str());
     373             :         }
     374             :         else
     375             :         {
     376         126 :             m_poFileWriter = *result;
     377             :         }
     378             :     }
     379         129 : }
     380             : 
     381             : /************************************************************************/
     382             : /*               PerformStepsBeforeFinalFlushGroup()                    */
     383             : /************************************************************************/
     384             : 
     385             : // Add a gdal:geo extension metadata for now, which embeds a bbox
     386         129 : void OGRFeatherWriterLayer::PerformStepsBeforeFinalFlushGroup()
     387             : {
     388         255 :     if (m_poFooterKeyValueMetadata &&
     389         250 :         m_poFeatureDefn->GetGeomFieldCount() != 0 &&
     390         121 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GDAL_FOOTER", "YES")))
     391             :     {
     392         236 :         CPLJSONObject oRoot;
     393         118 :         oRoot.Add("primary_column",
     394         118 :                   m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
     395         118 :         CPLJSONObject oColumns;
     396         118 :         oRoot.Add("columns", oColumns);
     397         236 :         for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
     398             :         {
     399         118 :             const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
     400         236 :             CPLJSONObject oColumn;
     401         118 :             oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
     402         118 :             oColumn.Add("encoding",
     403         118 :                         GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
     404             : 
     405         118 :             const auto poSRS = poGeomFieldDefn->GetSpatialRef();
     406         118 :             if (poSRS)
     407             :             {
     408         112 :                 const char *const apszOptions[] = {"FORMAT=WKT2_2019",
     409             :                                                    "MULTILINE=NO", nullptr};
     410         112 :                 char *pszWKT = nullptr;
     411         112 :                 poSRS->exportToWkt(&pszWKT, apszOptions);
     412         112 :                 if (pszWKT)
     413         112 :                     oColumn.Add("crs", pszWKT);
     414         112 :                 CPLFree(pszWKT);
     415             : 
     416         112 :                 const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
     417         112 :                 if (dfCoordEpoch > 0)
     418           1 :                     oColumn.Add("epoch", dfCoordEpoch);
     419             :             }
     420             : 
     421         118 :             if (m_aoEnvelopes[i].IsInit())
     422             :             {
     423         117 :                 CPLJSONArray oBBOX;
     424         117 :                 oBBOX.Add(m_aoEnvelopes[i].MinX);
     425         117 :                 oBBOX.Add(m_aoEnvelopes[i].MinY);
     426         117 :                 oBBOX.Add(m_aoEnvelopes[i].MaxX);
     427         117 :                 oBBOX.Add(m_aoEnvelopes[i].MaxY);
     428         117 :                 oColumn.Add("bbox", oBBOX);
     429             :             }
     430             :         }
     431             : 
     432         236 :         m_poFooterKeyValueMetadata->Append(
     433             :             GDAL_GEO_FOOTER_KEY,
     434         236 :             oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
     435             :     }
     436         129 : }
     437             : 
     438             : /************************************************************************/
     439             : /*                            FlushGroup()                              */
     440             : /************************************************************************/
     441             : 
     442          19 : bool OGRFeatherWriterLayer::FlushGroup()
     443             : {
     444          19 :     std::vector<std::shared_ptr<arrow::Array>> columns;
     445          19 :     auto ret = WriteArrays(
     446         839 :         [&columns](const std::shared_ptr<arrow::Field> &,
     447         839 :                    const std::shared_ptr<arrow::Array> &array)
     448             :         {
     449         839 :             columns.emplace_back(array);
     450         839 :             return true;
     451             :         });
     452             : 
     453          19 :     if (ret)
     454             :     {
     455             :         auto poRecordBatch = arrow::RecordBatch::Make(
     456          57 :             m_poSchema, !columns.empty() ? columns[0]->length() : 0, columns);
     457          38 :         auto status = m_poFileWriter->WriteRecordBatch(*poRecordBatch);
     458          19 :         if (!status.ok())
     459             :         {
     460           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     461             :                      "WriteRecordBatch() failed with %s",
     462           0 :                      status.message().c_str());
     463           0 :             ret = false;
     464             :         }
     465             :     }
     466             : 
     467          19 :     ClearArrayBuilers();
     468          38 :     return ret;
     469             : }
     470             : 
     471             : /************************************************************************/
     472             : /*                          WriteArrowBatch()                           */
     473             : /************************************************************************/
     474             : 
     475             : inline bool
     476         113 : OGRFeatherWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
     477             :                                        struct ArrowArray *array,
     478             :                                        CSLConstList papszOptions)
     479             : {
     480         226 :     return WriteArrowBatchInternal(
     481             :         schema, array, papszOptions,
     482         113 :         [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
     483             :         {
     484         226 :             auto status = m_poFileWriter->WriteRecordBatch(*poBatch);
     485         113 :             if (!status.ok())
     486             :             {
     487           0 :                 CPLError(CE_Failure, CPLE_AppDefined,
     488             :                          "WriteRecordBatch() failed: %s",
     489           0 :                          status.message().c_str());
     490           0 :                 return false;
     491             :             }
     492             : 
     493         113 :             return true;
     494         226 :         });
     495             : }

Generated by: LCOV version 1.14