Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "ogr_parquet.h"
14 : #include "memdataset.h"
15 : #include "ogr_swq.h"
16 :
17 : #include "../arrow_common/ograrrowdataset.hpp"
18 : #include "../arrow_common/ograrrowlayer.hpp"
19 : #include "../arrow_common/vsiarrowfilesystem.hpp"
20 :
21 : /************************************************************************/
22 : /* OGRParquetDataset() */
23 : /************************************************************************/
24 :
25 1164 : OGRParquetDataset::OGRParquetDataset()
26 1164 : : OGRArrowDataset(arrow::MemoryPool::CreateDefault())
27 : {
28 1164 : }
29 :
30 : /************************************************************************/
31 : /* ~OGRParquetDataset() */
32 : /************************************************************************/
33 :
34 2322 : OGRParquetDataset::~OGRParquetDataset()
35 : {
36 1161 : OGRParquetDataset::Close();
37 2322 : }
38 :
39 : /************************************************************************/
40 : /* Close() */
41 : /************************************************************************/
42 :
43 2303 : CPLErr OGRParquetDataset::Close()
44 : {
45 2303 : CPLErr eErr = CE_None;
46 2303 : if (nOpenFlags != OPEN_FLAGS_CLOSED)
47 : {
48 : // libarrow might continue to do I/O in auxiliary threads on the underlying
49 : // files when using the arrow::dataset API even after we closed the dataset.
50 : // This is annoying as it can cause crashes when closing GDAL, in particular
51 : // the virtual file manager, as this could result in VSI files being
52 : // accessed after their VSIVirtualFileSystem has been destroyed, resulting
53 : // in crashes. The workaround is to make sure that VSIArrowFileSystem
54 : // waits for all file handles it is aware of to have been destroyed.
55 1161 : eErr = OGRArrowDataset::Close();
56 :
57 2322 : auto poFS = std::dynamic_pointer_cast<VSIArrowFileSystem>(m_poFS);
58 1161 : if (poFS)
59 266 : poFS->AskToClose();
60 : }
61 :
62 2303 : return eErr;
63 : }
64 :
65 : /***********************************************************************/
66 : /* CreateReaderLayer() */
67 : /***********************************************************************/
68 :
69 : std::unique_ptr<OGRParquetLayer>
70 894 : OGRParquetDataset::CreateReaderLayer(const std::string &osFilename,
71 : VSILFILE *&fpIn,
72 : CSLConstList papszOpenOptionsIn)
73 : {
74 : try
75 : {
76 894 : std::shared_ptr<arrow::io::RandomAccessFile> infile;
77 1393 : if (STARTS_WITH(osFilename.c_str(), "/vsi") ||
78 499 : CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_VSI", "NO")))
79 : {
80 395 : VSIVirtualHandleUniquePtr fp(fpIn);
81 395 : fpIn = nullptr;
82 395 : if (fp == nullptr)
83 : {
84 0 : fp.reset(VSIFOpenL(osFilename.c_str(), "rb"));
85 0 : if (fp == nullptr)
86 0 : return nullptr;
87 : }
88 790 : infile = std::make_shared<OGRArrowRandomAccessFile>(osFilename,
89 790 : std::move(fp));
90 : }
91 : else
92 : {
93 499 : PARQUET_ASSIGN_OR_THROW(infile,
94 : arrow::io::ReadableFile::Open(osFilename));
95 : }
96 :
97 : // Open Parquet file reader
98 894 : std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
99 :
100 894 : const int nNumCPUs = OGRParquetLayerBase::GetNumCPUs();
101 : const char *pszUseThreads =
102 894 : CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr);
103 894 : if (!pszUseThreads && nNumCPUs > 1)
104 : {
105 894 : pszUseThreads = "YES";
106 : }
107 894 : const bool bUseThreads = pszUseThreads && CPLTestBool(pszUseThreads);
108 :
109 : const char *pszParquetBatchSize =
110 894 : CPLGetConfigOption("OGR_PARQUET_BATCH_SIZE", nullptr);
111 :
112 894 : auto poMemoryPool = GetMemoryPool();
113 : #if ARROW_VERSION_MAJOR >= 21
114 : parquet::arrow::FileReaderBuilder fileReaderBuilder;
115 : {
116 : auto st = fileReaderBuilder.Open(std::move(infile));
117 : if (!st.ok())
118 : {
119 : CPLError(CE_Failure, CPLE_AppDefined,
120 : "parquet::arrow::FileReaderBuilder::Open() failed: %s",
121 : st.message().c_str());
122 : return nullptr;
123 : }
124 : }
125 : fileReaderBuilder.memory_pool(poMemoryPool);
126 : parquet::ArrowReaderProperties fileReaderProperties;
127 : fileReaderProperties.set_arrow_extensions_enabled(CPLTestBool(
128 : CPLGetConfigOption("OGR_PARQUET_ENABLE_ARROW_EXTENSIONS", "YES")));
129 : if (pszParquetBatchSize)
130 : {
131 : fileReaderProperties.set_batch_size(
132 : CPLAtoGIntBig(pszParquetBatchSize));
133 : }
134 : if (bUseThreads)
135 : {
136 : fileReaderProperties.set_use_threads(true);
137 : }
138 : fileReaderBuilder.properties(fileReaderProperties);
139 : {
140 : auto res = fileReaderBuilder.Build();
141 : if (!res.ok())
142 : {
143 : CPLError(
144 : CE_Failure, CPLE_AppDefined,
145 : "parquet::arrow::FileReaderBuilder::Build() failed: %s",
146 : res.status().message().c_str());
147 : return nullptr;
148 : }
149 : arrow_reader = std::move(*res);
150 : }
151 : #elif ARROW_VERSION_MAJOR >= 19
152 1789 : PARQUET_ASSIGN_OR_THROW(
153 : arrow_reader,
154 : parquet::arrow::OpenFile(std::move(infile), poMemoryPool));
155 : #else
156 : auto st = parquet::arrow::OpenFile(std::move(infile), poMemoryPool,
157 : &arrow_reader);
158 : if (!st.ok())
159 : {
160 : CPLError(CE_Failure, CPLE_AppDefined,
161 : "parquet::arrow::OpenFile() failed: %s",
162 : st.message().c_str());
163 : return nullptr;
164 : }
165 : #endif
166 :
167 : #if ARROW_VERSION_MAJOR < 21
168 893 : if (pszParquetBatchSize)
169 : {
170 5 : arrow_reader->set_batch_size(CPLAtoGIntBig(pszParquetBatchSize));
171 : }
172 :
173 893 : if (bUseThreads)
174 : {
175 893 : arrow_reader->set_use_threads(true);
176 : }
177 : #endif
178 :
179 : return std::make_unique<OGRParquetLayer>(
180 1786 : this, CPLGetBasenameSafe(osFilename.c_str()).c_str(),
181 1786 : std::move(arrow_reader), papszOpenOptionsIn);
182 : }
183 2 : catch (const std::exception &e)
184 : {
185 1 : CPLError(CE_Failure, CPLE_AppDefined, "Parquet exception: %s",
186 1 : e.what());
187 1 : return nullptr;
188 : }
189 : }
190 :
191 : /***********************************************************************/
192 : /* ExecuteSQL() */
193 : /***********************************************************************/
194 :
195 52 : OGRLayer *OGRParquetDataset::ExecuteSQL(const char *pszSQLCommand,
196 : OGRGeometry *poSpatialFilter,
197 : const char *pszDialect)
198 : {
199 : /* -------------------------------------------------------------------- */
200 : /* Special cases for SQL optimizations */
201 : /* -------------------------------------------------------------------- */
202 52 : if (STARTS_WITH_CI(pszSQLCommand, "SELECT ") &&
203 10 : (pszDialect == nullptr || EQUAL(pszDialect, "") ||
204 0 : EQUAL(pszDialect, "OGRSQL")))
205 : {
206 45 : swq_select oSelect;
207 45 : if (oSelect.preparse(pszSQLCommand) != CE_None)
208 0 : return nullptr;
209 :
210 : /* --------------------------------------------------------------------
211 : */
212 : /* MIN/MAX/COUNT optimization */
213 : /* --------------------------------------------------------------------
214 : */
215 45 : if (oSelect.join_count == 0 && oSelect.poOtherSelect == nullptr &&
216 45 : oSelect.table_count == 1 && oSelect.order_specs == 0 &&
217 45 : oSelect.query_mode != SWQM_DISTINCT_LIST &&
218 119 : oSelect.where_expr == nullptr &&
219 29 : CPLTestBool(
220 : CPLGetConfigOption("OGR_PARQUET_USE_STATISTICS", "YES")))
221 : {
222 1 : auto poLayer = dynamic_cast<OGRParquetLayer *>(
223 29 : GetLayerByName(oSelect.table_defs[0].table_name));
224 29 : if (poLayer)
225 : {
226 16 : OGRMemLayer *poMemLayer = nullptr;
227 16 : const auto poLayerDefn = poLayer->GetLayerDefn();
228 :
229 16 : int i = 0; // Used after for.
230 47 : for (; i < oSelect.result_columns(); i++)
231 : {
232 44 : swq_col_func col_func = oSelect.column_defs[i].col_func;
233 44 : if (!(col_func == SWQCF_MIN || col_func == SWQCF_MAX ||
234 : col_func == SWQCF_COUNT))
235 13 : break;
236 :
237 : const char *pszFieldName =
238 37 : oSelect.column_defs[i].field_name;
239 37 : if (pszFieldName == nullptr)
240 0 : break;
241 37 : if (oSelect.column_defs[i].target_type != SWQ_OTHER)
242 0 : break;
243 :
244 : const int iOGRField =
245 37 : (EQUAL(pszFieldName, poLayer->GetFIDColumn()) &&
246 2 : pszFieldName[0])
247 39 : ? OGRParquetLayer::OGR_FID_INDEX
248 35 : : poLayerDefn->GetFieldIndex(pszFieldName);
249 37 : if (iOGRField < 0 &&
250 : iOGRField != OGRParquetLayer::OGR_FID_INDEX)
251 4 : break;
252 :
253 : OGRField sField;
254 33 : OGR_RawField_SetNull(&sField);
255 33 : OGRFieldType eType = OFTReal;
256 33 : OGRFieldSubType eSubType = OFSTNone;
257 : const int iCol =
258 : iOGRField == OGRParquetLayer::OGR_FID_INDEX
259 64 : ? poLayer->GetFIDParquetColumn()
260 31 : : poLayer->GetMapFieldIndexToParquetColumn()
261 31 : [iOGRField];
262 33 : if (iCol < 0)
263 0 : break;
264 : const auto metadata =
265 33 : poLayer->GetReader()->parquet_reader()->metadata();
266 33 : const auto numRowGroups = metadata->num_row_groups();
267 33 : bool bFound = false;
268 33 : std::string sVal;
269 :
270 33 : if (numRowGroups > 0)
271 : {
272 : const auto rowGroup0columnChunk =
273 66 : metadata->RowGroup(0)->ColumnChunk(iCol);
274 : const auto rowGroup0Stats =
275 66 : rowGroup0columnChunk->statistics();
276 65 : if (rowGroup0columnChunk->is_stats_set() &&
277 32 : rowGroup0Stats)
278 : {
279 : OGRField sFieldDummy;
280 : bool bFoundDummy;
281 64 : std::string sValDummy;
282 :
283 32 : if (col_func == SWQCF_MIN)
284 : {
285 15 : CPL_IGNORE_RET_VAL(
286 15 : poLayer->GetMinMaxForOGRField(
287 : /* iRowGroup=*/-1, // -1 for all
288 : iOGRField, true, sField, bFound, false,
289 : sFieldDummy, bFoundDummy, eType,
290 : eSubType, sVal, sValDummy));
291 : }
292 17 : else if (col_func == SWQCF_MAX)
293 : {
294 15 : CPL_IGNORE_RET_VAL(
295 15 : poLayer->GetMinMaxForOGRField(
296 : /* iRowGroup=*/-1, // -1 for all
297 : iOGRField, false, sFieldDummy,
298 : bFoundDummy, true, sField, bFound,
299 : eType, eSubType, sValDummy, sVal));
300 : }
301 2 : else if (col_func == SWQCF_COUNT)
302 : {
303 2 : if (oSelect.column_defs[i].distinct_flag)
304 : {
305 1 : eType = OFTInteger64;
306 1 : sField.Integer64 = 0;
307 1 : for (int iGroup = 0; iGroup < numRowGroups;
308 : iGroup++)
309 : {
310 : const auto columnChunk =
311 1 : metadata->RowGroup(iGroup)
312 1 : ->ColumnChunk(iCol);
313 : const auto colStats =
314 1 : columnChunk->statistics();
315 2 : if (columnChunk->is_stats_set() &&
316 2 : colStats &&
317 1 : colStats->HasDistinctCount())
318 : {
319 : // Statistics generated by arrow-cpp
320 : // Parquet writer seem to be buggy,
321 : // as distinct_count() is always
322 : // zero. We can detect this: if
323 : // there are non-null values, then
324 : // distinct_count() should be > 0.
325 0 : if (colStats->distinct_count() ==
326 0 : 0 &&
327 0 : colStats->num_values() > 0)
328 : {
329 0 : bFound = false;
330 0 : break;
331 : }
332 0 : sField.Integer64 +=
333 0 : colStats->distinct_count();
334 0 : bFound = true;
335 : }
336 : else
337 : {
338 1 : bFound = false;
339 1 : break;
340 : }
341 : }
342 : }
343 : else
344 : {
345 1 : eType = OFTInteger64;
346 1 : sField.Integer64 = 0;
347 1 : bFound = true;
348 3 : for (int iGroup = 0; iGroup < numRowGroups;
349 : iGroup++)
350 : {
351 : const auto columnChunk =
352 2 : metadata->RowGroup(iGroup)
353 4 : ->ColumnChunk(iCol);
354 : const auto colStats =
355 4 : columnChunk->statistics();
356 4 : if (columnChunk->is_stats_set() &&
357 2 : colStats)
358 : {
359 2 : sField.Integer64 +=
360 2 : colStats->num_values();
361 : }
362 : else
363 : {
364 0 : bFound = false;
365 : }
366 : }
367 : }
368 : }
369 : }
370 : else
371 : {
372 1 : CPLDebug("PARQUET",
373 : "Statistics not available for field %s",
374 : pszFieldName);
375 : }
376 : }
377 33 : if (!bFound)
378 : {
379 2 : break;
380 : }
381 :
382 31 : if (poMemLayer == nullptr)
383 : {
384 3 : poMemLayer =
385 3 : new OGRMemLayer("SELECT", nullptr, wkbNone);
386 : OGRFeature *poFeature =
387 3 : new OGRFeature(poMemLayer->GetLayerDefn());
388 3 : CPL_IGNORE_RET_VAL(
389 3 : poMemLayer->CreateFeature(poFeature));
390 3 : delete poFeature;
391 : }
392 :
393 : const char *pszMinMaxFieldName =
394 31 : oSelect.column_defs[i].field_alias
395 31 : ? oSelect.column_defs[i].field_alias
396 21 : : CPLSPrintf("%s_%s",
397 : (col_func == SWQCF_MIN) ? "MIN"
398 3 : : (col_func == SWQCF_MAX) ? "MAX"
399 : : "COUNT",
400 18 : oSelect.column_defs[i].field_name);
401 62 : OGRFieldDefn oFieldDefn(pszMinMaxFieldName, eType);
402 31 : oFieldDefn.SetSubType(eSubType);
403 31 : poMemLayer->CreateField(&oFieldDefn);
404 :
405 31 : OGRFeature *poFeature = poMemLayer->GetFeature(0);
406 31 : poFeature->SetField(oFieldDefn.GetNameRef(), &sField);
407 31 : CPL_IGNORE_RET_VAL(poMemLayer->SetFeature(poFeature));
408 31 : delete poFeature;
409 : }
410 16 : if (i != oSelect.result_columns())
411 : {
412 13 : delete poMemLayer;
413 : }
414 : else
415 : {
416 3 : CPLDebug("PARQUET",
417 : "Using optimized MIN/MAX/COUNT implementation");
418 3 : return poMemLayer;
419 : }
420 : }
421 : }
422 : }
423 :
424 49 : return GDALDataset::ExecuteSQL(pszSQLCommand, poSpatialFilter, pszDialect);
425 : }
426 :
427 : /***********************************************************************/
428 : /* ReleaseResultSet() */
429 : /***********************************************************************/
430 :
431 42 : void OGRParquetDataset::ReleaseResultSet(OGRLayer *poResultsSet)
432 : {
433 42 : delete poResultsSet;
434 42 : }
435 :
436 : /************************************************************************/
437 : /* TestCapability() */
438 : /************************************************************************/
439 :
440 71 : int OGRParquetDataset::TestCapability(const char *pszCap) const
441 :
442 : {
443 71 : if (EQUAL(pszCap, ODsCZGeometries))
444 7 : return true;
445 64 : else if (EQUAL(pszCap, ODsCMeasuredGeometries))
446 14 : return true;
447 :
448 50 : return false;
449 : }
|