Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Arrow generic code
4 : * Purpose: Arrow generic code
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * Permission is hereby granted, free of charge, to any person obtaining a
11 : * copy of this software and associated documentation files (the "Software"),
12 : * to deal in the Software without restriction, including without limitation
13 : * the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 : * and/or sell copies of the Software, and to permit persons to whom the
15 : * Software is furnished to do so, subject to the following conditions:
16 : *
17 : * The above copyright notice and this permission notice shall be included
18 : * in all copies or substantial portions of the Software.
19 : *
20 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 : * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 : * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25 : * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26 : * DEALINGS IN THE SOFTWARE.
27 : ****************************************************************************/
28 :
29 : #ifndef OGR_ARROW_H
30 : #define OGR_ARROW_H
31 :
32 : #include "gdal_pam.h"
33 : #include "ogrsf_frmts.h"
34 :
35 : #include <map>
36 : #include <set>
37 :
38 : #include "ogr_include_arrow.h"
39 :
40 : enum class OGRArrowGeomEncoding
41 : {
42 : WKB,
43 : WKT,
44 :
45 : // F(ixed) S(ize) L(ist) of (x,y[,z][,m]) values / Interleaved layout
46 : GEOARROW_FSL_GENERIC, // only used by OGRArrowWriterLayer::m_eGeomEncoding
47 : GEOARROW_FSL_POINT,
48 : GEOARROW_FSL_LINESTRING,
49 : GEOARROW_FSL_POLYGON,
50 : GEOARROW_FSL_MULTIPOINT,
51 : GEOARROW_FSL_MULTILINESTRING,
52 : GEOARROW_FSL_MULTIPOLYGON,
53 :
54 : // Struct of (x,y,[,z][,m])
55 : GEOARROW_STRUCT_GENERIC, // only used by OGRArrowWriterLayer::m_eGeomEncoding
56 : GEOARROW_STRUCT_POINT,
57 : GEOARROW_STRUCT_LINESTRING,
58 : GEOARROW_STRUCT_POLYGON,
59 : GEOARROW_STRUCT_MULTIPOINT,
60 : GEOARROW_STRUCT_MULTILINESTRING,
61 : GEOARROW_STRUCT_MULTIPOLYGON,
62 : };
63 :
64 : /************************************************************************/
65 : /* OGRArrowLayer */
66 : /************************************************************************/
67 :
68 : class OGRArrowDataset;
69 :
70 : class OGRArrowLayer CPL_NON_FINAL
71 : : public OGRLayer,
72 : public OGRGetNextFeatureThroughRaw<OGRArrowLayer>
73 : {
74 : public:
75 : struct Constraint
76 : {
77 : enum class Type
78 : {
79 : Integer,
80 : Integer64,
81 : Real,
82 : String,
83 : };
84 : int iField = -1; // index to a OGRFeatureDefn OGRField
85 : int iArrayIdx = -1; // index to m_poBatchColumns
86 : int nOperation = -1; // SWQ_xxxx
87 : Type eType{};
88 : OGRField sValue{};
89 : std::string osValue{};
90 : };
91 :
92 : private:
93 : OGRArrowLayer(const OGRArrowLayer &) = delete;
94 : OGRArrowLayer &operator=(const OGRArrowLayer &) = delete;
95 :
96 : int m_nUseOptimizedAttributeFilter = -1;
97 : bool m_bSpatialFilterIntersectsLayerExtent = true;
98 : bool m_bUseRecordBatchBaseImplementation = false;
99 :
100 : // Modified by UseRecordBatchBaseImplementation()
101 : mutable struct ArrowSchema m_sCachedSchema = {};
102 :
103 : bool SkipToNextFeatureDueToAttributeFilter() const;
104 : void ExploreExprNode(const swq_expr_node *poNode);
105 : bool UseRecordBatchBaseImplementation() const;
106 :
107 : template <typename SourceOffset>
108 : static struct ArrowArray *
109 : CreateWKBArrayFromWKTArray(const struct ArrowArray *sourceArray);
110 :
111 : int GetArrowSchemaInternal(struct ArrowSchema *out) const;
112 :
113 : protected:
114 : OGRArrowDataset *m_poArrowDS = nullptr;
115 : arrow::MemoryPool *m_poMemoryPool = nullptr;
116 : OGRFeatureDefn *m_poFeatureDefn = nullptr;
117 : std::shared_ptr<arrow::Schema> m_poSchema{};
118 : std::string m_osFIDColumn{};
119 : int m_iFIDArrowColumn = -1;
120 : std::vector<std::vector<int>> m_anMapFieldIndexToArrowColumn{};
121 : std::vector<int> m_anMapGeomFieldIndexToArrowColumn{};
122 : std::vector<OGRArrowGeomEncoding> m_aeGeomEncoding{};
123 :
124 : //! Describe the bbox column of a geometry column
125 : struct GeomColBBOX
126 : {
127 : bool bIsFloat = false;
128 : int iArrowCol = -1;
129 : int iArrayIdx = -1; // only valid when m_bIgnoredFields == true
130 : int iArrowSubfieldXMin = -1;
131 : int iArrowSubfieldYMin = -1;
132 : int iArrowSubfieldXMax = -1;
133 : int iArrowSubfieldYMax = -1;
134 : };
135 :
136 : //! Map from OGR geometry field index to GeomColBBOX
137 : std::map<int, GeomColBBOX> m_oMapGeomFieldIndexToGeomColBBOX{};
138 :
139 : const arrow::BinaryArray *m_poArrayWKB = nullptr;
140 : const arrow::LargeBinaryArray *m_poArrayWKBLarge = nullptr;
141 : const arrow::Array *m_poArrayBBOX = nullptr;
142 : const arrow::DoubleArray *m_poArrayXMinDouble = nullptr;
143 : const arrow::DoubleArray *m_poArrayYMinDouble = nullptr;
144 : const arrow::DoubleArray *m_poArrayXMaxDouble = nullptr;
145 : const arrow::DoubleArray *m_poArrayYMaxDouble = nullptr;
146 : const arrow::FloatArray *m_poArrayXMinFloat = nullptr;
147 : const arrow::FloatArray *m_poArrayYMinFloat = nullptr;
148 : const arrow::FloatArray *m_poArrayXMaxFloat = nullptr;
149 : const arrow::FloatArray *m_poArrayYMaxFloat = nullptr;
150 :
151 : //! References values in range [0, m_poSchema->field_count()-1]
152 : std::set<int> m_oSetBBoxArrowColumns{};
153 :
154 : bool m_bIgnoredFields = false;
155 : std::vector<int>
156 : m_anMapFieldIndexToArrayIndex{}; // only valid when m_bIgnoredFields is
157 : // set
158 : std::vector<int> m_anMapGeomFieldIndexToArrayIndex{}; // only valid when
159 : // m_bIgnoredFields is set
160 : int m_nRequestedFIDColumn = -1; // only valid when m_bIgnoredFields is set
161 :
162 : bool m_bEOF = false;
163 : int64_t m_nFeatureIdx = 0;
164 : int64_t m_nIdxInBatch = 0;
165 : std::map<std::string, CPLJSONObject> m_oMapGeometryColumns{};
166 : mutable std::map<int, OGREnvelope> m_oMapExtents{};
167 : int m_iRecordBatch = -1;
168 : std::shared_ptr<arrow::RecordBatch> m_poBatch{};
169 : // m_poBatch->columns() is a relatively costly operation, so cache its
170 : // result
171 : std::vector<std::shared_ptr<arrow::Array>>
172 : m_poBatchColumns{}; // must always be == m_poBatch->columns()
173 : mutable std::shared_ptr<arrow::Array> m_poReadFeatureTmpArray{};
174 :
175 : std::vector<Constraint> m_asAttributeFilterConstraints{};
176 :
177 : std::map<std::string, std::unique_ptr<OGRFieldDefn>>
178 : LoadGDALSchema(const arrow::KeyValueMetadata *kv_metadata);
179 :
180 : void LoadGDALMetadata(const arrow::KeyValueMetadata *kv_metadata);
181 :
182 : OGRArrowLayer(OGRArrowDataset *poDS, const char *pszLayerName);
183 :
184 : virtual std::string GetDriverUCName() const = 0;
185 : static bool IsIntegerArrowType(arrow::Type::type typeId);
186 : static bool
187 : IsHandledListOrMapType(const std::shared_ptr<arrow::DataType> &valueType);
188 : static bool
189 : IsHandledListType(const std::shared_ptr<arrow::BaseListType> &listType);
190 : static bool
191 : IsHandledMapType(const std::shared_ptr<arrow::MapType> &mapType);
192 : static bool
193 : IsValidGeometryEncoding(const std::shared_ptr<arrow::Field> &field,
194 : const std::string &osEncoding,
195 : bool bWarnIfUnknownEncoding,
196 : OGRwkbGeometryType &eGeomTypeOut,
197 : OGRArrowGeomEncoding &eGeomEncodingOut);
198 : static OGRwkbGeometryType
199 : GetGeometryTypeFromString(const std::string &osType);
200 : bool
201 : MapArrowTypeToOGR(const std::shared_ptr<arrow::DataType> &type,
202 : const std::shared_ptr<arrow::Field> &field,
203 : OGRFieldDefn &oField, OGRFieldType &eType,
204 : OGRFieldSubType &eSubType, const std::vector<int> &path,
205 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
206 : &oMapFieldNameToGDALSchemaFieldDefn);
207 : void CreateFieldFromSchema(
208 : const std::shared_ptr<arrow::Field> &field,
209 : const std::vector<int> &path,
210 : const std::map<std::string, std::unique_ptr<OGRFieldDefn>>
211 : &oMapFieldNameToGDALSchemaFieldDefn);
212 : std::unique_ptr<OGRFieldDomain>
213 : BuildDomainFromBatch(const std::string &osDomainName,
214 : const std::shared_ptr<arrow::RecordBatch> &poBatch,
215 : int iCol) const;
216 : OGRwkbGeometryType ComputeGeometryColumnTypeProcessBatch(
217 : const std::shared_ptr<arrow::RecordBatch> &poBatch, int iGeomCol,
218 : int iBatchCol, OGRwkbGeometryType eGeomType) const;
219 : static bool ReadWKBBoundingBox(const uint8_t *data, size_t size,
220 : OGREnvelope &sEnvelope);
221 : OGRFeature *ReadFeature(
222 : int64_t nIdxInBatch,
223 : const std::vector<std::shared_ptr<arrow::Array>> &poColumnArrays) const;
224 : OGRGeometry *ReadGeometry(int iGeomField, const arrow::Array *array,
225 : int64_t nIdxInBatch) const;
226 : virtual bool ReadNextBatch() = 0;
227 : virtual void InvalidateCachedBatches() = 0;
228 : OGRFeature *GetNextRawFeature();
229 :
230 0 : virtual bool CanRunNonForcedGetExtent()
231 : {
232 0 : return true;
233 : }
234 :
235 : void SetBatch(const std::shared_ptr<arrow::RecordBatch> &poBatch);
236 :
237 : // Refreshes Constraint.iArrayIdx from iField. To be called by SetIgnoredFields()
238 : void ComputeConstraintsArrayIdx();
239 :
240 : virtual bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const;
241 : bool FastGetExtent3D(int iGeomField, OGREnvelope3D *psExtent) const;
242 : static OGRErr GetExtentFromMetadata(const CPLJSONObject &oJSONDef,
243 : OGREnvelope3D *psExtent);
244 :
245 : int GetArrowSchema(struct ArrowArrayStream *,
246 : struct ArrowSchema *out) override;
247 : int GetNextArrowArray(struct ArrowArrayStream *,
248 : struct ArrowArray *out) override;
249 :
250 2977 : virtual void IncrFeatureIdx()
251 : {
252 2977 : ++m_nFeatureIdx;
253 2977 : }
254 :
255 : public:
256 : virtual ~OGRArrowLayer() override;
257 :
258 12922 : OGRFeatureDefn *GetLayerDefn() override
259 : {
260 12922 : return m_poFeatureDefn;
261 : }
262 :
263 : void ResetReading() override;
264 :
265 1475 : const char *GetFIDColumn() override
266 : {
267 1475 : return m_osFIDColumn.c_str();
268 : }
269 8948 : DEFINE_GET_NEXT_FEATURE_THROUGH_RAW(OGRArrowLayer)
270 : OGRErr GetExtent(OGREnvelope *psExtent, int bForce = TRUE) override;
271 : OGRErr GetExtent(int iGeomField, OGREnvelope *psExtent,
272 : int bForce = TRUE) override;
273 : OGRErr GetExtent3D(int iGeomField, OGREnvelope3D *psExtent,
274 : int bForce = TRUE) override;
275 : OGRErr SetAttributeFilter(const char *pszFilter) override;
276 :
277 677 : void SetSpatialFilter(OGRGeometry *poGeom) override
278 : {
279 677 : SetSpatialFilter(0, poGeom);
280 677 : }
281 :
282 : void SetSpatialFilter(int iGeomField, OGRGeometry *poGeom) override;
283 :
284 : int TestCapability(const char *pszCap) override;
285 :
286 : bool GetArrowStream(struct ArrowArrayStream *out_stream,
287 : CSLConstList papszOptions = nullptr) override;
288 :
289 : virtual std::unique_ptr<OGRFieldDomain>
290 : BuildDomain(const std::string &osDomainName, int iFieldIndex) const = 0;
291 :
292 : static void TimestampToOGR(int64_t timestamp,
293 : const arrow::TimestampType *timestampType,
294 : int nTZFlag, OGRField *psField);
295 : };
296 :
297 : /************************************************************************/
298 : /* OGRArrowDataset */
299 : /************************************************************************/
300 :
301 : class OGRArrowDataset CPL_NON_FINAL : public GDALPamDataset
302 : {
303 : std::shared_ptr<arrow::MemoryPool> m_poMemoryPool{};
304 : std::unique_ptr<OGRArrowLayer> m_poLayer{};
305 : std::vector<std::string> m_aosDomainNames{};
306 : std::map<std::string, int> m_oMapDomainNameToCol{};
307 :
308 : public:
309 : explicit OGRArrowDataset(
310 : const std::shared_ptr<arrow::MemoryPool> &poMemoryPool);
311 :
312 1221 : inline arrow::MemoryPool *GetMemoryPool() const
313 : {
314 1221 : return m_poMemoryPool.get();
315 : }
316 :
317 678 : inline const std::shared_ptr<arrow::MemoryPool> &GetSharedMemoryPool() const
318 : {
319 678 : return m_poMemoryPool;
320 : }
321 :
322 : void SetLayer(std::unique_ptr<OGRArrowLayer> &&poLayer);
323 :
324 : void RegisterDomainName(const std::string &osDomainName, int iFieldIndex);
325 :
326 : std::vector<std::string> GetFieldDomainNames(
327 : CSLConstList /*papszOptions*/ = nullptr) const override;
328 : const OGRFieldDomain *
329 : GetFieldDomain(const std::string &name) const override;
330 :
331 : int GetLayerCount() override;
332 : OGRLayer *GetLayer(int idx) override;
333 : };
334 :
335 : /************************************************************************/
336 : /* OGRArrowWriterLayer */
337 : /************************************************************************/
338 :
339 : class OGRArrowWriterLayer CPL_NON_FINAL : public OGRLayer
340 :
341 : {
342 : protected:
343 : OGRArrowWriterLayer(const OGRArrowWriterLayer &) = delete;
344 : OGRArrowWriterLayer &operator=(const OGRArrowWriterLayer &) = delete;
345 :
346 : arrow::MemoryPool *m_poMemoryPool = nullptr;
347 : bool m_bInitializationOK = false;
348 : std::shared_ptr<arrow::io::OutputStream> m_poOutputStream{};
349 : std::shared_ptr<arrow::Schema> m_poSchema{};
350 : OGRFeatureDefn *m_poFeatureDefn = nullptr;
351 : std::map<std::string, std::unique_ptr<OGRFieldDomain>> m_oMapFieldDomains{};
352 : std::map<std::string, std::shared_ptr<arrow::Array>>
353 : m_oMapFieldDomainToStringArray{};
354 :
355 : bool m_bWriteFieldArrowExtensionName = false;
356 : OGRArrowGeomEncoding m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
357 : std::vector<OGRArrowGeomEncoding> m_aeGeomEncoding{};
358 : int m_nWKTCoordinatePrecision = -1;
359 :
360 : //! Base struct data type for GeoArrow struct geometry columns.
361 : // Constraint: if not empty, m_apoBaseStructGeomType.size() == m_poFeatureDefn->GetGeomFieldCount()
362 : std::vector<std::shared_ptr<arrow::DataType>> m_apoBaseStructGeomType{};
363 :
364 : //! Whether to use a struct field with the values of the bounding box
365 : // of the geometries. Used by Parquet.
366 : bool m_bWriteBBoxStruct = false;
367 :
368 : //! Schema fields for bounding box of geometry columns.
369 : // Constraint: if not empty, m_apoFieldsBBOX.size() == m_poFeatureDefn->GetGeomFieldCount()
370 : std::vector<std::shared_ptr<arrow::Field>> m_apoFieldsBBOX{};
371 :
372 : //! Array builers for bounding box of geometry columns.
373 : // m_apoBuildersBBOXStruct is for the top-level field of type struct.
374 : // m_apoBuildersBBOX{XMin|YMin|XMax|YMax} are for the floating-point values
375 : // Constraint: if not empty, m_apoBuildersBBOX{Struct|XMin|YMin|XMax|YMax}.size() == m_poFeatureDefn->GetGeomFieldCount()
376 : std::vector<std::shared_ptr<arrow::StructBuilder>>
377 : m_apoBuildersBBOXStruct{};
378 : std::vector<std::shared_ptr<arrow::FloatBuilder>> m_apoBuildersBBOXXMin{};
379 : std::vector<std::shared_ptr<arrow::FloatBuilder>> m_apoBuildersBBOXYMin{};
380 : std::vector<std::shared_ptr<arrow::FloatBuilder>> m_apoBuildersBBOXXMax{};
381 : std::vector<std::shared_ptr<arrow::FloatBuilder>> m_apoBuildersBBOXYMax{};
382 :
383 : std::string m_osFIDColumn{};
384 : int64_t m_nFeatureCount = 0;
385 :
386 : int64_t m_nRowGroupSize = 64 * 1024;
387 : arrow::Compression::type m_eCompression = arrow::Compression::UNCOMPRESSED;
388 :
389 : std::vector<std::shared_ptr<arrow::Field>> m_apoFieldsFromArrowSchema{};
390 : std::vector<std::shared_ptr<arrow::ArrayBuilder>> m_apoBuilders{};
391 :
392 : std::vector<uint8_t> m_abyBuffer{};
393 :
394 : std::vector<int> m_anTZFlag{}; // size: GetFieldCount()
395 : std::vector<OGREnvelope3D> m_aoEnvelopes{}; // size: GetGeomFieldCount()
396 : std::vector<std::set<OGRwkbGeometryType>>
397 : m_oSetWrittenGeometryTypes{}; // size: GetGeomFieldCount()
398 :
399 : static OGRArrowGeomEncoding
400 : GetPreciseArrowGeomEncoding(OGRArrowGeomEncoding eEncodingType,
401 : OGRwkbGeometryType eGType);
402 : static const char *
403 : GetGeomEncodingAsString(OGRArrowGeomEncoding eGeomEncoding,
404 : bool bForParquetGeo);
405 :
406 : virtual bool IsSupportedGeometryType(OGRwkbGeometryType eGType) const = 0;
407 :
408 : virtual std::string GetDriverUCName() const = 0;
409 :
410 : virtual bool IsFileWriterCreated() const = 0;
411 : virtual void CreateWriter() = 0;
412 : virtual bool CloseFileWriter() = 0;
413 :
414 : void CreateSchemaCommon();
415 : void FinalizeSchema();
416 : virtual void CreateSchema() = 0;
417 :
418 0 : virtual void PerformStepsBeforeFinalFlushGroup()
419 : {
420 0 : }
421 :
422 : void CreateArrayBuilders();
423 :
424 : //! Clear array builders
425 : void ClearArrayBuilers();
426 :
427 : virtual bool FlushGroup() = 0;
428 : bool FinalizeWriting();
429 : bool WriteArrays(std::function<bool(const std::shared_ptr<arrow::Field> &,
430 : const std::shared_ptr<arrow::Array> &)>
431 : postProcessArray);
432 :
433 138 : virtual void FixupWKBGeometryBeforeWriting(GByte * /*pabyWKB*/,
434 : size_t /*nLen*/)
435 : {
436 138 : }
437 :
438 0 : virtual void FixupGeometryBeforeWriting(OGRGeometry * /* poGeom */)
439 : {
440 0 : }
441 :
442 : virtual bool IsSRSRequired() const = 0;
443 : bool WriteArrowBatchInternal(
444 : const struct ArrowSchema *schema, struct ArrowArray *array,
445 : CSLConstList papszOptions,
446 : std::function<bool(const std::shared_ptr<arrow::RecordBatch> &)>
447 : writeBatch);
448 :
449 : OGRErr BuildGeometry(OGRGeometry *poGeom, int iGeomField,
450 : arrow::ArrayBuilder *poBuilder);
451 :
452 : public:
453 : OGRArrowWriterLayer(
454 : arrow::MemoryPool *poMemoryPool,
455 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
456 : const char *pszLayerName);
457 :
458 : ~OGRArrowWriterLayer() override;
459 :
460 : bool AddFieldDomain(std::unique_ptr<OGRFieldDomain> &&domain,
461 : std::string &failureReason);
462 : std::vector<std::string> GetFieldDomainNames() const;
463 : const OGRFieldDomain *GetFieldDomain(const std::string &name) const;
464 :
465 7 : const char *GetFIDColumn() override
466 : {
467 7 : return m_osFIDColumn.c_str();
468 : }
469 :
470 5443 : OGRFeatureDefn *GetLayerDefn() override
471 : {
472 5443 : return m_poFeatureDefn;
473 : }
474 :
475 23 : void ResetReading() override
476 : {
477 23 : }
478 :
479 23 : OGRFeature *GetNextFeature() override
480 : {
481 23 : return nullptr;
482 : }
483 :
484 : int TestCapability(const char *pszCap) override;
485 : OGRErr CreateField(const OGRFieldDefn *poField,
486 : int bApproxOK = TRUE) override;
487 : OGRErr CreateGeomField(const OGRGeomFieldDefn *poField,
488 : int bApproxOK = TRUE) override;
489 : GIntBig GetFeatureCount(int bForce) override;
490 :
491 113 : bool IsArrowSchemaSupported(const struct ArrowSchema * /*schema*/,
492 : CSLConstList /* papszOptions */,
493 : std::string & /*osErrorMsg */) const override
494 : {
495 113 : return true;
496 : }
497 :
498 : bool
499 : CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
500 : CSLConstList papszOptions = nullptr) override;
501 : bool WriteArrowBatch(const struct ArrowSchema *schema,
502 : struct ArrowArray *array,
503 : CSLConstList papszOptions = nullptr) override = 0;
504 :
505 : protected:
506 : OGRErr ICreateFeature(OGRFeature *poFeature) override;
507 :
508 : bool FlushFeatures();
509 : };
510 :
511 : #endif // OGR_ARROW_H
|