Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #ifndef OGR_PARQUET_H
14 : #define OGR_PARQUET_H
15 :
16 : #include "ogrsf_frmts.h"
17 :
18 : #include "cpl_json.h"
19 :
20 : #include <functional>
21 : #include <map>
22 : #include <set>
23 :
24 : #include "../arrow_common/ogr_arrow.h"
25 : #include "ogr_include_parquet.h"
26 :
27 : constexpr int DEFAULT_COMPRESSION_LEVEL = -1;
28 : constexpr int OGR_PARQUET_ZSTD_DEFAULT_COMPRESSION_LEVEL = 9;
29 :
30 : /************************************************************************/
31 : /* OGRParquetLayerBase */
32 : /************************************************************************/
33 :
34 : class OGRParquetDataset;
35 :
36 : class OGRParquetLayerBase CPL_NON_FINAL : public OGRArrowLayer
37 : {
38 : OGRParquetLayerBase(const OGRParquetLayerBase &) = delete;
39 : OGRParquetLayerBase &operator=(const OGRParquetLayerBase &) = delete;
40 :
41 : protected:
42 : OGRParquetLayerBase(OGRParquetDataset *poDS, const char *pszLayerName,
43 : CSLConstList papszOpenOptions);
44 :
45 : OGRParquetDataset *m_poDS = nullptr;
46 : std::shared_ptr<arrow::RecordBatchReader> m_poRecordBatchReader{};
47 : CPLStringList m_aosGeomPossibleNames{};
48 : std::string m_osCRS{};
49 :
50 : #if PARQUET_VERSION_MAJOR >= 21
51 : std::set<int>
52 : m_geoStatsWithBBOXAvailable{}; // key is index of OGR geometry column
53 : #endif
54 :
55 : void LoadGeoMetadata(
56 : const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata);
57 : bool DealWithGeometryColumn(
58 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
59 : std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun,
60 : const parquet::ColumnDescriptor *parquetColumn,
61 : const parquet::FileMetaData *metadata, int iColumn);
62 :
63 : void InvalidateCachedBatches() override;
64 :
65 : static bool ParseGeometryColumnCovering(const CPLJSONObject &oJSONDef,
66 : std::string &osBBOXColumn,
67 : std::string &osXMin,
68 : std::string &osYMin,
69 : std::string &osXMax,
70 : std::string &osYMax);
71 :
72 : public:
73 : int TestCapability(const char *) const override;
74 :
75 : void ResetReading() override;
76 :
77 : GDALDataset *GetDataset() override;
78 :
79 : static int GetNumCPUs();
80 : };
81 :
82 : /************************************************************************/
83 : /* OGRParquetLayer */
84 : /************************************************************************/
85 :
86 : class OGRParquetLayer final : public OGRParquetLayerBase
87 :
88 : {
89 : std::unique_ptr<parquet::arrow::FileReader> m_poArrowReader{};
90 : bool m_bSingleBatch = false;
91 : int m_iFIDParquetColumn = -1;
92 : std::shared_ptr<arrow::DataType> m_poFIDType{};
93 : std::vector<std::shared_ptr<arrow::DataType>>
94 : m_apoArrowDataTypes{}; // .size() == field ocunt
95 : std::vector<int> m_anMapFieldIndexToParquetColumn{};
96 : std::vector<std::vector<int>> m_anMapGeomFieldIndexToParquetColumns{};
97 : bool m_bHasMissingMappingToParquet = false;
98 :
99 : //! Contains pairs of (selected feature idx, total feature idx) break points.
100 : std::vector<std::pair<int64_t, int64_t>> m_asFeatureIdxRemapping{};
101 : //! Iterator over m_asFeatureIdxRemapping
102 : std::vector<std::pair<int64_t, int64_t>>::iterator
103 : m_oFeatureIdxRemappingIter{};
104 : //! Feature index among the potentially restricted set of selected row groups
105 : int64_t m_nFeatureIdxSelected = 0;
106 : std::vector<int> m_anRequestedParquetColumns{}; // only valid when
107 : // m_bIgnoredFields is set
108 : CPLStringList m_aosFeatherMetadata{};
109 :
110 : //! Describe the bbox column of a geometry column
111 : struct GeomColBBOXParquet
112 : {
113 : int iParquetXMin = -1;
114 : int iParquetYMin = -1;
115 : int iParquetXMax = -1;
116 : int iParquetYMax = -1;
117 : std::vector<int> anParquetCols{};
118 : };
119 :
120 : //! Map from OGR geometry field index to GeomColBBOXParquet
121 : std::map<int, GeomColBBOXParquet>
122 : m_oMapGeomFieldIndexToGeomColBBOXParquet{};
123 :
124 : //! GDAL creation options that were used to create the file (if done by GDAL)
125 : CPLStringList m_aosCreationOptions{};
126 :
127 : void EstablishFeatureDefn();
128 : void ProcessGeometryColumnCovering(
129 : const std::shared_ptr<arrow::Field> &field,
130 : const CPLJSONObject &oJSONGeometryColumn,
131 : const std::map<std::string, int> &oMapParquetColumnNameToIdx);
132 : bool CreateRecordBatchReader(int iStartingRowGroup);
133 : bool CreateRecordBatchReader(const std::vector<int> &anRowGroups);
134 : bool ReadNextBatch() override;
135 :
136 : void InvalidateCachedBatches() override;
137 :
138 : OGRwkbGeometryType ComputeGeometryColumnType(int iGeomCol,
139 : int iParquetCol) const;
140 : void CreateFieldFromSchema(
141 : const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
142 : int &iParquetCol, const std::vector<int> &path,
143 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
144 : &oMapFieldNameToGDALSchemaFieldDefn);
145 : bool CheckMatchArrowParquetColumnNames(
146 : int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const;
147 : OGRFeature *GetFeatureExplicitFID(GIntBig nFID);
148 : OGRFeature *GetFeatureByIndex(GIntBig nFID);
149 :
150 1385 : std::string GetDriverUCName() const override
151 : {
152 1385 : return "PARQUET";
153 : }
154 :
155 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
156 :
157 : void IncrFeatureIdx() override;
158 :
159 : public:
160 : OGRParquetLayer(OGRParquetDataset *poDS, const char *pszLayerName,
161 : std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
162 : CSLConstList papszOpenOptions);
163 :
164 : void ResetReading() override;
165 : OGRFeature *GetFeature(GIntBig nFID) override;
166 : GIntBig GetFeatureCount(int bForce) override;
167 : int TestCapability(const char *pszCap) const override;
168 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
169 : const char *GetMetadataItem(const char *pszName,
170 : const char *pszDomain = "") override;
171 : char **GetMetadata(const char *pszDomain = "") override;
172 : OGRErr SetNextByIndex(GIntBig nIndex) override;
173 :
174 : bool GetArrowStream(struct ArrowArrayStream *out_stream,
175 : CSLConstList papszOptions = nullptr) override;
176 :
177 : std::unique_ptr<OGRFieldDomain> BuildDomain(const std::string &osDomainName,
178 : int iFieldIndex) const override;
179 :
180 1316 : parquet::arrow::FileReader *GetReader() const
181 : {
182 1316 : return m_poArrowReader.get();
183 : }
184 :
185 : std::vector<int>
186 : GetParquetColumnIndicesForArrowField(const std::string &field_name) const;
187 :
188 : const std::vector<std::shared_ptr<arrow::DataType>> &
189 249 : GetArrowFieldTypes() const
190 : {
191 249 : return m_apoArrowDataTypes;
192 : }
193 :
194 2 : int GetFIDParquetColumn() const
195 : {
196 2 : return m_iFIDParquetColumn;
197 : }
198 :
199 4 : const CPLStringList &GetCreationOptions() const
200 : {
201 4 : return m_aosCreationOptions;
202 : }
203 :
204 : static constexpr int OGR_FID_INDEX = -2;
205 : bool GetMinMaxForOGRField(int iRowGroup, // -1 for all
206 : int iOGRField, // or OGR_FID_INDEX
207 : bool bComputeMin, OGRField &sMin, bool &bFoundMin,
208 : bool bComputeMax, OGRField &sMax, bool &bFoundMax,
209 : OGRFieldType &eType, OGRFieldSubType &eSubType,
210 : std::string &osMinTmp,
211 : std::string &osMaxTmp) const;
212 :
213 : bool GetMinMaxForParquetCol(int iRowGroup, // -1 for all
214 : int iCol,
215 : const std::shared_ptr<arrow::DataType>
216 : &arrowType, // potentially nullptr
217 : bool bComputeMin, OGRField &sMin,
218 : bool &bFoundMin, bool bComputeMax,
219 : OGRField &sMax, bool &bFoundMax,
220 : OGRFieldType &eType, OGRFieldSubType &eSubType,
221 : std::string &osMinTmp,
222 : std::string &osMaxTmp) const;
223 :
224 : bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin,
225 : int &iParquetXMax, int &iParquetYMax) const;
226 : };
227 :
228 : /************************************************************************/
229 : /* OGRParquetDatasetLayer */
230 : /************************************************************************/
231 :
232 : #ifdef GDAL_USE_ARROWDATASET
233 :
234 : class OGRParquetDatasetLayer final : public OGRParquetLayerBase
235 : {
236 : bool m_bIsVSI = false;
237 : bool m_bRebuildScanner = true;
238 : bool m_bSkipFilterGeometry = false;
239 : std::shared_ptr<arrow::dataset::Dataset> m_poDataset{};
240 : std::shared_ptr<arrow::dataset::Scanner> m_poScanner{};
241 : std::vector<std::string> m_aosProjectedFields{};
242 :
243 : void EstablishFeatureDefn();
244 : void
245 : ProcessGeometryColumnCovering(const std::shared_ptr<arrow::Field> &field,
246 : const CPLJSONObject &oJSONGeometryColumn);
247 :
248 : void BuildScanner();
249 :
250 : //! Translate a OGR SQL expression into an Arrow one
251 : // bFullyTranslated should be set to true before calling this method.
252 : arrow::compute::Expression BuildArrowFilter(const swq_expr_node *poNode,
253 : bool &bFullyTranslated);
254 :
255 : protected:
256 311 : std::string GetDriverUCName() const override
257 : {
258 311 : return "PARQUET";
259 : }
260 :
261 : bool ReadNextBatch() override;
262 :
263 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
264 :
265 : public:
266 : OGRParquetDatasetLayer(
267 : OGRParquetDataset *poDS, const char *pszLayerName, bool bIsVSI,
268 : const std::shared_ptr<arrow::dataset::Dataset> &dataset,
269 : CSLConstList papszOpenOptions);
270 :
271 : OGRFeature *GetNextFeature() override;
272 :
273 : GIntBig GetFeatureCount(int bForce) override;
274 : OGRErr IGetExtent(int iGeomField, OGREnvelope *psExtent,
275 : bool bForce) override;
276 :
277 : OGRErr ISetSpatialFilter(int iGeomField,
278 : const OGRGeometry *poGeom) override;
279 :
280 : OGRErr SetAttributeFilter(const char *pszFilter) override;
281 :
282 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
283 :
284 : int TestCapability(const char *) const override;
285 :
286 : // TODO
287 : std::unique_ptr<OGRFieldDomain>
288 0 : BuildDomain(const std::string & /*osDomainName*/,
289 : int /*iFieldIndex*/) const override
290 : {
291 0 : return nullptr;
292 : }
293 : };
294 :
295 : #endif
296 :
297 : /************************************************************************/
298 : /* OGRParquetDataset */
299 : /************************************************************************/
300 :
301 : class OGRParquetDataset final : public OGRArrowDataset
302 : {
303 : std::shared_ptr<arrow::fs::FileSystem> m_poFS{};
304 :
305 : public:
306 : explicit OGRParquetDataset();
307 : ~OGRParquetDataset() override;
308 :
309 : CPLErr Close() override;
310 :
311 : OGRLayer *ExecuteSQL(const char *pszSQLCommand,
312 : OGRGeometry *poSpatialFilter,
313 : const char *pszDialect) override;
314 : void ReleaseResultSet(OGRLayer *poResultsSet) override;
315 :
316 : int TestCapability(const char *) const override;
317 :
318 274 : void SetFileSystem(const std::shared_ptr<arrow::fs::FileSystem> &fs)
319 : {
320 274 : m_poFS = fs;
321 274 : }
322 :
323 : std::unique_ptr<OGRParquetLayer>
324 : CreateReaderLayer(const std::string &osFilename, VSILFILE *&fpIn,
325 : CSLConstList papszOpenOptionsIn);
326 : };
327 :
328 : /************************************************************************/
329 : /* OGRParquetWriterLayer */
330 : /************************************************************************/
331 :
332 : class OGRParquetWriterDataset;
333 :
334 : class OGRParquetWriterLayer final : public OGRArrowWriterLayer
335 : {
336 : OGRParquetWriterLayer(const OGRParquetWriterLayer &) = delete;
337 : OGRParquetWriterLayer &operator=(const OGRParquetWriterLayer &) = delete;
338 :
339 : OGRParquetWriterDataset *m_poDataset = nullptr;
340 : std::unique_ptr<parquet::arrow::FileWriter> m_poFileWriter{};
341 : std::shared_ptr<const arrow::KeyValueMetadata> m_poKeyValueMetadata{};
342 : bool m_bForceCounterClockwiseOrientation = false;
343 : parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{};
344 :
345 : //! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode
346 : std::unique_ptr<GDALDataset> m_poTmpGPKG{};
347 : //! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode
348 : OGRLayer *m_poTmpGPKGLayer = nullptr;
349 : //! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode
350 : GIntBig m_nTmpFeatureCount = 0;
351 :
352 : //! Whether to write "geo" footer metadata;
353 : bool m_bWriteGeoMetadata = true;
354 :
355 : CPLStringList m_aosCreationOptions{};
356 :
357 723 : bool IsFileWriterCreated() const override
358 : {
359 723 : return m_poFileWriter != nullptr;
360 : }
361 :
362 : void CreateWriter() override;
363 : bool CloseFileWriter() override;
364 :
365 : void CreateSchema() override;
366 : void PerformStepsBeforeFinalFlushGroup() override;
367 :
368 : bool FlushGroup() override;
369 :
370 312 : std::string GetDriverUCName() const override
371 : {
372 312 : return "PARQUET";
373 : }
374 :
375 : virtual bool
376 : IsSupportedGeometryType(OGRwkbGeometryType eGType) const override;
377 :
378 : virtual void FixupWKBGeometryBeforeWriting(GByte *pabyWKB,
379 : size_t nLen) override;
380 : void FixupGeometryBeforeWriting(OGRGeometry *poGeom) override;
381 :
382 26 : bool IsSRSRequired() const override
383 : {
384 26 : return false;
385 : }
386 :
387 : std::string GetGeoMetadata() const;
388 :
389 : //! Copy temporary GeoPackage layer to final Parquet file
390 : bool CopyTmpGpkgLayerToFinalFile();
391 :
392 : public:
393 : OGRParquetWriterLayer(
394 : OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool,
395 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
396 : const char *pszLayerName);
397 :
398 : CPLErr SetMetadata(char **papszMetadata, const char *pszDomain) override;
399 :
400 : bool SetOptions(const OGRGeomFieldDefn *poSrcGeomFieldDefn,
401 : CSLConstList papszOptions);
402 :
403 : OGRErr CreateGeomField(const OGRGeomFieldDefn *poField,
404 : int bApproxOK = TRUE) override;
405 :
406 : int TestCapability(const char *pszCap) const override;
407 : #if PARQUET_VERSION_MAJOR <= 10
408 : // Parquet <= 10 doesn't support the WriteRecordBatch() API
409 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
410 : CSLConstList papszOptions,
411 : std::string &osErrorMsg) const override
412 : {
413 : return OGRLayer::IsArrowSchemaSupported(schema, papszOptions,
414 : osErrorMsg);
415 : }
416 :
417 : bool
418 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
419 : CSLConstList papszOptions = nullptr) override
420 : {
421 : return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions);
422 : }
423 :
424 : bool WriteArrowBatch(const struct ArrowSchema *schema,
425 : struct ArrowArray *array,
426 : CSLConstList papszOptions = nullptr) override
427 : {
428 : return OGRLayer::WriteArrowBatch(schema, array, papszOptions);
429 : }
430 : #else
431 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
432 : CSLConstList papszOptions,
433 : std::string &osErrorMsg) const override;
434 : bool
435 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
436 : CSLConstList papszOptions = nullptr) override;
437 : bool WriteArrowBatch(const struct ArrowSchema *schema,
438 : struct ArrowArray *array,
439 : CSLConstList papszOptions = nullptr) override;
440 : #endif
441 :
442 : GDALDataset *GetDataset() override;
443 :
444 : protected:
445 : OGRErr ICreateFeature(OGRFeature *poFeature) override;
446 :
447 : friend class OGRParquetWriterDataset;
448 : bool Close();
449 : };
450 :
451 : /************************************************************************/
452 : /* OGRParquetWriterDataset */
453 : /************************************************************************/
454 :
455 : class OGRParquetWriterDataset final : public GDALPamDataset
456 : {
457 : std::unique_ptr<arrow::MemoryPool> m_poMemoryPool{};
458 : std::unique_ptr<OGRParquetWriterLayer> m_poLayer{};
459 : std::shared_ptr<arrow::io::OutputStream> m_poOutputStream{};
460 :
461 : public:
462 : explicit OGRParquetWriterDataset(
463 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream);
464 :
465 : ~OGRParquetWriterDataset() override;
466 :
467 : arrow::MemoryPool *GetMemoryPool() const
468 : {
469 : return m_poMemoryPool.get();
470 : }
471 :
472 : CPLErr Close() override;
473 :
474 : int GetLayerCount() const override;
475 : const OGRLayer *GetLayer(int idx) const override;
476 : int TestCapability(const char *pszCap) const override;
477 : std::vector<std::string> GetFieldDomainNames(
478 : CSLConstList /*papszOptions*/ = nullptr) const override;
479 : const OGRFieldDomain *
480 : GetFieldDomain(const std::string &name) const override;
481 : bool AddFieldDomain(std::unique_ptr<OGRFieldDomain> &&domain,
482 : std::string &failureReason) override;
483 :
484 283 : GDALMultiDomainMetadata &GetMultiDomainMetadata()
485 : {
486 283 : return oMDMD;
487 : }
488 :
489 : protected:
490 : OGRLayer *ICreateLayer(const char *pszName,
491 : const OGRGeomFieldDefn *poGeomFieldDefn,
492 : CSLConstList papszOptions) override;
493 : };
494 :
495 : #endif // OGR_PARQUET_H
|