Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "ogr_parquet.h"
14 : #include "ogr_mem.h"
15 : #include "ogr_swq.h"
16 :
17 : #include "../arrow_common/ograrrowdataset.hpp"
18 : #include "../arrow_common/ograrrowlayer.hpp"
19 : #include "../arrow_common/vsiarrowfilesystem.hpp"
20 :
21 : /************************************************************************/
22 : /* OGRParquetDataset() */
23 : /************************************************************************/
24 :
25 1118 : OGRParquetDataset::OGRParquetDataset(
26 1118 : const std::shared_ptr<arrow::MemoryPool> &poMemoryPool)
27 1118 : : OGRArrowDataset(poMemoryPool)
28 : {
29 1118 : }
30 :
31 : /************************************************************************/
32 : /* ~OGRParquetDataset() */
33 : /************************************************************************/
34 :
35 2236 : OGRParquetDataset::~OGRParquetDataset()
36 : {
37 : // libarrow might continue to do I/O in auxiliary threads on the underlying
38 : // files when using the arrow::dataset API even after we closed the dataset.
39 : // This is annoying as it can cause crashes when closing GDAL, in particular
40 : // the virtual file manager, as this could result in VSI files being
41 : // accessed after their VSIVirtualFileSystem has been destroyed, resulting
42 : // in crashes. The workaround is to make sure that VSIArrowFileSystem
43 : // waits for all file handles it is aware of to have been destroyed.
44 1118 : close();
45 2236 : auto poFS = std::dynamic_pointer_cast<VSIArrowFileSystem>(m_poFS);
46 1118 : if (poFS)
47 264 : poFS->AskToClose();
48 2236 : }
49 :
50 : /***********************************************************************/
51 : /* ExecuteSQL() */
52 : /***********************************************************************/
53 :
54 46 : OGRLayer *OGRParquetDataset::ExecuteSQL(const char *pszSQLCommand,
55 : OGRGeometry *poSpatialFilter,
56 : const char *pszDialect)
57 : {
58 : /* -------------------------------------------------------------------- */
59 : /* Special cases for SQL optimizations */
60 : /* -------------------------------------------------------------------- */
61 46 : if (STARTS_WITH_CI(pszSQLCommand, "SELECT ") &&
62 10 : (pszDialect == nullptr || EQUAL(pszDialect, "") ||
63 0 : EQUAL(pszDialect, "OGRSQL")))
64 : {
65 40 : swq_select oSelect;
66 40 : if (oSelect.preparse(pszSQLCommand) != CE_None)
67 0 : return nullptr;
68 :
69 : /* --------------------------------------------------------------------
70 : */
71 : /* MIN/MAX/COUNT optimization */
72 : /* --------------------------------------------------------------------
73 : */
74 40 : if (oSelect.join_count == 0 && oSelect.poOtherSelect == nullptr &&
75 40 : oSelect.table_count == 1 && oSelect.order_specs == 0 &&
76 40 : oSelect.query_mode != SWQM_DISTINCT_LIST &&
77 106 : oSelect.where_expr == nullptr &&
78 26 : CPLTestBool(
79 : CPLGetConfigOption("OGR_PARQUET_USE_STATISTICS", "YES")))
80 : {
81 1 : auto poLayer = dynamic_cast<OGRParquetLayer *>(
82 26 : GetLayerByName(oSelect.table_defs[0].table_name));
83 26 : if (poLayer)
84 : {
85 16 : OGRMemLayer *poMemLayer = nullptr;
86 16 : const auto poLayerDefn = poLayer->GetLayerDefn();
87 :
88 16 : int i = 0; // Used after for.
89 47 : for (; i < oSelect.result_columns(); i++)
90 : {
91 44 : swq_col_func col_func = oSelect.column_defs[i].col_func;
92 44 : if (!(col_func == SWQCF_MIN || col_func == SWQCF_MAX ||
93 : col_func == SWQCF_COUNT))
94 13 : break;
95 :
96 : const char *pszFieldName =
97 37 : oSelect.column_defs[i].field_name;
98 37 : if (pszFieldName == nullptr)
99 0 : break;
100 37 : if (oSelect.column_defs[i].target_type != SWQ_OTHER)
101 0 : break;
102 :
103 : const int iOGRField =
104 37 : (EQUAL(pszFieldName, poLayer->GetFIDColumn()) &&
105 2 : pszFieldName[0])
106 39 : ? OGRParquetLayer::OGR_FID_INDEX
107 35 : : poLayerDefn->GetFieldIndex(pszFieldName);
108 37 : if (iOGRField < 0 &&
109 : iOGRField != OGRParquetLayer::OGR_FID_INDEX)
110 4 : break;
111 :
112 : OGRField sField;
113 33 : OGR_RawField_SetNull(&sField);
114 33 : OGRFieldType eType = OFTReal;
115 33 : OGRFieldSubType eSubType = OFSTNone;
116 : const int iCol =
117 : iOGRField == OGRParquetLayer::OGR_FID_INDEX
118 64 : ? poLayer->GetFIDParquetColumn()
119 31 : : poLayer->GetMapFieldIndexToParquetColumn()
120 31 : [iOGRField];
121 33 : if (iCol < 0)
122 0 : break;
123 : const auto metadata =
124 33 : poLayer->GetReader()->parquet_reader()->metadata();
125 33 : const auto numRowGroups = metadata->num_row_groups();
126 33 : bool bFound = false;
127 33 : std::string sVal;
128 :
129 33 : if (numRowGroups > 0)
130 : {
131 : const auto rowGroup0columnChunk =
132 66 : metadata->RowGroup(0)->ColumnChunk(iCol);
133 : const auto rowGroup0Stats =
134 66 : rowGroup0columnChunk->statistics();
135 65 : if (rowGroup0columnChunk->is_stats_set() &&
136 32 : rowGroup0Stats)
137 : {
138 : OGRField sFieldDummy;
139 : bool bFoundDummy;
140 64 : std::string sValDummy;
141 :
142 32 : if (col_func == SWQCF_MIN)
143 : {
144 15 : CPL_IGNORE_RET_VAL(
145 15 : poLayer->GetMinMaxForOGRField(
146 : /* iRowGroup=*/-1, // -1 for all
147 : iOGRField, true, sField, bFound, false,
148 : sFieldDummy, bFoundDummy, eType,
149 : eSubType, sVal, sValDummy));
150 : }
151 17 : else if (col_func == SWQCF_MAX)
152 : {
153 15 : CPL_IGNORE_RET_VAL(
154 15 : poLayer->GetMinMaxForOGRField(
155 : /* iRowGroup=*/-1, // -1 for all
156 : iOGRField, false, sFieldDummy,
157 : bFoundDummy, true, sField, bFound,
158 : eType, eSubType, sValDummy, sVal));
159 : }
160 2 : else if (col_func == SWQCF_COUNT)
161 : {
162 2 : if (oSelect.column_defs[i].distinct_flag)
163 : {
164 1 : eType = OFTInteger64;
165 1 : sField.Integer64 = 0;
166 1 : for (int iGroup = 0; iGroup < numRowGroups;
167 : iGroup++)
168 : {
169 : const auto columnChunk =
170 1 : metadata->RowGroup(iGroup)
171 1 : ->ColumnChunk(iCol);
172 : const auto colStats =
173 1 : columnChunk->statistics();
174 2 : if (columnChunk->is_stats_set() &&
175 2 : colStats &&
176 1 : colStats->HasDistinctCount())
177 : {
178 : // Statistics generated by arrow-cpp
179 : // Parquet writer seem to be buggy,
180 : // as distinct_count() is always
181 : // zero. We can detect this: if
182 : // there are non-null values, then
183 : // distinct_count() should be > 0.
184 0 : if (colStats->distinct_count() ==
185 0 : 0 &&
186 0 : colStats->num_values() > 0)
187 : {
188 0 : bFound = false;
189 0 : break;
190 : }
191 0 : sField.Integer64 +=
192 0 : colStats->distinct_count();
193 0 : bFound = true;
194 : }
195 : else
196 : {
197 1 : bFound = false;
198 1 : break;
199 : }
200 : }
201 : }
202 : else
203 : {
204 1 : eType = OFTInteger64;
205 1 : sField.Integer64 = 0;
206 1 : bFound = true;
207 3 : for (int iGroup = 0; iGroup < numRowGroups;
208 : iGroup++)
209 : {
210 : const auto columnChunk =
211 2 : metadata->RowGroup(iGroup)
212 4 : ->ColumnChunk(iCol);
213 : const auto colStats =
214 4 : columnChunk->statistics();
215 4 : if (columnChunk->is_stats_set() &&
216 2 : colStats)
217 : {
218 2 : sField.Integer64 +=
219 2 : colStats->num_values();
220 : }
221 : else
222 : {
223 0 : bFound = false;
224 : }
225 : }
226 : }
227 : }
228 : }
229 : else
230 : {
231 1 : CPLDebug("PARQUET",
232 : "Statistics not available for field %s",
233 : pszFieldName);
234 : }
235 : }
236 33 : if (!bFound)
237 : {
238 2 : break;
239 : }
240 :
241 31 : if (poMemLayer == nullptr)
242 : {
243 3 : poMemLayer =
244 3 : new OGRMemLayer("SELECT", nullptr, wkbNone);
245 : OGRFeature *poFeature =
246 3 : new OGRFeature(poMemLayer->GetLayerDefn());
247 3 : CPL_IGNORE_RET_VAL(
248 3 : poMemLayer->CreateFeature(poFeature));
249 3 : delete poFeature;
250 : }
251 :
252 : const char *pszMinMaxFieldName =
253 47 : CPLSPrintf("%s_%s",
254 : (col_func == SWQCF_MIN) ? "MIN"
255 16 : : (col_func == SWQCF_MAX) ? "MAX"
256 : : "COUNT",
257 31 : oSelect.column_defs[i].field_name);
258 62 : OGRFieldDefn oFieldDefn(pszMinMaxFieldName, eType);
259 31 : oFieldDefn.SetSubType(eSubType);
260 31 : poMemLayer->CreateField(&oFieldDefn);
261 :
262 31 : OGRFeature *poFeature = poMemLayer->GetFeature(0);
263 31 : poFeature->SetField(oFieldDefn.GetNameRef(), &sField);
264 31 : CPL_IGNORE_RET_VAL(poMemLayer->SetFeature(poFeature));
265 31 : delete poFeature;
266 : }
267 16 : if (i != oSelect.result_columns())
268 : {
269 13 : delete poMemLayer;
270 : }
271 : else
272 : {
273 3 : CPLDebug("PARQUET",
274 : "Using optimized MIN/MAX/COUNT implementation");
275 3 : return poMemLayer;
276 : }
277 : }
278 : }
279 : }
280 :
281 43 : return GDALDataset::ExecuteSQL(pszSQLCommand, poSpatialFilter, pszDialect);
282 : }
283 :
284 : /***********************************************************************/
285 : /* ReleaseResultSet() */
286 : /***********************************************************************/
287 :
288 37 : void OGRParquetDataset::ReleaseResultSet(OGRLayer *poResultsSet)
289 : {
290 37 : delete poResultsSet;
291 37 : }
292 :
293 : /************************************************************************/
294 : /* TestCapability() */
295 : /************************************************************************/
296 :
297 64 : int OGRParquetDataset::TestCapability(const char *pszCap)
298 :
299 : {
300 64 : if (EQUAL(pszCap, ODsCZGeometries))
301 6 : return true;
302 58 : else if (EQUAL(pszCap, ODsCMeasuredGeometries))
303 12 : return true;
304 :
305 46 : return false;
306 : }
|