Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "ogr_feather.h"
14 :
15 : #include "../arrow_common/ograrrowwriterlayer.hpp"
16 :
17 : /************************************************************************/
18 : /* OGRFeatherWriterLayer() */
19 : /************************************************************************/
20 :
21 152 : OGRFeatherWriterLayer::OGRFeatherWriterLayer(
22 : GDALDataset *poDS, arrow::MemoryPool *poMemoryPool,
23 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
24 152 : const char *pszLayerName)
25 : : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
26 152 : m_poDS(poDS)
27 : {
28 152 : m_bWriteFieldArrowExtensionName = true;
29 152 : }
30 :
31 : /************************************************************************/
32 : /* Close() */
33 : /************************************************************************/
34 :
35 143 : bool OGRFeatherWriterLayer::Close()
36 : {
37 143 : if (m_bInitializationOK)
38 : {
39 143 : if (!FinalizeWriting())
40 0 : return false;
41 : }
42 :
43 143 : return true;
44 : }
45 :
46 : /************************************************************************/
47 : /* IsSupportedGeometryType() */
48 : /************************************************************************/
49 :
50 146 : bool OGRFeatherWriterLayer::IsSupportedGeometryType(
51 : OGRwkbGeometryType eGType) const
52 : {
53 146 : if (eGType != wkbFlatten(eGType))
54 : {
55 : const auto osConfigOptionName =
56 170 : "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
57 85 : if (!CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
58 : {
59 7 : CPLError(CE_Failure, CPLE_NotSupported,
60 : "Only 2D geometry types are supported (unless the "
61 : "%s configuration option is set to YES)",
62 : osConfigOptionName.c_str());
63 7 : return false;
64 : }
65 : }
66 139 : return true;
67 : }
68 :
69 : /************************************************************************/
70 : /* SetOptions() */
71 : /************************************************************************/
72 :
73 152 : bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
74 : CSLConstList papszOptions,
75 : const OGRSpatialReference *poSpatialRef,
76 : OGRwkbGeometryType eGType)
77 : {
78 152 : m_aosCreationOptions = papszOptions;
79 :
80 : const char *pszDefaultFormat =
81 304 : (EQUAL(CPLGetExtensionSafe(osFilename.c_str()).c_str(), "arrows") ||
82 152 : STARTS_WITH_CI(osFilename.c_str(), "/vsistdout"))
83 456 : ? "STREAM"
84 152 : : "FILE";
85 152 : m_bStreamFormat =
86 152 : EQUAL(CSLFetchNameValueDef(papszOptions, "FORMAT", pszDefaultFormat),
87 : "STREAM");
88 :
89 : const char *pszGeomEncoding =
90 152 : CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
91 152 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
92 152 : if (pszGeomEncoding)
93 : {
94 121 : if (EQUAL(pszGeomEncoding, "WKB"))
95 31 : m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
96 90 : else if (EQUAL(pszGeomEncoding, "WKT"))
97 29 : m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
98 61 : else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
99 30 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
100 31 : else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
101 6 : EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
102 31 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
103 : else
104 : {
105 0 : CPLError(CE_Failure, CPLE_NotSupported,
106 : "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
107 0 : return false;
108 : }
109 : }
110 :
111 152 : if (eGType != wkbNone)
112 : {
113 146 : if (!IsSupportedGeometryType(eGType))
114 : {
115 9 : return false;
116 : }
117 :
118 139 : if (poSpatialRef == nullptr)
119 : {
120 10 : CPLError(CE_Warning, CPLE_AppDefined,
121 : "Geometry column should have an associated CRS");
122 : }
123 :
124 139 : m_poFeatureDefn->SetGeomType(eGType);
125 139 : auto eGeomEncoding = m_eGeomEncoding;
126 139 : if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
127 109 : eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
128 : {
129 79 : const auto eEncodingType = eGeomEncoding;
130 79 : eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
131 79 : if (eGeomEncoding == eEncodingType)
132 2 : return false;
133 : }
134 137 : m_aeGeomEncoding.push_back(eGeomEncoding);
135 137 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
136 : CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
137 137 : if (poSpatialRef)
138 : {
139 129 : auto poSRS = poSpatialRef->Clone();
140 129 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
141 129 : poSRS->Release();
142 : }
143 : }
144 :
145 143 : m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
146 :
147 143 : const char *pszCompression = CSLFetchNameValue(papszOptions, "COMPRESSION");
148 143 : if (pszCompression == nullptr)
149 : {
150 420 : auto oResult = arrow::util::Codec::GetCompressionType("lz4");
151 140 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
152 : {
153 140 : pszCompression = "LZ4";
154 : }
155 : else
156 : {
157 0 : pszCompression = "NONE";
158 : }
159 : }
160 :
161 143 : if (EQUAL(pszCompression, "NONE"))
162 0 : pszCompression = "UNCOMPRESSED";
163 : auto oResult = arrow::util::Codec::GetCompressionType(
164 286 : CPLString(pszCompression).tolower());
165 143 : if (!oResult.ok())
166 : {
167 0 : CPLError(CE_Failure, CPLE_NotSupported,
168 : "Unrecognized compression method: %s", pszCompression);
169 0 : return false;
170 : }
171 143 : m_eCompression = *oResult;
172 143 : if (!arrow::util::Codec::IsAvailable(m_eCompression))
173 : {
174 0 : CPLError(CE_Failure, CPLE_NotSupported,
175 : "Compression method %s is known, but libarrow has not "
176 : "been built with support for it",
177 : pszCompression);
178 0 : return false;
179 : }
180 :
181 143 : const char *pszRowGroupSize = CSLFetchNameValue(papszOptions, "BATCH_SIZE");
182 143 : if (pszRowGroupSize)
183 : {
184 3 : auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
185 3 : if (nRowGroupSize > 0)
186 : {
187 3 : if (nRowGroupSize > INT_MAX)
188 0 : nRowGroupSize = INT_MAX;
189 3 : m_nRowGroupSize = nRowGroupSize;
190 : }
191 : }
192 :
193 143 : m_bInitializationOK = true;
194 143 : return true;
195 : }
196 :
197 : /************************************************************************/
198 : /* CloseFileWriter() */
199 : /************************************************************************/
200 :
201 143 : bool OGRFeatherWriterLayer::CloseFileWriter()
202 : {
203 286 : auto status = m_poFileWriter->Close();
204 143 : if (!status.ok())
205 : {
206 0 : CPLError(CE_Failure, CPLE_AppDefined,
207 : "FileWriter::Close() failed with %s",
208 0 : status.message().c_str());
209 : }
210 286 : return status.ok();
211 : }
212 :
213 : /************************************************************************/
214 : /* CreateSchema() */
215 : /************************************************************************/
216 :
217 143 : void OGRFeatherWriterLayer::CreateSchema()
218 : {
219 143 : CreateSchemaCommon();
220 :
221 280 : if (m_poFeatureDefn->GetGeomFieldCount() != 0 &&
222 137 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GEO", "YES")))
223 : {
224 272 : CPLJSONObject oRoot;
225 136 : oRoot.Add("schema_version", "0.1.0");
226 136 : oRoot.Add("primary_column",
227 136 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
228 272 : CPLJSONObject oColumns;
229 136 : oRoot.Add("columns", oColumns);
230 272 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
231 : {
232 136 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
233 272 : CPLJSONObject oColumn;
234 136 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
235 136 : oColumn.Add("encoding",
236 136 : GetGeomEncodingAsString(m_aeGeomEncoding[i], false));
237 :
238 136 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
239 136 : if (poSRS)
240 : {
241 128 : const char *const apszOptions[] = {"FORMAT=WKT2_2019",
242 : "MULTILINE=NO", nullptr};
243 128 : char *pszWKT = nullptr;
244 128 : poSRS->exportToWkt(&pszWKT, apszOptions);
245 128 : if (pszWKT)
246 128 : oColumn.Add("crs", pszWKT);
247 128 : CPLFree(pszWKT);
248 :
249 128 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
250 128 : if (dfCoordEpoch > 0)
251 2 : oColumn.Add("epoch", dfCoordEpoch);
252 : }
253 :
254 : #if 0
255 : if( m_aoEnvelopes[i].IsInit() &&
256 : CPLTestBool(CPLGetConfigOption(
257 : "OGR_ARROW_WRITE_BBOX", "YES")) )
258 : {
259 : CPLJSONArray oBBOX;
260 : oBBOX.Add(m_aoEnvelopes[i].MinX);
261 : oBBOX.Add(m_aoEnvelopes[i].MinY);
262 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
263 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
264 : oColumn.Add("bbox", oBBOX);
265 : }
266 : #endif
267 136 : const auto eType = poGeomFieldDefn->GetType();
268 136 : if (CPLTestBool(CPLGetConfigOption(
269 272 : "OGR_ARROW_WRITE_GDAL_GEOMETRY_TYPE", "YES")) &&
270 136 : eType == wkbFlatten(eType))
271 : {
272 : // Geometry type, place under a temporary "gdal:geometry_type"
273 : // property pending acceptance of proposal at
274 : // https://github.com/opengeospatial/geoparquet/issues/41
275 58 : const char *pszType = "mixed";
276 58 : if (wkbPoint == eType)
277 19 : pszType = "Point";
278 39 : else if (wkbLineString == eType)
279 7 : pszType = "LineString";
280 32 : else if (wkbPolygon == eType)
281 9 : pszType = "Polygon";
282 23 : else if (wkbMultiPoint == eType)
283 7 : pszType = "MultiPoint";
284 16 : else if (wkbMultiLineString == eType)
285 7 : pszType = "MultiLineString";
286 9 : else if (wkbMultiPolygon == eType)
287 7 : pszType = "MultiPolygon";
288 2 : else if (wkbGeometryCollection == eType)
289 2 : pszType = "GeometryCollection";
290 58 : oColumn.Add("gdal:geometry_type", pszType);
291 : }
292 : }
293 :
294 136 : auto kvMetadata = m_poSchema->metadata()
295 2 : ? m_poSchema->metadata()->Copy()
296 274 : : std::make_shared<arrow::KeyValueMetadata>();
297 272 : kvMetadata->Append("geo",
298 272 : oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
299 136 : m_poSchema = m_poSchema->WithMetadata(kvMetadata);
300 136 : CPLAssert(m_poSchema);
301 : }
302 143 : }
303 :
304 : /************************************************************************/
305 : /* CreateWriter() */
306 : /************************************************************************/
307 :
308 143 : void OGRFeatherWriterLayer::CreateWriter()
309 : {
310 143 : CPLAssert(m_poFileWriter == nullptr);
311 :
312 143 : if (m_poSchema == nullptr)
313 : {
314 15 : CreateSchema();
315 : }
316 : else
317 : {
318 128 : FinalizeSchema();
319 : }
320 :
321 286 : auto options = arrow::ipc::IpcWriteOptions::Defaults();
322 143 : options.memory_pool = m_poMemoryPool;
323 :
324 : {
325 286 : auto result = arrow::util::Codec::Create(m_eCompression);
326 143 : if (!result.ok())
327 : {
328 0 : CPLError(CE_Failure, CPLE_AppDefined,
329 : "Codec::Create() failed with %s",
330 0 : result.status().message().c_str());
331 : }
332 : else
333 : {
334 143 : options.codec.reset(result->release());
335 : }
336 : }
337 :
338 143 : if (m_bStreamFormat)
339 : {
340 : auto result =
341 8 : arrow::ipc::MakeStreamWriter(m_poOutputStream, m_poSchema, options);
342 4 : if (!result.ok())
343 : {
344 0 : CPLError(CE_Failure, CPLE_AppDefined,
345 : "arrow::ipc::MakeStreamWriter() failed with %s",
346 0 : result.status().message().c_str());
347 : }
348 : else
349 : {
350 4 : m_poFileWriter = *result;
351 : }
352 : }
353 : else
354 : {
355 : m_poFooterKeyValueMetadata =
356 139 : std::make_shared<arrow::KeyValueMetadata>();
357 : auto result = arrow::ipc::MakeFileWriter(
358 417 : m_poOutputStream, m_poSchema, options, m_poFooterKeyValueMetadata);
359 139 : if (!result.ok())
360 : {
361 0 : CPLError(CE_Failure, CPLE_AppDefined,
362 : "arrow::ipc::MakeFileWriter() failed with %s",
363 0 : result.status().message().c_str());
364 : }
365 : else
366 : {
367 139 : m_poFileWriter = *result;
368 : }
369 : }
370 143 : }
371 :
372 : /************************************************************************/
373 : /* PerformStepsBeforeFinalFlushGroup() */
374 : /************************************************************************/
375 :
376 : // Add a gdal:geo extension metadata for now, which embeds a bbox
377 143 : void OGRFeatherWriterLayer::PerformStepsBeforeFinalFlushGroup()
378 : {
379 282 : if (m_poFooterKeyValueMetadata &&
380 277 : m_poFeatureDefn->GetGeomFieldCount() != 0 &&
381 134 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GDAL_FOOTER", "YES")))
382 : {
383 262 : CPLJSONObject oRoot;
384 131 : oRoot.Add("primary_column",
385 131 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
386 131 : CPLJSONObject oColumns;
387 131 : oRoot.Add("columns", oColumns);
388 262 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
389 : {
390 131 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
391 262 : CPLJSONObject oColumn;
392 131 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
393 131 : oColumn.Add("encoding",
394 131 : GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
395 :
396 131 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
397 131 : if (poSRS)
398 : {
399 123 : const char *const apszOptions[] = {"FORMAT=WKT2_2019",
400 : "MULTILINE=NO", nullptr};
401 123 : char *pszWKT = nullptr;
402 123 : poSRS->exportToWkt(&pszWKT, apszOptions);
403 123 : if (pszWKT)
404 123 : oColumn.Add("crs", pszWKT);
405 123 : CPLFree(pszWKT);
406 :
407 123 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
408 123 : if (dfCoordEpoch > 0)
409 1 : oColumn.Add("epoch", dfCoordEpoch);
410 : }
411 :
412 131 : if (m_aoEnvelopes[i].IsInit())
413 : {
414 116 : CPLJSONArray oBBOX;
415 116 : oBBOX.Add(m_aoEnvelopes[i].MinX);
416 116 : oBBOX.Add(m_aoEnvelopes[i].MinY);
417 116 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
418 116 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
419 116 : oColumn.Add("bbox", oBBOX);
420 : }
421 : }
422 :
423 262 : m_poFooterKeyValueMetadata->Append(
424 : GDAL_GEO_FOOTER_KEY,
425 262 : oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
426 : }
427 143 : }
428 :
429 : /************************************************************************/
430 : /* FlushGroup() */
431 : /************************************************************************/
432 :
433 22 : bool OGRFeatherWriterLayer::FlushGroup()
434 : {
435 22 : std::vector<std::shared_ptr<arrow::Array>> columns;
436 22 : auto ret = WriteArrays(
437 846 : [&columns](const std::shared_ptr<arrow::Field> &,
438 846 : const std::shared_ptr<arrow::Array> &array)
439 : {
440 846 : columns.emplace_back(array);
441 846 : return true;
442 : });
443 :
444 22 : if (ret)
445 : {
446 : auto poRecordBatch = arrow::RecordBatch::Make(
447 66 : m_poSchema, !columns.empty() ? columns[0]->length() : 0, columns);
448 44 : auto status = m_poFileWriter->WriteRecordBatch(*poRecordBatch);
449 22 : if (!status.ok())
450 : {
451 0 : CPLError(CE_Failure, CPLE_AppDefined,
452 : "WriteRecordBatch() failed with %s",
453 0 : status.message().c_str());
454 0 : ret = false;
455 : }
456 : }
457 :
458 22 : ClearArrayBuilers();
459 44 : return ret;
460 : }
461 :
462 : /************************************************************************/
463 : /* WriteArrowBatch() */
464 : /************************************************************************/
465 :
466 : inline bool
467 112 : OGRFeatherWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
468 : struct ArrowArray *array,
469 : CSLConstList papszOptions)
470 : {
471 224 : return WriteArrowBatchInternal(
472 : schema, array, papszOptions,
473 112 : [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
474 : {
475 224 : auto status = m_poFileWriter->WriteRecordBatch(*poBatch);
476 112 : if (!status.ok())
477 : {
478 0 : CPLError(CE_Failure, CPLE_AppDefined,
479 : "WriteRecordBatch() failed: %s",
480 0 : status.message().c_str());
481 0 : return false;
482 : }
483 :
484 112 : return true;
485 224 : });
486 : }
|