LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow - ogrfeatherdriver.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 171 202 84.7 %
Date: 2024-05-14 13:00:50 Functions: 7 7 100.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Feather Translator
       4             :  * Purpose:  Implements OGRFeatherDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * Permission is hereby granted, free of charge, to any person obtaining a
      11             :  * copy of this software and associated documentation files (the "Software"),
      12             :  * to deal in the Software without restriction, including without limitation
      13             :  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
      14             :  * and/or sell copies of the Software, and to permit persons to whom the
      15             :  * Software is furnished to do so, subject to the following conditions:
      16             :  *
      17             :  * The above copyright notice and this permission notice shall be included
      18             :  * in all copies or substantial portions of the Software.
      19             :  *
      20             :  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
      21             :  * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      22             :  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
      23             :  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      24             :  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
      25             :  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
      26             :  * DEALINGS IN THE SOFTWARE.
      27             :  ****************************************************************************/
      28             : 
      29             : #include "gdal_pam.h"
      30             : #include "ogrsf_frmts.h"
      31             : 
      32             : #include <map>
      33             : 
      34             : #include "ogr_feather.h"
      35             : #include "../arrow_common/ograrrowrandomaccessfile.h"
      36             : #include "../arrow_common/ograrrowwritablefile.h"
      37             : #include "../arrow_common/ograrrowdataset.hpp"
      38             : 
      39             : #include "ogrfeatherdrivercore.h"
      40             : 
      41             : /************************************************************************/
      42             : /*                        IsArrowIPCStream()                            */
      43             : /************************************************************************/
      44             : 
      45         543 : static bool IsArrowIPCStream(GDALOpenInfo *poOpenInfo)
      46             : {
      47             :     // WARNING: if making changes in that method, reflect them in
      48             :     // OGRFeatherDriverIsArrowIPCStreamBasic() in ogrfeatherdrivercore.cpp
      49             : 
      50         543 :     if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
      51           3 :         return true;
      52             : 
      53         540 :     constexpr int CONTINUATION_SIZE = 4;  // 0xFFFFFFFF
      54         540 :     constexpr int METADATA_SIZE_SIZE = 4;
      55             : 
      56             :     // See
      57             :     // https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
      58         540 :     if (poOpenInfo->fpL != nullptr &&
      59         540 :         poOpenInfo->nHeaderBytes >= CONTINUATION_SIZE + METADATA_SIZE_SIZE &&
      60         540 :         memcmp(poOpenInfo->pabyHeader, "\xFF\xFF\xFF\xFF", CONTINUATION_SIZE) ==
      61             :             0)
      62             :     {
      63           8 :         const char *pszExt = CPLGetExtension(poOpenInfo->pszFilename);
      64           8 :         if (EQUAL(pszExt, "arrows") || EQUAL(pszExt, "ipc"))
      65           3 :             return true;
      66             : 
      67           5 :         const uint32_t nMetadataSize =
      68           5 :             CPL_LSBUINT32PTR(poOpenInfo->pabyHeader + CONTINUATION_SIZE);
      69           5 :         if (strcmp(poOpenInfo->pszFilename, "/vsistdin/") == 0)
      70             :         {
      71             :             // Padding after metadata and before body is not necessarily present
      72             :             // but the body must be at least 4 bytes
      73           0 :             constexpr int PADDING_MAX_SIZE = 4;
      74             : 
      75             :             // /vsistdin/ cannot seek back beyond first MB
      76           0 :             if (nMetadataSize >
      77             :                 1024 * 1024 -
      78             :                     (CONTINUATION_SIZE + METADATA_SIZE_SIZE + PADDING_MAX_SIZE))
      79             :             {
      80           0 :                 return false;
      81             :             }
      82           0 :             const int nSizeToRead = CONTINUATION_SIZE + METADATA_SIZE_SIZE +
      83           0 :                                     nMetadataSize + PADDING_MAX_SIZE;
      84           0 :             if (!poOpenInfo->TryToIngest(nSizeToRead))
      85             :             {
      86           0 :                 return false;
      87             :             }
      88             : 
      89             :             const std::string osTmpFilename(
      90           0 :                 CPLSPrintf("/vsimem/_arrow/%p", poOpenInfo));
      91             :             auto fp = VSIVirtualHandleUniquePtr(VSIFileFromMemBuffer(
      92             :                 osTmpFilename.c_str(), poOpenInfo->pabyHeader, nSizeToRead,
      93           0 :                 false));
      94             :             auto infile =
      95           0 :                 std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
      96           0 :             auto options = arrow::ipc::IpcReadOptions::Defaults();
      97             :             auto result =
      98           0 :                 arrow::ipc::RecordBatchStreamReader::Open(infile, options);
      99           0 :             CPLDebug("ARROW", "RecordBatchStreamReader::Open(): %s",
     100           0 :                      result.status().message().c_str());
     101           0 :             VSIUnlink(osTmpFilename.c_str());
     102           0 :             return result.ok();
     103             :         }
     104             : 
     105           5 :         VSIFSeekL(poOpenInfo->fpL, 0, SEEK_END);
     106           5 :         const auto nFileSize = VSIFTellL(poOpenInfo->fpL);
     107           5 :         VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
     108           5 :         if (nMetadataSize >
     109           5 :             nFileSize - (CONTINUATION_SIZE + METADATA_SIZE_SIZE))
     110           0 :             return false;
     111             : 
     112             :         // Do not give ownership of poOpenInfo->fpL to infile
     113             :         auto infile =
     114          10 :             std::make_shared<OGRArrowRandomAccessFile>(poOpenInfo->fpL, false);
     115          10 :         auto options = arrow::ipc::IpcReadOptions::Defaults();
     116             :         auto result =
     117          10 :             arrow::ipc::RecordBatchStreamReader::Open(infile, options);
     118           5 :         VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
     119           5 :         return result.ok();
     120             :     }
     121         532 :     return false;
     122             : }
     123             : 
     124             : /************************************************************************/
     125             : /*                                Open()                                */
     126             : /************************************************************************/
     127             : 
     128         543 : static GDALDataset *OGRFeatherDriverOpen(GDALOpenInfo *poOpenInfo)
     129             : {
     130         543 :     if (poOpenInfo->eAccess == GA_Update)
     131             :     {
     132           0 :         return nullptr;
     133             :     }
     134             : 
     135         543 :     const bool bIsStreamingFormat = IsArrowIPCStream(poOpenInfo);
     136         543 :     if (!bIsStreamingFormat && !OGRFeatherDriverIsArrowFileFormat(poOpenInfo))
     137             :     {
     138           0 :         return nullptr;
     139             :     }
     140             : 
     141         543 :     std::shared_ptr<arrow::io::RandomAccessFile> infile;
     142         543 :     if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
     143             :     {
     144           3 :         const std::string osFilename(poOpenInfo->pszFilename +
     145           3 :                                      strlen("ARROW_IPC_STREAM:"));
     146             :         auto fp =
     147           3 :             VSIVirtualHandleUniquePtr(VSIFOpenL(osFilename.c_str(), "rb"));
     148           3 :         if (fp == nullptr)
     149             :         {
     150           1 :             CPLError(CE_Failure, CPLE_FileIO, "Cannot open %s",
     151             :                      osFilename.c_str());
     152           1 :             return nullptr;
     153             :         }
     154           2 :         infile = std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
     155             :     }
     156         943 :     else if (STARTS_WITH(poOpenInfo->pszFilename, "/vsi") ||
     157         403 :              CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "NO")))
     158             :     {
     159         137 :         VSIVirtualHandleUniquePtr fp(poOpenInfo->fpL);
     160         137 :         poOpenInfo->fpL = nullptr;
     161         137 :         infile = std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
     162             :     }
     163             :     else
     164             :     {
     165         806 :         auto result = arrow::io::ReadableFile::Open(poOpenInfo->pszFilename);
     166         403 :         if (!result.ok())
     167             :         {
     168           0 :             CPLError(CE_Failure, CPLE_AppDefined,
     169             :                      "ReadableFile::Open() failed with %s",
     170           0 :                      result.status().message().c_str());
     171           0 :             return nullptr;
     172             :         }
     173         403 :         infile = *result;
     174             :     }
     175             : 
     176             :     auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
     177        1084 :         arrow::MemoryPool::CreateDefault().release());
     178        1084 :     auto options = arrow::ipc::IpcReadOptions::Defaults();
     179         542 :     options.memory_pool = poMemoryPool.get();
     180             : 
     181        1084 :     auto poDS = std::make_unique<OGRFeatherDataset>(poMemoryPool);
     182         542 :     if (bIsStreamingFormat)
     183             :     {
     184             :         auto result =
     185          10 :             arrow::ipc::RecordBatchStreamReader::Open(infile, options);
     186          10 :         if (!result.ok())
     187             :         {
     188           1 :             CPLError(CE_Failure, CPLE_AppDefined,
     189             :                      "RecordBatchStreamReader::Open() failed with %s",
     190           1 :                      result.status().message().c_str());
     191           1 :             return nullptr;
     192             :         }
     193          18 :         auto poRecordBatchStreamReader = *result;
     194           9 :         const bool bSeekable =
     195          16 :             !STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:") &&
     196           7 :             strcmp(poOpenInfo->pszFilename, "/vsistdin/") != 0;
     197          18 :         std::string osLayername = CPLGetBasename(poOpenInfo->pszFilename);
     198           9 :         if (osLayername.empty())
     199           0 :             osLayername = "layer";
     200             :         auto poLayer = std::make_unique<OGRFeatherLayer>(
     201           9 :             poDS.get(), osLayername.c_str(), infile, bSeekable, options,
     202          18 :             poRecordBatchStreamReader);
     203           9 :         poDS->SetLayer(std::move(poLayer));
     204             : 
     205             :         // Pre-load field domains, as this depends on the first record batch
     206           9 :         auto poLayerPtr = poDS->GetLayer(0);
     207           9 :         const auto poFeatureDefn = poLayerPtr->GetLayerDefn();
     208           9 :         bool bHasReadBatch = false;
     209         534 :         for (int i = 0; i < poFeatureDefn->GetFieldCount(); ++i)
     210             :         {
     211         525 :             const auto poFieldDefn = poFeatureDefn->GetFieldDefn(i);
     212         525 :             const auto &osDomainName = poFieldDefn->GetDomainName();
     213         525 :             if (!osDomainName.empty())
     214             :             {
     215           6 :                 if (!bHasReadBatch)
     216             :                 {
     217           6 :                     bHasReadBatch = true;
     218           6 :                     delete poLayerPtr->GetNextFeature();
     219           6 :                     poLayerPtr->ResetReading();
     220             :                 }
     221           6 :                 poDS->GetFieldDomain(osDomainName);
     222             :             }
     223             :         }
     224             :     }
     225             :     else
     226             :     {
     227         532 :         auto result = arrow::ipc::RecordBatchFileReader::Open(infile, options);
     228         532 :         if (!result.ok())
     229             :         {
     230           1 :             CPLError(CE_Failure, CPLE_AppDefined,
     231             :                      "RecordBatchFileReader::Open() failed with %s",
     232           1 :                      result.status().message().c_str());
     233           1 :             return nullptr;
     234             :         }
     235        1062 :         auto poRecordBatchReader = *result;
     236             :         auto poLayer = std::make_unique<OGRFeatherLayer>(
     237         531 :             poDS.get(), CPLGetBasename(poOpenInfo->pszFilename),
     238         531 :             poRecordBatchReader);
     239         531 :         poDS->SetLayer(std::move(poLayer));
     240             :     }
     241         540 :     return poDS.release();
     242             : }
     243             : 
     244             : /************************************************************************/
     245             : /*                               Create()                               */
     246             : /************************************************************************/
     247             : 
     248         139 : static GDALDataset *OGRFeatherDriverCreate(const char *pszName, int nXSize,
     249             :                                            int nYSize, int nBands,
     250             :                                            GDALDataType eType,
     251             :                                            char ** /* papszOptions */)
     252             : {
     253         139 :     if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
     254           0 :         return nullptr;
     255             : 
     256         139 :     std::shared_ptr<arrow::io::OutputStream> out_file;
     257         144 :     if (STARTS_WITH(pszName, "/vsi") ||
     258           5 :         CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "YES")))
     259             :     {
     260         139 :         VSILFILE *fp = VSIFOpenL(pszName, "wb");
     261         139 :         if (fp == nullptr)
     262             :         {
     263           1 :             CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
     264           1 :             return nullptr;
     265             :         }
     266         138 :         out_file = std::make_shared<OGRArrowWritableFile>(fp);
     267             :     }
     268             :     else
     269             :     {
     270           0 :         auto result = arrow::io::FileOutputStream::Open(pszName);
     271           0 :         if (!result.ok())
     272             :         {
     273           0 :             CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s: %s", pszName,
     274           0 :                      result.status().message().c_str());
     275           0 :             return nullptr;
     276             :         }
     277           0 :         out_file = *result;
     278             :     }
     279             : 
     280         138 :     return new OGRFeatherWriterDataset(pszName, out_file);
     281             : }
     282             : 
     283             : /************************************************************************/
     284             : /*                         OGRFeatherDriver()                           */
     285             : /************************************************************************/
     286             : 
     287             : class OGRFeatherDriver final : public GDALDriver
     288             : {
     289             :     bool m_bMetadataInitialized = false;
     290             :     void InitMetadata();
     291             : 
     292             :   public:
     293        1347 :     const char *GetMetadataItem(const char *pszName,
     294             :                                 const char *pszDomain) override
     295             :     {
     296        1347 :         if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
     297             :         {
     298         259 :             InitMetadata();
     299             :         }
     300        1347 :         return GDALDriver::GetMetadataItem(pszName, pszDomain);
     301             :     }
     302             : 
     303         133 :     char **GetMetadata(const char *pszDomain) override
     304             :     {
     305         133 :         InitMetadata();
     306         133 :         return GDALDriver::GetMetadata(pszDomain);
     307             :     }
     308             : };
     309             : 
     310         392 : void OGRFeatherDriver::InitMetadata()
     311             : {
     312         392 :     if (m_bMetadataInitialized)
     313         385 :         return;
     314           7 :     m_bMetadataInitialized = true;
     315             : 
     316             :     CPLXMLTreeCloser oTree(
     317          14 :         CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
     318             : 
     319          14 :     std::vector<const char *> apszCompressionMethods;
     320           7 :     bool bHasLZ4 = false;
     321          21 :     for (const char *pszMethod : {"ZSTD", "LZ4"})
     322             :     {
     323             :         auto oResult = arrow::util::Codec::GetCompressionType(
     324          28 :             CPLString(pszMethod).tolower());
     325          14 :         if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
     326             :         {
     327          14 :             if (EQUAL(pszMethod, "LZ4"))
     328           7 :                 bHasLZ4 = true;
     329          14 :             apszCompressionMethods.emplace_back(pszMethod);
     330             :         }
     331             :     }
     332             : 
     333             :     {
     334           7 :         auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
     335           7 :         CPLAddXMLAttributeAndValue(psOption, "name", "FORMAT");
     336           7 :         CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
     337           7 :         CPLAddXMLAttributeAndValue(psOption, "description",
     338             :                                    "File format variant");
     339          21 :         for (const char *pszEncoding : {"FILE", "STREAM"})
     340             :         {
     341          14 :             auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
     342          14 :             CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
     343             :         }
     344             :     }
     345             : 
     346             :     {
     347           7 :         auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
     348           7 :         CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
     349           7 :         CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
     350           7 :         CPLAddXMLAttributeAndValue(psOption, "description",
     351             :                                    "Compression method");
     352           7 :         CPLAddXMLAttributeAndValue(psOption, "default",
     353             :                                    bHasLZ4 ? "LZ4" : "NONE");
     354             :         {
     355           7 :             auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
     356           7 :             CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
     357           7 :             CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
     358             :         }
     359          21 :         for (const char *pszMethod : apszCompressionMethods)
     360             :         {
     361          14 :             auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
     362          14 :             CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
     363             :         }
     364             :     }
     365             : 
     366             :     {
     367           7 :         auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
     368           7 :         CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
     369           7 :         CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
     370           7 :         CPLAddXMLAttributeAndValue(psOption, "description",
     371             :                                    "Encoding of geometry columns");
     372           7 :         CPLAddXMLAttributeAndValue(psOption, "default", "GEOARROW");
     373          28 :         for (const char *pszEncoding :
     374          35 :              {"GEOARROW", "GEOARROW_INTERLEAVED", "WKB", "WKT"})
     375             :         {
     376          28 :             auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
     377          28 :             CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
     378          28 :             if (EQUAL(pszEncoding, "GEOARROW"))
     379           7 :                 CPLAddXMLAttributeAndValue(poValueNode, "alias",
     380             :                                            "GEOARROW_STRUCT");
     381             :         }
     382             :     }
     383             : 
     384             :     {
     385           7 :         auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
     386           7 :         CPLAddXMLAttributeAndValue(psOption, "name", "BATCH_SIZE");
     387           7 :         CPLAddXMLAttributeAndValue(psOption, "type", "integer");
     388           7 :         CPLAddXMLAttributeAndValue(psOption, "description",
     389             :                                    "Maximum number of rows per batch");
     390           7 :         CPLAddXMLAttributeAndValue(psOption, "default", "65536");
     391             :     }
     392             : 
     393             :     {
     394           7 :         auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
     395           7 :         CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
     396           7 :         CPLAddXMLAttributeAndValue(psOption, "type", "string");
     397           7 :         CPLAddXMLAttributeAndValue(psOption, "description",
     398             :                                    "Name of geometry column");
     399           7 :         CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
     400             :     }
     401             : 
     402             :     {
     403           7 :         auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
     404           7 :         CPLAddXMLAttributeAndValue(psOption, "name", "FID");
     405           7 :         CPLAddXMLAttributeAndValue(psOption, "type", "string");
     406           7 :         CPLAddXMLAttributeAndValue(psOption, "description",
     407             :                                    "Name of the FID column to create");
     408             :     }
     409             : 
     410           7 :     char *pszXML = CPLSerializeXMLTree(oTree.get());
     411           7 :     GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
     412           7 :     CPLFree(pszXML);
     413             : }
     414             : 
     415             : /************************************************************************/
     416             : /*                         RegisterOGRArrow()                           */
     417             : /************************************************************************/
     418             : 
     419          11 : void RegisterOGRArrow()
     420             : {
     421          11 :     if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
     422           0 :         return;
     423             : 
     424          22 :     auto poDriver = std::make_unique<OGRFeatherDriver>();
     425             : 
     426          11 :     OGRFeatherDriverSetCommonMetadata(poDriver.get());
     427             : 
     428          11 :     poDriver->pfnOpen = OGRFeatherDriverOpen;
     429          11 :     poDriver->pfnCreate = OGRFeatherDriverCreate;
     430             : 
     431          11 :     GetGDALDriverManager()->RegisterDriver(poDriver.release());
     432             : }

Generated by: LCOV version 1.14