Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "ogr_feather.h"
14 :
15 : #include "../arrow_common/ograrrowwriterlayer.hpp"
16 :
17 : /************************************************************************/
18 : /* OGRFeatherWriterLayer() */
19 : /************************************************************************/
20 :
21 152 : OGRFeatherWriterLayer::OGRFeatherWriterLayer(
22 : GDALDataset *poDS, arrow::MemoryPool *poMemoryPool,
23 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
24 152 : const char *pszLayerName)
25 : : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
26 152 : m_poDS(poDS)
27 : {
28 152 : m_bWriteFieldArrowExtensionName = true;
29 152 : }
30 :
31 : /************************************************************************/
32 : /* Close() */
33 : /************************************************************************/
34 :
35 143 : bool OGRFeatherWriterLayer::Close()
36 : {
37 143 : if (m_bInitializationOK)
38 : {
39 143 : if (!FinalizeWriting())
40 0 : return false;
41 : }
42 :
43 143 : return true;
44 : }
45 :
46 : /************************************************************************/
47 : /* IsSupportedGeometryType() */
48 : /************************************************************************/
49 :
50 146 : bool OGRFeatherWriterLayer::IsSupportedGeometryType(
51 : OGRwkbGeometryType eGType) const
52 : {
53 146 : if (eGType != wkbFlatten(eGType))
54 : {
55 : const auto osConfigOptionName =
56 170 : "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
57 85 : if (!CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
58 : {
59 7 : CPLError(CE_Failure, CPLE_NotSupported,
60 : "Only 2D geometry types are supported (unless the "
61 : "%s configuration option is set to YES)",
62 : osConfigOptionName.c_str());
63 7 : return false;
64 : }
65 : }
66 139 : return true;
67 : }
68 :
69 : /************************************************************************/
70 : /* SetOptions() */
71 : /************************************************************************/
72 :
73 152 : bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
74 : CSLConstList papszOptions,
75 : const OGRSpatialReference *poSpatialRef,
76 : OGRwkbGeometryType eGType)
77 : {
78 152 : m_aosCreationOptions = papszOptions;
79 :
80 : const char *pszDefaultFormat =
81 304 : (EQUAL(CPLGetExtensionSafe(osFilename.c_str()).c_str(), "arrows") ||
82 152 : STARTS_WITH_CI(osFilename.c_str(), "/vsistdout"))
83 456 : ? "STREAM"
84 152 : : "FILE";
85 152 : m_bStreamFormat =
86 152 : EQUAL(CSLFetchNameValueDef(papszOptions, "FORMAT", pszDefaultFormat),
87 : "STREAM");
88 :
89 : const char *pszGeomEncoding =
90 152 : CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
91 152 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
92 152 : if (pszGeomEncoding)
93 : {
94 121 : if (EQUAL(pszGeomEncoding, "WKB"))
95 31 : m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
96 90 : else if (EQUAL(pszGeomEncoding, "WKT"))
97 29 : m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
98 61 : else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
99 30 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
100 31 : else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
101 6 : EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
102 31 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
103 : else
104 : {
105 0 : CPLError(CE_Failure, CPLE_NotSupported,
106 : "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
107 0 : return false;
108 : }
109 : }
110 :
111 152 : if (eGType != wkbNone)
112 : {
113 146 : if (!IsSupportedGeometryType(eGType))
114 : {
115 9 : return false;
116 : }
117 :
118 139 : if (poSpatialRef == nullptr)
119 : {
120 10 : CPLError(CE_Warning, CPLE_AppDefined,
121 : "Geometry column should have an associated CRS");
122 : }
123 :
124 139 : m_poFeatureDefn->SetGeomType(eGType);
125 139 : auto eGeomEncoding = m_eGeomEncoding;
126 139 : if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
127 109 : eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
128 : {
129 79 : const auto eEncodingType = eGeomEncoding;
130 79 : eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
131 79 : if (eGeomEncoding == eEncodingType)
132 2 : return false;
133 : }
134 137 : m_aeGeomEncoding.push_back(eGeomEncoding);
135 137 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
136 : CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
137 137 : if (poSpatialRef)
138 : {
139 129 : auto poSRS = poSpatialRef->Clone();
140 129 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
141 129 : poSRS->Release();
142 : }
143 : }
144 :
145 143 : m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
146 :
147 : const char *pszCompression =
148 143 : CSLFetchNameValue(papszOptions, GDALMD_COMPRESSION);
149 143 : if (pszCompression == nullptr)
150 : {
151 420 : auto oResult = arrow::util::Codec::GetCompressionType("lz4");
152 140 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
153 : {
154 140 : pszCompression = "LZ4";
155 : }
156 : else
157 : {
158 0 : pszCompression = "NONE";
159 : }
160 : }
161 :
162 143 : if (EQUAL(pszCompression, "NONE"))
163 0 : pszCompression = "UNCOMPRESSED";
164 : auto oResult = arrow::util::Codec::GetCompressionType(
165 286 : CPLString(pszCompression).tolower());
166 143 : if (!oResult.ok())
167 : {
168 0 : CPLError(CE_Failure, CPLE_NotSupported,
169 : "Unrecognized compression method: %s", pszCompression);
170 0 : return false;
171 : }
172 143 : m_eCompression = *oResult;
173 143 : if (!arrow::util::Codec::IsAvailable(m_eCompression))
174 : {
175 0 : CPLError(CE_Failure, CPLE_NotSupported,
176 : "Compression method %s is known, but libarrow has not "
177 : "been built with support for it",
178 : pszCompression);
179 0 : return false;
180 : }
181 :
182 143 : const char *pszRowGroupSize = CSLFetchNameValue(papszOptions, "BATCH_SIZE");
183 143 : if (pszRowGroupSize)
184 : {
185 3 : auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
186 3 : if (nRowGroupSize > 0)
187 : {
188 3 : if (nRowGroupSize > INT_MAX)
189 0 : nRowGroupSize = INT_MAX;
190 3 : m_nRowGroupSize = nRowGroupSize;
191 : }
192 : }
193 :
194 143 : m_bInitializationOK = true;
195 143 : return true;
196 : }
197 :
198 : /************************************************************************/
199 : /* CloseFileWriter() */
200 : /************************************************************************/
201 :
202 143 : bool OGRFeatherWriterLayer::CloseFileWriter()
203 : {
204 286 : auto status = m_poFileWriter->Close();
205 143 : if (!status.ok())
206 : {
207 0 : CPLError(CE_Failure, CPLE_AppDefined,
208 : "FileWriter::Close() failed with %s",
209 0 : status.message().c_str());
210 : }
211 286 : return status.ok();
212 : }
213 :
214 : /************************************************************************/
215 : /* CreateSchema() */
216 : /************************************************************************/
217 :
218 143 : void OGRFeatherWriterLayer::CreateSchema()
219 : {
220 143 : CreateSchemaCommon();
221 :
222 280 : if (m_poFeatureDefn->GetGeomFieldCount() != 0 &&
223 137 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GEO", "YES")))
224 : {
225 272 : CPLJSONObject oRoot;
226 136 : oRoot.Add("schema_version", "0.1.0");
227 136 : oRoot.Add("primary_column",
228 136 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
229 272 : CPLJSONObject oColumns;
230 136 : oRoot.Add("columns", oColumns);
231 272 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
232 : {
233 136 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
234 272 : CPLJSONObject oColumn;
235 136 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
236 136 : oColumn.Add("encoding",
237 136 : GetGeomEncodingAsString(m_aeGeomEncoding[i], false));
238 :
239 136 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
240 136 : if (poSRS)
241 : {
242 128 : const char *const apszOptions[] = {"FORMAT=WKT2_2019",
243 : "MULTILINE=NO", nullptr};
244 128 : char *pszWKT = nullptr;
245 128 : poSRS->exportToWkt(&pszWKT, apszOptions);
246 128 : if (pszWKT)
247 128 : oColumn.Add("crs", pszWKT);
248 128 : CPLFree(pszWKT);
249 :
250 128 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
251 128 : if (dfCoordEpoch > 0)
252 2 : oColumn.Add("epoch", dfCoordEpoch);
253 : }
254 :
255 : #if 0
256 : if( m_aoEnvelopes[i].IsInit() &&
257 : CPLTestBool(CPLGetConfigOption(
258 : "OGR_ARROW_WRITE_BBOX", "YES")) )
259 : {
260 : CPLJSONArray oBBOX;
261 : oBBOX.Add(m_aoEnvelopes[i].MinX);
262 : oBBOX.Add(m_aoEnvelopes[i].MinY);
263 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
264 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
265 : oColumn.Add("bbox", oBBOX);
266 : }
267 : #endif
268 136 : const auto eType = poGeomFieldDefn->GetType();
269 136 : if (CPLTestBool(CPLGetConfigOption(
270 272 : "OGR_ARROW_WRITE_GDAL_GEOMETRY_TYPE", "YES")) &&
271 136 : eType == wkbFlatten(eType))
272 : {
273 : // Geometry type, place under a temporary "gdal:geometry_type"
274 : // property pending acceptance of proposal at
275 : // https://github.com/opengeospatial/geoparquet/issues/41
276 58 : const char *pszType = "mixed";
277 58 : if (wkbPoint == eType)
278 19 : pszType = "Point";
279 39 : else if (wkbLineString == eType)
280 7 : pszType = "LineString";
281 32 : else if (wkbPolygon == eType)
282 9 : pszType = "Polygon";
283 23 : else if (wkbMultiPoint == eType)
284 7 : pszType = "MultiPoint";
285 16 : else if (wkbMultiLineString == eType)
286 7 : pszType = "MultiLineString";
287 9 : else if (wkbMultiPolygon == eType)
288 7 : pszType = "MultiPolygon";
289 2 : else if (wkbGeometryCollection == eType)
290 2 : pszType = "GeometryCollection";
291 58 : oColumn.Add("gdal:geometry_type", pszType);
292 : }
293 : }
294 :
295 136 : auto kvMetadata = m_poSchema->metadata()
296 2 : ? m_poSchema->metadata()->Copy()
297 274 : : std::make_shared<arrow::KeyValueMetadata>();
298 272 : kvMetadata->Append("geo",
299 272 : oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
300 136 : m_poSchema = m_poSchema->WithMetadata(kvMetadata);
301 136 : CPLAssert(m_poSchema);
302 : }
303 143 : }
304 :
305 : /************************************************************************/
306 : /* CreateWriter() */
307 : /************************************************************************/
308 :
309 143 : void OGRFeatherWriterLayer::CreateWriter()
310 : {
311 143 : CPLAssert(m_poFileWriter == nullptr);
312 :
313 143 : if (m_poSchema == nullptr)
314 : {
315 15 : CreateSchema();
316 : }
317 : else
318 : {
319 128 : FinalizeSchema();
320 : }
321 :
322 286 : auto options = arrow::ipc::IpcWriteOptions::Defaults();
323 143 : options.memory_pool = m_poMemoryPool;
324 :
325 : {
326 286 : auto result = arrow::util::Codec::Create(m_eCompression);
327 143 : if (!result.ok())
328 : {
329 0 : CPLError(CE_Failure, CPLE_AppDefined,
330 : "Codec::Create() failed with %s",
331 0 : result.status().message().c_str());
332 : }
333 : else
334 : {
335 143 : options.codec.reset(result->release());
336 : }
337 : }
338 :
339 143 : if (m_bStreamFormat)
340 : {
341 : auto result =
342 8 : arrow::ipc::MakeStreamWriter(m_poOutputStream, m_poSchema, options);
343 4 : if (!result.ok())
344 : {
345 0 : CPLError(CE_Failure, CPLE_AppDefined,
346 : "arrow::ipc::MakeStreamWriter() failed with %s",
347 0 : result.status().message().c_str());
348 : }
349 : else
350 : {
351 4 : m_poFileWriter = *result;
352 : }
353 : }
354 : else
355 : {
356 : m_poFooterKeyValueMetadata =
357 139 : std::make_shared<arrow::KeyValueMetadata>();
358 : auto result = arrow::ipc::MakeFileWriter(
359 417 : m_poOutputStream, m_poSchema, options, m_poFooterKeyValueMetadata);
360 139 : if (!result.ok())
361 : {
362 0 : CPLError(CE_Failure, CPLE_AppDefined,
363 : "arrow::ipc::MakeFileWriter() failed with %s",
364 0 : result.status().message().c_str());
365 : }
366 : else
367 : {
368 139 : m_poFileWriter = *result;
369 : }
370 : }
371 143 : }
372 :
373 : /************************************************************************/
374 : /* PerformStepsBeforeFinalFlushGroup() */
375 : /************************************************************************/
376 :
377 : // Add a gdal:geo extension metadata for now, which embeds a bbox
378 143 : void OGRFeatherWriterLayer::PerformStepsBeforeFinalFlushGroup()
379 : {
380 282 : if (m_poFooterKeyValueMetadata &&
381 277 : m_poFeatureDefn->GetGeomFieldCount() != 0 &&
382 134 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GDAL_FOOTER", "YES")))
383 : {
384 262 : CPLJSONObject oRoot;
385 131 : oRoot.Add("primary_column",
386 131 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
387 131 : CPLJSONObject oColumns;
388 131 : oRoot.Add("columns", oColumns);
389 262 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
390 : {
391 131 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
392 262 : CPLJSONObject oColumn;
393 131 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
394 131 : oColumn.Add("encoding",
395 131 : GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
396 :
397 131 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
398 131 : if (poSRS)
399 : {
400 123 : const char *const apszOptions[] = {"FORMAT=WKT2_2019",
401 : "MULTILINE=NO", nullptr};
402 123 : char *pszWKT = nullptr;
403 123 : poSRS->exportToWkt(&pszWKT, apszOptions);
404 123 : if (pszWKT)
405 123 : oColumn.Add("crs", pszWKT);
406 123 : CPLFree(pszWKT);
407 :
408 123 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
409 123 : if (dfCoordEpoch > 0)
410 1 : oColumn.Add("epoch", dfCoordEpoch);
411 : }
412 :
413 131 : if (m_aoEnvelopes[i].IsInit())
414 : {
415 116 : CPLJSONArray oBBOX;
416 116 : oBBOX.Add(m_aoEnvelopes[i].MinX);
417 116 : oBBOX.Add(m_aoEnvelopes[i].MinY);
418 116 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
419 116 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
420 116 : oColumn.Add("bbox", oBBOX);
421 : }
422 : }
423 :
424 262 : m_poFooterKeyValueMetadata->Append(
425 : GDAL_GEO_FOOTER_KEY,
426 262 : oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
427 : }
428 143 : }
429 :
430 : /************************************************************************/
431 : /* FlushGroup() */
432 : /************************************************************************/
433 :
434 22 : bool OGRFeatherWriterLayer::FlushGroup()
435 : {
436 22 : std::vector<std::shared_ptr<arrow::Array>> columns;
437 22 : auto ret = WriteArrays(
438 846 : [&columns](const std::shared_ptr<arrow::Field> &,
439 846 : const std::shared_ptr<arrow::Array> &array)
440 : {
441 846 : columns.emplace_back(array);
442 846 : return true;
443 : });
444 :
445 22 : if (ret)
446 : {
447 : auto poRecordBatch = arrow::RecordBatch::Make(
448 66 : m_poSchema, !columns.empty() ? columns[0]->length() : 0, columns);
449 44 : auto status = m_poFileWriter->WriteRecordBatch(*poRecordBatch);
450 22 : if (!status.ok())
451 : {
452 0 : CPLError(CE_Failure, CPLE_AppDefined,
453 : "WriteRecordBatch() failed with %s",
454 0 : status.message().c_str());
455 0 : ret = false;
456 : }
457 : }
458 :
459 22 : ClearArrayBuilers();
460 44 : return ret;
461 : }
462 :
463 : /************************************************************************/
464 : /* WriteArrowBatch() */
465 : /************************************************************************/
466 :
467 : inline bool
468 112 : OGRFeatherWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
469 : struct ArrowArray *array,
470 : CSLConstList papszOptions)
471 : {
472 224 : return WriteArrowBatchInternal(
473 : schema, array, papszOptions,
474 112 : [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
475 : {
476 224 : auto status = m_poFileWriter->WriteRecordBatch(*poBatch);
477 112 : if (!status.ok())
478 : {
479 0 : CPLError(CE_Failure, CPLE_AppDefined,
480 : "WriteRecordBatch() failed: %s",
481 0 : status.message().c_str());
482 0 : return false;
483 : }
484 :
485 112 : return true;
486 224 : });
487 : }
|