Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #ifndef OGR_PARQUET_H
14 : #define OGR_PARQUET_H
15 :
16 : #include "ogrsf_frmts.h"
17 :
18 : #include "cpl_json.h"
19 :
20 : #include <functional>
21 : #include <map>
22 : #include <set>
23 :
24 : #include "../arrow_common/ogr_arrow.h"
25 : #include "ogr_include_parquet.h"
26 :
27 : constexpr int DEFAULT_COMPRESSION_LEVEL = -1;
28 : constexpr int OGR_PARQUET_ZSTD_DEFAULT_COMPRESSION_LEVEL = 9;
29 :
30 : /************************************************************************/
31 : /* OGRParquetLayerBase */
32 : /************************************************************************/
33 :
34 : class OGRParquetDataset;
35 :
36 : class OGRParquetLayerBase CPL_NON_FINAL : public OGRArrowLayer
37 : {
38 : OGRParquetLayerBase(const OGRParquetLayerBase &) = delete;
39 : OGRParquetLayerBase &operator=(const OGRParquetLayerBase &) = delete;
40 :
41 : protected:
42 : OGRParquetLayerBase(OGRParquetDataset *poDS, const char *pszLayerName,
43 : CSLConstList papszOpenOptions);
44 :
45 : OGRParquetDataset *m_poDS = nullptr;
46 : std::shared_ptr<arrow::RecordBatchReader> m_poRecordBatchReader{};
47 : CPLStringList m_aosGeomPossibleNames{};
48 : std::string m_osCRS{};
49 :
50 : #if PARQUET_VERSION_MAJOR >= 21
51 : std::set<int>
52 : m_geoStatsWithBBOXAvailable{}; // key is index of OGR geometry column
53 : #endif
54 :
55 : void LoadGeoMetadata(
56 : const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata);
57 : bool DealWithGeometryColumn(
58 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
59 : std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun,
60 : const parquet::ColumnDescriptor *parquetColumn,
61 : const parquet::FileMetaData *fileMetadata, int iColumn);
62 :
63 : #if PARQUET_VERSION_MAJOR >= 21
64 : bool DealWithArrow21GeometryGeographyNativeTypes(
65 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
66 : const parquet::ColumnDescriptor *parquetColumn,
67 : const parquet::FileMetaData *fileMetadata, int iColumn);
68 : #endif
69 :
70 : void InvalidateCachedBatches() override;
71 :
72 : static bool ParseGeometryColumnCovering(const CPLJSONObject &oJSONDef,
73 : std::string &osBBOXColumn,
74 : std::string &osXMin,
75 : std::string &osYMin,
76 : std::string &osXMax,
77 : std::string &osYMax);
78 :
79 : public:
80 : int TestCapability(const char *) const override;
81 :
82 : void ResetReading() override;
83 :
84 : GDALDataset *GetDataset() override;
85 :
86 : static int GetNumCPUs();
87 : };
88 :
89 : /************************************************************************/
90 : /* OGRParquetLayer */
91 : /************************************************************************/
92 :
93 : class OGRParquetLayer final : public OGRParquetLayerBase
94 :
95 : {
96 : std::unique_ptr<parquet::arrow::FileReader> m_poArrowReader{};
97 : bool m_bSingleBatch = false;
98 : int m_iFIDParquetColumn = -1;
99 : std::shared_ptr<arrow::DataType> m_poFIDType{};
100 : std::vector<std::shared_ptr<arrow::DataType>>
101 : m_apoArrowDataTypes{}; // .size() == field ocunt
102 : std::vector<int> m_anMapFieldIndexToParquetColumn{};
103 : std::vector<std::vector<int>> m_anMapGeomFieldIndexToParquetColumns{};
104 : bool m_bHasMissingMappingToParquet = false;
105 :
106 : //! Contains pairs of (selected feature idx, total feature idx) break points.
107 : std::vector<std::pair<int64_t, int64_t>> m_asFeatureIdxRemapping{};
108 : //! Iterator over m_asFeatureIdxRemapping
109 : std::vector<std::pair<int64_t, int64_t>>::iterator
110 : m_oFeatureIdxRemappingIter{};
111 : //! Feature index among the potentially restricted set of selected row groups
112 : int64_t m_nFeatureIdxSelected = 0;
113 : std::vector<int> m_anRequestedParquetColumns{}; // only valid when
114 : // m_bIgnoredFields is set
115 : CPLStringList m_aosFeatherMetadata{};
116 :
117 : //! Describe the bbox column of a geometry column
118 : struct GeomColBBOXParquet
119 : {
120 : int iParquetXMin = -1;
121 : int iParquetYMin = -1;
122 : int iParquetXMax = -1;
123 : int iParquetYMax = -1;
124 : std::vector<int> anParquetCols{};
125 : };
126 :
127 : //! Map from OGR geometry field index to GeomColBBOXParquet
128 : std::map<int, GeomColBBOXParquet>
129 : m_oMapGeomFieldIndexToGeomColBBOXParquet{};
130 :
131 : //! GDAL creation options that were used to create the file (if done by GDAL)
132 : CPLStringList m_aosCreationOptions{};
133 :
134 : void EstablishFeatureDefn();
135 : void ProcessGeometryColumnCovering(
136 : const std::shared_ptr<arrow::Field> &field,
137 : const CPLJSONObject &oJSONGeometryColumn,
138 : const std::map<std::string, int> &oMapParquetColumnNameToIdx);
139 : bool CreateRecordBatchReader(int iStartingRowGroup);
140 : bool CreateRecordBatchReader(const std::vector<int> &anRowGroups);
141 : bool ReadNextBatch() override;
142 :
143 : void InvalidateCachedBatches() override;
144 :
145 : OGRwkbGeometryType ComputeGeometryColumnType(int iGeomCol,
146 : int iParquetCol) const;
147 : void CreateFieldFromSchema(
148 : const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
149 : int &iParquetCol, const std::vector<int> &path,
150 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
151 : &oMapFieldNameToGDALSchemaFieldDefn);
152 : bool CheckMatchArrowParquetColumnNames(
153 : int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const;
154 : OGRFeature *GetFeatureExplicitFID(GIntBig nFID);
155 : OGRFeature *GetFeatureByIndex(GIntBig nFID);
156 :
157 28756 : std::string GetDriverUCName() const override
158 : {
159 28756 : return "PARQUET";
160 : }
161 :
162 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
163 :
164 : void IncrFeatureIdx() override;
165 :
166 : public:
167 : OGRParquetLayer(OGRParquetDataset *poDS, const char *pszLayerName,
168 : std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
169 : CSLConstList papszOpenOptions);
170 :
171 : void ResetReading() override;
172 : OGRFeature *GetFeature(GIntBig nFID) override;
173 : GIntBig GetFeatureCount(int bForce) override;
174 : int TestCapability(const char *pszCap) const override;
175 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
176 : const char *GetMetadataItem(const char *pszName,
177 : const char *pszDomain = "") override;
178 : CSLConstList GetMetadata(const char *pszDomain = "") override;
179 : OGRErr SetNextByIndex(GIntBig nIndex) override;
180 :
181 : bool GetArrowStream(struct ArrowArrayStream *out_stream,
182 : CSLConstList papszOptions = nullptr) override;
183 :
184 : std::unique_ptr<OGRFieldDomain> BuildDomain(const std::string &osDomainName,
185 : int iFieldIndex) const override;
186 :
187 1546 : parquet::arrow::FileReader *GetReader() const
188 : {
189 1546 : return m_poArrowReader.get();
190 : }
191 :
192 : std::vector<int>
193 : GetParquetColumnIndicesForArrowField(const std::string &field_name) const;
194 :
195 : const std::vector<std::shared_ptr<arrow::DataType>> &
196 249 : GetArrowFieldTypes() const
197 : {
198 249 : return m_apoArrowDataTypes;
199 : }
200 :
201 2 : int GetFIDParquetColumn() const
202 : {
203 2 : return m_iFIDParquetColumn;
204 : }
205 :
206 4 : const CPLStringList &GetCreationOptions() const
207 : {
208 4 : return m_aosCreationOptions;
209 : }
210 :
211 : static constexpr int OGR_FID_INDEX = -2;
212 : bool GetMinMaxForOGRField(int iRowGroup, // -1 for all
213 : int iOGRField, // or OGR_FID_INDEX
214 : bool bComputeMin, OGRField &sMin, bool &bFoundMin,
215 : bool bComputeMax, OGRField &sMax, bool &bFoundMax,
216 : OGRFieldType &eType, OGRFieldSubType &eSubType,
217 : std::string &osMinTmp,
218 : std::string &osMaxTmp) const;
219 :
220 : bool GetMinMaxForParquetCol(int iRowGroup, // -1 for all
221 : int iCol,
222 : const std::shared_ptr<arrow::DataType>
223 : &arrowType, // potentially nullptr
224 : bool bComputeMin, OGRField &sMin,
225 : bool &bFoundMin, bool bComputeMax,
226 : OGRField &sMax, bool &bFoundMax,
227 : OGRFieldType &eType, OGRFieldSubType &eSubType,
228 : std::string &osMinTmp,
229 : std::string &osMaxTmp) const;
230 :
231 : bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin,
232 : int &iParquetXMax, int &iParquetYMax) const;
233 : };
234 :
235 : /************************************************************************/
236 : /* OGRParquetDatasetLayer */
237 : /************************************************************************/
238 :
239 : #ifdef GDAL_USE_ARROWDATASET
240 :
241 : class OGRParquetDatasetLayer final : public OGRParquetLayerBase
242 : {
243 : bool m_bIsVSI = false;
244 : bool m_bRebuildScanner = true;
245 : bool m_bSkipFilterGeometry = false;
246 : std::shared_ptr<arrow::dataset::Dataset> m_poDataset{};
247 : std::shared_ptr<arrow::dataset::Scanner> m_poScanner{};
248 : std::vector<std::string> m_aosProjectedFields{};
249 :
250 : void EstablishFeatureDefn();
251 : void
252 : ProcessGeometryColumnCovering(const std::shared_ptr<arrow::Field> &field,
253 : const CPLJSONObject &oJSONGeometryColumn);
254 :
255 : void BuildScanner();
256 :
257 : //! Translate a OGR SQL expression into an Arrow one
258 : // bFullyTranslated should be set to true before calling this method.
259 : arrow::compute::Expression BuildArrowFilter(const swq_expr_node *poNode,
260 : bool &bFullyTranslated);
261 :
262 : protected:
263 7547 : std::string GetDriverUCName() const override
264 : {
265 7547 : return "PARQUET";
266 : }
267 :
268 : bool ReadNextBatch() override;
269 :
270 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
271 :
272 : public:
273 : OGRParquetDatasetLayer(
274 : OGRParquetDataset *poDS, const char *pszLayerName, bool bIsVSI,
275 : const std::shared_ptr<arrow::dataset::Dataset> &dataset,
276 : CSLConstList papszOpenOptions);
277 :
278 : OGRFeature *GetNextFeature() override;
279 :
280 : GIntBig GetFeatureCount(int bForce) override;
281 : OGRErr IGetExtent(int iGeomField, OGREnvelope *psExtent,
282 : bool bForce) override;
283 :
284 : OGRErr ISetSpatialFilter(int iGeomField,
285 : const OGRGeometry *poGeom) override;
286 :
287 : OGRErr SetAttributeFilter(const char *pszFilter) override;
288 :
289 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
290 :
291 : int TestCapability(const char *) const override;
292 :
293 : // TODO
294 : std::unique_ptr<OGRFieldDomain>
295 0 : BuildDomain(const std::string & /*osDomainName*/,
296 : int /*iFieldIndex*/) const override
297 : {
298 0 : return nullptr;
299 : }
300 : };
301 :
302 : #endif
303 :
304 : /************************************************************************/
305 : /* OGRParquetDataset */
306 : /************************************************************************/
307 :
308 : class OGRParquetDataset final : public OGRArrowDataset
309 : {
310 : std::shared_ptr<arrow::fs::FileSystem> m_poFS{};
311 :
312 : public:
313 : explicit OGRParquetDataset();
314 : ~OGRParquetDataset() override;
315 :
316 : CPLErr Close(GDALProgressFunc = nullptr, void * = nullptr) override;
317 :
318 : OGRLayer *ExecuteSQL(const char *pszSQLCommand,
319 : OGRGeometry *poSpatialFilter,
320 : const char *pszDialect) override;
321 : void ReleaseResultSet(OGRLayer *poResultsSet) override;
322 :
323 : int TestCapability(const char *) const override;
324 :
325 360 : void SetFileSystem(const std::shared_ptr<arrow::fs::FileSystem> &fs)
326 : {
327 360 : m_poFS = fs;
328 360 : }
329 :
330 : std::unique_ptr<OGRParquetLayer>
331 : CreateReaderLayer(const std::string &osFilename, VSILFILE *&fpIn,
332 : CSLConstList papszOpenOptionsIn);
333 : };
334 :
335 : /************************************************************************/
336 : /* OGRParquetWriterLayer */
337 : /************************************************************************/
338 :
339 : class OGRParquetWriterDataset;
340 :
341 : class OGRParquetWriterLayer final : public OGRArrowWriterLayer
342 : {
343 : OGRParquetWriterLayer(const OGRParquetWriterLayer &) = delete;
344 : OGRParquetWriterLayer &operator=(const OGRParquetWriterLayer &) = delete;
345 :
346 : OGRParquetWriterDataset *m_poDataset = nullptr;
347 : std::unique_ptr<parquet::arrow::FileWriter> m_poFileWriter{};
348 : std::shared_ptr<const arrow::KeyValueMetadata> m_poKeyValueMetadata{};
349 : bool m_bForceCounterClockwiseOrientation = false;
350 : parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{};
351 :
352 : //! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode
353 : std::unique_ptr<GDALDataset> m_poTmpGPKG{};
354 : //! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode
355 : OGRLayer *m_poTmpGPKGLayer = nullptr;
356 : //! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode
357 : GIntBig m_nTmpFeatureCount = 0;
358 :
359 : //! Whether to write "geo" footer metadata;
360 : bool m_bWriteGeoMetadata = true;
361 :
362 895 : bool IsFileWriterCreated() const override
363 : {
364 895 : return m_poFileWriter != nullptr;
365 : }
366 :
367 : void CreateWriter() override;
368 : bool CloseFileWriter() override;
369 :
370 : void CreateSchema() override;
371 : void PerformStepsBeforeFinalFlushGroup() override;
372 :
373 : bool FlushGroup() override;
374 :
375 383 : std::string GetDriverUCName() const override
376 : {
377 383 : return "PARQUET";
378 : }
379 :
380 : virtual bool
381 : IsSupportedGeometryType(OGRwkbGeometryType eGType) const override;
382 :
383 : virtual void FixupWKBGeometryBeforeWriting(GByte *pabyWKB,
384 : size_t nLen) override;
385 : void FixupGeometryBeforeWriting(OGRGeometry *poGeom) override;
386 :
387 26 : bool IsSRSRequired() const override
388 : {
389 26 : return false;
390 : }
391 :
392 : std::string GetGeoMetadata() const;
393 :
394 : //! Copy temporary GeoPackage layer to final Parquet file
395 : bool CopyTmpGpkgLayerToFinalFile();
396 :
397 : public:
398 : OGRParquetWriterLayer(
399 : OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool,
400 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
401 : const char *pszLayerName);
402 :
403 : CPLErr SetMetadata(CSLConstList papszMetadata,
404 : const char *pszDomain) override;
405 :
406 : bool SetOptions(const OGRGeomFieldDefn *poSrcGeomFieldDefn,
407 : CSLConstList papszOptions);
408 :
409 : OGRErr CreateGeomField(const OGRGeomFieldDefn *poField,
410 : int bApproxOK = TRUE) override;
411 :
412 : int TestCapability(const char *pszCap) const override;
413 : #if PARQUET_VERSION_MAJOR <= 10
414 : // Parquet <= 10 doesn't support the WriteRecordBatch() API
415 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
416 : CSLConstList papszOptions,
417 : std::string &osErrorMsg) const override
418 : {
419 : return OGRLayer::IsArrowSchemaSupported(schema, papszOptions,
420 : osErrorMsg);
421 : }
422 :
423 : bool
424 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
425 : CSLConstList papszOptions = nullptr) override
426 : {
427 : return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions);
428 : }
429 :
430 : bool WriteArrowBatch(const struct ArrowSchema *schema,
431 : struct ArrowArray *array,
432 : CSLConstList papszOptions = nullptr) override
433 : {
434 : return OGRLayer::WriteArrowBatch(schema, array, papszOptions);
435 : }
436 : #else
437 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
438 : CSLConstList papszOptions,
439 : std::string &osErrorMsg) const override;
440 : bool
441 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
442 : CSLConstList papszOptions = nullptr) override;
443 : bool WriteArrowBatch(const struct ArrowSchema *schema,
444 : struct ArrowArray *array,
445 : CSLConstList papszOptions = nullptr) override;
446 : #endif
447 :
448 : GDALDataset *GetDataset() override;
449 :
450 : protected:
451 : OGRErr ICreateFeature(OGRFeature *poFeature) override;
452 :
453 : friend class OGRParquetWriterDataset;
454 : bool Close();
455 : };
456 :
457 : /************************************************************************/
458 : /* OGRParquetWriterDataset */
459 : /************************************************************************/
460 :
461 : class OGRParquetWriterDataset final : public GDALPamDataset
462 : {
463 : std::unique_ptr<arrow::MemoryPool> m_poMemoryPool{};
464 : std::unique_ptr<OGRParquetWriterLayer> m_poLayer{};
465 : std::shared_ptr<arrow::io::OutputStream> m_poOutputStream{};
466 :
467 : public:
468 : explicit OGRParquetWriterDataset(
469 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream);
470 :
471 : ~OGRParquetWriterDataset() override;
472 :
473 : arrow::MemoryPool *GetMemoryPool() const
474 : {
475 : return m_poMemoryPool.get();
476 : }
477 :
478 : CPLErr Close(GDALProgressFunc = nullptr, void * = nullptr) override;
479 :
480 : int GetLayerCount() const override;
481 : const OGRLayer *GetLayer(int idx) const override;
482 : int TestCapability(const char *pszCap) const override;
483 : std::vector<std::string> GetFieldDomainNames(
484 : CSLConstList /*papszOptions*/ = nullptr) const override;
485 : const OGRFieldDomain *
486 : GetFieldDomain(const std::string &name) const override;
487 : bool AddFieldDomain(std::unique_ptr<OGRFieldDomain> &&domain,
488 : std::string &failureReason) override;
489 :
490 353 : GDALMultiDomainMetadata &GetMultiDomainMetadata()
491 : {
492 353 : return oMDMD;
493 : }
494 :
495 : protected:
496 : OGRLayer *ICreateLayer(const char *pszName,
497 : const OGRGeomFieldDefn *poGeomFieldDefn,
498 : CSLConstList papszOptions) override;
499 : };
500 :
501 : #endif // OGR_PARQUET_H
|