LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/parquet - ogr_parquet.h (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 20 22 90.9 %
Date: 2024-05-14 23:54:21 Functions: 10 11 90.9 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Parquet Translator
       4             :  * Purpose:  Implements OGRParquetDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * Permission is hereby granted, free of charge, to any person obtaining a
      11             :  * copy of this software and associated documentation files (the "Software"),
      12             :  * to deal in the Software without restriction, including without limitation
      13             :  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
      14             :  * and/or sell copies of the Software, and to permit persons to whom the
      15             :  * Software is furnished to do so, subject to the following conditions:
      16             :  *
      17             :  * The above copyright notice and this permission notice shall be included
      18             :  * in all copies or substantial portions of the Software.
      19             :  *
      20             :  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
      21             :  * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      22             :  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
      23             :  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      24             :  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
      25             :  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
      26             :  * DEALINGS IN THE SOFTWARE.
      27             :  ****************************************************************************/
      28             : 
      29             : #ifndef OGR_PARQUET_H
      30             : #define OGR_PARQUET_H
      31             : 
      32             : #include "ogrsf_frmts.h"
      33             : 
      34             : #include <functional>
      35             : #include <map>
      36             : 
      37             : #include "../arrow_common/ogr_arrow.h"
      38             : #include "ogr_include_parquet.h"
      39             : 
      40             : /************************************************************************/
      41             : /*                       OGRParquetLayerBase                            */
      42             : /************************************************************************/
      43             : 
      44             : class OGRParquetDataset;
      45             : 
      46             : class OGRParquetLayerBase CPL_NON_FINAL : public OGRArrowLayer
      47             : {
      48             :     OGRParquetLayerBase(const OGRParquetLayerBase &) = delete;
      49             :     OGRParquetLayerBase &operator=(const OGRParquetLayerBase &) = delete;
      50             : 
      51             :   protected:
      52             :     OGRParquetLayerBase(OGRParquetDataset *poDS, const char *pszLayerName,
      53             :                         CSLConstList papszOpenOptions);
      54             : 
      55             :     OGRParquetDataset *m_poDS = nullptr;
      56             :     std::shared_ptr<arrow::RecordBatchReader> m_poRecordBatchReader{};
      57             :     CPLStringList m_aosGeomPossibleNames{};
      58             :     std::string m_osCRS{};
      59             : 
      60             :     void LoadGeoMetadata(
      61             :         const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata);
      62             :     bool DealWithGeometryColumn(
      63             :         int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
      64             :         std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun);
      65             : 
      66             :   public:
      67             :     int TestCapability(const char *) override;
      68             : 
      69             :     void ResetReading() override;
      70             : 
      71             :     GDALDataset *GetDataset() override;
      72             : };
      73             : 
      74             : /************************************************************************/
      75             : /*                        OGRParquetLayer                               */
      76             : /************************************************************************/
      77             : 
      78             : class OGRParquetLayer final : public OGRParquetLayerBase
      79             : 
      80             : {
      81             :     std::unique_ptr<parquet::arrow::FileReader> m_poArrowReader{};
      82             :     bool m_bSingleBatch = false;
      83             :     int m_iFIDParquetColumn = -1;
      84             :     std::shared_ptr<arrow::DataType> m_poFIDType{};
      85             :     std::vector<std::shared_ptr<arrow::DataType>>
      86             :         m_apoArrowDataTypes{};  // .size() == field ocunt
      87             :     std::vector<int> m_anMapFieldIndexToParquetColumn{};
      88             :     std::vector<std::vector<int>> m_anMapGeomFieldIndexToParquetColumns{};
      89             :     bool m_bHasMissingMappingToParquet = false;
      90             : 
      91             :     //! Contains pairs of (selected feature idx, total feature idx) break points.
      92             :     std::vector<std::pair<int64_t, int64_t>> m_asFeatureIdxRemapping{};
      93             :     //! Iterator over m_asFeatureIdxRemapping
      94             :     std::vector<std::pair<int64_t, int64_t>>::iterator
      95             :         m_oFeatureIdxRemappingIter{};
      96             :     //! Feature index among the potentially restricted set of selected row groups
      97             :     int64_t m_nFeatureIdxSelected = 0;
      98             :     std::vector<int> m_anRequestedParquetColumns{};  // only valid when
      99             :                                                      // m_bIgnoredFields is set
     100             : #ifdef DEBUG
     101             :     int m_nExpectedBatchColumns =
     102             :         0;  // Should be equal to m_poBatch->num_columns() (when
     103             :             // m_bIgnoredFields is set)
     104             : #endif
     105             :     CPLStringList m_aosFeatherMetadata{};
     106             : 
     107             :     //! Describe the bbox column of a geometry column
     108             :     struct GeomColBBOXParquet
     109             :     {
     110             :         int iParquetXMin = -1;
     111             :         int iParquetYMin = -1;
     112             :         int iParquetXMax = -1;
     113             :         int iParquetYMax = -1;
     114             :         std::vector<int> anParquetCols{};
     115             :     };
     116             : 
     117             :     //! Map from OGR geometry field index to GeomColBBOXParquet
     118             :     std::map<int, GeomColBBOXParquet>
     119             :         m_oMapGeomFieldIndexToGeomColBBOXParquet{};
     120             : 
     121             :     void EstablishFeatureDefn();
     122             :     void ProcessGeometryColumnCovering(
     123             :         const std::shared_ptr<arrow::Field> &field,
     124             :         const CPLJSONObject &oJSONGeometryColumn,
     125             :         const std::map<std::string, int> &oMapParquetColumnNameToIdx);
     126             :     bool CreateRecordBatchReader(int iStartingRowGroup);
     127             :     bool CreateRecordBatchReader(const std::vector<int> &anRowGroups);
     128             :     bool ReadNextBatch() override;
     129             : 
     130             :     void InvalidateCachedBatches() override;
     131             : 
     132             :     OGRwkbGeometryType ComputeGeometryColumnType(int iGeomCol,
     133             :                                                  int iParquetCol) const;
     134             :     void CreateFieldFromSchema(
     135             :         const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
     136             :         int &iParquetCol, const std::vector<int> &path,
     137             :         const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
     138             :             &oMapFieldNameToGDALSchemaFieldDefn);
     139             :     bool CheckMatchArrowParquetColumnNames(
     140             :         int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const;
     141             :     OGRFeature *GetFeatureExplicitFID(GIntBig nFID);
     142             :     OGRFeature *GetFeatureByIndex(GIntBig nFID);
     143             : 
     144        1757 :     virtual std::string GetDriverUCName() const override
     145             :     {
     146        1757 :         return "PARQUET";
     147             :     }
     148             : 
     149             :     bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
     150             : 
     151             :     void IncrFeatureIdx() override;
     152             : 
     153             :   public:
     154             :     OGRParquetLayer(OGRParquetDataset *poDS, const char *pszLayerName,
     155             :                     std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
     156             :                     CSLConstList papszOpenOptions);
     157             : 
     158             :     void ResetReading() override;
     159             :     OGRFeature *GetFeature(GIntBig nFID) override;
     160             :     GIntBig GetFeatureCount(int bForce) override;
     161             :     int TestCapability(const char *pszCap) override;
     162             :     OGRErr SetIgnoredFields(CSLConstList papszFields) override;
     163             :     const char *GetMetadataItem(const char *pszName,
     164             :                                 const char *pszDomain = "") override;
     165             :     char **GetMetadata(const char *pszDomain = "") override;
     166             :     OGRErr SetNextByIndex(GIntBig nIndex) override;
     167             : 
     168             :     bool GetArrowStream(struct ArrowArrayStream *out_stream,
     169             :                         CSLConstList papszOptions = nullptr) override;
     170             : 
     171             :     std::unique_ptr<OGRFieldDomain> BuildDomain(const std::string &osDomainName,
     172             :                                                 int iFieldIndex) const override;
     173             : 
     174        1249 :     parquet::arrow::FileReader *GetReader() const
     175             :     {
     176        1249 :         return m_poArrowReader.get();
     177             :     }
     178             : 
     179         286 :     const std::vector<int> &GetMapFieldIndexToParquetColumn() const
     180             :     {
     181         286 :         return m_anMapFieldIndexToParquetColumn;
     182             :     }
     183             : 
     184             :     const std::vector<std::shared_ptr<arrow::DataType>> &
     185         237 :     GetArrowFieldTypes() const
     186             :     {
     187         237 :         return m_apoArrowDataTypes;
     188             :     }
     189             : 
     190           2 :     int GetFIDParquetColumn() const
     191             :     {
     192           2 :         return m_iFIDParquetColumn;
     193             :     }
     194             : 
     195             :     static constexpr int OGR_FID_INDEX = -2;
     196             :     bool GetMinMaxForOGRField(int iRowGroup,  // -1 for all
     197             :                               int iOGRField,  // or OGR_FID_INDEX
     198             :                               bool bComputeMin, OGRField &sMin, bool &bFoundMin,
     199             :                               bool bComputeMax, OGRField &sMax, bool &bFoundMax,
     200             :                               OGRFieldType &eType, OGRFieldSubType &eSubType,
     201             :                               std::string &osMinTmp,
     202             :                               std::string &osMaxTmp) const;
     203             : 
     204             :     bool GetMinMaxForParquetCol(int iRowGroup,  // -1 for all
     205             :                                 int iCol,
     206             :                                 const std::shared_ptr<arrow::DataType>
     207             :                                     &arrowType,  // potentially nullptr
     208             :                                 bool bComputeMin, OGRField &sMin,
     209             :                                 bool &bFoundMin, bool bComputeMax,
     210             :                                 OGRField &sMax, bool &bFoundMax,
     211             :                                 OGRFieldType &eType, OGRFieldSubType &eSubType,
     212             :                                 std::string &osMinTmp,
     213             :                                 std::string &osMaxTmp) const;
     214             : 
     215             :     bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin,
     216             :                              int &iParquetXMax, int &iParquetYMax) const;
     217             : };
     218             : 
     219             : /************************************************************************/
     220             : /*                      OGRParquetDatasetLayer                          */
     221             : /************************************************************************/
     222             : 
     223             : #ifdef GDAL_USE_ARROWDATASET
     224             : 
     225             : class OGRParquetDatasetLayer final : public OGRParquetLayerBase
     226             : {
     227             :     std::shared_ptr<arrow::dataset::Scanner> m_poScanner{};
     228             : 
     229             :     void EstablishFeatureDefn();
     230             : 
     231             :   protected:
     232          24 :     std::string GetDriverUCName() const override
     233             :     {
     234          24 :         return "PARQUET";
     235             :     }
     236             : 
     237             :     bool ReadNextBatch() override;
     238             : 
     239             :     void InvalidateCachedBatches() override;
     240             : 
     241             :     bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
     242             : 
     243             :   public:
     244             :     OGRParquetDatasetLayer(
     245             :         OGRParquetDataset *poDS, const char *pszLayerName,
     246             :         const std::shared_ptr<arrow::dataset::Scanner> &scanner,
     247             :         const std::shared_ptr<arrow::Schema> &schema,
     248             :         CSLConstList papszOpenOptions);
     249             : 
     250             :     GIntBig GetFeatureCount(int bForce) override;
     251             :     OGRErr GetExtent(OGREnvelope *psExtent, int bForce = TRUE) override;
     252             :     OGRErr GetExtent(int iGeomField, OGREnvelope *psExtent,
     253             :                      int bForce = TRUE) override;
     254             : 
     255             :     // TODO
     256             :     std::unique_ptr<OGRFieldDomain>
     257           0 :     BuildDomain(const std::string & /*osDomainName*/,
     258             :                 int /*iFieldIndex*/) const override
     259             :     {
     260           0 :         return nullptr;
     261             :     }
     262             : };
     263             : 
     264             : #endif
     265             : 
     266             : /************************************************************************/
     267             : /*                         OGRParquetDataset                            */
     268             : /************************************************************************/
     269             : 
     270             : class OGRParquetDataset final : public OGRArrowDataset
     271             : {
     272             :   public:
     273             :     explicit OGRParquetDataset(
     274             :         const std::shared_ptr<arrow::MemoryPool> &poMemoryPool);
     275             : 
     276             :     OGRLayer *ExecuteSQL(const char *pszSQLCommand,
     277             :                          OGRGeometry *poSpatialFilter,
     278             :                          const char *pszDialect) override;
     279             :     void ReleaseResultSet(OGRLayer *poResultsSet) override;
     280             : 
     281             :     int TestCapability(const char *) override;
     282             : };
     283             : 
     284             : /************************************************************************/
     285             : /*                        OGRParquetWriterLayer                         */
     286             : /************************************************************************/
     287             : 
     288             : class OGRParquetWriterDataset;
     289             : 
     290             : class OGRParquetWriterLayer final : public OGRArrowWriterLayer
     291             : {
     292             :     OGRParquetWriterLayer(const OGRParquetWriterLayer &) = delete;
     293             :     OGRParquetWriterLayer &operator=(const OGRParquetWriterLayer &) = delete;
     294             : 
     295             :     OGRParquetWriterDataset *m_poDataset = nullptr;
     296             :     std::unique_ptr<parquet::arrow::FileWriter> m_poFileWriter{};
     297             :     std::shared_ptr<const arrow::KeyValueMetadata> m_poKeyValueMetadata{};
     298             :     bool m_bForceCounterClockwiseOrientation = false;
     299             :     bool m_bEdgesSpherical = false;
     300             :     parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{};
     301             : 
     302             :     //! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode
     303             :     std::unique_ptr<GDALDataset> m_poTmpGPKG{};
     304             :     //! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode
     305             :     OGRLayer *m_poTmpGPKGLayer = nullptr;
     306             :     //! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode
     307             :     GIntBig m_nTmpFeatureCount = 0;
     308             : 
     309         524 :     virtual bool IsFileWriterCreated() const override
     310             :     {
     311         524 :         return m_poFileWriter != nullptr;
     312             :     }
     313             : 
     314             :     virtual void CreateWriter() override;
     315             :     virtual bool CloseFileWriter() override;
     316             : 
     317             :     virtual void CreateSchema() override;
     318             :     virtual void PerformStepsBeforeFinalFlushGroup() override;
     319             : 
     320             :     virtual bool FlushGroup() override;
     321             : 
     322         220 :     virtual std::string GetDriverUCName() const override
     323             :     {
     324         220 :         return "PARQUET";
     325             :     }
     326             : 
     327             :     virtual bool
     328             :     IsSupportedGeometryType(OGRwkbGeometryType eGType) const override;
     329             : 
     330             :     virtual void FixupWKBGeometryBeforeWriting(GByte *pabyWKB,
     331             :                                                size_t nLen) override;
     332             :     virtual void FixupGeometryBeforeWriting(OGRGeometry *poGeom) override;
     333             : 
     334          26 :     virtual bool IsSRSRequired() const override
     335             :     {
     336          26 :         return false;
     337             :     }
     338             : 
     339             :     std::string GetGeoMetadata() const;
     340             : 
     341             :     //! Copy temporary GeoPackage layer to final Parquet file
     342             :     bool CopyTmpGpkgLayerToFinalFile();
     343             : 
     344             :   public:
     345             :     OGRParquetWriterLayer(
     346             :         OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool,
     347             :         const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
     348             :         const char *pszLayerName);
     349             : 
     350             :     CPLErr SetMetadata(char **papszMetadata, const char *pszDomain) override;
     351             : 
     352             :     bool SetOptions(CSLConstList papszOptions,
     353             :                     const OGRSpatialReference *poSpatialRef,
     354             :                     OGRwkbGeometryType eGType);
     355             : 
     356             :     OGRErr CreateGeomField(const OGRGeomFieldDefn *poField,
     357             :                            int bApproxOK = TRUE) override;
     358             : 
     359             :     int TestCapability(const char *pszCap) override;
     360             : #if PARQUET_VERSION_MAJOR <= 10
     361             :     // Parquet <= 10 doesn't support the WriteRecordBatch() API
     362             :     bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
     363             :                                 CSLConstList papszOptions,
     364             :                                 std::string &osErrorMsg) const override
     365             :     {
     366             :         return OGRLayer::IsArrowSchemaSupported(schema, papszOptions,
     367             :                                                 osErrorMsg);
     368             :     }
     369             : 
     370             :     bool
     371             :     CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
     372             :                                CSLConstList papszOptions = nullptr) override
     373             :     {
     374             :         return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions);
     375             :     }
     376             : 
     377             :     bool WriteArrowBatch(const struct ArrowSchema *schema,
     378             :                          struct ArrowArray *array,
     379             :                          CSLConstList papszOptions = nullptr) override
     380             :     {
     381             :         return OGRLayer::WriteArrowBatch(schema, array, papszOptions);
     382             :     }
     383             : #else
     384             :     bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
     385             :                                 CSLConstList papszOptions,
     386             :                                 std::string &osErrorMsg) const override;
     387             :     bool
     388             :     CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
     389             :                                CSLConstList papszOptions = nullptr) override;
     390             :     bool WriteArrowBatch(const struct ArrowSchema *schema,
     391             :                          struct ArrowArray *array,
     392             :                          CSLConstList papszOptions = nullptr) override;
     393             : #endif
     394             : 
     395             :     GDALDataset *GetDataset() override;
     396             : 
     397             :   protected:
     398             :     OGRErr ICreateFeature(OGRFeature *poFeature) override;
     399             : 
     400             :     friend class OGRParquetWriterDataset;
     401             :     bool Close();
     402             : };
     403             : 
     404             : /************************************************************************/
     405             : /*                        OGRParquetWriterDataset                       */
     406             : /************************************************************************/
     407             : 
     408             : class OGRParquetWriterDataset final : public GDALPamDataset
     409             : {
     410             :     std::unique_ptr<arrow::MemoryPool> m_poMemoryPool{};
     411             :     std::unique_ptr<OGRParquetWriterLayer> m_poLayer{};
     412             :     std::shared_ptr<arrow::io::OutputStream> m_poOutputStream{};
     413             : 
     414             :   public:
     415             :     explicit OGRParquetWriterDataset(
     416             :         const std::shared_ptr<arrow::io::OutputStream> &poOutputStream);
     417             : 
     418             :     arrow::MemoryPool *GetMemoryPool() const
     419             :     {
     420             :         return m_poMemoryPool.get();
     421             :     }
     422             : 
     423             :     CPLErr Close() override;
     424             : 
     425             :     int GetLayerCount() override;
     426             :     OGRLayer *GetLayer(int idx) override;
     427             :     int TestCapability(const char *pszCap) override;
     428             :     std::vector<std::string> GetFieldDomainNames(
     429             :         CSLConstList /*papszOptions*/ = nullptr) const override;
     430             :     const OGRFieldDomain *
     431             :     GetFieldDomain(const std::string &name) const override;
     432             :     bool AddFieldDomain(std::unique_ptr<OGRFieldDomain> &&domain,
     433             :                         std::string &failureReason) override;
     434             : 
     435         194 :     GDALMultiDomainMetadata &GetMultiDomainMetadata()
     436             :     {
     437         194 :         return oMDMD;
     438             :     }
     439             : 
     440             :   protected:
     441             :     OGRLayer *ICreateLayer(const char *pszName,
     442             :                            const OGRGeomFieldDefn *poGeomFieldDefn,
     443             :                            CSLConstList papszOptions) override;
     444             : };
     445             : 
     446             : #endif  // OGR_PARQUET_H

Generated by: LCOV version 1.14