Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #ifndef OGR_PARQUET_H
14 : #define OGR_PARQUET_H
15 :
16 : #include "ogrsf_frmts.h"
17 :
18 : #include "cpl_json.h"
19 :
20 : #include <functional>
21 : #include <map>
22 : #include <set>
23 :
24 : #include "../arrow_common/ogr_arrow.h"
25 : #include "ogr_include_parquet.h"
26 :
27 : constexpr int DEFAULT_COMPRESSION_LEVEL = -1;
28 : constexpr int OGR_PARQUET_ZSTD_DEFAULT_COMPRESSION_LEVEL = 9;
29 :
30 : /************************************************************************/
31 : /* OGRParquetLayerBase */
32 : /************************************************************************/
33 :
34 : class OGRParquetDataset;
35 :
36 : class OGRParquetLayerBase CPL_NON_FINAL : public OGRArrowLayer
37 : {
38 : OGRParquetLayerBase(const OGRParquetLayerBase &) = delete;
39 : OGRParquetLayerBase &operator=(const OGRParquetLayerBase &) = delete;
40 :
41 : protected:
42 : OGRParquetLayerBase(OGRParquetDataset *poDS, const char *pszLayerName,
43 : CSLConstList papszOpenOptions);
44 :
45 : OGRParquetDataset *m_poDS = nullptr;
46 : std::shared_ptr<arrow::RecordBatchReader> m_poRecordBatchReader{};
47 : CPLStringList m_aosGeomPossibleNames{};
48 : std::string m_osCRS{};
49 :
50 : #if PARQUET_VERSION_MAJOR >= 21
51 : std::set<int>
52 : m_geoStatsWithBBOXAvailable{}; // key is index of OGR geometry column
53 : #endif
54 :
55 : void LoadGeoMetadata(
56 : const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata);
57 : bool DealWithGeometryColumn(
58 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
59 : std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun,
60 : const parquet::ColumnDescriptor *parquetColumn,
61 : const parquet::FileMetaData *metadata, int iColumn);
62 :
63 : void InvalidateCachedBatches() override;
64 :
65 : static bool ParseGeometryColumnCovering(const CPLJSONObject &oJSONDef,
66 : std::string &osBBOXColumn,
67 : std::string &osXMin,
68 : std::string &osYMin,
69 : std::string &osXMax,
70 : std::string &osYMax);
71 :
72 : public:
73 : int TestCapability(const char *) const override;
74 :
75 : void ResetReading() override;
76 :
77 : GDALDataset *GetDataset() override;
78 :
79 : static int GetNumCPUs();
80 : };
81 :
82 : /************************************************************************/
83 : /* OGRParquetLayer */
84 : /************************************************************************/
85 :
86 : class OGRParquetLayer final : public OGRParquetLayerBase
87 :
88 : {
89 : std::unique_ptr<parquet::arrow::FileReader> m_poArrowReader{};
90 : bool m_bSingleBatch = false;
91 : int m_iFIDParquetColumn = -1;
92 : std::shared_ptr<arrow::DataType> m_poFIDType{};
93 : std::vector<std::shared_ptr<arrow::DataType>>
94 : m_apoArrowDataTypes{}; // .size() == field ocunt
95 : std::vector<int> m_anMapFieldIndexToParquetColumn{};
96 : std::vector<std::vector<int>> m_anMapGeomFieldIndexToParquetColumns{};
97 : bool m_bHasMissingMappingToParquet = false;
98 :
99 : //! Contains pairs of (selected feature idx, total feature idx) break points.
100 : std::vector<std::pair<int64_t, int64_t>> m_asFeatureIdxRemapping{};
101 : //! Iterator over m_asFeatureIdxRemapping
102 : std::vector<std::pair<int64_t, int64_t>>::iterator
103 : m_oFeatureIdxRemappingIter{};
104 : //! Feature index among the potentially restricted set of selected row groups
105 : int64_t m_nFeatureIdxSelected = 0;
106 : std::vector<int> m_anRequestedParquetColumns{}; // only valid when
107 : // m_bIgnoredFields is set
108 : CPLStringList m_aosFeatherMetadata{};
109 :
110 : //! Describe the bbox column of a geometry column
111 : struct GeomColBBOXParquet
112 : {
113 : int iParquetXMin = -1;
114 : int iParquetYMin = -1;
115 : int iParquetXMax = -1;
116 : int iParquetYMax = -1;
117 : std::vector<int> anParquetCols{};
118 : };
119 :
120 : //! Map from OGR geometry field index to GeomColBBOXParquet
121 : std::map<int, GeomColBBOXParquet>
122 : m_oMapGeomFieldIndexToGeomColBBOXParquet{};
123 :
124 : //! GDAL creation options that were used to create the file (if done by GDAL)
125 : CPLStringList m_aosCreationOptions{};
126 :
127 : void EstablishFeatureDefn();
128 : void ProcessGeometryColumnCovering(
129 : const std::shared_ptr<arrow::Field> &field,
130 : const CPLJSONObject &oJSONGeometryColumn,
131 : const std::map<std::string, int> &oMapParquetColumnNameToIdx);
132 : bool CreateRecordBatchReader(int iStartingRowGroup);
133 : bool CreateRecordBatchReader(const std::vector<int> &anRowGroups);
134 : bool ReadNextBatch() override;
135 :
136 : void InvalidateCachedBatches() override;
137 :
138 : OGRwkbGeometryType ComputeGeometryColumnType(int iGeomCol,
139 : int iParquetCol) const;
140 : void CreateFieldFromSchema(
141 : const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
142 : int &iParquetCol, const std::vector<int> &path,
143 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
144 : &oMapFieldNameToGDALSchemaFieldDefn);
145 : bool CheckMatchArrowParquetColumnNames(
146 : int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const;
147 : OGRFeature *GetFeatureExplicitFID(GIntBig nFID);
148 : OGRFeature *GetFeatureByIndex(GIntBig nFID);
149 :
150 28621 : std::string GetDriverUCName() const override
151 : {
152 28621 : return "PARQUET";
153 : }
154 :
155 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
156 :
157 : void IncrFeatureIdx() override;
158 :
159 : public:
160 : OGRParquetLayer(OGRParquetDataset *poDS, const char *pszLayerName,
161 : std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
162 : CSLConstList papszOpenOptions);
163 :
164 : void ResetReading() override;
165 : OGRFeature *GetFeature(GIntBig nFID) override;
166 : GIntBig GetFeatureCount(int bForce) override;
167 : int TestCapability(const char *pszCap) const override;
168 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
169 : const char *GetMetadataItem(const char *pszName,
170 : const char *pszDomain = "") override;
171 : char **GetMetadata(const char *pszDomain = "") override;
172 : OGRErr SetNextByIndex(GIntBig nIndex) override;
173 :
174 : bool GetArrowStream(struct ArrowArrayStream *out_stream,
175 : CSLConstList papszOptions = nullptr) override;
176 :
177 : std::unique_ptr<OGRFieldDomain> BuildDomain(const std::string &osDomainName,
178 : int iFieldIndex) const override;
179 :
180 1546 : parquet::arrow::FileReader *GetReader() const
181 : {
182 1546 : return m_poArrowReader.get();
183 : }
184 :
185 : std::vector<int>
186 : GetParquetColumnIndicesForArrowField(const std::string &field_name) const;
187 :
188 : const std::vector<std::shared_ptr<arrow::DataType>> &
189 249 : GetArrowFieldTypes() const
190 : {
191 249 : return m_apoArrowDataTypes;
192 : }
193 :
194 2 : int GetFIDParquetColumn() const
195 : {
196 2 : return m_iFIDParquetColumn;
197 : }
198 :
199 4 : const CPLStringList &GetCreationOptions() const
200 : {
201 4 : return m_aosCreationOptions;
202 : }
203 :
204 : static constexpr int OGR_FID_INDEX = -2;
205 : bool GetMinMaxForOGRField(int iRowGroup, // -1 for all
206 : int iOGRField, // or OGR_FID_INDEX
207 : bool bComputeMin, OGRField &sMin, bool &bFoundMin,
208 : bool bComputeMax, OGRField &sMax, bool &bFoundMax,
209 : OGRFieldType &eType, OGRFieldSubType &eSubType,
210 : std::string &osMinTmp,
211 : std::string &osMaxTmp) const;
212 :
213 : bool GetMinMaxForParquetCol(int iRowGroup, // -1 for all
214 : int iCol,
215 : const std::shared_ptr<arrow::DataType>
216 : &arrowType, // potentially nullptr
217 : bool bComputeMin, OGRField &sMin,
218 : bool &bFoundMin, bool bComputeMax,
219 : OGRField &sMax, bool &bFoundMax,
220 : OGRFieldType &eType, OGRFieldSubType &eSubType,
221 : std::string &osMinTmp,
222 : std::string &osMaxTmp) const;
223 :
224 : bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin,
225 : int &iParquetXMax, int &iParquetYMax) const;
226 : };
227 :
228 : /************************************************************************/
229 : /* OGRParquetDatasetLayer */
230 : /************************************************************************/
231 :
232 : #ifdef GDAL_USE_ARROWDATASET
233 :
234 : class OGRParquetDatasetLayer final : public OGRParquetLayerBase
235 : {
236 : bool m_bIsVSI = false;
237 : bool m_bRebuildScanner = true;
238 : bool m_bSkipFilterGeometry = false;
239 : std::shared_ptr<arrow::dataset::Dataset> m_poDataset{};
240 : std::shared_ptr<arrow::dataset::Scanner> m_poScanner{};
241 : std::vector<std::string> m_aosProjectedFields{};
242 :
243 : void EstablishFeatureDefn();
244 : void
245 : ProcessGeometryColumnCovering(const std::shared_ptr<arrow::Field> &field,
246 : const CPLJSONObject &oJSONGeometryColumn);
247 :
248 : void BuildScanner();
249 :
250 : //! Translate a OGR SQL expression into an Arrow one
251 : // bFullyTranslated should be set to true before calling this method.
252 : arrow::compute::Expression BuildArrowFilter(const swq_expr_node *poNode,
253 : bool &bFullyTranslated);
254 :
255 : protected:
256 7547 : std::string GetDriverUCName() const override
257 : {
258 7547 : return "PARQUET";
259 : }
260 :
261 : bool ReadNextBatch() override;
262 :
263 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
264 :
265 : public:
266 : OGRParquetDatasetLayer(
267 : OGRParquetDataset *poDS, const char *pszLayerName, bool bIsVSI,
268 : const std::shared_ptr<arrow::dataset::Dataset> &dataset,
269 : CSLConstList papszOpenOptions);
270 :
271 : OGRFeature *GetNextFeature() override;
272 :
273 : GIntBig GetFeatureCount(int bForce) override;
274 : OGRErr IGetExtent(int iGeomField, OGREnvelope *psExtent,
275 : bool bForce) override;
276 :
277 : OGRErr ISetSpatialFilter(int iGeomField,
278 : const OGRGeometry *poGeom) override;
279 :
280 : OGRErr SetAttributeFilter(const char *pszFilter) override;
281 :
282 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
283 :
284 : int TestCapability(const char *) const override;
285 :
286 : // TODO
287 : std::unique_ptr<OGRFieldDomain>
288 0 : BuildDomain(const std::string & /*osDomainName*/,
289 : int /*iFieldIndex*/) const override
290 : {
291 0 : return nullptr;
292 : }
293 : };
294 :
295 : #endif
296 :
297 : /************************************************************************/
298 : /* OGRParquetDataset */
299 : /************************************************************************/
300 :
301 : class OGRParquetDataset final : public OGRArrowDataset
302 : {
303 : std::shared_ptr<arrow::fs::FileSystem> m_poFS{};
304 :
305 : public:
306 : explicit OGRParquetDataset();
307 : ~OGRParquetDataset() override;
308 :
309 : CPLErr Close(GDALProgressFunc = nullptr, void * = nullptr) override;
310 :
311 : OGRLayer *ExecuteSQL(const char *pszSQLCommand,
312 : OGRGeometry *poSpatialFilter,
313 : const char *pszDialect) override;
314 : void ReleaseResultSet(OGRLayer *poResultsSet) override;
315 :
316 : int TestCapability(const char *) const override;
317 :
318 360 : void SetFileSystem(const std::shared_ptr<arrow::fs::FileSystem> &fs)
319 : {
320 360 : m_poFS = fs;
321 360 : }
322 :
323 : std::unique_ptr<OGRParquetLayer>
324 : CreateReaderLayer(const std::string &osFilename, VSILFILE *&fpIn,
325 : CSLConstList papszOpenOptionsIn);
326 : };
327 :
328 : /************************************************************************/
329 : /* OGRParquetWriterLayer */
330 : /************************************************************************/
331 :
332 : class OGRParquetWriterDataset;
333 :
334 : class OGRParquetWriterLayer final : public OGRArrowWriterLayer
335 : {
336 : OGRParquetWriterLayer(const OGRParquetWriterLayer &) = delete;
337 : OGRParquetWriterLayer &operator=(const OGRParquetWriterLayer &) = delete;
338 :
339 : OGRParquetWriterDataset *m_poDataset = nullptr;
340 : std::unique_ptr<parquet::arrow::FileWriter> m_poFileWriter{};
341 : std::shared_ptr<const arrow::KeyValueMetadata> m_poKeyValueMetadata{};
342 : bool m_bForceCounterClockwiseOrientation = false;
343 : parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{};
344 :
345 : //! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode
346 : std::unique_ptr<GDALDataset> m_poTmpGPKG{};
347 : //! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode
348 : OGRLayer *m_poTmpGPKGLayer = nullptr;
349 : //! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode
350 : GIntBig m_nTmpFeatureCount = 0;
351 :
352 : //! Whether to write "geo" footer metadata;
353 : bool m_bWriteGeoMetadata = true;
354 :
355 895 : bool IsFileWriterCreated() const override
356 : {
357 895 : return m_poFileWriter != nullptr;
358 : }
359 :
360 : void CreateWriter() override;
361 : bool CloseFileWriter() override;
362 :
363 : void CreateSchema() override;
364 : void PerformStepsBeforeFinalFlushGroup() override;
365 :
366 : bool FlushGroup() override;
367 :
368 383 : std::string GetDriverUCName() const override
369 : {
370 383 : return "PARQUET";
371 : }
372 :
373 : virtual bool
374 : IsSupportedGeometryType(OGRwkbGeometryType eGType) const override;
375 :
376 : virtual void FixupWKBGeometryBeforeWriting(GByte *pabyWKB,
377 : size_t nLen) override;
378 : void FixupGeometryBeforeWriting(OGRGeometry *poGeom) override;
379 :
380 26 : bool IsSRSRequired() const override
381 : {
382 26 : return false;
383 : }
384 :
385 : std::string GetGeoMetadata() const;
386 :
387 : //! Copy temporary GeoPackage layer to final Parquet file
388 : bool CopyTmpGpkgLayerToFinalFile();
389 :
390 : public:
391 : OGRParquetWriterLayer(
392 : OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool,
393 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
394 : const char *pszLayerName);
395 :
396 : CPLErr SetMetadata(char **papszMetadata, const char *pszDomain) override;
397 :
398 : bool SetOptions(const OGRGeomFieldDefn *poSrcGeomFieldDefn,
399 : CSLConstList papszOptions);
400 :
401 : OGRErr CreateGeomField(const OGRGeomFieldDefn *poField,
402 : int bApproxOK = TRUE) override;
403 :
404 : int TestCapability(const char *pszCap) const override;
405 : #if PARQUET_VERSION_MAJOR <= 10
406 : // Parquet <= 10 doesn't support the WriteRecordBatch() API
407 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
408 : CSLConstList papszOptions,
409 : std::string &osErrorMsg) const override
410 : {
411 : return OGRLayer::IsArrowSchemaSupported(schema, papszOptions,
412 : osErrorMsg);
413 : }
414 :
415 : bool
416 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
417 : CSLConstList papszOptions = nullptr) override
418 : {
419 : return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions);
420 : }
421 :
422 : bool WriteArrowBatch(const struct ArrowSchema *schema,
423 : struct ArrowArray *array,
424 : CSLConstList papszOptions = nullptr) override
425 : {
426 : return OGRLayer::WriteArrowBatch(schema, array, papszOptions);
427 : }
428 : #else
429 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
430 : CSLConstList papszOptions,
431 : std::string &osErrorMsg) const override;
432 : bool
433 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
434 : CSLConstList papszOptions = nullptr) override;
435 : bool WriteArrowBatch(const struct ArrowSchema *schema,
436 : struct ArrowArray *array,
437 : CSLConstList papszOptions = nullptr) override;
438 : #endif
439 :
440 : GDALDataset *GetDataset() override;
441 :
442 : protected:
443 : OGRErr ICreateFeature(OGRFeature *poFeature) override;
444 :
445 : friend class OGRParquetWriterDataset;
446 : bool Close();
447 : };
448 :
449 : /************************************************************************/
450 : /* OGRParquetWriterDataset */
451 : /************************************************************************/
452 :
453 : class OGRParquetWriterDataset final : public GDALPamDataset
454 : {
455 : std::unique_ptr<arrow::MemoryPool> m_poMemoryPool{};
456 : std::unique_ptr<OGRParquetWriterLayer> m_poLayer{};
457 : std::shared_ptr<arrow::io::OutputStream> m_poOutputStream{};
458 :
459 : public:
460 : explicit OGRParquetWriterDataset(
461 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream);
462 :
463 : ~OGRParquetWriterDataset() override;
464 :
465 : arrow::MemoryPool *GetMemoryPool() const
466 : {
467 : return m_poMemoryPool.get();
468 : }
469 :
470 : CPLErr Close(GDALProgressFunc = nullptr, void * = nullptr) override;
471 :
472 : int GetLayerCount() const override;
473 : const OGRLayer *GetLayer(int idx) const override;
474 : int TestCapability(const char *pszCap) const override;
475 : std::vector<std::string> GetFieldDomainNames(
476 : CSLConstList /*papszOptions*/ = nullptr) const override;
477 : const OGRFieldDomain *
478 : GetFieldDomain(const std::string &name) const override;
479 : bool AddFieldDomain(std::unique_ptr<OGRFieldDomain> &&domain,
480 : std::string &failureReason) override;
481 :
482 354 : GDALMultiDomainMetadata &GetMultiDomainMetadata()
483 : {
484 354 : return oMDMD;
485 : }
486 :
487 : protected:
488 : OGRLayer *ICreateLayer(const char *pszName,
489 : const OGRGeomFieldDefn *poGeomFieldDefn,
490 : CSLConstList papszOptions) override;
491 : };
492 :
493 : #endif // OGR_PARQUET_H
|