Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * Permission is hereby granted, free of charge, to any person obtaining a
11 : * copy of this software and associated documentation files (the "Software"),
12 : * to deal in the Software without restriction, including without limitation
13 : * the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 : * and/or sell copies of the Software, and to permit persons to whom the
15 : * Software is furnished to do so, subject to the following conditions:
16 : *
17 : * The above copyright notice and this permission notice shall be included
18 : * in all copies or substantial portions of the Software.
19 : *
20 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 : * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 : * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25 : * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26 : * DEALINGS IN THE SOFTWARE.
27 : ****************************************************************************/
28 :
29 : #include "ogr_feather.h"
30 :
31 : #include "../arrow_common/ograrrowwriterlayer.hpp"
32 :
33 : /************************************************************************/
34 : /* OGRFeatherWriterLayer() */
35 : /************************************************************************/
36 :
37 138 : OGRFeatherWriterLayer::OGRFeatherWriterLayer(
38 : GDALDataset *poDS, arrow::MemoryPool *poMemoryPool,
39 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
40 138 : const char *pszLayerName)
41 : : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
42 138 : m_poDS(poDS)
43 : {
44 138 : m_bWriteFieldArrowExtensionName = true;
45 138 : }
46 :
47 : /************************************************************************/
48 : /* ~OGRFeatherWriterLayer() */
49 : /************************************************************************/
50 :
51 276 : OGRFeatherWriterLayer::~OGRFeatherWriterLayer()
52 : {
53 138 : if (m_bInitializationOK)
54 129 : FinalizeWriting();
55 276 : }
56 :
57 : /************************************************************************/
58 : /* IsSupportedGeometryType() */
59 : /************************************************************************/
60 :
61 133 : bool OGRFeatherWriterLayer::IsSupportedGeometryType(
62 : OGRwkbGeometryType eGType) const
63 : {
64 133 : if (eGType != wkbFlatten(eGType))
65 : {
66 : const auto osConfigOptionName =
67 170 : "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
68 85 : if (!CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
69 : {
70 7 : CPLError(CE_Failure, CPLE_NotSupported,
71 : "Only 2D geometry types are supported (unless the "
72 : "%s configuration option is set to YES)",
73 : osConfigOptionName.c_str());
74 7 : return false;
75 : }
76 : }
77 126 : return true;
78 : }
79 :
80 : /************************************************************************/
81 : /* SetOptions() */
82 : /************************************************************************/
83 :
84 138 : bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
85 : CSLConstList papszOptions,
86 : const OGRSpatialReference *poSpatialRef,
87 : OGRwkbGeometryType eGType)
88 : {
89 : const char *pszDefaultFormat =
90 138 : (EQUAL(CPLGetExtension(osFilename.c_str()), "arrows") ||
91 138 : STARTS_WITH_CI(osFilename.c_str(), "/vsistdout"))
92 276 : ? "STREAM"
93 138 : : "FILE";
94 138 : m_bStreamFormat =
95 138 : EQUAL(CSLFetchNameValueDef(papszOptions, "FORMAT", pszDefaultFormat),
96 : "STREAM");
97 :
98 : const char *pszGeomEncoding =
99 138 : CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
100 138 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
101 138 : if (pszGeomEncoding)
102 : {
103 110 : if (EQUAL(pszGeomEncoding, "WKB"))
104 32 : m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
105 78 : else if (EQUAL(pszGeomEncoding, "WKT"))
106 29 : m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
107 49 : else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
108 24 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
109 25 : else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
110 0 : EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
111 25 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
112 : else
113 : {
114 0 : CPLError(CE_Failure, CPLE_NotSupported,
115 : "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
116 0 : return false;
117 : }
118 : }
119 :
120 138 : if (eGType != wkbNone)
121 : {
122 133 : if (!IsSupportedGeometryType(eGType))
123 : {
124 9 : return false;
125 : }
126 :
127 126 : if (poSpatialRef == nullptr)
128 : {
129 8 : CPLError(CE_Warning, CPLE_AppDefined,
130 : "Geometry column should have an associated CRS");
131 : }
132 :
133 126 : m_poFeatureDefn->SetGeomType(eGType);
134 126 : auto eGeomEncoding = m_eGeomEncoding;
135 126 : if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
136 102 : eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
137 : {
138 65 : const auto eEncodingType = eGeomEncoding;
139 65 : eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
140 65 : if (eGeomEncoding == eEncodingType)
141 2 : return false;
142 : }
143 124 : m_aeGeomEncoding.push_back(eGeomEncoding);
144 124 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
145 : CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
146 124 : if (poSpatialRef)
147 : {
148 118 : auto poSRS = poSpatialRef->Clone();
149 118 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
150 118 : poSRS->Release();
151 : }
152 : }
153 :
154 129 : m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
155 :
156 129 : const char *pszCompression = CSLFetchNameValue(papszOptions, "COMPRESSION");
157 129 : if (pszCompression == nullptr)
158 : {
159 378 : auto oResult = arrow::util::Codec::GetCompressionType("lz4");
160 126 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
161 : {
162 126 : pszCompression = "LZ4";
163 : }
164 : else
165 : {
166 0 : pszCompression = "NONE";
167 : }
168 : }
169 :
170 129 : if (EQUAL(pszCompression, "NONE"))
171 0 : pszCompression = "UNCOMPRESSED";
172 : auto oResult = arrow::util::Codec::GetCompressionType(
173 258 : CPLString(pszCompression).tolower());
174 129 : if (!oResult.ok())
175 : {
176 0 : CPLError(CE_Failure, CPLE_NotSupported,
177 : "Unrecognized compression method: %s", pszCompression);
178 0 : return false;
179 : }
180 129 : m_eCompression = *oResult;
181 129 : if (!arrow::util::Codec::IsAvailable(m_eCompression))
182 : {
183 0 : CPLError(CE_Failure, CPLE_NotSupported,
184 : "Compression method %s is known, but libarrow has not "
185 : "been built with support for it",
186 : pszCompression);
187 0 : return false;
188 : }
189 :
190 129 : const char *pszRowGroupSize = CSLFetchNameValue(papszOptions, "BATCH_SIZE");
191 129 : if (pszRowGroupSize)
192 : {
193 3 : auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
194 3 : if (nRowGroupSize > 0)
195 : {
196 3 : if (nRowGroupSize > INT_MAX)
197 0 : nRowGroupSize = INT_MAX;
198 3 : m_nRowGroupSize = nRowGroupSize;
199 : }
200 : }
201 :
202 129 : m_bInitializationOK = true;
203 129 : return true;
204 : }
205 :
206 : /************************************************************************/
207 : /* CloseFileWriter() */
208 : /************************************************************************/
209 :
210 129 : bool OGRFeatherWriterLayer::CloseFileWriter()
211 : {
212 258 : auto status = m_poFileWriter->Close();
213 129 : if (!status.ok())
214 : {
215 0 : CPLError(CE_Failure, CPLE_AppDefined,
216 : "FileWriter::Close() failed with %s",
217 0 : status.message().c_str());
218 : }
219 258 : return status.ok();
220 : }
221 :
222 : /************************************************************************/
223 : /* CreateSchema() */
224 : /************************************************************************/
225 :
226 129 : void OGRFeatherWriterLayer::CreateSchema()
227 : {
228 129 : CreateSchemaCommon();
229 :
230 253 : if (m_poFeatureDefn->GetGeomFieldCount() != 0 &&
231 124 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GEO", "YES")))
232 : {
233 246 : CPLJSONObject oRoot;
234 123 : oRoot.Add("schema_version", "0.1.0");
235 123 : oRoot.Add("primary_column",
236 123 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
237 246 : CPLJSONObject oColumns;
238 123 : oRoot.Add("columns", oColumns);
239 246 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
240 : {
241 123 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
242 246 : CPLJSONObject oColumn;
243 123 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
244 123 : oColumn.Add("encoding",
245 123 : GetGeomEncodingAsString(m_aeGeomEncoding[i], false));
246 :
247 123 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
248 123 : if (poSRS)
249 : {
250 117 : const char *const apszOptions[] = {"FORMAT=WKT2_2019",
251 : "MULTILINE=NO", nullptr};
252 117 : char *pszWKT = nullptr;
253 117 : poSRS->exportToWkt(&pszWKT, apszOptions);
254 117 : if (pszWKT)
255 117 : oColumn.Add("crs", pszWKT);
256 117 : CPLFree(pszWKT);
257 :
258 117 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
259 117 : if (dfCoordEpoch > 0)
260 2 : oColumn.Add("epoch", dfCoordEpoch);
261 : }
262 :
263 : #if 0
264 : if( m_aoEnvelopes[i].IsInit() &&
265 : CPLTestBool(CPLGetConfigOption(
266 : "OGR_ARROW_WRITE_BBOX", "YES")) )
267 : {
268 : CPLJSONArray oBBOX;
269 : oBBOX.Add(m_aoEnvelopes[i].MinX);
270 : oBBOX.Add(m_aoEnvelopes[i].MinY);
271 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
272 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
273 : oColumn.Add("bbox", oBBOX);
274 : }
275 : #endif
276 123 : const auto eType = poGeomFieldDefn->GetType();
277 123 : if (CPLTestBool(CPLGetConfigOption(
278 246 : "OGR_ARROW_WRITE_GDAL_GEOMETRY_TYPE", "YES")) &&
279 123 : eType == wkbFlatten(eType))
280 : {
281 : // Geometry type, place under a temporary "gdal:geometry_type"
282 : // property pending acceptance of proposal at
283 : // https://github.com/opengeospatial/geoparquet/issues/41
284 45 : const char *pszType = "mixed";
285 45 : if (wkbPoint == eType)
286 18 : pszType = "Point";
287 27 : else if (wkbLineString == eType)
288 5 : pszType = "LineString";
289 22 : else if (wkbPolygon == eType)
290 5 : pszType = "Polygon";
291 17 : else if (wkbMultiPoint == eType)
292 5 : pszType = "MultiPoint";
293 12 : else if (wkbMultiLineString == eType)
294 5 : pszType = "MultiLineString";
295 7 : else if (wkbMultiPolygon == eType)
296 5 : pszType = "MultiPolygon";
297 2 : else if (wkbGeometryCollection == eType)
298 2 : pszType = "GeometryCollection";
299 45 : oColumn.Add("gdal:geometry_type", pszType);
300 : }
301 : }
302 :
303 123 : auto kvMetadata = m_poSchema->metadata()
304 5 : ? m_poSchema->metadata()->Copy()
305 251 : : std::make_shared<arrow::KeyValueMetadata>();
306 246 : kvMetadata->Append("geo",
307 246 : oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
308 123 : m_poSchema = m_poSchema->WithMetadata(kvMetadata);
309 123 : CPLAssert(m_poSchema);
310 : }
311 129 : }
312 :
313 : /************************************************************************/
314 : /* CreateWriter() */
315 : /************************************************************************/
316 :
317 129 : void OGRFeatherWriterLayer::CreateWriter()
318 : {
319 129 : CPLAssert(m_poFileWriter == nullptr);
320 :
321 129 : if (m_poSchema == nullptr)
322 : {
323 3 : CreateSchema();
324 : }
325 : else
326 : {
327 126 : FinalizeSchema();
328 : }
329 :
330 258 : auto options = arrow::ipc::IpcWriteOptions::Defaults();
331 129 : options.memory_pool = m_poMemoryPool;
332 :
333 : {
334 258 : auto result = arrow::util::Codec::Create(m_eCompression);
335 129 : if (!result.ok())
336 : {
337 0 : CPLError(CE_Failure, CPLE_AppDefined,
338 : "Codec::Create() failed with %s",
339 0 : result.status().message().c_str());
340 : }
341 : else
342 : {
343 129 : options.codec.reset(result->release());
344 : }
345 : }
346 :
347 129 : if (m_bStreamFormat)
348 : {
349 : auto result =
350 6 : arrow::ipc::MakeStreamWriter(m_poOutputStream, m_poSchema, options);
351 3 : if (!result.ok())
352 : {
353 0 : CPLError(CE_Failure, CPLE_AppDefined,
354 : "arrow::ipc::MakeStreamWriter() failed with %s",
355 0 : result.status().message().c_str());
356 : }
357 : else
358 : {
359 3 : m_poFileWriter = *result;
360 : }
361 : }
362 : else
363 : {
364 : m_poFooterKeyValueMetadata =
365 126 : std::make_shared<arrow::KeyValueMetadata>();
366 : auto result = arrow::ipc::MakeFileWriter(
367 378 : m_poOutputStream, m_poSchema, options, m_poFooterKeyValueMetadata);
368 126 : if (!result.ok())
369 : {
370 0 : CPLError(CE_Failure, CPLE_AppDefined,
371 : "arrow::ipc::MakeFileWriter() failed with %s",
372 0 : result.status().message().c_str());
373 : }
374 : else
375 : {
376 126 : m_poFileWriter = *result;
377 : }
378 : }
379 129 : }
380 :
381 : /************************************************************************/
382 : /* PerformStepsBeforeFinalFlushGroup() */
383 : /************************************************************************/
384 :
385 : // Add a gdal:geo extension metadata for now, which embeds a bbox
386 129 : void OGRFeatherWriterLayer::PerformStepsBeforeFinalFlushGroup()
387 : {
388 255 : if (m_poFooterKeyValueMetadata &&
389 250 : m_poFeatureDefn->GetGeomFieldCount() != 0 &&
390 121 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_WRITE_GDAL_FOOTER", "YES")))
391 : {
392 236 : CPLJSONObject oRoot;
393 118 : oRoot.Add("primary_column",
394 118 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
395 118 : CPLJSONObject oColumns;
396 118 : oRoot.Add("columns", oColumns);
397 236 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
398 : {
399 118 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
400 236 : CPLJSONObject oColumn;
401 118 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
402 118 : oColumn.Add("encoding",
403 118 : GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
404 :
405 118 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
406 118 : if (poSRS)
407 : {
408 112 : const char *const apszOptions[] = {"FORMAT=WKT2_2019",
409 : "MULTILINE=NO", nullptr};
410 112 : char *pszWKT = nullptr;
411 112 : poSRS->exportToWkt(&pszWKT, apszOptions);
412 112 : if (pszWKT)
413 112 : oColumn.Add("crs", pszWKT);
414 112 : CPLFree(pszWKT);
415 :
416 112 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
417 112 : if (dfCoordEpoch > 0)
418 1 : oColumn.Add("epoch", dfCoordEpoch);
419 : }
420 :
421 118 : if (m_aoEnvelopes[i].IsInit())
422 : {
423 117 : CPLJSONArray oBBOX;
424 117 : oBBOX.Add(m_aoEnvelopes[i].MinX);
425 117 : oBBOX.Add(m_aoEnvelopes[i].MinY);
426 117 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
427 117 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
428 117 : oColumn.Add("bbox", oBBOX);
429 : }
430 : }
431 :
432 236 : m_poFooterKeyValueMetadata->Append(
433 : GDAL_GEO_FOOTER_KEY,
434 236 : oRoot.Format(CPLJSONObject::PrettyFormat::Plain));
435 : }
436 129 : }
437 :
438 : /************************************************************************/
439 : /* FlushGroup() */
440 : /************************************************************************/
441 :
442 19 : bool OGRFeatherWriterLayer::FlushGroup()
443 : {
444 19 : std::vector<std::shared_ptr<arrow::Array>> columns;
445 19 : auto ret = WriteArrays(
446 839 : [&columns](const std::shared_ptr<arrow::Field> &,
447 839 : const std::shared_ptr<arrow::Array> &array)
448 : {
449 839 : columns.emplace_back(array);
450 839 : return true;
451 : });
452 :
453 19 : if (ret)
454 : {
455 : auto poRecordBatch = arrow::RecordBatch::Make(
456 57 : m_poSchema, !columns.empty() ? columns[0]->length() : 0, columns);
457 38 : auto status = m_poFileWriter->WriteRecordBatch(*poRecordBatch);
458 19 : if (!status.ok())
459 : {
460 0 : CPLError(CE_Failure, CPLE_AppDefined,
461 : "WriteRecordBatch() failed with %s",
462 0 : status.message().c_str());
463 0 : ret = false;
464 : }
465 : }
466 :
467 19 : ClearArrayBuilers();
468 38 : return ret;
469 : }
470 :
471 : /************************************************************************/
472 : /* WriteArrowBatch() */
473 : /************************************************************************/
474 :
475 : inline bool
476 113 : OGRFeatherWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
477 : struct ArrowArray *array,
478 : CSLConstList papszOptions)
479 : {
480 226 : return WriteArrowBatchInternal(
481 : schema, array, papszOptions,
482 113 : [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
483 : {
484 226 : auto status = m_poFileWriter->WriteRecordBatch(*poBatch);
485 113 : if (!status.ok())
486 : {
487 0 : CPLError(CE_Failure, CPLE_AppDefined,
488 : "WriteRecordBatch() failed: %s",
489 0 : status.message().c_str());
490 0 : return false;
491 : }
492 :
493 113 : return true;
494 226 : });
495 : }
|