Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "ogr_feather.h"
14 :
15 : #include "../arrow_common/ograrrowwriterlayer.hpp"
16 :
17 : /************************************************************************/
18 : /* OGRFeatherWriterLayer() */
19 : /************************************************************************/
20 :
21 150 : OGRFeatherWriterLayer::OGRFeatherWriterLayer(
22 : GDALDataset *poDS, arrow::MemoryPool *poMemoryPool,
23 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
24 150 : const char *pszLayerName)
25 : : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
26 150 : m_poDS(poDS)
27 : {
28 150 : m_bWriteFieldArrowExtensionName = true;
29 150 : }
30 :
31 : /************************************************************************/
32 : /* Close() */
33 : /************************************************************************/
34 :
35 141 : bool OGRFeatherWriterLayer::Close()
36 : {
37 141 : if (m_bInitializationOK)
38 : {
39 141 : if (!FinalizeWriting())
40 0 : return false;
41 : }
42 :
43 141 : return true;
44 : }
45 :
46 : /************************************************************************/
47 : /* IsSupportedGeometryType() */
48 : /************************************************************************/
49 :
50 144 : bool OGRFeatherWriterLayer::IsSupportedGeometryType(
51 : OGRwkbGeometryType eGType) const
52 : {
53 144 : if (eGType != wkbFlatten(eGType))
54 : {
55 : const auto osConfigOptionName =
56 170 : "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
57 85 : if (!CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
58 : {
59 7 : CPLError(CE_Failure, CPLE_NotSupported,
60 : "Only 2D geometry types are supported (unless the "
61 : "%s configuration option is set to YES)",
62 : osConfigOptionName.c_str());
63 7 : return false;
64 : }
65 : }
66 137 : return true;
67 : }
68 :
69 : /************************************************************************/
70 : /* SetOptions() */
71 : /************************************************************************/
72 :
73 150 : bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
74 : CSLConstList papszOptions,
75 : const OGRSpatialReference *poSpatialRef,
76 : OGRwkbGeometryType eGType)
77 : {
78 : const char *pszDefaultFormat =
79 300 : (EQUAL(CPLGetExtensionSafe(osFilename.c_str()).c_str(), "arrows") ||
80 150 : STARTS_WITH_CI(osFilename.c_str(), "/vsistdout"))
81 450 : ? "STREAM"
82 150 : : "FILE";
83 150 : m_bStreamFormat =
84 150 : EQUAL(CSLFetchNameValueDef(papszOptions, "FORMAT", pszDefaultFormat),
85 : "STREAM");
86 :
87 : const char *pszGeomEncoding =
88 150 : CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
89 150 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
90 150 : if (pszGeomEncoding)
91 : {
92 121 : if (EQUAL(pszGeomEncoding, "WKB"))
93 31 : m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
94 90 : else if (EQUAL(pszGeomEncoding, "WKT"))
95 29 : m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
96 61 : else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
97 30 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
98 31 : else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
99 6 : EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
100 31 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
101 : else
102 : {
103 0 : CPLError(CE_Failure, CPLE_NotSupported,
104 : "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
105 0 : return false;
106 : }
107 : }
108 :
109 150 : if (eGType != wkbNone)
110 : {
111 144 : if (!IsSupportedGeometryType(eGType))
112 : {
113 9 : return false;
114 : }
115 :
116 137 : if (poSpatialRef == nullptr)
117 : {
118 8 : CPLError(CE_Warning, CPLE_AppDefined,
119 : "Geometry column should have an associated CRS");
120 : }
121 :
122 137 : m_poFeatureDefn->SetGeomType(eGType);
123 137 : auto eGeomEncoding = m_eGeomEncoding;
124 137 : if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
125 107 : eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
126 : {
127 77 : const auto eEncodingType = eGeomEncoding;
128 77 : eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
129 77 : if (eGeomEncoding == eEncodingType)
130 2 : return false;
131 : }
132 135 : m_aeGeomEncoding.push_back(eGeomEncoding);
133 135 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
134 : CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
135 135 : if (poSpatialRef)
136 : {
137 129 : auto poSRS = poSpatialRef->Clone();
138 129 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
139 129 : poSRS->Release();
140 : }
141 : }
142 :
143 141 : m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
144 :
145 141 : const char *pszCompression = CSLFetchNameValue(papszOptions, "COMPRESSION");
146 141 : if (pszCompression == nullptr)
147 : {
148 414 : auto oResult = arrow::util::Codec::GetCompressionType("lz4");
149 138 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
150 : {
151 138 : pszCompression = "LZ4";
152 : }
153 : else
154 : {
155 0 : pszCompression = "NONE";
156 : }
157 : }
158 :
159 141 : if (EQUAL(pszCompression, "NONE"))
160 0 : pszCompression = "UNCOMPRESSED";
161 : auto oResult = arrow::util::Codec::GetCompressionType(
162 282 : CPLString(pszCompression).tolower());
163 141 : if (!oResult.ok())
164 : {
165 0 : CPLError(CE_Failure, CPLE_NotSupported,
166 : "Unrecognized compression method: %s", pszCompression);
167 0 : return false;
168 : }
169 141 : m_eCompression = *oResult;
170 141 : if (!arrow::util::Codec::IsAvailable(m_eCompression))
171 : {
172 0 : CPLError(CE_Failure, CPLE_NotSupported,
173 : "Compression method %s is known, but libarrow has not "
174 : "been built with support for it",
175 : pszCompression);
176 0 : return false;
177 : }
178 :
179 141 : const char *pszRowGroupSize = CSLFetchNameValue(papszOptions, "BATCH_SIZE");
180 141 : if (pszRowGroupSize)
181 : {
182 3 : auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
183 3 : if (nRowGroupSize > 0)
184 : {
185 3 : if (nRowGroupSize > INT_MAX)
186 0 : nRowGroupSize = INT_MAX;
187 3 : m_nRowGroupSize = nRowGroupSize;
188 : }
189 : }
190 :
191 141 : m_bInitializationOK = true;
192 141 : return true;
193 : }
194 :
195 : /************************************************************************/
196 : /* CloseFileWriter() */
197 : /************************************************************************/
198 :
199 141 : bool OGRFeatherWriterLayer::CloseFileWriter()
200 : {
201 282 : auto status = m_poFileWriter->Close();
202 141 : if (!status.ok())
203 : {
204 0 : CPLError(CE_Failure, CPLE_AppDefined,
205 : "FileWriter::Close() failed with %s",
206 0 : status.message().c_str());
207 : }
208 282 : return status.ok();
209 : }
210 :
211 : /************************************************************************/
212 : /* CreateSchema() */
213 : /************************************************************************/
214 :
215 141 : void OGRFeatherWriterLayer::CreateSchema()
216 : {
217 141 : CreateSchemaCommon();
218 :
219 276 : if (m_poFeatureDefn->GetGeomFieldCount() != 0 &&
220 135 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GEO", "YES")))
221 : {
222 268 : CPLJSONObject oRoot;
223 134 : oRoot.Add("schema_version", "0.1.0");
224 134 : oRoot.Add("primary_column",
225 134 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
226 268 : CPLJSONObject oColumns;
227 134 : oRoot.Add("columns", oColumns);
228 268 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
229 : {
230 134 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
231 268 : CPLJSONObject oColumn;
232 134 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
233 134 : oColumn.Add("encoding",
234 134 : GetGeomEncodingAsString(m_aeGeomEncoding[i], false));
235 :
236 134 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
237 134 : if (poSRS)
238 : {
239 128 : const char *const apszOptions[] = {"FORMAT=WKT2_2019",
240 : "MULTILINE=NO", nullptr};
241 128 : char *pszWKT = nullptr;
242 128 : poSRS->exportToWkt(&pszWKT, apszOptions);
243 128 : if (pszWKT)
244 128 : oColumn.Add("crs", pszWKT);
245 128 : CPLFree(pszWKT);
246 :
247 128 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
248 128 : if (dfCoordEpoch > 0)
249 2 : oColumn.Add("epoch", dfCoordEpoch);
250 : }
251 :
252 : #if 0
253 : if( m_aoEnvelopes[i].IsInit() &&
254 : CPLTestBool(CPLGetConfigOption(
255 : "OGR_ARROW_WRITE_BBOX", "YES")) )
256 : {
257 : CPLJSONArray oBBOX;
258 : oBBOX.Add(m_aoEnvelopes[i].MinX);
259 : oBBOX.Add(m_aoEnvelopes[i].MinY);
260 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
261 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
262 : oColumn.Add("bbox", oBBOX);
263 : }
264 : #endif
265 134 : const auto eType = poGeomFieldDefn->GetType();
266 134 : if (CPLTestBool(CPLGetConfigOption(
267 268 : "OGR_ARROW_WRITE_GDAL_GEOMETRY_TYPE", "YES")) &&
268 134 : eType == wkbFlatten(eType))
269 : {
270 : // Geometry type, place under a temporary "gdal:geometry_type"
271 : // property pending acceptance of proposal at
272 : // https://github.com/opengeospatial/geoparquet/issues/41
273 56 : const char *pszType = "mixed";
274 56 : if (wkbPoint == eType)
275 19 : pszType = "Point";
276 37 : else if (wkbLineString == eType)
277 7 : pszType = "LineString";
278 30 : else if (wkbPolygon == eType)
279 7 : pszType = "Polygon";
280 23 : else if (wkbMultiPoint == eType)
281 7 : pszType = "MultiPoint";
282 16 : else if (wkbMultiLineString == eType)
283 7 : pszType = "MultiLineString";
284 9 : else if (wkbMultiPolygon == eType)
285 7 : pszType = "MultiPolygon";
286 2 : else if (wkbGeometryCollection == eType)
287 2 : pszType = "GeometryCollection";
288 56 : oColumn.Add("gdal:geometry_type", pszType);
289 : }
290 : }
291 :
292 134 : auto kvMetadata = m_poSchema->metadata()
293 2 : ? m_poSchema->metadata()->Copy()
294 270 : : std::make_shared<arrow::KeyValueMetadata>();
295 268 : kvMetadata->Append("geo",
296 268 : oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
297 134 : m_poSchema = m_poSchema->WithMetadata(kvMetadata);
298 134 : CPLAssert(m_poSchema);
299 : }
300 141 : }
301 :
302 : /************************************************************************/
303 : /* CreateWriter() */
304 : /************************************************************************/
305 :
306 141 : void OGRFeatherWriterLayer::CreateWriter()
307 : {
308 141 : CPLAssert(m_poFileWriter == nullptr);
309 :
310 141 : if (m_poSchema == nullptr)
311 : {
312 15 : CreateSchema();
313 : }
314 : else
315 : {
316 126 : FinalizeSchema();
317 : }
318 :
319 282 : auto options = arrow::ipc::IpcWriteOptions::Defaults();
320 141 : options.memory_pool = m_poMemoryPool;
321 :
322 : {
323 282 : auto result = arrow::util::Codec::Create(m_eCompression);
324 141 : if (!result.ok())
325 : {
326 0 : CPLError(CE_Failure, CPLE_AppDefined,
327 : "Codec::Create() failed with %s",
328 0 : result.status().message().c_str());
329 : }
330 : else
331 : {
332 141 : options.codec.reset(result->release());
333 : }
334 : }
335 :
336 141 : if (m_bStreamFormat)
337 : {
338 : auto result =
339 8 : arrow::ipc::MakeStreamWriter(m_poOutputStream, m_poSchema, options);
340 4 : if (!result.ok())
341 : {
342 0 : CPLError(CE_Failure, CPLE_AppDefined,
343 : "arrow::ipc::MakeStreamWriter() failed with %s",
344 0 : result.status().message().c_str());
345 : }
346 : else
347 : {
348 4 : m_poFileWriter = *result;
349 : }
350 : }
351 : else
352 : {
353 : m_poFooterKeyValueMetadata =
354 137 : std::make_shared<arrow::KeyValueMetadata>();
355 : auto result = arrow::ipc::MakeFileWriter(
356 411 : m_poOutputStream, m_poSchema, options, m_poFooterKeyValueMetadata);
357 137 : if (!result.ok())
358 : {
359 0 : CPLError(CE_Failure, CPLE_AppDefined,
360 : "arrow::ipc::MakeFileWriter() failed with %s",
361 0 : result.status().message().c_str());
362 : }
363 : else
364 : {
365 137 : m_poFileWriter = *result;
366 : }
367 : }
368 141 : }
369 :
370 : /************************************************************************/
371 : /* PerformStepsBeforeFinalFlushGroup() */
372 : /************************************************************************/
373 :
374 : // Add a gdal:geo extension metadata for now, which embeds a bbox
375 141 : void OGRFeatherWriterLayer::PerformStepsBeforeFinalFlushGroup()
376 : {
377 278 : if (m_poFooterKeyValueMetadata &&
378 273 : m_poFeatureDefn->GetGeomFieldCount() != 0 &&
379 132 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GDAL_FOOTER", "YES")))
380 : {
381 258 : CPLJSONObject oRoot;
382 129 : oRoot.Add("primary_column",
383 129 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
384 129 : CPLJSONObject oColumns;
385 129 : oRoot.Add("columns", oColumns);
386 258 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
387 : {
388 129 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
389 258 : CPLJSONObject oColumn;
390 129 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
391 129 : oColumn.Add("encoding",
392 129 : GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
393 :
394 129 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
395 129 : if (poSRS)
396 : {
397 123 : const char *const apszOptions[] = {"FORMAT=WKT2_2019",
398 : "MULTILINE=NO", nullptr};
399 123 : char *pszWKT = nullptr;
400 123 : poSRS->exportToWkt(&pszWKT, apszOptions);
401 123 : if (pszWKT)
402 123 : oColumn.Add("crs", pszWKT);
403 123 : CPLFree(pszWKT);
404 :
405 123 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
406 123 : if (dfCoordEpoch > 0)
407 1 : oColumn.Add("epoch", dfCoordEpoch);
408 : }
409 :
410 129 : if (m_aoEnvelopes[i].IsInit())
411 : {
412 116 : CPLJSONArray oBBOX;
413 116 : oBBOX.Add(m_aoEnvelopes[i].MinX);
414 116 : oBBOX.Add(m_aoEnvelopes[i].MinY);
415 116 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
416 116 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
417 116 : oColumn.Add("bbox", oBBOX);
418 : }
419 : }
420 :
421 258 : m_poFooterKeyValueMetadata->Append(
422 : GDAL_GEO_FOOTER_KEY,
423 258 : oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
424 : }
425 141 : }
426 :
427 : /************************************************************************/
428 : /* FlushGroup() */
429 : /************************************************************************/
430 :
431 20 : bool OGRFeatherWriterLayer::FlushGroup()
432 : {
433 20 : std::vector<std::shared_ptr<arrow::Array>> columns;
434 20 : auto ret = WriteArrays(
435 840 : [&columns](const std::shared_ptr<arrow::Field> &,
436 840 : const std::shared_ptr<arrow::Array> &array)
437 : {
438 840 : columns.emplace_back(array);
439 840 : return true;
440 : });
441 :
442 20 : if (ret)
443 : {
444 : auto poRecordBatch = arrow::RecordBatch::Make(
445 60 : m_poSchema, !columns.empty() ? columns[0]->length() : 0, columns);
446 40 : auto status = m_poFileWriter->WriteRecordBatch(*poRecordBatch);
447 20 : if (!status.ok())
448 : {
449 0 : CPLError(CE_Failure, CPLE_AppDefined,
450 : "WriteRecordBatch() failed with %s",
451 0 : status.message().c_str());
452 0 : ret = false;
453 : }
454 : }
455 :
456 20 : ClearArrayBuilers();
457 40 : return ret;
458 : }
459 :
460 : /************************************************************************/
461 : /* WriteArrowBatch() */
462 : /************************************************************************/
463 :
464 : inline bool
465 112 : OGRFeatherWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
466 : struct ArrowArray *array,
467 : CSLConstList papszOptions)
468 : {
469 224 : return WriteArrowBatchInternal(
470 : schema, array, papszOptions,
471 112 : [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
472 : {
473 224 : auto status = m_poFileWriter->WriteRecordBatch(*poBatch);
474 112 : if (!status.ok())
475 : {
476 0 : CPLError(CE_Failure, CPLE_AppDefined,
477 : "WriteRecordBatch() failed: %s",
478 0 : status.message().c_str());
479 0 : return false;
480 : }
481 :
482 112 : return true;
483 224 : });
484 : }
|