Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * Permission is hereby granted, free of charge, to any person obtaining a
11 : * copy of this software and associated documentation files (the "Software"),
12 : * to deal in the Software without restriction, including without limitation
13 : * the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 : * and/or sell copies of the Software, and to permit persons to whom the
15 : * Software is furnished to do so, subject to the following conditions:
16 : *
17 : * The above copyright notice and this permission notice shall be included
18 : * in all copies or substantial portions of the Software.
19 : *
20 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 : * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 : * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25 : * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26 : * DEALINGS IN THE SOFTWARE.
27 : ****************************************************************************/
28 :
29 : #ifndef OGR_PARQUET_H
30 : #define OGR_PARQUET_H
31 :
32 : #include "ogrsf_frmts.h"
33 :
34 : #include <functional>
35 : #include <map>
36 :
37 : #include "../arrow_common/ogr_arrow.h"
38 : #include "ogr_include_parquet.h"
39 :
40 : /************************************************************************/
41 : /* OGRParquetLayerBase */
42 : /************************************************************************/
43 :
44 : class OGRParquetDataset;
45 :
46 : class OGRParquetLayerBase CPL_NON_FINAL : public OGRArrowLayer
47 : {
48 : OGRParquetLayerBase(const OGRParquetLayerBase &) = delete;
49 : OGRParquetLayerBase &operator=(const OGRParquetLayerBase &) = delete;
50 :
51 : protected:
52 : OGRParquetLayerBase(OGRParquetDataset *poDS, const char *pszLayerName,
53 : CSLConstList papszOpenOptions);
54 :
55 : OGRParquetDataset *m_poDS = nullptr;
56 : std::shared_ptr<arrow::RecordBatchReader> m_poRecordBatchReader{};
57 : CPLStringList m_aosGeomPossibleNames{};
58 : std::string m_osCRS{};
59 :
60 : void LoadGeoMetadata(
61 : const std::shared_ptr<const arrow::KeyValueMetadata> &kv_metadata);
62 : bool DealWithGeometryColumn(
63 : int iFieldIdx, const std::shared_ptr<arrow::Field> &field,
64 : std::function<OGRwkbGeometryType(void)> computeGeometryTypeFun);
65 :
66 : public:
67 : int TestCapability(const char *) override;
68 :
69 : void ResetReading() override;
70 :
71 : GDALDataset *GetDataset() override;
72 : };
73 :
74 : /************************************************************************/
75 : /* OGRParquetLayer */
76 : /************************************************************************/
77 :
78 : class OGRParquetLayer final : public OGRParquetLayerBase
79 :
80 : {
81 : std::unique_ptr<parquet::arrow::FileReader> m_poArrowReader{};
82 : bool m_bSingleBatch = false;
83 : int m_iFIDParquetColumn = -1;
84 : std::shared_ptr<arrow::DataType> m_poFIDType{};
85 : std::vector<std::shared_ptr<arrow::DataType>>
86 : m_apoArrowDataTypes{}; // .size() == field ocunt
87 : std::vector<int> m_anMapFieldIndexToParquetColumn{};
88 : std::vector<std::vector<int>> m_anMapGeomFieldIndexToParquetColumns{};
89 : bool m_bHasMissingMappingToParquet = false;
90 :
91 : //! Contains pairs of (selected feature idx, total feature idx) break points.
92 : std::vector<std::pair<int64_t, int64_t>> m_asFeatureIdxRemapping{};
93 : //! Iterator over m_asFeatureIdxRemapping
94 : std::vector<std::pair<int64_t, int64_t>>::iterator
95 : m_oFeatureIdxRemappingIter{};
96 : //! Feature index among the potentially restricted set of selected row groups
97 : int64_t m_nFeatureIdxSelected = 0;
98 : std::vector<int> m_anRequestedParquetColumns{}; // only valid when
99 : // m_bIgnoredFields is set
100 : #ifdef DEBUG
101 : int m_nExpectedBatchColumns =
102 : 0; // Should be equal to m_poBatch->num_columns() (when
103 : // m_bIgnoredFields is set)
104 : #endif
105 : CPLStringList m_aosFeatherMetadata{};
106 :
107 : //! Describe the bbox column of a geometry column
108 : struct GeomColBBOXParquet
109 : {
110 : int iParquetXMin = -1;
111 : int iParquetYMin = -1;
112 : int iParquetXMax = -1;
113 : int iParquetYMax = -1;
114 : std::vector<int> anParquetCols{};
115 : };
116 :
117 : //! Map from OGR geometry field index to GeomColBBOXParquet
118 : std::map<int, GeomColBBOXParquet>
119 : m_oMapGeomFieldIndexToGeomColBBOXParquet{};
120 :
121 : void EstablishFeatureDefn();
122 : void ProcessGeometryColumnCovering(
123 : const std::shared_ptr<arrow::Field> &field,
124 : const CPLJSONObject &oJSONGeometryColumn,
125 : const std::map<std::string, int> &oMapParquetColumnNameToIdx);
126 : bool CreateRecordBatchReader(int iStartingRowGroup);
127 : bool CreateRecordBatchReader(const std::vector<int> &anRowGroups);
128 : bool ReadNextBatch() override;
129 :
130 : void InvalidateCachedBatches() override;
131 :
132 : OGRwkbGeometryType ComputeGeometryColumnType(int iGeomCol,
133 : int iParquetCol) const;
134 : void CreateFieldFromSchema(
135 : const std::shared_ptr<arrow::Field> &field, bool bParquetColValid,
136 : int &iParquetCol, const std::vector<int> &path,
137 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
138 : &oMapFieldNameToGDALSchemaFieldDefn);
139 : bool CheckMatchArrowParquetColumnNames(
140 : int &iParquetCol, const std::shared_ptr<arrow::Field> &field) const;
141 : OGRFeature *GetFeatureExplicitFID(GIntBig nFID);
142 : OGRFeature *GetFeatureByIndex(GIntBig nFID);
143 :
144 1757 : virtual std::string GetDriverUCName() const override
145 : {
146 1757 : return "PARQUET";
147 : }
148 :
149 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
150 :
151 : void IncrFeatureIdx() override;
152 :
153 : public:
154 : OGRParquetLayer(OGRParquetDataset *poDS, const char *pszLayerName,
155 : std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
156 : CSLConstList papszOpenOptions);
157 :
158 : void ResetReading() override;
159 : OGRFeature *GetFeature(GIntBig nFID) override;
160 : GIntBig GetFeatureCount(int bForce) override;
161 : int TestCapability(const char *pszCap) override;
162 : OGRErr SetIgnoredFields(CSLConstList papszFields) override;
163 : const char *GetMetadataItem(const char *pszName,
164 : const char *pszDomain = "") override;
165 : char **GetMetadata(const char *pszDomain = "") override;
166 : OGRErr SetNextByIndex(GIntBig nIndex) override;
167 :
168 : bool GetArrowStream(struct ArrowArrayStream *out_stream,
169 : CSLConstList papszOptions = nullptr) override;
170 :
171 : std::unique_ptr<OGRFieldDomain> BuildDomain(const std::string &osDomainName,
172 : int iFieldIndex) const override;
173 :
174 1249 : parquet::arrow::FileReader *GetReader() const
175 : {
176 1249 : return m_poArrowReader.get();
177 : }
178 :
179 286 : const std::vector<int> &GetMapFieldIndexToParquetColumn() const
180 : {
181 286 : return m_anMapFieldIndexToParquetColumn;
182 : }
183 :
184 : const std::vector<std::shared_ptr<arrow::DataType>> &
185 237 : GetArrowFieldTypes() const
186 : {
187 237 : return m_apoArrowDataTypes;
188 : }
189 :
190 2 : int GetFIDParquetColumn() const
191 : {
192 2 : return m_iFIDParquetColumn;
193 : }
194 :
195 : static constexpr int OGR_FID_INDEX = -2;
196 : bool GetMinMaxForOGRField(int iRowGroup, // -1 for all
197 : int iOGRField, // or OGR_FID_INDEX
198 : bool bComputeMin, OGRField &sMin, bool &bFoundMin,
199 : bool bComputeMax, OGRField &sMax, bool &bFoundMax,
200 : OGRFieldType &eType, OGRFieldSubType &eSubType,
201 : std::string &osMinTmp,
202 : std::string &osMaxTmp) const;
203 :
204 : bool GetMinMaxForParquetCol(int iRowGroup, // -1 for all
205 : int iCol,
206 : const std::shared_ptr<arrow::DataType>
207 : &arrowType, // potentially nullptr
208 : bool bComputeMin, OGRField &sMin,
209 : bool &bFoundMin, bool bComputeMax,
210 : OGRField &sMax, bool &bFoundMax,
211 : OGRFieldType &eType, OGRFieldSubType &eSubType,
212 : std::string &osMinTmp,
213 : std::string &osMaxTmp) const;
214 :
215 : bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin,
216 : int &iParquetXMax, int &iParquetYMax) const;
217 : };
218 :
219 : /************************************************************************/
220 : /* OGRParquetDatasetLayer */
221 : /************************************************************************/
222 :
223 : #ifdef GDAL_USE_ARROWDATASET
224 :
225 : class OGRParquetDatasetLayer final : public OGRParquetLayerBase
226 : {
227 : std::shared_ptr<arrow::dataset::Scanner> m_poScanner{};
228 :
229 : void EstablishFeatureDefn();
230 :
231 : protected:
232 24 : std::string GetDriverUCName() const override
233 : {
234 24 : return "PARQUET";
235 : }
236 :
237 : bool ReadNextBatch() override;
238 :
239 : void InvalidateCachedBatches() override;
240 :
241 : bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;
242 :
243 : public:
244 : OGRParquetDatasetLayer(
245 : OGRParquetDataset *poDS, const char *pszLayerName,
246 : const std::shared_ptr<arrow::dataset::Scanner> &scanner,
247 : const std::shared_ptr<arrow::Schema> &schema,
248 : CSLConstList papszOpenOptions);
249 :
250 : GIntBig GetFeatureCount(int bForce) override;
251 : OGRErr GetExtent(OGREnvelope *psExtent, int bForce = TRUE) override;
252 : OGRErr GetExtent(int iGeomField, OGREnvelope *psExtent,
253 : int bForce = TRUE) override;
254 :
255 : // TODO
256 : std::unique_ptr<OGRFieldDomain>
257 0 : BuildDomain(const std::string & /*osDomainName*/,
258 : int /*iFieldIndex*/) const override
259 : {
260 0 : return nullptr;
261 : }
262 : };
263 :
264 : #endif
265 :
266 : /************************************************************************/
267 : /* OGRParquetDataset */
268 : /************************************************************************/
269 :
270 : class OGRParquetDataset final : public OGRArrowDataset
271 : {
272 : public:
273 : explicit OGRParquetDataset(
274 : const std::shared_ptr<arrow::MemoryPool> &poMemoryPool);
275 :
276 : OGRLayer *ExecuteSQL(const char *pszSQLCommand,
277 : OGRGeometry *poSpatialFilter,
278 : const char *pszDialect) override;
279 : void ReleaseResultSet(OGRLayer *poResultsSet) override;
280 :
281 : int TestCapability(const char *) override;
282 : };
283 :
284 : /************************************************************************/
285 : /* OGRParquetWriterLayer */
286 : /************************************************************************/
287 :
288 : class OGRParquetWriterDataset;
289 :
290 : class OGRParquetWriterLayer final : public OGRArrowWriterLayer
291 : {
292 : OGRParquetWriterLayer(const OGRParquetWriterLayer &) = delete;
293 : OGRParquetWriterLayer &operator=(const OGRParquetWriterLayer &) = delete;
294 :
295 : OGRParquetWriterDataset *m_poDataset = nullptr;
296 : std::unique_ptr<parquet::arrow::FileWriter> m_poFileWriter{};
297 : std::shared_ptr<const arrow::KeyValueMetadata> m_poKeyValueMetadata{};
298 : bool m_bForceCounterClockwiseOrientation = false;
299 : bool m_bEdgesSpherical = false;
300 : parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{};
301 :
302 : //! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode
303 : std::unique_ptr<GDALDataset> m_poTmpGPKG{};
304 : //! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode
305 : OGRLayer *m_poTmpGPKGLayer = nullptr;
306 : //! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode
307 : GIntBig m_nTmpFeatureCount = 0;
308 :
309 524 : virtual bool IsFileWriterCreated() const override
310 : {
311 524 : return m_poFileWriter != nullptr;
312 : }
313 :
314 : virtual void CreateWriter() override;
315 : virtual bool CloseFileWriter() override;
316 :
317 : virtual void CreateSchema() override;
318 : virtual void PerformStepsBeforeFinalFlushGroup() override;
319 :
320 : virtual bool FlushGroup() override;
321 :
322 220 : virtual std::string GetDriverUCName() const override
323 : {
324 220 : return "PARQUET";
325 : }
326 :
327 : virtual bool
328 : IsSupportedGeometryType(OGRwkbGeometryType eGType) const override;
329 :
330 : virtual void FixupWKBGeometryBeforeWriting(GByte *pabyWKB,
331 : size_t nLen) override;
332 : virtual void FixupGeometryBeforeWriting(OGRGeometry *poGeom) override;
333 :
334 26 : virtual bool IsSRSRequired() const override
335 : {
336 26 : return false;
337 : }
338 :
339 : std::string GetGeoMetadata() const;
340 :
341 : //! Copy temporary GeoPackage layer to final Parquet file
342 : bool CopyTmpGpkgLayerToFinalFile();
343 :
344 : public:
345 : OGRParquetWriterLayer(
346 : OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool,
347 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
348 : const char *pszLayerName);
349 :
350 : CPLErr SetMetadata(char **papszMetadata, const char *pszDomain) override;
351 :
352 : bool SetOptions(CSLConstList papszOptions,
353 : const OGRSpatialReference *poSpatialRef,
354 : OGRwkbGeometryType eGType);
355 :
356 : OGRErr CreateGeomField(const OGRGeomFieldDefn *poField,
357 : int bApproxOK = TRUE) override;
358 :
359 : int TestCapability(const char *pszCap) override;
360 : #if PARQUET_VERSION_MAJOR <= 10
361 : // Parquet <= 10 doesn't support the WriteRecordBatch() API
362 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
363 : CSLConstList papszOptions,
364 : std::string &osErrorMsg) const override
365 : {
366 : return OGRLayer::IsArrowSchemaSupported(schema, papszOptions,
367 : osErrorMsg);
368 : }
369 :
370 : bool
371 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
372 : CSLConstList papszOptions = nullptr) override
373 : {
374 : return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions);
375 : }
376 :
377 : bool WriteArrowBatch(const struct ArrowSchema *schema,
378 : struct ArrowArray *array,
379 : CSLConstList papszOptions = nullptr) override
380 : {
381 : return OGRLayer::WriteArrowBatch(schema, array, papszOptions);
382 : }
383 : #else
384 : bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
385 : CSLConstList papszOptions,
386 : std::string &osErrorMsg) const override;
387 : bool
388 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
389 : CSLConstList papszOptions = nullptr) override;
390 : bool WriteArrowBatch(const struct ArrowSchema *schema,
391 : struct ArrowArray *array,
392 : CSLConstList papszOptions = nullptr) override;
393 : #endif
394 :
395 : GDALDataset *GetDataset() override;
396 :
397 : protected:
398 : OGRErr ICreateFeature(OGRFeature *poFeature) override;
399 :
400 : friend class OGRParquetWriterDataset;
401 : bool Close();
402 : };
403 :
404 : /************************************************************************/
405 : /* OGRParquetWriterDataset */
406 : /************************************************************************/
407 :
408 : class OGRParquetWriterDataset final : public GDALPamDataset
409 : {
410 : std::unique_ptr<arrow::MemoryPool> m_poMemoryPool{};
411 : std::unique_ptr<OGRParquetWriterLayer> m_poLayer{};
412 : std::shared_ptr<arrow::io::OutputStream> m_poOutputStream{};
413 :
414 : public:
415 : explicit OGRParquetWriterDataset(
416 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream);
417 :
418 : arrow::MemoryPool *GetMemoryPool() const
419 : {
420 : return m_poMemoryPool.get();
421 : }
422 :
423 : CPLErr Close() override;
424 :
425 : int GetLayerCount() override;
426 : OGRLayer *GetLayer(int idx) override;
427 : int TestCapability(const char *pszCap) override;
428 : std::vector<std::string> GetFieldDomainNames(
429 : CSLConstList /*papszOptions*/ = nullptr) const override;
430 : const OGRFieldDomain *
431 : GetFieldDomain(const std::string &name) const override;
432 : bool AddFieldDomain(std::unique_ptr<OGRFieldDomain> &&domain,
433 : std::string &failureReason) override;
434 :
435 194 : GDALMultiDomainMetadata &GetMultiDomainMetadata()
436 : {
437 194 : return oMDMD;
438 : }
439 :
440 : protected:
441 : OGRLayer *ICreateLayer(const char *pszName,
442 : const OGRGeomFieldDefn *poGeomFieldDefn,
443 : CSLConstList papszOptions) override;
444 : };
445 :
446 : #endif // OGR_PARQUET_H
|