LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/parquet - ogrparquetdataset.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 155 172 90.1 %
Date: 2025-11-12 21:50:40 Functions: 8 8 100.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Parquet Translator
       4             :  * Purpose:  Implements OGRParquetDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "ogr_parquet.h"
      14             : #include "memdataset.h"
      15             : #include "ogr_swq.h"
      16             : 
      17             : #include "../arrow_common/ograrrowdataset.hpp"
      18             : #include "../arrow_common/ograrrowlayer.hpp"
      19             : #include "../arrow_common/vsiarrowfilesystem.hpp"
      20             : 
      21             : /************************************************************************/
      22             : /*                         OGRParquetDataset()                          */
      23             : /************************************************************************/
      24             : 
      25        1166 : OGRParquetDataset::OGRParquetDataset()
      26        1166 :     : OGRArrowDataset(arrow::MemoryPool::CreateDefault())
      27             : {
      28        1166 : }
      29             : 
      30             : /************************************************************************/
      31             : /*                        ~OGRParquetDataset()                          */
      32             : /************************************************************************/
      33             : 
      34        2326 : OGRParquetDataset::~OGRParquetDataset()
      35             : {
      36        1163 :     OGRParquetDataset::Close();
      37        2326 : }
      38             : 
      39             : /************************************************************************/
      40             : /*                                Close()                               */
      41             : /************************************************************************/
      42             : 
      43        2307 : CPLErr OGRParquetDataset::Close()
      44             : {
      45        2307 :     CPLErr eErr = CE_None;
      46        2307 :     if (nOpenFlags != OPEN_FLAGS_CLOSED)
      47             :     {
      48             :         // libarrow might continue to do I/O in auxiliary threads on the underlying
      49             :         // files when using the arrow::dataset API even after we closed the dataset.
      50             :         // This is annoying as it can cause crashes when closing GDAL, in particular
      51             :         // the virtual file manager, as this could result in VSI files being
      52             :         // accessed after their VSIVirtualFileSystem has been destroyed, resulting
      53             :         // in crashes. The workaround is to make sure that VSIArrowFileSystem
      54             :         // waits for all file handles it is aware of to have been destroyed.
      55        1163 :         eErr = OGRArrowDataset::Close();
      56             : 
      57        2326 :         auto poFS = std::dynamic_pointer_cast<VSIArrowFileSystem>(m_poFS);
      58        1163 :         if (poFS)
      59         266 :             poFS->AskToClose();
      60             :     }
      61             : 
      62        2307 :     return eErr;
      63             : }
      64             : 
      65             : /***********************************************************************/
      66             : /*                          CreateReaderLayer()                        */
      67             : /***********************************************************************/
      68             : 
      69             : std::unique_ptr<OGRParquetLayer>
      70         896 : OGRParquetDataset::CreateReaderLayer(const std::string &osFilename,
      71             :                                      VSILFILE *&fpIn,
      72             :                                      CSLConstList papszOpenOptionsIn)
      73             : {
      74             :     try
      75             :     {
      76         896 :         std::shared_ptr<arrow::io::RandomAccessFile> infile;
      77        1396 :         if (STARTS_WITH(osFilename.c_str(), "/vsi") ||
      78         500 :             CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "NO")))
      79             :         {
      80         396 :             VSIVirtualHandleUniquePtr fp(fpIn);
      81         396 :             fpIn = nullptr;
      82         396 :             if (fp == nullptr)
      83             :             {
      84           0 :                 fp.reset(VSIFOpenL(osFilename.c_str(), "rb"));
      85           0 :                 if (fp == nullptr)
      86           0 :                     return nullptr;
      87             :             }
      88         792 :             infile = std::make_shared<OGRArrowRandomAccessFile>(osFilename,
      89         792 :                                                                 std::move(fp));
      90             :         }
      91             :         else
      92             :         {
      93         500 :             PARQUET_ASSIGN_OR_THROW(infile,
      94             :                                     arrow::io::ReadableFile::Open(osFilename));
      95             :         }
      96             : 
      97             :         // Open Parquet file reader
      98         896 :         std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
      99             : 
     100         896 :         const int nNumCPUs = OGRParquetLayerBase::GetNumCPUs();
     101             :         const char *pszUseThreads =
     102         896 :             CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr);
     103         896 :         if (!pszUseThreads && nNumCPUs > 1)
     104             :         {
     105         896 :             pszUseThreads = "YES";
     106             :         }
     107         896 :         const bool bUseThreads = pszUseThreads && CPLTestBool(pszUseThreads);
     108             : 
     109             :         const char *pszParquetBatchSize =
     110         896 :             CPLGetConfigOption("OGR_PARQUET_BATCH_SIZE", nullptr);
     111             : 
     112         896 :         auto poMemoryPool = GetMemoryPool();
     113             : #if ARROW_VERSION_MAJOR >= 21
     114             :         parquet::arrow::FileReaderBuilder fileReaderBuilder;
     115             :         {
     116             :             auto st = fileReaderBuilder.Open(std::move(infile));
     117             :             if (!st.ok())
     118             :             {
     119             :                 CPLError(CE_Failure, CPLE_AppDefined,
     120             :                          "parquet::arrow::FileReaderBuilder::Open() failed: %s",
     121             :                          st.message().c_str());
     122             :                 return nullptr;
     123             :             }
     124             :         }
     125             :         fileReaderBuilder.memory_pool(poMemoryPool);
     126             :         parquet::ArrowReaderProperties fileReaderProperties;
     127             :         fileReaderProperties.set_arrow_extensions_enabled(CPLTestBool(
     128             :             CPLGetConfigOption("OGR_PARQUET_ENABLE_ARROW_EXTENSIONS", "YES")));
     129             :         if (pszParquetBatchSize)
     130             :         {
     131             :             fileReaderProperties.set_batch_size(
     132             :                 CPLAtoGIntBig(pszParquetBatchSize));
     133             :         }
     134             :         if (bUseThreads)
     135             :         {
     136             :             fileReaderProperties.set_use_threads(true);
     137             :         }
     138             :         fileReaderBuilder.properties(fileReaderProperties);
     139             :         {
     140             :             auto res = fileReaderBuilder.Build();
     141             :             if (!res.ok())
     142             :             {
     143             :                 CPLError(
     144             :                     CE_Failure, CPLE_AppDefined,
     145             :                     "parquet::arrow::FileReaderBuilder::Build() failed: %s",
     146             :                     res.status().message().c_str());
     147             :                 return nullptr;
     148             :             }
     149             :             arrow_reader = std::move(*res);
     150             :         }
     151             : #elif ARROW_VERSION_MAJOR >= 19
     152        1793 :         PARQUET_ASSIGN_OR_THROW(
     153             :             arrow_reader,
     154             :             parquet::arrow::OpenFile(std::move(infile), poMemoryPool));
     155             : #else
     156             :         auto st = parquet::arrow::OpenFile(std::move(infile), poMemoryPool,
     157             :                                            &arrow_reader);
     158             :         if (!st.ok())
     159             :         {
     160             :             CPLError(CE_Failure, CPLE_AppDefined,
     161             :                      "parquet::arrow::OpenFile() failed: %s",
     162             :                      st.message().c_str());
     163             :             return nullptr;
     164             :         }
     165             : #endif
     166             : 
     167             : #if ARROW_VERSION_MAJOR < 21
     168         895 :         if (pszParquetBatchSize)
     169             :         {
     170           5 :             arrow_reader->set_batch_size(CPLAtoGIntBig(pszParquetBatchSize));
     171             :         }
     172             : 
     173         895 :         if (bUseThreads)
     174             :         {
     175         895 :             arrow_reader->set_use_threads(true);
     176             :         }
     177             : #endif
     178             : 
     179             :         return std::make_unique<OGRParquetLayer>(
     180        1790 :             this, CPLGetBasenameSafe(osFilename.c_str()).c_str(),
     181        1790 :             std::move(arrow_reader), papszOpenOptionsIn);
     182             :     }
     183           2 :     catch (const std::exception &e)
     184             :     {
     185           1 :         CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
     186           1 :                  e.what());
     187           1 :         return nullptr;
     188             :     }
     189             : }
     190             : 
     191             : /***********************************************************************/
     192             : /*                            ExecuteSQL()                             */
     193             : /***********************************************************************/
     194             : 
     195          52 : OGRLayer *OGRParquetDataset::ExecuteSQL(const char *pszSQLCommand,
     196             :                                         OGRGeometry *poSpatialFilter,
     197             :                                         const char *pszDialect)
     198             : {
     199             :     /* -------------------------------------------------------------------- */
     200             :     /*      Special cases for SQL optimizations                             */
     201             :     /* -------------------------------------------------------------------- */
     202          52 :     if (STARTS_WITH_CI(pszSQLCommand, "SELECT ") &&
     203          10 :         (pszDialect == nullptr || EQUAL(pszDialect, "") ||
     204           0 :          EQUAL(pszDialect, "OGRSQL")))
     205             :     {
     206          45 :         swq_select oSelect;
     207          45 :         if (oSelect.preparse(pszSQLCommand) != CE_None)
     208           0 :             return nullptr;
     209             : 
     210             :         /* --------------------------------------------------------------------
     211             :          */
     212             :         /*      MIN/MAX/COUNT optimization */
     213             :         /* --------------------------------------------------------------------
     214             :          */
     215          45 :         if (oSelect.join_count == 0 && oSelect.poOtherSelect == nullptr &&
     216          45 :             oSelect.table_count == 1 && oSelect.order_specs == 0 &&
     217          45 :             oSelect.query_mode != SWQM_DISTINCT_LIST &&
     218         119 :             oSelect.where_expr == nullptr &&
     219          29 :             CPLTestBool(
     220             :                 CPLGetConfigOption("OGR_PARQUET_USE_STATISTICS", "YES")))
     221             :         {
     222           1 :             auto poLayer = dynamic_cast<OGRParquetLayer *>(
     223          29 :                 GetLayerByName(oSelect.table_defs[0].table_name));
     224          29 :             if (poLayer)
     225             :             {
     226          16 :                 OGRMemLayer *poMemLayer = nullptr;
     227          16 :                 const auto poLayerDefn = poLayer->GetLayerDefn();
     228             : 
     229          16 :                 int i = 0;  // Used after for.
     230          47 :                 for (; i < oSelect.result_columns(); i++)
     231             :                 {
     232          44 :                     swq_col_func col_func = oSelect.column_defs[i].col_func;
     233          44 :                     if (!(col_func == SWQCF_MIN || col_func == SWQCF_MAX ||
     234             :                           col_func == SWQCF_COUNT))
     235          13 :                         break;
     236             : 
     237             :                     const char *pszFieldName =
     238          37 :                         oSelect.column_defs[i].field_name;
     239          37 :                     if (pszFieldName == nullptr)
     240           0 :                         break;
     241          37 :                     if (oSelect.column_defs[i].target_type != SWQ_OTHER)
     242           0 :                         break;
     243             : 
     244             :                     const int iOGRField =
     245          37 :                         (EQUAL(pszFieldName, poLayer->GetFIDColumn()) &&
     246           2 :                          pszFieldName[0])
     247          39 :                             ? OGRParquetLayer::OGR_FID_INDEX
     248          35 :                             : poLayerDefn->GetFieldIndex(pszFieldName);
     249          37 :                     if (iOGRField < 0 &&
     250             :                         iOGRField != OGRParquetLayer::OGR_FID_INDEX)
     251           4 :                         break;
     252             : 
     253             :                     OGRField sField;
     254          33 :                     OGR_RawField_SetNull(&sField);
     255          33 :                     OGRFieldType eType = OFTReal;
     256          33 :                     OGRFieldSubType eSubType = OFSTNone;
     257             :                     const std::vector<int> anCols =
     258             :                         iOGRField == OGRParquetLayer::OGR_FID_INDEX
     259           2 :                             ? std::vector<int>{poLayer->GetFIDParquetColumn()}
     260             :                             : poLayer->GetParquetColumnIndicesForArrowField(
     261          99 :                                   pszFieldName);
     262          33 :                     if (anCols.size() != 1 || anCols[0] < 0)
     263           0 :                         break;
     264          33 :                     const int iCol = anCols[0];
     265             :                     const auto metadata =
     266          33 :                         poLayer->GetReader()->parquet_reader()->metadata();
     267          33 :                     const auto numRowGroups = metadata->num_row_groups();
     268          33 :                     bool bFound = false;
     269          33 :                     std::string sVal;
     270             : 
     271          33 :                     if (numRowGroups > 0)
     272             :                     {
     273             :                         const auto rowGroup0columnChunk =
     274          66 :                             metadata->RowGroup(0)->ColumnChunk(iCol);
     275             :                         const auto rowGroup0Stats =
     276          66 :                             rowGroup0columnChunk->statistics();
     277          65 :                         if (rowGroup0columnChunk->is_stats_set() &&
     278          32 :                             rowGroup0Stats)
     279             :                         {
     280             :                             OGRField sFieldDummy;
     281             :                             bool bFoundDummy;
     282          64 :                             std::string sValDummy;
     283             : 
     284          32 :                             if (col_func == SWQCF_MIN)
     285             :                             {
     286          15 :                                 CPL_IGNORE_RET_VAL(
     287          15 :                                     poLayer->GetMinMaxForOGRField(
     288             :                                         /* iRowGroup=*/-1,  // -1 for all
     289             :                                         iOGRField, true, sField, bFound, false,
     290             :                                         sFieldDummy, bFoundDummy, eType,
     291             :                                         eSubType, sVal, sValDummy));
     292             :                             }
     293          17 :                             else if (col_func == SWQCF_MAX)
     294             :                             {
     295          15 :                                 CPL_IGNORE_RET_VAL(
     296          15 :                                     poLayer->GetMinMaxForOGRField(
     297             :                                         /* iRowGroup=*/-1,  // -1 for all
     298             :                                         iOGRField, false, sFieldDummy,
     299             :                                         bFoundDummy, true, sField, bFound,
     300             :                                         eType, eSubType, sValDummy, sVal));
     301             :                             }
     302           2 :                             else if (col_func == SWQCF_COUNT)
     303             :                             {
     304           2 :                                 if (oSelect.column_defs[i].distinct_flag)
     305             :                                 {
     306           1 :                                     eType = OFTInteger64;
     307           1 :                                     sField.Integer64 = 0;
     308           1 :                                     for (int iGroup = 0; iGroup < numRowGroups;
     309             :                                          iGroup++)
     310             :                                     {
     311             :                                         const auto columnChunk =
     312           1 :                                             metadata->RowGroup(iGroup)
     313           1 :                                                 ->ColumnChunk(iCol);
     314             :                                         const auto colStats =
     315           1 :                                             columnChunk->statistics();
     316           2 :                                         if (columnChunk->is_stats_set() &&
     317           2 :                                             colStats &&
     318           1 :                                             colStats->HasDistinctCount())
     319             :                                         {
     320             :                                             // Statistics generated by arrow-cpp
     321             :                                             // Parquet writer seem to be buggy,
     322             :                                             // as distinct_count() is always
     323             :                                             // zero. We can detect this: if
     324             :                                             // there are non-null values, then
     325             :                                             // distinct_count() should be > 0.
     326           0 :                                             if (colStats->distinct_count() ==
     327           0 :                                                     0 &&
     328           0 :                                                 colStats->num_values() > 0)
     329             :                                             {
     330           0 :                                                 bFound = false;
     331           0 :                                                 break;
     332             :                                             }
     333           0 :                                             sField.Integer64 +=
     334           0 :                                                 colStats->distinct_count();
     335           0 :                                             bFound = true;
     336             :                                         }
     337             :                                         else
     338             :                                         {
     339           1 :                                             bFound = false;
     340           1 :                                             break;
     341             :                                         }
     342             :                                     }
     343             :                                 }
     344             :                                 else
     345             :                                 {
     346           1 :                                     eType = OFTInteger64;
     347           1 :                                     sField.Integer64 = 0;
     348           1 :                                     bFound = true;
     349           3 :                                     for (int iGroup = 0; iGroup < numRowGroups;
     350             :                                          iGroup++)
     351             :                                     {
     352             :                                         const auto columnChunk =
     353           2 :                                             metadata->RowGroup(iGroup)
     354           4 :                                                 ->ColumnChunk(iCol);
     355             :                                         const auto colStats =
     356           4 :                                             columnChunk->statistics();
     357           4 :                                         if (columnChunk->is_stats_set() &&
     358           2 :                                             colStats)
     359             :                                         {
     360           2 :                                             sField.Integer64 +=
     361           2 :                                                 colStats->num_values();
     362             :                                         }
     363             :                                         else
     364             :                                         {
     365           0 :                                             bFound = false;
     366             :                                         }
     367             :                                     }
     368             :                                 }
     369             :                             }
     370             :                         }
     371             :                         else
     372             :                         {
     373           1 :                             CPLDebug("PARQUET",
     374             :                                      "Statistics not available for field %s",
     375             :                                      pszFieldName);
     376             :                         }
     377             :                     }
     378          33 :                     if (!bFound)
     379             :                     {
     380           2 :                         break;
     381             :                     }
     382             : 
     383          31 :                     if (poMemLayer == nullptr)
     384             :                     {
     385           3 :                         poMemLayer =
     386           3 :                             new OGRMemLayer("SELECT", nullptr, wkbNone);
     387             :                         OGRFeature *poFeature =
     388           3 :                             new OGRFeature(poMemLayer->GetLayerDefn());
     389           3 :                         CPL_IGNORE_RET_VAL(
     390           3 :                             poMemLayer->CreateFeature(poFeature));
     391           3 :                         delete poFeature;
     392             :                     }
     393             : 
     394             :                     const char *pszMinMaxFieldName =
     395          31 :                         oSelect.column_defs[i].field_alias
     396          31 :                             ? oSelect.column_defs[i].field_alias
     397          21 :                             : CPLSPrintf("%s_%s",
     398             :                                          (col_func == SWQCF_MIN)   ? "MIN"
     399           3 :                                          : (col_func == SWQCF_MAX) ? "MAX"
     400             :                                                                    : "COUNT",
     401          18 :                                          oSelect.column_defs[i].field_name);
     402          62 :                     OGRFieldDefn oFieldDefn(pszMinMaxFieldName, eType);
     403          31 :                     oFieldDefn.SetSubType(eSubType);
     404          31 :                     poMemLayer->CreateField(&oFieldDefn);
     405             : 
     406          31 :                     OGRFeature *poFeature = poMemLayer->GetFeature(0);
     407          31 :                     poFeature->SetField(oFieldDefn.GetNameRef(), &sField);
     408          31 :                     CPL_IGNORE_RET_VAL(poMemLayer->SetFeature(poFeature));
     409          31 :                     delete poFeature;
     410             :                 }
     411          16 :                 if (i != oSelect.result_columns())
     412             :                 {
     413          13 :                     delete poMemLayer;
     414             :                 }
     415             :                 else
     416             :                 {
     417           3 :                     CPLDebug("PARQUET",
     418             :                              "Using optimized MIN/MAX/COUNT implementation");
     419           3 :                     return poMemLayer;
     420             :                 }
     421             :             }
     422             :         }
     423             :     }
     424             : 
     425          49 :     return GDALDataset::ExecuteSQL(pszSQLCommand, poSpatialFilter, pszDialect);
     426             : }
     427             : 
     428             : /***********************************************************************/
     429             : /*                           ReleaseResultSet()                        */
     430             : /***********************************************************************/
     431             : 
     432          42 : void OGRParquetDataset::ReleaseResultSet(OGRLayer *poResultsSet)
     433             : {
     434          42 :     delete poResultsSet;
     435          42 : }
     436             : 
     437             : /************************************************************************/
     438             : /*                           TestCapability()                           */
     439             : /************************************************************************/
     440             : 
     441          71 : int OGRParquetDataset::TestCapability(const char *pszCap) const
     442             : 
     443             : {
     444          71 :     if (EQUAL(pszCap, ODsCZGeometries))
     445           7 :         return true;
     446          64 :     else if (EQUAL(pszCap, ODsCMeasuredGeometries))
     447          14 :         return true;
     448             : 
     449          50 :     return false;
     450             : }

Generated by: LCOV version 1.14