Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "ogr_feather.h"
14 :
15 : #include "../arrow_common/ograrrowwriterlayer.hpp"
16 :
17 : /************************************************************************/
18 : /* OGRFeatherWriterLayer() */
19 : /************************************************************************/
20 :
21 150 : OGRFeatherWriterLayer::OGRFeatherWriterLayer(
22 : GDALDataset *poDS, arrow::MemoryPool *poMemoryPool,
23 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
24 150 : const char *pszLayerName)
25 : : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
26 150 : m_poDS(poDS)
27 : {
28 150 : m_bWriteFieldArrowExtensionName = true;
29 150 : }
30 :
31 : /************************************************************************/
32 : /* ~OGRFeatherWriterLayer() */
33 : /************************************************************************/
34 :
35 300 : OGRFeatherWriterLayer::~OGRFeatherWriterLayer()
36 : {
37 150 : if (m_bInitializationOK)
38 141 : FinalizeWriting();
39 300 : }
40 :
41 : /************************************************************************/
42 : /* IsSupportedGeometryType() */
43 : /************************************************************************/
44 :
45 144 : bool OGRFeatherWriterLayer::IsSupportedGeometryType(
46 : OGRwkbGeometryType eGType) const
47 : {
48 144 : if (eGType != wkbFlatten(eGType))
49 : {
50 : const auto osConfigOptionName =
51 170 : "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
52 85 : if (!CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
53 : {
54 7 : CPLError(CE_Failure, CPLE_NotSupported,
55 : "Only 2D geometry types are supported (unless the "
56 : "%s configuration option is set to YES)",
57 : osConfigOptionName.c_str());
58 7 : return false;
59 : }
60 : }
61 137 : return true;
62 : }
63 :
64 : /************************************************************************/
65 : /* SetOptions() */
66 : /************************************************************************/
67 :
68 150 : bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
69 : CSLConstList papszOptions,
70 : const OGRSpatialReference *poSpatialRef,
71 : OGRwkbGeometryType eGType)
72 : {
73 : const char *pszDefaultFormat =
74 150 : (EQUAL(CPLGetExtension(osFilename.c_str()), "arrows") ||
75 150 : STARTS_WITH_CI(osFilename.c_str(), "/vsistdout"))
76 300 : ? "STREAM"
77 150 : : "FILE";
78 150 : m_bStreamFormat =
79 150 : EQUAL(CSLFetchNameValueDef(papszOptions, "FORMAT", pszDefaultFormat),
80 : "STREAM");
81 :
82 : const char *pszGeomEncoding =
83 150 : CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
84 150 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
85 150 : if (pszGeomEncoding)
86 : {
87 121 : if (EQUAL(pszGeomEncoding, "WKB"))
88 31 : m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
89 90 : else if (EQUAL(pszGeomEncoding, "WKT"))
90 29 : m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
91 61 : else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
92 30 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
93 31 : else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
94 6 : EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
95 31 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
96 : else
97 : {
98 0 : CPLError(CE_Failure, CPLE_NotSupported,
99 : "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
100 0 : return false;
101 : }
102 : }
103 :
104 150 : if (eGType != wkbNone)
105 : {
106 144 : if (!IsSupportedGeometryType(eGType))
107 : {
108 9 : return false;
109 : }
110 :
111 137 : if (poSpatialRef == nullptr)
112 : {
113 8 : CPLError(CE_Warning, CPLE_AppDefined,
114 : "Geometry column should have an associated CRS");
115 : }
116 :
117 137 : m_poFeatureDefn->SetGeomType(eGType);
118 137 : auto eGeomEncoding = m_eGeomEncoding;
119 137 : if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
120 107 : eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
121 : {
122 77 : const auto eEncodingType = eGeomEncoding;
123 77 : eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
124 77 : if (eGeomEncoding == eEncodingType)
125 2 : return false;
126 : }
127 135 : m_aeGeomEncoding.push_back(eGeomEncoding);
128 135 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
129 : CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
130 135 : if (poSpatialRef)
131 : {
132 129 : auto poSRS = poSpatialRef->Clone();
133 129 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
134 129 : poSRS->Release();
135 : }
136 : }
137 :
138 141 : m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
139 :
140 141 : const char *pszCompression = CSLFetchNameValue(papszOptions, "COMPRESSION");
141 141 : if (pszCompression == nullptr)
142 : {
143 414 : auto oResult = arrow::util::Codec::GetCompressionType("lz4");
144 138 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
145 : {
146 138 : pszCompression = "LZ4";
147 : }
148 : else
149 : {
150 0 : pszCompression = "NONE";
151 : }
152 : }
153 :
154 141 : if (EQUAL(pszCompression, "NONE"))
155 0 : pszCompression = "UNCOMPRESSED";
156 : auto oResult = arrow::util::Codec::GetCompressionType(
157 282 : CPLString(pszCompression).tolower());
158 141 : if (!oResult.ok())
159 : {
160 0 : CPLError(CE_Failure, CPLE_NotSupported,
161 : "Unrecognized compression method: %s", pszCompression);
162 0 : return false;
163 : }
164 141 : m_eCompression = *oResult;
165 141 : if (!arrow::util::Codec::IsAvailable(m_eCompression))
166 : {
167 0 : CPLError(CE_Failure, CPLE_NotSupported,
168 : "Compression method %s is known, but libarrow has not "
169 : "been built with support for it",
170 : pszCompression);
171 0 : return false;
172 : }
173 :
174 141 : const char *pszRowGroupSize = CSLFetchNameValue(papszOptions, "BATCH_SIZE");
175 141 : if (pszRowGroupSize)
176 : {
177 3 : auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
178 3 : if (nRowGroupSize > 0)
179 : {
180 3 : if (nRowGroupSize > INT_MAX)
181 0 : nRowGroupSize = INT_MAX;
182 3 : m_nRowGroupSize = nRowGroupSize;
183 : }
184 : }
185 :
186 141 : m_bInitializationOK = true;
187 141 : return true;
188 : }
189 :
190 : /************************************************************************/
191 : /* CloseFileWriter() */
192 : /************************************************************************/
193 :
194 141 : bool OGRFeatherWriterLayer::CloseFileWriter()
195 : {
196 282 : auto status = m_poFileWriter->Close();
197 141 : if (!status.ok())
198 : {
199 0 : CPLError(CE_Failure, CPLE_AppDefined,
200 : "FileWriter::Close() failed with %s",
201 0 : status.message().c_str());
202 : }
203 282 : return status.ok();
204 : }
205 :
206 : /************************************************************************/
207 : /* CreateSchema() */
208 : /************************************************************************/
209 :
210 141 : void OGRFeatherWriterLayer::CreateSchema()
211 : {
212 141 : CreateSchemaCommon();
213 :
214 276 : if (m_poFeatureDefn->GetGeomFieldCount() != 0 &&
215 135 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GEO", "YES")))
216 : {
217 268 : CPLJSONObject oRoot;
218 134 : oRoot.Add("schema_version", "0.1.0");
219 134 : oRoot.Add("primary_column",
220 134 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
221 268 : CPLJSONObject oColumns;
222 134 : oRoot.Add("columns", oColumns);
223 268 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
224 : {
225 134 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
226 268 : CPLJSONObject oColumn;
227 134 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
228 134 : oColumn.Add("encoding",
229 134 : GetGeomEncodingAsString(m_aeGeomEncoding[i], false));
230 :
231 134 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
232 134 : if (poSRS)
233 : {
234 128 : const char *const apszOptions[] = {"FORMAT=WKT2_2019",
235 : "MULTILINE=NO", nullptr};
236 128 : char *pszWKT = nullptr;
237 128 : poSRS->exportToWkt(&pszWKT, apszOptions);
238 128 : if (pszWKT)
239 128 : oColumn.Add("crs", pszWKT);
240 128 : CPLFree(pszWKT);
241 :
242 128 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
243 128 : if (dfCoordEpoch > 0)
244 2 : oColumn.Add("epoch", dfCoordEpoch);
245 : }
246 :
247 : #if 0
248 : if( m_aoEnvelopes[i].IsInit() &&
249 : CPLTestBool(CPLGetConfigOption(
250 : "OGR_ARROW_WRITE_BBOX", "YES")) )
251 : {
252 : CPLJSONArray oBBOX;
253 : oBBOX.Add(m_aoEnvelopes[i].MinX);
254 : oBBOX.Add(m_aoEnvelopes[i].MinY);
255 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
256 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
257 : oColumn.Add("bbox", oBBOX);
258 : }
259 : #endif
260 134 : const auto eType = poGeomFieldDefn->GetType();
261 134 : if (CPLTestBool(CPLGetConfigOption(
262 268 : "OGR_ARROW_WRITE_GDAL_GEOMETRY_TYPE", "YES")) &&
263 134 : eType == wkbFlatten(eType))
264 : {
265 : // Geometry type, place under a temporary "gdal:geometry_type"
266 : // property pending acceptance of proposal at
267 : // https://github.com/opengeospatial/geoparquet/issues/41
268 56 : const char *pszType = "mixed";
269 56 : if (wkbPoint == eType)
270 19 : pszType = "Point";
271 37 : else if (wkbLineString == eType)
272 7 : pszType = "LineString";
273 30 : else if (wkbPolygon == eType)
274 7 : pszType = "Polygon";
275 23 : else if (wkbMultiPoint == eType)
276 7 : pszType = "MultiPoint";
277 16 : else if (wkbMultiLineString == eType)
278 7 : pszType = "MultiLineString";
279 9 : else if (wkbMultiPolygon == eType)
280 7 : pszType = "MultiPolygon";
281 2 : else if (wkbGeometryCollection == eType)
282 2 : pszType = "GeometryCollection";
283 56 : oColumn.Add("gdal:geometry_type", pszType);
284 : }
285 : }
286 :
287 134 : auto kvMetadata = m_poSchema->metadata()
288 2 : ? m_poSchema->metadata()->Copy()
289 270 : : std::make_shared<arrow::KeyValueMetadata>();
290 268 : kvMetadata->Append("geo",
291 268 : oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
292 134 : m_poSchema = m_poSchema->WithMetadata(kvMetadata);
293 134 : CPLAssert(m_poSchema);
294 : }
295 141 : }
296 :
297 : /************************************************************************/
298 : /* CreateWriter() */
299 : /************************************************************************/
300 :
301 141 : void OGRFeatherWriterLayer::CreateWriter()
302 : {
303 141 : CPLAssert(m_poFileWriter == nullptr);
304 :
305 141 : if (m_poSchema == nullptr)
306 : {
307 15 : CreateSchema();
308 : }
309 : else
310 : {
311 126 : FinalizeSchema();
312 : }
313 :
314 282 : auto options = arrow::ipc::IpcWriteOptions::Defaults();
315 141 : options.memory_pool = m_poMemoryPool;
316 :
317 : {
318 282 : auto result = arrow::util::Codec::Create(m_eCompression);
319 141 : if (!result.ok())
320 : {
321 0 : CPLError(CE_Failure, CPLE_AppDefined,
322 : "Codec::Create() failed with %s",
323 0 : result.status().message().c_str());
324 : }
325 : else
326 : {
327 141 : options.codec.reset(result->release());
328 : }
329 : }
330 :
331 141 : if (m_bStreamFormat)
332 : {
333 : auto result =
334 8 : arrow::ipc::MakeStreamWriter(m_poOutputStream, m_poSchema, options);
335 4 : if (!result.ok())
336 : {
337 0 : CPLError(CE_Failure, CPLE_AppDefined,
338 : "arrow::ipc::MakeStreamWriter() failed with %s",
339 0 : result.status().message().c_str());
340 : }
341 : else
342 : {
343 4 : m_poFileWriter = *result;
344 : }
345 : }
346 : else
347 : {
348 : m_poFooterKeyValueMetadata =
349 137 : std::make_shared<arrow::KeyValueMetadata>();
350 : auto result = arrow::ipc::MakeFileWriter(
351 411 : m_poOutputStream, m_poSchema, options, m_poFooterKeyValueMetadata);
352 137 : if (!result.ok())
353 : {
354 0 : CPLError(CE_Failure, CPLE_AppDefined,
355 : "arrow::ipc::MakeFileWriter() failed with %s",
356 0 : result.status().message().c_str());
357 : }
358 : else
359 : {
360 137 : m_poFileWriter = *result;
361 : }
362 : }
363 141 : }
364 :
365 : /************************************************************************/
366 : /* PerformStepsBeforeFinalFlushGroup() */
367 : /************************************************************************/
368 :
369 : // Add a gdal:geo extension metadata for now, which embeds a bbox
370 141 : void OGRFeatherWriterLayer::PerformStepsBeforeFinalFlushGroup()
371 : {
372 278 : if (m_poFooterKeyValueMetadata &&
373 273 : m_poFeatureDefn->GetGeomFieldCount() != 0 &&
374 132 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GDAL_FOOTER", "YES")))
375 : {
376 258 : CPLJSONObject oRoot;
377 129 : oRoot.Add("primary_column",
378 129 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
379 129 : CPLJSONObject oColumns;
380 129 : oRoot.Add("columns", oColumns);
381 258 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
382 : {
383 129 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
384 258 : CPLJSONObject oColumn;
385 129 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
386 129 : oColumn.Add("encoding",
387 129 : GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
388 :
389 129 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
390 129 : if (poSRS)
391 : {
392 123 : const char *const apszOptions[] = {"FORMAT=WKT2_2019",
393 : "MULTILINE=NO", nullptr};
394 123 : char *pszWKT = nullptr;
395 123 : poSRS->exportToWkt(&pszWKT, apszOptions);
396 123 : if (pszWKT)
397 123 : oColumn.Add("crs", pszWKT);
398 123 : CPLFree(pszWKT);
399 :
400 123 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
401 123 : if (dfCoordEpoch > 0)
402 1 : oColumn.Add("epoch", dfCoordEpoch);
403 : }
404 :
405 129 : if (m_aoEnvelopes[i].IsInit())
406 : {
407 116 : CPLJSONArray oBBOX;
408 116 : oBBOX.Add(m_aoEnvelopes[i].MinX);
409 116 : oBBOX.Add(m_aoEnvelopes[i].MinY);
410 116 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
411 116 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
412 116 : oColumn.Add("bbox", oBBOX);
413 : }
414 : }
415 :
416 258 : m_poFooterKeyValueMetadata->Append(
417 : GDAL_GEO_FOOTER_KEY,
418 258 : oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
419 : }
420 141 : }
421 :
422 : /************************************************************************/
423 : /* FlushGroup() */
424 : /************************************************************************/
425 :
426 20 : bool OGRFeatherWriterLayer::FlushGroup()
427 : {
428 20 : std::vector<std::shared_ptr<arrow::Array>> columns;
429 20 : auto ret = WriteArrays(
430 840 : [&columns](const std::shared_ptr<arrow::Field> &,
431 840 : const std::shared_ptr<arrow::Array> &array)
432 : {
433 840 : columns.emplace_back(array);
434 840 : return true;
435 : });
436 :
437 20 : if (ret)
438 : {
439 : auto poRecordBatch = arrow::RecordBatch::Make(
440 60 : m_poSchema, !columns.empty() ? columns[0]->length() : 0, columns);
441 40 : auto status = m_poFileWriter->WriteRecordBatch(*poRecordBatch);
442 20 : if (!status.ok())
443 : {
444 0 : CPLError(CE_Failure, CPLE_AppDefined,
445 : "WriteRecordBatch() failed with %s",
446 0 : status.message().c_str());
447 0 : ret = false;
448 : }
449 : }
450 :
451 20 : ClearArrayBuilers();
452 40 : return ret;
453 : }
454 :
455 : /************************************************************************/
456 : /* WriteArrowBatch() */
457 : /************************************************************************/
458 :
459 : inline bool
460 112 : OGRFeatherWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
461 : struct ArrowArray *array,
462 : CSLConstList papszOptions)
463 : {
464 224 : return WriteArrowBatchInternal(
465 : schema, array, papszOptions,
466 112 : [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
467 : {
468 224 : auto status = m_poFileWriter->WriteRecordBatch(*poBatch);
469 112 : if (!status.ok())
470 : {
471 0 : CPLError(CE_Failure, CPLE_AppDefined,
472 : "WriteRecordBatch() failed: %s",
473 0 : status.message().c_str());
474 0 : return false;
475 : }
476 :
477 112 : return true;
478 224 : });
479 : }
|