Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #ifndef OGR_PARQUET_H
14 : #define OGR_PARQUET_H
15 :
16 : #include "ogrsf_frmts.h"
17 :
18 : #include "cpl_json.h"
19 :
20 : #include <functional>
21 : #include <map>
22 : #include <set>
23 :
24 : #include "../arrow_common/ogr_arrow.h"
25 : #include "ogr_include_parquet.h"
26 :
27 : constexpr int DEFAULT_COMPRESSION_LEVEL = -1;
28 : constexpr int OGR_PARQUET_ZSTD_DEFAULT_COMPRESSION_LEVEL = 9;
29 :
30 : /************************************************************************/
31 : /* OGRParquetLayerBase */
32 : /************************************************************************/
33 :
34 : class OGRParquetDataset;
35 :
36 : class OGRParquetLayerBase CPL_NON_FINAL : public OGRArrowLayer
37 : {
38 : OGRParquetLayerBase(const OGRParquetLayerBase &) = delete;
39 : OGRParquetLayerBase &operator=(const OGRParquetLayerBase &) = delete;
40 :
41 : protected:
42 : OGRParquetLayerBase(OGRParquetDataset *poDS, const char *pszLayerName,
43 : CSLConstList papszOpenOptions);
44 :
45 : OGRParquetDataset *m_poDS = nullptr;
46 : std::shared_ptr<arrow::RecordBatchReader> m_poRecordBatchReader{};
47 : CPLStringList m_aosGeomPossibleNames{};
48 : std::string m_osCRS{};
49 :
50 : #if PARQUET_VERSION_MAJOR >= 21
51 : std::set<int>
52 : m_geoStatsWithBBOXAvailable{}; // key is index of OGR geometry column
53 : #endif
54 :
55 : void LoadGeoMetadata(
56 : const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata);
57 : bool DealWithGeometryColumn(
58 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
59 : std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun,
60 : const parquet::ColumnDescriptor *parquetColumn,
61 : const parquet::FileMetaData *metadata, int iColumn);
62 :
63 : void InvalidateCachedBatches() override;
64 :
65 : static bool ParseGeometryColumnCovering(const CPLJSONObject &oJSONDef,
66 : std::string &osBBOXColumn,
67 : std::string &osXMin,
68 : std::string &osYMin,
69 : std::string &osXMax,
70 : std::string &osYMax);
71 :
72 : public:
73 : int TestCapability(const char *) const override;
74 :
75 : void ResetReading() override;
76 :
77 : GDALDataset *GetDataset() override;
78 :
79 : static int GetNumCPUs();
80 : };
81 :
82 : /************************************************************************/
83 : /* OGRParquetLayer */
84 : /************************************************************************/
85 :
86 : class OGRParquetLayer final : public OGRParquetLayerBase
87 :
88 : {
89 : std::unique_ptr<parquet::arrow::FileReader> m_poArrowReader{};
90 : bool m_bSingleBatch = false;
91 : int m_iFIDParquetColumn = -1;
92 : std::shared_ptr<arrow::DataType> m_poFIDType{};
93 : std::vector<std::shared_ptr<arrow::DataType>>
94 : m_apoArrowDataTypes{}; // .size() == field ocunt
95 : std::vector<int> m_anMapFieldIndexToParquetColumn{};
96 : std::vector<std::vector<int>> m_anMapGeomFieldIndexToParquetColumns{};
97 : bool m_bHasMissingMappingToParquet = false;
98 :
99 : //! Contains pairs of (selected feature idx, total feature idx) break points.
100 : std::vector<std::pair<int64_t, int64_t>> m_asFeatureIdxRemapping{};
101 : //! Iterator over m_asFeatureIdxRemapping
102 : std::vector<std::pair<int64_t, int64_t>>::iterator
103 : m_oFeatureIdxRemappingIter{};
104 : //! Feature index among the potentially restricted set of selected row groups
105 : int64_t m_nFeatureIdxSelected = 0;
106 : std::vector<int> m_anRequestedParquetColumns{}; // only valid when
107 : // m_bIgnoredFields is set
108 : CPLStringList m_aosFeatherMetadata{};
109 :
110 : //! Describe the bbox column of a geometry column
111 : struct GeomColBBOXParquet
112 : {
113 : int iParquetXMin = -1;
114 : int iParquetYMin = -1;
115 : int iParquetXMax = -1;
116 : int iParquetYMax = -1;
117 : std::vector<int> anParquetCols{};
118 : };
119 :
120 : //! Map from OGR geometry field index to GeomColBBOXParquet
121 : std::map<int, GeomColBBOXParquet>
122 : m_oMapGeomFieldIndexToGeomColBBOXParquet{};
123 :
124 : //! GDAL creation options that were used to create the file (if done by GDAL)
125 : CPLStringList m_aosCreationOptions{};
126 :
127 : void EstablishFeatureDefn();
128 : void ProcessGeometryColumnCovering(
129 : const std::shared_ptr<arrow::Field> &field,
130 : const CPLJSONObject &oJSONGeometryColumn,
131 : const std::map<std::string, int> &oMapParquetColumnNameToIdx);
132 : bool CreateRecordBatchReader(int iStartingRowGroup);
133 : bool CreateRecordBatchReader(const std::vector<int> &anRowGroups);
134 : bool ReadNextBatch() override;
135 :
136 : void InvalidateCachedBatches() override;
137 :
138 : OGRwkbGeometryType ComputeGeometryColumnType(int iGeomCol,
139 : int iParquetCol) const;
140 : void CreateFieldFromSchema(
141 : const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
142 : int &iParquetCol, const std::vector<int> &path,
143 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
144 : &oMapFieldNameToGDALSchemaFieldDefn);
145 : bool CheckMatchArrowParquetColumnNames(
146 : int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const;
147 : OGRFeature *GetFeatureExplicitFID(GIntBig nFID);
148 : OGRFeature *GetFeatureByIndex(GIntBig nFID);
149 :
150 1383 : std::string GetDriverUCName() const override
151 : {
152 1383 : return "PARQUET";
153 : }
154 :
155 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
156 :
157 : void IncrFeatureIdx() override;
158 :
159 : public:
160 : OGRParquetLayer(OGRParquetDataset *poDS, const char *pszLayerName,
161 : std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
162 : CSLConstList papszOpenOptions);
163 :
164 : void ResetReading() override;
165 : OGRFeature *GetFeature(GIntBig nFID) override;
166 : GIntBig GetFeatureCount(int bForce) override;
167 : int TestCapability(const char *pszCap) const override;
168 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
169 : const char *GetMetadataItem(const char *pszName,
170 : const char *pszDomain = "") override;
171 : char **GetMetadata(const char *pszDomain = "") override;
172 : OGRErr SetNextByIndex(GIntBig nIndex) override;
173 :
174 : bool GetArrowStream(struct ArrowArrayStream *out_stream,
175 : CSLConstList papszOptions = nullptr) override;
176 :
177 : std::unique_ptr<OGRFieldDomain> BuildDomain(const std::string &osDomainName,
178 : int iFieldIndex) const override;
179 :
180 1322 : parquet::arrow::FileReader *GetReader() const
181 : {
182 1322 : return m_poArrowReader.get();
183 : }
184 :
185 306 : const std::vector<int> &GetMapFieldIndexToParquetColumn() const
186 : {
187 306 : return m_anMapFieldIndexToParquetColumn;
188 : }
189 :
190 : const std::vector<std::shared_ptr<arrow::DataType>> &
191 253 : GetArrowFieldTypes() const
192 : {
193 253 : return m_apoArrowDataTypes;
194 : }
195 :
196 2 : int GetFIDParquetColumn() const
197 : {
198 2 : return m_iFIDParquetColumn;
199 : }
200 :
201 4 : const CPLStringList &GetCreationOptions() const
202 : {
203 4 : return m_aosCreationOptions;
204 : }
205 :
206 : static constexpr int OGR_FID_INDEX = -2;
207 : bool GetMinMaxForOGRField(int iRowGroup, // -1 for all
208 : int iOGRField, // or OGR_FID_INDEX
209 : bool bComputeMin, OGRField &sMin, bool &bFoundMin,
210 : bool bComputeMax, OGRField &sMax, bool &bFoundMax,
211 : OGRFieldType &eType, OGRFieldSubType &eSubType,
212 : std::string &osMinTmp,
213 : std::string &osMaxTmp) const;
214 :
215 : bool GetMinMaxForParquetCol(int iRowGroup, // -1 for all
216 : int iCol,
217 : const std::shared_ptr<arrow::DataType>
218 : &arrowType, // potentially nullptr
219 : bool bComputeMin, OGRField &sMin,
220 : bool &bFoundMin, bool bComputeMax,
221 : OGRField &sMax, bool &bFoundMax,
222 : OGRFieldType &eType, OGRFieldSubType &eSubType,
223 : std::string &osMinTmp,
224 : std::string &osMaxTmp) const;
225 :
226 : bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin,
227 : int &iParquetXMax, int &iParquetYMax) const;
228 : };
229 :
230 : /************************************************************************/
231 : /* OGRParquetDatasetLayer */
232 : /************************************************************************/
233 :
234 : #ifdef GDAL_USE_ARROWDATASET
235 :
236 : class OGRParquetDatasetLayer final : public OGRParquetLayerBase
237 : {
238 : bool m_bIsVSI = false;
239 : bool m_bRebuildScanner = true;
240 : bool m_bSkipFilterGeometry = false;
241 : std::shared_ptr<arrow::dataset::Dataset> m_poDataset{};
242 : std::shared_ptr<arrow::dataset::Scanner> m_poScanner{};
243 : std::vector<std::string> m_aosProjectedFields{};
244 :
245 : void EstablishFeatureDefn();
246 : void
247 : ProcessGeometryColumnCovering(const std::shared_ptr<arrow::Field> &field,
248 : const CPLJSONObject &oJSONGeometryColumn);
249 :
250 : void BuildScanner();
251 :
252 : //! Translate a OGR SQL expression into an Arrow one
253 : // bFullyTranslated should be set to true before calling this method.
254 : arrow::compute::Expression BuildArrowFilter(const swq_expr_node *poNode,
255 : bool &bFullyTranslated);
256 :
257 : protected:
258 311 : std::string GetDriverUCName() const override
259 : {
260 311 : return "PARQUET";
261 : }
262 :
263 : bool ReadNextBatch() override;
264 :
265 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
266 :
267 : public:
268 : OGRParquetDatasetLayer(
269 : OGRParquetDataset *poDS, const char *pszLayerName, bool bIsVSI,
270 : const std::shared_ptr<arrow::dataset::Dataset> &dataset,
271 : CSLConstList papszOpenOptions);
272 :
273 : OGRFeature *GetNextFeature() override;
274 :
275 : GIntBig GetFeatureCount(int bForce) override;
276 : OGRErr IGetExtent(int iGeomField, OGREnvelope *psExtent,
277 : bool bForce) override;
278 :
279 : OGRErr ISetSpatialFilter(int iGeomField,
280 : const OGRGeometry *poGeom) override;
281 :
282 : OGRErr SetAttributeFilter(const char *pszFilter) override;
283 :
284 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
285 :
286 : int TestCapability(const char *) const override;
287 :
288 : // TODO
289 : std::unique_ptr<OGRFieldDomain>
290 0 : BuildDomain(const std::string & /*osDomainName*/,
291 : int /*iFieldIndex*/) const override
292 : {
293 0 : return nullptr;
294 : }
295 : };
296 :
297 : #endif
298 :
299 : /************************************************************************/
300 : /* OGRParquetDataset */
301 : /************************************************************************/
302 :
303 : class OGRParquetDataset final : public OGRArrowDataset
304 : {
305 : std::shared_ptr<arrow::fs::FileSystem> m_poFS{};
306 :
307 : public:
308 : explicit OGRParquetDataset();
309 : ~OGRParquetDataset() override;
310 :
311 : CPLErr Close() override;
312 :
313 : OGRLayer *ExecuteSQL(const char *pszSQLCommand,
314 : OGRGeometry *poSpatialFilter,
315 : const char *pszDialect) override;
316 : void ReleaseResultSet(OGRLayer *poResultsSet) override;
317 :
318 : int TestCapability(const char *) const override;
319 :
320 274 : void SetFileSystem(const std::shared_ptr<arrow::fs::FileSystem> &fs)
321 : {
322 274 : m_poFS = fs;
323 274 : }
324 :
325 : std::unique_ptr<OGRParquetLayer>
326 : CreateReaderLayer(const std::string &osFilename, VSILFILE *&fpIn,
327 : CSLConstList papszOpenOptionsIn);
328 : };
329 :
330 : /************************************************************************/
331 : /* OGRParquetWriterLayer */
332 : /************************************************************************/
333 :
334 : class OGRParquetWriterDataset;
335 :
336 : class OGRParquetWriterLayer final : public OGRArrowWriterLayer
337 : {
338 : OGRParquetWriterLayer(const OGRParquetWriterLayer &) = delete;
339 : OGRParquetWriterLayer &operator=(const OGRParquetWriterLayer &) = delete;
340 :
341 : OGRParquetWriterDataset *m_poDataset = nullptr;
342 : std::unique_ptr<parquet::arrow::FileWriter> m_poFileWriter{};
343 : std::shared_ptr<const arrow::KeyValueMetadata> m_poKeyValueMetadata{};
344 : bool m_bForceCounterClockwiseOrientation = false;
345 : parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{};
346 :
347 : //! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode
348 : std::unique_ptr<GDALDataset> m_poTmpGPKG{};
349 : //! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode
350 : OGRLayer *m_poTmpGPKGLayer = nullptr;
351 : //! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode
352 : GIntBig m_nTmpFeatureCount = 0;
353 :
354 : //! Whether to write "geo" footer metadata;
355 : bool m_bWriteGeoMetadata = true;
356 :
357 : CPLStringList m_aosCreationOptions{};
358 :
359 721 : bool IsFileWriterCreated() const override
360 : {
361 721 : return m_poFileWriter != nullptr;
362 : }
363 :
364 : void CreateWriter() override;
365 : bool CloseFileWriter() override;
366 :
367 : void CreateSchema() override;
368 : void PerformStepsBeforeFinalFlushGroup() override;
369 :
370 : bool FlushGroup() override;
371 :
372 310 : std::string GetDriverUCName() const override
373 : {
374 310 : return "PARQUET";
375 : }
376 :
377 : virtual bool
378 : IsSupportedGeometryType(OGRwkbGeometryType eGType) const override;
379 :
380 : virtual void FixupWKBGeometryBeforeWriting(GByte *pabyWKB,
381 : size_t nLen) override;
382 : void FixupGeometryBeforeWriting(OGRGeometry *poGeom) override;
383 :
384 26 : bool IsSRSRequired() const override
385 : {
386 26 : return false;
387 : }
388 :
389 : std::string GetGeoMetadata() const;
390 :
391 : //! Copy temporary GeoPackage layer to final Parquet file
392 : bool CopyTmpGpkgLayerToFinalFile();
393 :
394 : public:
395 : OGRParquetWriterLayer(
396 : OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool,
397 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
398 : const char *pszLayerName);
399 :
400 : CPLErr SetMetadata(char **papszMetadata, const char *pszDomain) override;
401 :
402 : bool SetOptions(const OGRGeomFieldDefn *poSrcGeomFieldDefn,
403 : CSLConstList papszOptions);
404 :
405 : OGRErr CreateGeomField(const OGRGeomFieldDefn *poField,
406 : int bApproxOK = TRUE) override;
407 :
408 : int TestCapability(const char *pszCap) const override;
409 : #if PARQUET_VERSION_MAJOR <= 10
410 : // Parquet <= 10 doesn't support the WriteRecordBatch() API
411 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
412 : CSLConstList papszOptions,
413 : std::string &osErrorMsg) const override
414 : {
415 : return OGRLayer::IsArrowSchemaSupported(schema, papszOptions,
416 : osErrorMsg);
417 : }
418 :
419 : bool
420 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
421 : CSLConstList papszOptions = nullptr) override
422 : {
423 : return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions);
424 : }
425 :
426 : bool WriteArrowBatch(const struct ArrowSchema *schema,
427 : struct ArrowArray *array,
428 : CSLConstList papszOptions = nullptr) override
429 : {
430 : return OGRLayer::WriteArrowBatch(schema, array, papszOptions);
431 : }
432 : #else
433 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
434 : CSLConstList papszOptions,
435 : std::string &osErrorMsg) const override;
436 : bool
437 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
438 : CSLConstList papszOptions = nullptr) override;
439 : bool WriteArrowBatch(const struct ArrowSchema *schema,
440 : struct ArrowArray *array,
441 : CSLConstList papszOptions = nullptr) override;
442 : #endif
443 :
444 : GDALDataset *GetDataset() override;
445 :
446 : protected:
447 : OGRErr ICreateFeature(OGRFeature *poFeature) override;
448 :
449 : friend class OGRParquetWriterDataset;
450 : bool Close();
451 : };
452 :
453 : /************************************************************************/
454 : /* OGRParquetWriterDataset */
455 : /************************************************************************/
456 :
457 : class OGRParquetWriterDataset final : public GDALPamDataset
458 : {
459 : std::unique_ptr<arrow::MemoryPool> m_poMemoryPool{};
460 : std::unique_ptr<OGRParquetWriterLayer> m_poLayer{};
461 : std::shared_ptr<arrow::io::OutputStream> m_poOutputStream{};
462 :
463 : public:
464 : explicit OGRParquetWriterDataset(
465 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream);
466 :
467 : ~OGRParquetWriterDataset() override;
468 :
469 : arrow::MemoryPool *GetMemoryPool() const
470 : {
471 : return m_poMemoryPool.get();
472 : }
473 :
474 : CPLErr Close() override;
475 :
476 : int GetLayerCount() const override;
477 : const OGRLayer *GetLayer(int idx) const override;
478 : int TestCapability(const char *pszCap) const override;
479 : std::vector<std::string> GetFieldDomainNames(
480 : CSLConstList /*papszOptions*/ = nullptr) const override;
481 : const OGRFieldDomain *
482 : GetFieldDomain(const std::string &name) const override;
483 : bool AddFieldDomain(std::unique_ptr<OGRFieldDomain> &&domain,
484 : std::string &failureReason) override;
485 :
486 282 : GDALMultiDomainMetadata &GetMultiDomainMetadata()
487 : {
488 282 : return oMDMD;
489 : }
490 :
491 : protected:
492 : OGRLayer *ICreateLayer(const char *pszName,
493 : const OGRGeomFieldDefn *poGeomFieldDefn,
494 : CSLConstList papszOptions) override;
495 : };
496 :
497 : #endif // OGR_PARQUET_H
|