LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/parquet - ogrparquetdataset.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 155 172 90.1 %
Date: 2025-10-27 00:14:23 Functions: 8 8 100.0 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Parquet Translator
       4             :  * Purpose:  Implements OGRParquetDriver.
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "ogr_parquet.h"
      14             : #include "memdataset.h"
      15             : #include "ogr_swq.h"
      16             : 
      17             : #include "../arrow_common/ograrrowdataset.hpp"
      18             : #include "../arrow_common/ograrrowlayer.hpp"
      19             : #include "../arrow_common/vsiarrowfilesystem.hpp"
      20             : 
      21             : /************************************************************************/
      22             : /*                         OGRParquetDataset()                          */
      23             : /************************************************************************/
      24             : 
      25        1164 : OGRParquetDataset::OGRParquetDataset()
      26        1164 :     : OGRArrowDataset(arrow::MemoryPool::CreateDefault())
      27             : {
      28        1164 : }
      29             : 
      30             : /************************************************************************/
      31             : /*                        ~OGRParquetDataset()                          */
      32             : /************************************************************************/
      33             : 
      34        2322 : OGRParquetDataset::~OGRParquetDataset()
      35             : {
      36        1161 :     OGRParquetDataset::Close();
      37        2322 : }
      38             : 
      39             : /************************************************************************/
      40             : /*                                Close()                               */
      41             : /************************************************************************/
      42             : 
      43        2303 : CPLErr OGRParquetDataset::Close()
      44             : {
      45        2303 :     CPLErr eErr = CE_None;
      46        2303 :     if (nOpenFlags != OPEN_FLAGS_CLOSED)
      47             :     {
      48             :         // libarrow might continue to do I/O in auxiliary threads on the underlying
      49             :         // files when using the arrow::dataset API even after we closed the dataset.
      50             :         // This is annoying as it can cause crashes when closing GDAL, in particular
      51             :         // the virtual file manager, as this could result in VSI files being
      52             :         // accessed after their VSIVirtualFileSystem has been destroyed, resulting
      53             :         // in crashes. The workaround is to make sure that VSIArrowFileSystem
      54             :         // waits for all file handles it is aware of to have been destroyed.
      55        1161 :         eErr = OGRArrowDataset::Close();
      56             : 
      57        2322 :         auto poFS = std::dynamic_pointer_cast<VSIArrowFileSystem>(m_poFS);
      58        1161 :         if (poFS)
      59         266 :             poFS->AskToClose();
      60             :     }
      61             : 
      62        2303 :     return eErr;
      63             : }
      64             : 
      65             : /***********************************************************************/
      66             : /*                          CreateReaderLayer()                        */
      67             : /***********************************************************************/
      68             : 
      69             : std::unique_ptr<OGRParquetLayer>
      70         894 : OGRParquetDataset::CreateReaderLayer(const std::string &osFilename,
      71             :                                      VSILFILE *&fpIn,
      72             :                                      CSLConstList papszOpenOptionsIn)
      73             : {
      74             :     try
      75             :     {
      76         894 :         std::shared_ptr<arrow::io::RandomAccessFile> infile;
      77        1393 :         if (STARTS_WITH(osFilename.c_str(), "/vsi") ||
      78         499 :             CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "NO")))
      79             :         {
      80         395 :             VSIVirtualHandleUniquePtr fp(fpIn);
      81         395 :             fpIn = nullptr;
      82         395 :             if (fp == nullptr)
      83             :             {
      84           0 :                 fp.reset(VSIFOpenL(osFilename.c_str(), "rb"));
      85           0 :                 if (fp == nullptr)
      86           0 :                     return nullptr;
      87             :             }
      88         790 :             infile = std::make_shared<OGRArrowRandomAccessFile>(osFilename,
      89         790 :                                                                 std::move(fp));
      90             :         }
      91             :         else
      92             :         {
      93         499 :             PARQUET_ASSIGN_OR_THROW(infile,
      94             :                                     arrow::io::ReadableFile::Open(osFilename));
      95             :         }
      96             : 
      97             :         // Open Parquet file reader
      98         894 :         std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
      99             : 
     100         894 :         const int nNumCPUs = OGRParquetLayerBase::GetNumCPUs();
     101             :         const char *pszUseThreads =
     102         894 :             CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr);
     103         894 :         if (!pszUseThreads && nNumCPUs > 1)
     104             :         {
     105         894 :             pszUseThreads = "YES";
     106             :         }
     107         894 :         const bool bUseThreads = pszUseThreads && CPLTestBool(pszUseThreads);
     108             : 
     109             :         const char *pszParquetBatchSize =
     110         894 :             CPLGetConfigOption("OGR_PARQUET_BATCH_SIZE", nullptr);
     111             : 
     112         894 :         auto poMemoryPool = GetMemoryPool();
     113             : #if ARROW_VERSION_MAJOR >= 21
     114             :         parquet::arrow::FileReaderBuilder fileReaderBuilder;
     115             :         {
     116             :             auto st = fileReaderBuilder.Open(std::move(infile));
     117             :             if (!st.ok())
     118             :             {
     119             :                 CPLError(CE_Failure, CPLE_AppDefined,
     120             :                          "parquet::arrow::FileReaderBuilder::Open() failed: %s",
     121             :                          st.message().c_str());
     122             :                 return nullptr;
     123             :             }
     124             :         }
     125             :         fileReaderBuilder.memory_pool(poMemoryPool);
     126             :         parquet::ArrowReaderProperties fileReaderProperties;
     127             :         fileReaderProperties.set_arrow_extensions_enabled(CPLTestBool(
     128             :             CPLGetConfigOption("OGR_PARQUET_ENABLE_ARROW_EXTENSIONS", "YES")));
     129             :         if (pszParquetBatchSize)
     130             :         {
     131             :             fileReaderProperties.set_batch_size(
     132             :                 CPLAtoGIntBig(pszParquetBatchSize));
     133             :         }
     134             :         if (bUseThreads)
     135             :         {
     136             :             fileReaderProperties.set_use_threads(true);
     137             :         }
     138             :         fileReaderBuilder.properties(fileReaderProperties);
     139             :         {
     140             :             auto res = fileReaderBuilder.Build();
     141             :             if (!res.ok())
     142             :             {
     143             :                 CPLError(
     144             :                     CE_Failure, CPLE_AppDefined,
     145             :                     "parquet::arrow::FileReaderBuilder::Build() failed: %s",
     146             :                     res.status().message().c_str());
     147             :                 return nullptr;
     148             :             }
     149             :             arrow_reader = std::move(*res);
     150             :         }
     151             : #elif ARROW_VERSION_MAJOR >= 19
     152        1789 :         PARQUET_ASSIGN_OR_THROW(
     153             :             arrow_reader,
     154             :             parquet::arrow::OpenFile(std::move(infile), poMemoryPool));
     155             : #else
     156             :         auto st = parquet::arrow::OpenFile(std::move(infile), poMemoryPool,
     157             :                                            &arrow_reader);
     158             :         if (!st.ok())
     159             :         {
     160             :             CPLError(CE_Failure, CPLE_AppDefined,
     161             :                      "parquet::arrow::OpenFile() failed: %s",
     162             :                      st.message().c_str());
     163             :             return nullptr;
     164             :         }
     165             : #endif
     166             : 
     167             : #if ARROW_VERSION_MAJOR < 21
     168         893 :         if (pszParquetBatchSize)
     169             :         {
     170           5 :             arrow_reader->set_batch_size(CPLAtoGIntBig(pszParquetBatchSize));
     171             :         }
     172             : 
     173         893 :         if (bUseThreads)
     174             :         {
     175         893 :             arrow_reader->set_use_threads(true);
     176             :         }
     177             : #endif
     178             : 
     179             :         return std::make_unique<OGRParquetLayer>(
     180        1786 :             this, CPLGetBasenameSafe(osFilename.c_str()).c_str(),
     181        1786 :             std::move(arrow_reader), papszOpenOptionsIn);
     182             :     }
     183           2 :     catch (const std::exception &e)
     184             :     {
     185           1 :         CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
     186           1 :                  e.what());
     187           1 :         return nullptr;
     188             :     }
     189             : }
     190             : 
     191             : /***********************************************************************/
     192             : /*                            ExecuteSQL()                             */
     193             : /***********************************************************************/
     194             : 
     195          52 : OGRLayer *OGRParquetDataset::ExecuteSQL(const char *pszSQLCommand,
     196             :                                         OGRGeometry *poSpatialFilter,
     197             :                                         const char *pszDialect)
     198             : {
     199             :     /* -------------------------------------------------------------------- */
     200             :     /*      Special cases for SQL optimizations                             */
     201             :     /* -------------------------------------------------------------------- */
     202          52 :     if (STARTS_WITH_CI(pszSQLCommand, "SELECT ") &&
     203          10 :         (pszDialect == nullptr || EQUAL(pszDialect, "") ||
     204           0 :          EQUAL(pszDialect, "OGRSQL")))
     205             :     {
     206          45 :         swq_select oSelect;
     207          45 :         if (oSelect.preparse(pszSQLCommand) != CE_None)
     208           0 :             return nullptr;
     209             : 
     210             :         /* --------------------------------------------------------------------
     211             :          */
     212             :         /*      MIN/MAX/COUNT optimization */
     213             :         /* --------------------------------------------------------------------
     214             :          */
     215          45 :         if (oSelect.join_count == 0 && oSelect.poOtherSelect == nullptr &&
     216          45 :             oSelect.table_count == 1 && oSelect.order_specs == 0 &&
     217          45 :             oSelect.query_mode != SWQM_DISTINCT_LIST &&
     218         119 :             oSelect.where_expr == nullptr &&
     219          29 :             CPLTestBool(
     220             :                 CPLGetConfigOption("OGR_PARQUET_USE_STATISTICS", "YES")))
     221             :         {
     222           1 :             auto poLayer = dynamic_cast<OGRParquetLayer *>(
     223          29 :                 GetLayerByName(oSelect.table_defs[0].table_name));
     224          29 :             if (poLayer)
     225             :             {
     226          16 :                 OGRMemLayer *poMemLayer = nullptr;
     227          16 :                 const auto poLayerDefn = poLayer->GetLayerDefn();
     228             : 
     229          16 :                 int i = 0;  // Used after for.
     230          47 :                 for (; i < oSelect.result_columns(); i++)
     231             :                 {
     232          44 :                     swq_col_func col_func = oSelect.column_defs[i].col_func;
     233          44 :                     if (!(col_func == SWQCF_MIN || col_func == SWQCF_MAX ||
     234             :                           col_func == SWQCF_COUNT))
     235          13 :                         break;
     236             : 
     237             :                     const char *pszFieldName =
     238          37 :                         oSelect.column_defs[i].field_name;
     239          37 :                     if (pszFieldName == nullptr)
     240           0 :                         break;
     241          37 :                     if (oSelect.column_defs[i].target_type != SWQ_OTHER)
     242           0 :                         break;
     243             : 
     244             :                     const int iOGRField =
     245          37 :                         (EQUAL(pszFieldName, poLayer->GetFIDColumn()) &&
     246           2 :                          pszFieldName[0])
     247          39 :                             ? OGRParquetLayer::OGR_FID_INDEX
     248          35 :                             : poLayerDefn->GetFieldIndex(pszFieldName);
     249          37 :                     if (iOGRField < 0 &&
     250             :                         iOGRField != OGRParquetLayer::OGR_FID_INDEX)
     251           4 :                         break;
     252             : 
     253             :                     OGRField sField;
     254          33 :                     OGR_RawField_SetNull(&sField);
     255          33 :                     OGRFieldType eType = OFTReal;
     256          33 :                     OGRFieldSubType eSubType = OFSTNone;
     257             :                     const int iCol =
     258             :                         iOGRField == OGRParquetLayer::OGR_FID_INDEX
     259          64 :                             ? poLayer->GetFIDParquetColumn()
     260          31 :                             : poLayer->GetMapFieldIndexToParquetColumn()
     261          31 :                                   [iOGRField];
     262          33 :                     if (iCol < 0)
     263           0 :                         break;
     264             :                     const auto metadata =
     265          33 :                         poLayer->GetReader()->parquet_reader()->metadata();
     266          33 :                     const auto numRowGroups = metadata->num_row_groups();
     267          33 :                     bool bFound = false;
     268          33 :                     std::string sVal;
     269             : 
     270          33 :                     if (numRowGroups > 0)
     271             :                     {
     272             :                         const auto rowGroup0columnChunk =
     273          66 :                             metadata->RowGroup(0)->ColumnChunk(iCol);
     274             :                         const auto rowGroup0Stats =
     275          66 :                             rowGroup0columnChunk->statistics();
     276          65 :                         if (rowGroup0columnChunk->is_stats_set() &&
     277          32 :                             rowGroup0Stats)
     278             :                         {
     279             :                             OGRField sFieldDummy;
     280             :                             bool bFoundDummy;
     281          64 :                             std::string sValDummy;
     282             : 
     283          32 :                             if (col_func == SWQCF_MIN)
     284             :                             {
     285          15 :                                 CPL_IGNORE_RET_VAL(
     286          15 :                                     poLayer->GetMinMaxForOGRField(
     287             :                                         /* iRowGroup=*/-1,  // -1 for all
     288             :                                         iOGRField, true, sField, bFound, false,
     289             :                                         sFieldDummy, bFoundDummy, eType,
     290             :                                         eSubType, sVal, sValDummy));
     291             :                             }
     292          17 :                             else if (col_func == SWQCF_MAX)
     293             :                             {
     294          15 :                                 CPL_IGNORE_RET_VAL(
     295          15 :                                     poLayer->GetMinMaxForOGRField(
     296             :                                         /* iRowGroup=*/-1,  // -1 for all
     297             :                                         iOGRField, false, sFieldDummy,
     298             :                                         bFoundDummy, true, sField, bFound,
     299             :                                         eType, eSubType, sValDummy, sVal));
     300             :                             }
     301           2 :                             else if (col_func == SWQCF_COUNT)
     302             :                             {
     303           2 :                                 if (oSelect.column_defs[i].distinct_flag)
     304             :                                 {
     305           1 :                                     eType = OFTInteger64;
     306           1 :                                     sField.Integer64 = 0;
     307           1 :                                     for (int iGroup = 0; iGroup < numRowGroups;
     308             :                                          iGroup++)
     309             :                                     {
     310             :                                         const auto columnChunk =
     311           1 :                                             metadata->RowGroup(iGroup)
     312           1 :                                                 ->ColumnChunk(iCol);
     313             :                                         const auto colStats =
     314           1 :                                             columnChunk->statistics();
     315           2 :                                         if (columnChunk->is_stats_set() &&
     316           2 :                                             colStats &&
     317           1 :                                             colStats->HasDistinctCount())
     318             :                                         {
     319             :                                             // Statistics generated by arrow-cpp
     320             :                                             // Parquet writer seem to be buggy,
     321             :                                             // as distinct_count() is always
     322             :                                             // zero. We can detect this: if
     323             :                                             // there are non-null values, then
     324             :                                             // distinct_count() should be > 0.
     325           0 :                                             if (colStats->distinct_count() ==
     326           0 :                                                     0 &&
     327           0 :                                                 colStats->num_values() > 0)
     328             :                                             {
     329           0 :                                                 bFound = false;
     330           0 :                                                 break;
     331             :                                             }
     332           0 :                                             sField.Integer64 +=
     333           0 :                                                 colStats->distinct_count();
     334           0 :                                             bFound = true;
     335             :                                         }
     336             :                                         else
     337             :                                         {
     338           1 :                                             bFound = false;
     339           1 :                                             break;
     340             :                                         }
     341             :                                     }
     342             :                                 }
     343             :                                 else
     344             :                                 {
     345           1 :                                     eType = OFTInteger64;
     346           1 :                                     sField.Integer64 = 0;
     347           1 :                                     bFound = true;
     348           3 :                                     for (int iGroup = 0; iGroup < numRowGroups;
     349             :                                          iGroup++)
     350             :                                     {
     351             :                                         const auto columnChunk =
     352           2 :                                             metadata->RowGroup(iGroup)
     353           4 :                                                 ->ColumnChunk(iCol);
     354             :                                         const auto colStats =
     355           4 :                                             columnChunk->statistics();
     356           4 :                                         if (columnChunk->is_stats_set() &&
     357           2 :                                             colStats)
     358             :                                         {
     359           2 :                                             sField.Integer64 +=
     360           2 :                                                 colStats->num_values();
     361             :                                         }
     362             :                                         else
     363             :                                         {
     364           0 :                                             bFound = false;
     365             :                                         }
     366             :                                     }
     367             :                                 }
     368             :                             }
     369             :                         }
     370             :                         else
     371             :                         {
     372           1 :                             CPLDebug("PARQUET",
     373             :                                      "Statistics not available for field %s",
     374             :                                      pszFieldName);
     375             :                         }
     376             :                     }
     377          33 :                     if (!bFound)
     378             :                     {
     379           2 :                         break;
     380             :                     }
     381             : 
     382          31 :                     if (poMemLayer == nullptr)
     383             :                     {
     384           3 :                         poMemLayer =
     385           3 :                             new OGRMemLayer("SELECT", nullptr, wkbNone);
     386             :                         OGRFeature *poFeature =
     387           3 :                             new OGRFeature(poMemLayer->GetLayerDefn());
     388           3 :                         CPL_IGNORE_RET_VAL(
     389           3 :                             poMemLayer->CreateFeature(poFeature));
     390           3 :                         delete poFeature;
     391             :                     }
     392             : 
     393             :                     const char *pszMinMaxFieldName =
     394          31 :                         oSelect.column_defs[i].field_alias
     395          31 :                             ? oSelect.column_defs[i].field_alias
     396          21 :                             : CPLSPrintf("%s_%s",
     397             :                                          (col_func == SWQCF_MIN)   ? "MIN"
     398           3 :                                          : (col_func == SWQCF_MAX) ? "MAX"
     399             :                                                                    : "COUNT",
     400          18 :                                          oSelect.column_defs[i].field_name);
     401          62 :                     OGRFieldDefn oFieldDefn(pszMinMaxFieldName, eType);
     402          31 :                     oFieldDefn.SetSubType(eSubType);
     403          31 :                     poMemLayer->CreateField(&oFieldDefn);
     404             : 
     405          31 :                     OGRFeature *poFeature = poMemLayer->GetFeature(0);
     406          31 :                     poFeature->SetField(oFieldDefn.GetNameRef(), &sField);
     407          31 :                     CPL_IGNORE_RET_VAL(poMemLayer->SetFeature(poFeature));
     408          31 :                     delete poFeature;
     409             :                 }
     410          16 :                 if (i != oSelect.result_columns())
     411             :                 {
     412          13 :                     delete poMemLayer;
     413             :                 }
     414             :                 else
     415             :                 {
     416           3 :                     CPLDebug("PARQUET",
     417             :                              "Using optimized MIN/MAX/COUNT implementation");
     418           3 :                     return poMemLayer;
     419             :                 }
     420             :             }
     421             :         }
     422             :     }
     423             : 
     424          49 :     return GDALDataset::ExecuteSQL(pszSQLCommand, poSpatialFilter, pszDialect);
     425             : }
     426             : 
     427             : /***********************************************************************/
     428             : /*                           ReleaseResultSet()                        */
     429             : /***********************************************************************/
     430             : 
     431          42 : void OGRParquetDataset::ReleaseResultSet(OGRLayer *poResultsSet)
     432             : {
     433          42 :     delete poResultsSet;
     434          42 : }
     435             : 
     436             : /************************************************************************/
     437             : /*                           TestCapability()                           */
     438             : /************************************************************************/
     439             : 
     440          71 : int OGRParquetDataset::TestCapability(const char *pszCap) const
     441             : 
     442             : {
     443          71 :     if (EQUAL(pszCap, ODsCZGeometries))
     444           7 :         return true;
     445          64 :     else if (EQUAL(pszCap, ODsCMeasuredGeometries))
     446          14 :         return true;
     447             : 
     448          50 :     return false;
     449             : }

Generated by: LCOV version 1.14