Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #ifndef OGR_PARQUET_H
14 : #define OGR_PARQUET_H
15 :
16 : #include "ogrsf_frmts.h"
17 :
18 : #include "cpl_json.h"
19 :
20 : #include <functional>
21 : #include <map>
22 : #include <set>
23 :
24 : #include "../arrow_common/ogr_arrow.h"
25 : #include "ogr_include_parquet.h"
26 :
27 : constexpr int DEFAULT_COMPRESSION_LEVEL = -1;
28 : constexpr int OGR_PARQUET_ZSTD_DEFAULT_COMPRESSION_LEVEL = 9;
29 :
30 : /************************************************************************/
31 : /* OGRParquetLayerBase */
32 : /************************************************************************/
33 :
34 : class OGRParquetDataset;
35 :
36 : class OGRParquetLayerBase CPL_NON_FINAL : public OGRArrowLayer
37 : {
38 : OGRParquetLayerBase(const OGRParquetLayerBase &) = delete;
39 : OGRParquetLayerBase &operator=(const OGRParquetLayerBase &) = delete;
40 :
41 : protected:
42 : OGRParquetLayerBase(OGRParquetDataset *poDS, const char *pszLayerName,
43 : CSLConstList papszOpenOptions);
44 :
45 : OGRParquetDataset *m_poDS = nullptr;
46 : std::shared_ptr<arrow::RecordBatchReader> m_poRecordBatchReader{};
47 : CPLStringList m_aosGeomPossibleNames{};
48 : std::string m_osCRS{};
49 :
50 : #if PARQUET_VERSION_MAJOR >= 21
51 : std::set<int>
52 : m_geoStatsWithBBOXAvailable{}; // key is index of OGR geometry column
53 : #endif
54 :
55 : void LoadGeoMetadata(
56 : const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata);
57 : bool DealWithGeometryColumn(
58 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
59 : std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun,
60 : const parquet::ColumnDescriptor *parquetColumn,
61 : const parquet::FileMetaData *metadata, int iColumn);
62 :
63 : void InvalidateCachedBatches() override;
64 :
65 : static bool ParseGeometryColumnCovering(const CPLJSONObject &oJSONDef,
66 : std::string &osBBOXColumn,
67 : std::string &osXMin,
68 : std::string &osYMin,
69 : std::string &osXMax,
70 : std::string &osYMax);
71 :
72 : public:
73 : int TestCapability(const char *) const override;
74 :
75 : void ResetReading() override;
76 :
77 : GDALDataset *GetDataset() override;
78 :
79 : static int GetNumCPUs();
80 : };
81 :
82 : /************************************************************************/
83 : /* OGRParquetLayer */
84 : /************************************************************************/
85 :
86 : class OGRParquetLayer final : public OGRParquetLayerBase
87 :
88 : {
89 : std::unique_ptr<parquet::arrow::FileReader> m_poArrowReader{};
90 : bool m_bSingleBatch = false;
91 : int m_iFIDParquetColumn = -1;
92 : std::shared_ptr<arrow::DataType> m_poFIDType{};
93 : std::vector<std::shared_ptr<arrow::DataType>>
94 : m_apoArrowDataTypes{}; // .size() == field ocunt
95 : std::vector<int> m_anMapFieldIndexToParquetColumn{};
96 : std::vector<std::vector<int>> m_anMapGeomFieldIndexToParquetColumns{};
97 : bool m_bHasMissingMappingToParquet = false;
98 :
99 : //! Contains pairs of (selected feature idx, total feature idx) break points.
100 : std::vector<std::pair<int64_t, int64_t>> m_asFeatureIdxRemapping{};
101 : //! Iterator over m_asFeatureIdxRemapping
102 : std::vector<std::pair<int64_t, int64_t>>::iterator
103 : m_oFeatureIdxRemappingIter{};
104 : //! Feature index among the potentially restricted set of selected row groups
105 : int64_t m_nFeatureIdxSelected = 0;
106 : std::vector<int> m_anRequestedParquetColumns{}; // only valid when
107 : // m_bIgnoredFields is set
108 : CPLStringList m_aosFeatherMetadata{};
109 :
110 : //! Describe the bbox column of a geometry column
111 : struct GeomColBBOXParquet
112 : {
113 : int iParquetXMin = -1;
114 : int iParquetYMin = -1;
115 : int iParquetXMax = -1;
116 : int iParquetYMax = -1;
117 : std::vector<int> anParquetCols{};
118 : };
119 :
120 : //! Map from OGR geometry field index to GeomColBBOXParquet
121 : std::map<int, GeomColBBOXParquet>
122 : m_oMapGeomFieldIndexToGeomColBBOXParquet{};
123 :
124 : void EstablishFeatureDefn();
125 : void ProcessGeometryColumnCovering(
126 : const std::shared_ptr<arrow::Field> &field,
127 : const CPLJSONObject &oJSONGeometryColumn,
128 : const std::map<std::string, int> &oMapParquetColumnNameToIdx);
129 : bool CreateRecordBatchReader(int iStartingRowGroup);
130 : bool CreateRecordBatchReader(const std::vector<int> &anRowGroups);
131 : bool ReadNextBatch() override;
132 :
133 : void InvalidateCachedBatches() override;
134 :
135 : OGRwkbGeometryType ComputeGeometryColumnType(int iGeomCol,
136 : int iParquetCol) const;
137 : void CreateFieldFromSchema(
138 : const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
139 : int &iParquetCol, const std::vector<int> &path,
140 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
141 : &oMapFieldNameToGDALSchemaFieldDefn);
142 : bool CheckMatchArrowParquetColumnNames(
143 : int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const;
144 : OGRFeature *GetFeatureExplicitFID(GIntBig nFID);
145 : OGRFeature *GetFeatureByIndex(GIntBig nFID);
146 :
147 1353 : std::string GetDriverUCName() const override
148 : {
149 1353 : return "PARQUET";
150 : }
151 :
152 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
153 :
154 : void IncrFeatureIdx() override;
155 :
156 : public:
157 : OGRParquetLayer(OGRParquetDataset *poDS, const char *pszLayerName,
158 : std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
159 : CSLConstList papszOpenOptions);
160 :
161 : void ResetReading() override;
162 : OGRFeature *GetFeature(GIntBig nFID) override;
163 : GIntBig GetFeatureCount(int bForce) override;
164 : int TestCapability(const char *pszCap) const override;
165 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
166 : const char *GetMetadataItem(const char *pszName,
167 : const char *pszDomain = "") override;
168 : char **GetMetadata(const char *pszDomain = "") override;
169 : OGRErr SetNextByIndex(GIntBig nIndex) override;
170 :
171 : bool GetArrowStream(struct ArrowArrayStream *out_stream,
172 : CSLConstList papszOptions = nullptr) override;
173 :
174 : std::unique_ptr<OGRFieldDomain> BuildDomain(const std::string &osDomainName,
175 : int iFieldIndex) const override;
176 :
177 1297 : parquet::arrow::FileReader *GetReader() const
178 : {
179 1297 : return m_poArrowReader.get();
180 : }
181 :
182 306 : const std::vector<int> &GetMapFieldIndexToParquetColumn() const
183 : {
184 306 : return m_anMapFieldIndexToParquetColumn;
185 : }
186 :
187 : const std::vector<std::shared_ptr<arrow::DataType>> &
188 253 : GetArrowFieldTypes() const
189 : {
190 253 : return m_apoArrowDataTypes;
191 : }
192 :
193 2 : int GetFIDParquetColumn() const
194 : {
195 2 : return m_iFIDParquetColumn;
196 : }
197 :
198 : static constexpr int OGR_FID_INDEX = -2;
199 : bool GetMinMaxForOGRField(int iRowGroup, // -1 for all
200 : int iOGRField, // or OGR_FID_INDEX
201 : bool bComputeMin, OGRField &sMin, bool &bFoundMin,
202 : bool bComputeMax, OGRField &sMax, bool &bFoundMax,
203 : OGRFieldType &eType, OGRFieldSubType &eSubType,
204 : std::string &osMinTmp,
205 : std::string &osMaxTmp) const;
206 :
207 : bool GetMinMaxForParquetCol(int iRowGroup, // -1 for all
208 : int iCol,
209 : const std::shared_ptr<arrow::DataType>
210 : &arrowType, // potentially nullptr
211 : bool bComputeMin, OGRField &sMin,
212 : bool &bFoundMin, bool bComputeMax,
213 : OGRField &sMax, bool &bFoundMax,
214 : OGRFieldType &eType, OGRFieldSubType &eSubType,
215 : std::string &osMinTmp,
216 : std::string &osMaxTmp) const;
217 :
218 : bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin,
219 : int &iParquetXMax, int &iParquetYMax) const;
220 : };
221 :
222 : /************************************************************************/
223 : /* OGRParquetDatasetLayer */
224 : /************************************************************************/
225 :
226 : #ifdef GDAL_USE_ARROWDATASET
227 :
228 : class OGRParquetDatasetLayer final : public OGRParquetLayerBase
229 : {
230 : bool m_bIsVSI = false;
231 : bool m_bRebuildScanner = true;
232 : bool m_bSkipFilterGeometry = false;
233 : std::shared_ptr<arrow::dataset::Dataset> m_poDataset{};
234 : std::shared_ptr<arrow::dataset::Scanner> m_poScanner{};
235 : std::vector<std::string> m_aosProjectedFields{};
236 :
237 : void EstablishFeatureDefn();
238 : void
239 : ProcessGeometryColumnCovering(const std::shared_ptr<arrow::Field> &field,
240 : const CPLJSONObject &oJSONGeometryColumn);
241 :
242 : void BuildScanner();
243 :
244 : //! Translate a OGR SQL expression into an Arrow one
245 : // bFullyTranslated should be set to true before calling this method.
246 : arrow::compute::Expression BuildArrowFilter(const swq_expr_node *poNode,
247 : bool &bFullyTranslated);
248 :
249 : protected:
250 311 : std::string GetDriverUCName() const override
251 : {
252 311 : return "PARQUET";
253 : }
254 :
255 : bool ReadNextBatch() override;
256 :
257 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
258 :
259 : public:
260 : OGRParquetDatasetLayer(
261 : OGRParquetDataset *poDS, const char *pszLayerName, bool bIsVSI,
262 : const std::shared_ptr<arrow::dataset::Dataset> &dataset,
263 : CSLConstList papszOpenOptions);
264 :
265 : OGRFeature *GetNextFeature() override;
266 :
267 : GIntBig GetFeatureCount(int bForce) override;
268 : OGRErr IGetExtent(int iGeomField, OGREnvelope *psExtent,
269 : bool bForce) override;
270 :
271 : OGRErr ISetSpatialFilter(int iGeomField,
272 : const OGRGeometry *poGeom) override;
273 :
274 : OGRErr SetAttributeFilter(const char *pszFilter) override;
275 :
276 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
277 :
278 : int TestCapability(const char *) const override;
279 :
280 : // TODO
281 : std::unique_ptr<OGRFieldDomain>
282 0 : BuildDomain(const std::string & /*osDomainName*/,
283 : int /*iFieldIndex*/) const override
284 : {
285 0 : return nullptr;
286 : }
287 : };
288 :
289 : #endif
290 :
291 : /************************************************************************/
292 : /* OGRParquetDataset */
293 : /************************************************************************/
294 :
295 : class OGRParquetDataset final : public OGRArrowDataset
296 : {
297 : std::shared_ptr<arrow::fs::FileSystem> m_poFS{};
298 :
299 : public:
300 : explicit OGRParquetDataset(
301 : const std::shared_ptr<arrow::MemoryPool> &poMemoryPool);
302 : ~OGRParquetDataset() override;
303 :
304 : CPLErr Close() override;
305 :
306 : OGRLayer *ExecuteSQL(const char *pszSQLCommand,
307 : OGRGeometry *poSpatialFilter,
308 : const char *pszDialect) override;
309 : void ReleaseResultSet(OGRLayer *poResultsSet) override;
310 :
311 : int TestCapability(const char *) const override;
312 :
313 274 : void SetFileSystem(const std::shared_ptr<arrow::fs::FileSystem> &fs)
314 : {
315 274 : m_poFS = fs;
316 274 : }
317 : };
318 :
319 : /************************************************************************/
320 : /* OGRParquetWriterLayer */
321 : /************************************************************************/
322 :
323 : class OGRParquetWriterDataset;
324 :
325 : class OGRParquetWriterLayer final : public OGRArrowWriterLayer
326 : {
327 : OGRParquetWriterLayer(const OGRParquetWriterLayer &) = delete;
328 : OGRParquetWriterLayer &operator=(const OGRParquetWriterLayer &) = delete;
329 :
330 : OGRParquetWriterDataset *m_poDataset = nullptr;
331 : std::unique_ptr<parquet::arrow::FileWriter> m_poFileWriter{};
332 : std::shared_ptr<const arrow::KeyValueMetadata> m_poKeyValueMetadata{};
333 : bool m_bForceCounterClockwiseOrientation = false;
334 : parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{};
335 :
336 : //! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode
337 : std::unique_ptr<GDALDataset> m_poTmpGPKG{};
338 : //! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode
339 : OGRLayer *m_poTmpGPKGLayer = nullptr;
340 : //! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode
341 : GIntBig m_nTmpFeatureCount = 0;
342 :
343 : //! Whether to write "geo" footer metadata;
344 : bool m_bWriteGeoMetadata = true;
345 :
346 702 : bool IsFileWriterCreated() const override
347 : {
348 702 : return m_poFileWriter != nullptr;
349 : }
350 :
351 : void CreateWriter() override;
352 : bool CloseFileWriter() override;
353 :
354 : void CreateSchema() override;
355 : void PerformStepsBeforeFinalFlushGroup() override;
356 :
357 : bool FlushGroup() override;
358 :
359 301 : std::string GetDriverUCName() const override
360 : {
361 301 : return "PARQUET";
362 : }
363 :
364 : virtual bool
365 : IsSupportedGeometryType(OGRwkbGeometryType eGType) const override;
366 :
367 : virtual void FixupWKBGeometryBeforeWriting(GByte *pabyWKB,
368 : size_t nLen) override;
369 : void FixupGeometryBeforeWriting(OGRGeometry *poGeom) override;
370 :
371 26 : bool IsSRSRequired() const override
372 : {
373 26 : return false;
374 : }
375 :
376 : std::string GetGeoMetadata() const;
377 :
378 : //! Copy temporary GeoPackage layer to final Parquet file
379 : bool CopyTmpGpkgLayerToFinalFile();
380 :
381 : public:
382 : OGRParquetWriterLayer(
383 : OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool,
384 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
385 : const char *pszLayerName);
386 :
387 : CPLErr SetMetadata(char **papszMetadata, const char *pszDomain) override;
388 :
389 : bool SetOptions(CSLConstList papszOptions,
390 : const OGRSpatialReference *poSpatialRef,
391 : OGRwkbGeometryType eGType);
392 :
393 : OGRErr CreateGeomField(const OGRGeomFieldDefn *poField,
394 : int bApproxOK = TRUE) override;
395 :
396 : int TestCapability(const char *pszCap) const override;
397 : #if PARQUET_VERSION_MAJOR <= 10
398 : // Parquet <= 10 doesn't support the WriteRecordBatch() API
399 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
400 : CSLConstList papszOptions,
401 : std::string &osErrorMsg) const override
402 : {
403 : return OGRLayer::IsArrowSchemaSupported(schema, papszOptions,
404 : osErrorMsg);
405 : }
406 :
407 : bool
408 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
409 : CSLConstList papszOptions = nullptr) override
410 : {
411 : return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions);
412 : }
413 :
414 : bool WriteArrowBatch(const struct ArrowSchema *schema,
415 : struct ArrowArray *array,
416 : CSLConstList papszOptions = nullptr) override
417 : {
418 : return OGRLayer::WriteArrowBatch(schema, array, papszOptions);
419 : }
420 : #else
421 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
422 : CSLConstList papszOptions,
423 : std::string &osErrorMsg) const override;
424 : bool
425 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
426 : CSLConstList papszOptions = nullptr) override;
427 : bool WriteArrowBatch(const struct ArrowSchema *schema,
428 : struct ArrowArray *array,
429 : CSLConstList papszOptions = nullptr) override;
430 : #endif
431 :
432 : GDALDataset *GetDataset() override;
433 :
434 : protected:
435 : OGRErr ICreateFeature(OGRFeature *poFeature) override;
436 :
437 : friend class OGRParquetWriterDataset;
438 : bool Close();
439 : };
440 :
441 : /************************************************************************/
442 : /* OGRParquetWriterDataset */
443 : /************************************************************************/
444 :
445 : class OGRParquetWriterDataset final : public GDALPamDataset
446 : {
447 : std::unique_ptr<arrow::MemoryPool> m_poMemoryPool{};
448 : std::unique_ptr<OGRParquetWriterLayer> m_poLayer{};
449 : std::shared_ptr<arrow::io::OutputStream> m_poOutputStream{};
450 :
451 : public:
452 : explicit OGRParquetWriterDataset(
453 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream);
454 :
455 : ~OGRParquetWriterDataset() override;
456 :
457 : arrow::MemoryPool *GetMemoryPool() const
458 : {
459 : return m_poMemoryPool.get();
460 : }
461 :
462 : CPLErr Close() override;
463 :
464 : int GetLayerCount() const override;
465 : const OGRLayer *GetLayer(int idx) const override;
466 : int TestCapability(const char *pszCap) const override;
467 : std::vector<std::string> GetFieldDomainNames(
468 : CSLConstList /*papszOptions*/ = nullptr) const override;
469 : const OGRFieldDomain *
470 : GetFieldDomain(const std::string &name) const override;
471 : bool AddFieldDomain(std::unique_ptr<OGRFieldDomain> &&domain,
472 : std::string &failureReason) override;
473 :
474 275 : GDALMultiDomainMetadata &GetMultiDomainMetadata()
475 : {
476 275 : return oMDMD;
477 : }
478 :
479 : protected:
480 : OGRLayer *ICreateLayer(const char *pszName,
481 : const OGRGeomFieldDefn *poGeomFieldDefn,
482 : CSLConstList papszOptions) override;
483 : };
484 :
485 : #endif // OGR_PARQUET_H
|