Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #ifndef OGR_PARQUET_H
14 : #define OGR_PARQUET_H
15 :
16 : #include "ogrsf_frmts.h"
17 :
18 : #include "cpl_json.h"
19 :
20 : #include <functional>
21 : #include <map>
22 :
23 : #include "../arrow_common/ogr_arrow.h"
24 : #include "ogr_include_parquet.h"
25 :
26 : /************************************************************************/
27 : /* OGRParquetLayerBase */
28 : /************************************************************************/
29 :
30 : class OGRParquetDataset;
31 :
32 : class OGRParquetLayerBase CPL_NON_FINAL : public OGRArrowLayer
33 : {
34 : OGRParquetLayerBase(const OGRParquetLayerBase &) = delete;
35 : OGRParquetLayerBase &operator=(const OGRParquetLayerBase &) = delete;
36 :
37 : protected:
38 : OGRParquetLayerBase(OGRParquetDataset *poDS, const char *pszLayerName,
39 : CSLConstList papszOpenOptions);
40 :
41 : OGRParquetDataset *m_poDS = nullptr;
42 : std::shared_ptr<arrow::RecordBatchReader> m_poRecordBatchReader{};
43 : CPLStringList m_aosGeomPossibleNames{};
44 : std::string m_osCRS{};
45 :
46 : static int GetNumCPUs();
47 :
48 : void LoadGeoMetadata(
49 : const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata);
50 : bool DealWithGeometryColumn(
51 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
52 : std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun);
53 :
54 : void InvalidateCachedBatches() override;
55 :
56 : static bool ParseGeometryColumnCovering(const CPLJSONObject &oJSONDef,
57 : std::string &osBBOXColumn,
58 : std::string &osXMin,
59 : std::string &osYMin,
60 : std::string &osXMax,
61 : std::string &osYMax);
62 :
63 : public:
64 : int TestCapability(const char *) override;
65 :
66 : void ResetReading() override;
67 :
68 : GDALDataset *GetDataset() override;
69 : };
70 :
71 : /************************************************************************/
72 : /* OGRParquetLayer */
73 : /************************************************************************/
74 :
75 : class OGRParquetLayer final : public OGRParquetLayerBase
76 :
77 : {
78 : std::unique_ptr<parquet::arrow::FileReader> m_poArrowReader{};
79 : bool m_bSingleBatch = false;
80 : int m_iFIDParquetColumn = -1;
81 : std::shared_ptr<arrow::DataType> m_poFIDType{};
82 : std::vector<std::shared_ptr<arrow::DataType>>
83 : m_apoArrowDataTypes{}; // .size() == field ocunt
84 : std::vector<int> m_anMapFieldIndexToParquetColumn{};
85 : std::vector<std::vector<int>> m_anMapGeomFieldIndexToParquetColumns{};
86 : bool m_bHasMissingMappingToParquet = false;
87 :
88 : //! Contains pairs of (selected feature idx, total feature idx) break points.
89 : std::vector<std::pair<int64_t, int64_t>> m_asFeatureIdxRemapping{};
90 : //! Iterator over m_asFeatureIdxRemapping
91 : std::vector<std::pair<int64_t, int64_t>>::iterator
92 : m_oFeatureIdxRemappingIter{};
93 : //! Feature index among the potentially restricted set of selected row groups
94 : int64_t m_nFeatureIdxSelected = 0;
95 : std::vector<int> m_anRequestedParquetColumns{}; // only valid when
96 : // m_bIgnoredFields is set
97 : CPLStringList m_aosFeatherMetadata{};
98 :
99 : //! Describe the bbox column of a geometry column
100 : struct GeomColBBOXParquet
101 : {
102 : int iParquetXMin = -1;
103 : int iParquetYMin = -1;
104 : int iParquetXMax = -1;
105 : int iParquetYMax = -1;
106 : std::vector<int> anParquetCols{};
107 : };
108 :
109 : //! Map from OGR geometry field index to GeomColBBOXParquet
110 : std::map<int, GeomColBBOXParquet>
111 : m_oMapGeomFieldIndexToGeomColBBOXParquet{};
112 :
113 : void EstablishFeatureDefn();
114 : void ProcessGeometryColumnCovering(
115 : const std::shared_ptr<arrow::Field> &field,
116 : const CPLJSONObject &oJSONGeometryColumn,
117 : const std::map<std::string, int> &oMapParquetColumnNameToIdx);
118 : bool CreateRecordBatchReader(int iStartingRowGroup);
119 : bool CreateRecordBatchReader(const std::vector<int> &anRowGroups);
120 : bool ReadNextBatch() override;
121 :
122 : void InvalidateCachedBatches() override;
123 :
124 : OGRwkbGeometryType ComputeGeometryColumnType(int iGeomCol,
125 : int iParquetCol) const;
126 : void CreateFieldFromSchema(
127 : const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
128 : int &iParquetCol, const std::vector<int> &path,
129 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
130 : &oMapFieldNameToGDALSchemaFieldDefn);
131 : bool CheckMatchArrowParquetColumnNames(
132 : int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const;
133 : OGRFeature *GetFeatureExplicitFID(GIntBig nFID);
134 : OGRFeature *GetFeatureByIndex(GIntBig nFID);
135 :
136 1351 : virtual std::string GetDriverUCName() const override
137 : {
138 1351 : return "PARQUET";
139 : }
140 :
141 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
142 :
143 : void IncrFeatureIdx() override;
144 :
145 : public:
146 : OGRParquetLayer(OGRParquetDataset *poDS, const char *pszLayerName,
147 : std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
148 : CSLConstList papszOpenOptions);
149 :
150 : void ResetReading() override;
151 : OGRFeature *GetFeature(GIntBig nFID) override;
152 : GIntBig GetFeatureCount(int bForce) override;
153 : int TestCapability(const char *pszCap) override;
154 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
155 : const char *GetMetadataItem(const char *pszName,
156 : const char *pszDomain = "") override;
157 : char **GetMetadata(const char *pszDomain = "") override;
158 : OGRErr SetNextByIndex(GIntBig nIndex) override;
159 :
160 : bool GetArrowStream(struct ArrowArrayStream *out_stream,
161 : CSLConstList papszOptions = nullptr) override;
162 :
163 : std::unique_ptr<OGRFieldDomain> BuildDomain(const std::string &osDomainName,
164 : int iFieldIndex) const override;
165 :
166 1297 : parquet::arrow::FileReader *GetReader() const
167 : {
168 1297 : return m_poArrowReader.get();
169 : }
170 :
171 306 : const std::vector<int> &GetMapFieldIndexToParquetColumn() const
172 : {
173 306 : return m_anMapFieldIndexToParquetColumn;
174 : }
175 :
176 : const std::vector<std::shared_ptr<arrow::DataType>> &
177 253 : GetArrowFieldTypes() const
178 : {
179 253 : return m_apoArrowDataTypes;
180 : }
181 :
182 2 : int GetFIDParquetColumn() const
183 : {
184 2 : return m_iFIDParquetColumn;
185 : }
186 :
187 : static constexpr int OGR_FID_INDEX = -2;
188 : bool GetMinMaxForOGRField(int iRowGroup, // -1 for all
189 : int iOGRField, // or OGR_FID_INDEX
190 : bool bComputeMin, OGRField &sMin, bool &bFoundMin,
191 : bool bComputeMax, OGRField &sMax, bool &bFoundMax,
192 : OGRFieldType &eType, OGRFieldSubType &eSubType,
193 : std::string &osMinTmp,
194 : std::string &osMaxTmp) const;
195 :
196 : bool GetMinMaxForParquetCol(int iRowGroup, // -1 for all
197 : int iCol,
198 : const std::shared_ptr<arrow::DataType>
199 : &arrowType, // potentially nullptr
200 : bool bComputeMin, OGRField &sMin,
201 : bool &bFoundMin, bool bComputeMax,
202 : OGRField &sMax, bool &bFoundMax,
203 : OGRFieldType &eType, OGRFieldSubType &eSubType,
204 : std::string &osMinTmp,
205 : std::string &osMaxTmp) const;
206 :
207 : bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin,
208 : int &iParquetXMax, int &iParquetYMax) const;
209 : };
210 :
211 : /************************************************************************/
212 : /* OGRParquetDatasetLayer */
213 : /************************************************************************/
214 :
215 : #ifdef GDAL_USE_ARROWDATASET
216 :
217 : class OGRParquetDatasetLayer final : public OGRParquetLayerBase
218 : {
219 : bool m_bIsVSI = false;
220 : bool m_bRebuildScanner = true;
221 : bool m_bSkipFilterGeometry = false;
222 : std::shared_ptr<arrow::dataset::Dataset> m_poDataset{};
223 : std::shared_ptr<arrow::dataset::Scanner> m_poScanner{};
224 : std::vector<std::string> m_aosProjectedFields{};
225 :
226 : void EstablishFeatureDefn();
227 : void
228 : ProcessGeometryColumnCovering(const std::shared_ptr<arrow::Field> &field,
229 : const CPLJSONObject &oJSONGeometryColumn);
230 :
231 : void BuildScanner();
232 :
233 : //! Translate a OGR SQL expression into an Arrow one
234 : // bFullyTranslated should be set to true before calling this method.
235 : arrow::compute::Expression BuildArrowFilter(const swq_expr_node *poNode,
236 : bool &bFullyTranslated);
237 :
238 : protected:
239 311 : std::string GetDriverUCName() const override
240 : {
241 311 : return "PARQUET";
242 : }
243 :
244 : bool ReadNextBatch() override;
245 :
246 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
247 :
248 : public:
249 : OGRParquetDatasetLayer(
250 : OGRParquetDataset *poDS, const char *pszLayerName, bool bIsVSI,
251 : const std::shared_ptr<arrow::dataset::Dataset> &dataset,
252 : CSLConstList papszOpenOptions);
253 :
254 : OGRFeature *GetNextFeature() override;
255 :
256 : GIntBig GetFeatureCount(int bForce) override;
257 : OGRErr GetExtent(OGREnvelope *psExtent, int bForce = TRUE) override;
258 : OGRErr GetExtent(int iGeomField, OGREnvelope *psExtent,
259 : int bForce = TRUE) override;
260 :
261 424 : void SetSpatialFilter(OGRGeometry *poGeom) override
262 : {
263 424 : SetSpatialFilter(0, poGeom);
264 424 : }
265 :
266 : void SetSpatialFilter(int iGeomField, OGRGeometry *poGeom) override;
267 :
268 : OGRErr SetAttributeFilter(const char *pszFilter) override;
269 :
270 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
271 :
272 : int TestCapability(const char *) override;
273 :
274 : // TODO
275 : std::unique_ptr<OGRFieldDomain>
276 0 : BuildDomain(const std::string & /*osDomainName*/,
277 : int /*iFieldIndex*/) const override
278 : {
279 0 : return nullptr;
280 : }
281 : };
282 :
283 : #endif
284 :
285 : /************************************************************************/
286 : /* OGRParquetDataset */
287 : /************************************************************************/
288 :
289 : class OGRParquetDataset final : public OGRArrowDataset
290 : {
291 : std::shared_ptr<arrow::fs::FileSystem> m_poFS{};
292 :
293 : public:
294 : explicit OGRParquetDataset(
295 : const std::shared_ptr<arrow::MemoryPool> &poMemoryPool);
296 : ~OGRParquetDataset();
297 :
298 : OGRLayer *ExecuteSQL(const char *pszSQLCommand,
299 : OGRGeometry *poSpatialFilter,
300 : const char *pszDialect) override;
301 : void ReleaseResultSet(OGRLayer *poResultsSet) override;
302 :
303 : int TestCapability(const char *) override;
304 :
305 273 : void SetFileSystem(const std::shared_ptr<arrow::fs::FileSystem> &fs)
306 : {
307 273 : m_poFS = fs;
308 273 : }
309 : };
310 :
311 : /************************************************************************/
312 : /* OGRParquetWriterLayer */
313 : /************************************************************************/
314 :
315 : class OGRParquetWriterDataset;
316 :
317 : class OGRParquetWriterLayer final : public OGRArrowWriterLayer
318 : {
319 : OGRParquetWriterLayer(const OGRParquetWriterLayer &) = delete;
320 : OGRParquetWriterLayer &operator=(const OGRParquetWriterLayer &) = delete;
321 :
322 : OGRParquetWriterDataset *m_poDataset = nullptr;
323 : std::unique_ptr<parquet::arrow::FileWriter> m_poFileWriter{};
324 : std::shared_ptr<const arrow::KeyValueMetadata> m_poKeyValueMetadata{};
325 : bool m_bForceCounterClockwiseOrientation = false;
326 : bool m_bEdgesSpherical = false;
327 : parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{};
328 :
329 : //! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode
330 : std::unique_ptr<GDALDataset> m_poTmpGPKG{};
331 : //! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode
332 : OGRLayer *m_poTmpGPKGLayer = nullptr;
333 : //! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode
334 : GIntBig m_nTmpFeatureCount = 0;
335 :
336 662 : virtual bool IsFileWriterCreated() const override
337 : {
338 662 : return m_poFileWriter != nullptr;
339 : }
340 :
341 : virtual void CreateWriter() override;
342 : virtual bool CloseFileWriter() override;
343 :
344 : virtual void CreateSchema() override;
345 : virtual void PerformStepsBeforeFinalFlushGroup() override;
346 :
347 : virtual bool FlushGroup() override;
348 :
349 286 : virtual std::string GetDriverUCName() const override
350 : {
351 286 : return "PARQUET";
352 : }
353 :
354 : virtual bool
355 : IsSupportedGeometryType(OGRwkbGeometryType eGType) const override;
356 :
357 : virtual void FixupWKBGeometryBeforeWriting(GByte *pabyWKB,
358 : size_t nLen) override;
359 : virtual void FixupGeometryBeforeWriting(OGRGeometry *poGeom) override;
360 :
361 26 : virtual bool IsSRSRequired() const override
362 : {
363 26 : return false;
364 : }
365 :
366 : std::string GetGeoMetadata() const;
367 :
368 : //! Copy temporary GeoPackage layer to final Parquet file
369 : bool CopyTmpGpkgLayerToFinalFile();
370 :
371 : public:
372 : OGRParquetWriterLayer(
373 : OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool,
374 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
375 : const char *pszLayerName);
376 :
377 : CPLErr SetMetadata(char **papszMetadata, const char *pszDomain) override;
378 :
379 : bool SetOptions(CSLConstList papszOptions,
380 : const OGRSpatialReference *poSpatialRef,
381 : OGRwkbGeometryType eGType);
382 :
383 : OGRErr CreateGeomField(const OGRGeomFieldDefn *poField,
384 : int bApproxOK = TRUE) override;
385 :
386 : int TestCapability(const char *pszCap) override;
387 : #if PARQUET_VERSION_MAJOR <= 10
388 : // Parquet <= 10 doesn't support the WriteRecordBatch() API
389 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
390 : CSLConstList papszOptions,
391 : std::string &osErrorMsg) const override
392 : {
393 : return OGRLayer::IsArrowSchemaSupported(schema, papszOptions,
394 : osErrorMsg);
395 : }
396 :
397 : bool
398 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
399 : CSLConstList papszOptions = nullptr) override
400 : {
401 : return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions);
402 : }
403 :
404 : bool WriteArrowBatch(const struct ArrowSchema *schema,
405 : struct ArrowArray *array,
406 : CSLConstList papszOptions = nullptr) override
407 : {
408 : return OGRLayer::WriteArrowBatch(schema, array, papszOptions);
409 : }
410 : #else
411 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
412 : CSLConstList papszOptions,
413 : std::string &osErrorMsg) const override;
414 : bool
415 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
416 : CSLConstList papszOptions = nullptr) override;
417 : bool WriteArrowBatch(const struct ArrowSchema *schema,
418 : struct ArrowArray *array,
419 : CSLConstList papszOptions = nullptr) override;
420 : #endif
421 :
422 : GDALDataset *GetDataset() override;
423 :
424 : protected:
425 : OGRErr ICreateFeature(OGRFeature *poFeature) override;
426 :
427 : friend class OGRParquetWriterDataset;
428 : bool Close();
429 : };
430 :
431 : /************************************************************************/
432 : /* OGRParquetWriterDataset */
433 : /************************************************************************/
434 :
435 : class OGRParquetWriterDataset final : public GDALPamDataset
436 : {
437 : std::unique_ptr<arrow::MemoryPool> m_poMemoryPool{};
438 : std::unique_ptr<OGRParquetWriterLayer> m_poLayer{};
439 : std::shared_ptr<arrow::io::OutputStream> m_poOutputStream{};
440 :
441 : public:
442 : explicit OGRParquetWriterDataset(
443 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream);
444 :
445 : arrow::MemoryPool *GetMemoryPool() const
446 : {
447 : return m_poMemoryPool.get();
448 : }
449 :
450 : CPLErr Close() override;
451 :
452 : int GetLayerCount() override;
453 : OGRLayer *GetLayer(int idx) override;
454 : int TestCapability(const char *pszCap) override;
455 : std::vector<std::string> GetFieldDomainNames(
456 : CSLConstList /*papszOptions*/ = nullptr) const override;
457 : const OGRFieldDomain *
458 : GetFieldDomain(const std::string &name) const override;
459 : bool AddFieldDomain(std::unique_ptr<OGRFieldDomain> &&domain,
460 : std::string &failureReason) override;
461 :
462 258 : GDALMultiDomainMetadata &GetMultiDomainMetadata()
463 : {
464 258 : return oMDMD;
465 : }
466 :
467 : protected:
468 : OGRLayer *ICreateLayer(const char *pszName,
469 : const OGRGeomFieldDefn *poGeomFieldDefn,
470 : CSLConstList papszOptions) override;
471 : };
472 :
473 : #endif // OGR_PARQUET_H
|