Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #ifdef STANDALONE
14 : #include "gdal_version.h"
15 : #else
16 : #undef DO_NOT_DEFINE_GDAL_DATE_NAME
17 : #include "gdal_version_full/gdal_version.h"
18 : #endif
19 :
20 : #include "ogr_parquet.h"
21 :
22 : #include "../arrow_common/ograrrowwriterlayer.hpp"
23 :
24 : #include "ogr_wkb.h"
25 :
26 : #include <cassert>
27 : #include <utility>
28 :
29 : /************************************************************************/
30 : /* OGRParquetWriterLayer() */
31 : /************************************************************************/
32 :
33 272 : OGRParquetWriterLayer::OGRParquetWriterLayer(
34 : OGRParquetWriterDataset *poDataset, arrow::MemoryPool *poMemoryPool,
35 : const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
36 272 : const char *pszLayerName)
37 : : OGRArrowWriterLayer(poMemoryPool, poOutputStream, pszLayerName),
38 272 : m_poDataset(poDataset)
39 : {
40 272 : m_bWriteFieldArrowExtensionName = CPLTestBool(
41 : CPLGetConfigOption("OGR_PARQUET_WRITE_ARROW_EXTENSION_NAME", "NO"));
42 272 : }
43 :
44 : /************************************************************************/
45 : /* Close() */
46 : /************************************************************************/
47 :
48 269 : bool OGRParquetWriterLayer::Close()
49 : {
50 269 : if (m_poTmpGPKGLayer)
51 : {
52 2 : if (!CopyTmpGpkgLayerToFinalFile())
53 0 : return false;
54 : }
55 :
56 269 : if (m_bInitializationOK)
57 : {
58 269 : if (!FinalizeWriting())
59 0 : return false;
60 : }
61 :
62 269 : return true;
63 : }
64 :
65 : /************************************************************************/
66 : /* CopyTmpGpkgLayerToFinalFile() */
67 : /************************************************************************/
68 :
69 2 : bool OGRParquetWriterLayer::CopyTmpGpkgLayerToFinalFile()
70 : {
71 2 : if (!m_poTmpGPKGLayer)
72 : {
73 0 : return true;
74 : }
75 :
76 2 : CPLDebug("PARQUET", "CopyTmpGpkgLayerToFinalFile(): start...");
77 :
78 2 : VSIUnlink(m_poTmpGPKG->GetDescription());
79 :
80 4 : OGRFeature oFeat(m_poFeatureDefn);
81 :
82 : // Interval in terms of features between 2 debug progress report messages
83 2 : constexpr int PROGRESS_FC_INTERVAL = 100 * 1000;
84 :
85 : // First, write features without geometries
86 : {
87 2 : auto poTmpLayer = std::unique_ptr<OGRLayer>(m_poTmpGPKG->ExecuteSQL(
88 : "SELECT serialized_feature FROM tmp WHERE fid NOT IN (SELECT id "
89 : "FROM rtree_tmp_geom)",
90 2 : nullptr, nullptr));
91 2 : if (!poTmpLayer)
92 0 : return false;
93 1004 : for (const auto &poSrcFeature : poTmpLayer.get())
94 : {
95 1002 : int nBytesFeature = 0;
96 : const GByte *pabyFeatureData =
97 1002 : poSrcFeature->GetFieldAsBinary(0, &nBytesFeature);
98 1002 : if (!oFeat.DeserializeFromBinary(pabyFeatureData, nBytesFeature))
99 : {
100 0 : CPLError(CE_Failure, CPLE_AppDefined,
101 : "Cannot deserialize feature");
102 0 : return false;
103 : }
104 1002 : if (OGRArrowWriterLayer::ICreateFeature(&oFeat) != OGRERR_NONE)
105 : {
106 0 : return false;
107 : }
108 :
109 1002 : if ((m_nFeatureCount % PROGRESS_FC_INTERVAL) == 0)
110 : {
111 0 : CPLDebugProgress(
112 : "PARQUET",
113 : "CopyTmpGpkgLayerToFinalFile(): %.02f%% progress",
114 0 : 100.0 * double(m_nFeatureCount) /
115 0 : double(m_nTmpFeatureCount));
116 : }
117 : }
118 :
119 2 : if (!FlushFeatures())
120 : {
121 0 : return false;
122 : }
123 : }
124 :
125 : // Now walk through the GPKG RTree for features with geometries
126 : // Cf https://github.com/sqlite/sqlite/blob/master/ext/rtree/rtree.c
127 : // for the description of the content of the rtree _node table
128 4 : std::vector<std::pair<int64_t, int>> aNodeNoDepthPair;
129 2 : int nTreeDepth = 0;
130 : // Queue the root node
131 : aNodeNoDepthPair.emplace_back(
132 2 : std::make_pair(/* nodeNo = */ 1, /* depth = */ 0));
133 2 : int nCountWrittenFeaturesSinceLastFlush = 0;
134 50 : while (!aNodeNoDepthPair.empty())
135 : {
136 48 : const auto &oLastPair = aNodeNoDepthPair.back();
137 48 : const int64_t nNodeNo = oLastPair.first;
138 48 : const int nCurDepth = oLastPair.second;
139 : //CPLDebug("PARQUET", "Reading nodeNode=%d, curDepth=%d", int(nNodeNo), nCurDepth);
140 48 : aNodeNoDepthPair.pop_back();
141 :
142 48 : auto poRTreeLayer = std::unique_ptr<OGRLayer>(m_poTmpGPKG->ExecuteSQL(
143 : CPLSPrintf("SELECT data FROM rtree_tmp_geom_node WHERE nodeno "
144 : "= " CPL_FRMT_GIB,
145 : static_cast<GIntBig>(nNodeNo)),
146 48 : nullptr, nullptr));
147 48 : if (!poRTreeLayer)
148 : {
149 0 : CPLError(CE_Failure, CPLE_AppDefined,
150 : "Cannot read node " CPL_FRMT_GIB,
151 : static_cast<GIntBig>(nNodeNo));
152 0 : return false;
153 : }
154 : const auto poRTreeFeature =
155 48 : std::unique_ptr<const OGRFeature>(poRTreeLayer->GetNextFeature());
156 48 : if (!poRTreeFeature)
157 : {
158 0 : CPLError(CE_Failure, CPLE_AppDefined,
159 : "Cannot read node " CPL_FRMT_GIB,
160 : static_cast<GIntBig>(nNodeNo));
161 0 : return false;
162 : }
163 :
164 48 : int nNodeBytes = 0;
165 : const GByte *pabyNodeData =
166 48 : poRTreeFeature->GetFieldAsBinary(0, &nNodeBytes);
167 48 : constexpr int BLOB_HEADER_SIZE = 4;
168 48 : if (nNodeBytes < BLOB_HEADER_SIZE)
169 : {
170 0 : CPLError(CE_Failure, CPLE_AppDefined,
171 : "Not enough bytes when reading node " CPL_FRMT_GIB,
172 : static_cast<GIntBig>(nNodeNo));
173 0 : return false;
174 : }
175 48 : if (nNodeNo == 1)
176 : {
177 : // Get the RTree depth from the root node
178 2 : nTreeDepth = (pabyNodeData[0] << 8) | pabyNodeData[1];
179 : //CPLDebug("PARQUET", "nTreeDepth = %d", nTreeDepth);
180 : }
181 :
182 48 : const int nCellCount = (pabyNodeData[2] << 8) | pabyNodeData[3];
183 48 : constexpr int SIZEOF_CELL = 24; // int64_t + 4 float
184 48 : if (nNodeBytes < BLOB_HEADER_SIZE + SIZEOF_CELL * nCellCount)
185 : {
186 0 : CPLError(CE_Failure, CPLE_AppDefined,
187 : "Not enough bytes when reading node " CPL_FRMT_GIB,
188 : static_cast<GIntBig>(nNodeNo));
189 0 : return false;
190 : }
191 :
192 48 : size_t nOffset = BLOB_HEADER_SIZE;
193 48 : if (nCurDepth == nTreeDepth)
194 : {
195 : // Leaf node: it references feature IDs.
196 :
197 : // If we are about to go above m_nRowGroupSize, flush past
198 : // features now, to improve the spatial compacity of the row group.
199 46 : if (m_nRowGroupSize > nCellCount &&
200 46 : nCountWrittenFeaturesSinceLastFlush + nCellCount >
201 46 : m_nRowGroupSize)
202 : {
203 14 : nCountWrittenFeaturesSinceLastFlush = 0;
204 14 : if (!FlushFeatures())
205 : {
206 0 : return false;
207 : }
208 : }
209 :
210 : // nCellCount shouldn't be over 51 normally, but even 65535
211 : // would be fine...
212 46 : assert(nCellCount <= 65535);
213 1248 : for (int i = 0; i < nCellCount; ++i)
214 : {
215 : int64_t nFID;
216 1202 : memcpy(&nFID, pabyNodeData + nOffset, sizeof(int64_t));
217 1202 : CPL_MSBPTR64(&nFID);
218 :
219 : const auto poSrcFeature = std::unique_ptr<const OGRFeature>(
220 1202 : m_poTmpGPKGLayer->GetFeature(nFID));
221 1202 : if (!poSrcFeature)
222 : {
223 0 : CPLError(CE_Failure, CPLE_AppDefined,
224 : "Cannot get feature " CPL_FRMT_GIB,
225 : static_cast<GIntBig>(nFID));
226 0 : return false;
227 : }
228 :
229 1202 : int nBytesFeature = 0;
230 : const GByte *pabyFeatureData =
231 1202 : poSrcFeature->GetFieldAsBinary(0, &nBytesFeature);
232 1202 : if (!oFeat.DeserializeFromBinary(pabyFeatureData,
233 : nBytesFeature))
234 : {
235 0 : CPLError(CE_Failure, CPLE_AppDefined,
236 : "Cannot deserialize feature");
237 0 : return false;
238 : }
239 1202 : if (OGRArrowWriterLayer::ICreateFeature(&oFeat) != OGRERR_NONE)
240 : {
241 0 : return false;
242 : }
243 :
244 1202 : nOffset += SIZEOF_CELL;
245 :
246 1202 : ++nCountWrittenFeaturesSinceLastFlush;
247 :
248 1202 : if ((m_nFeatureCount % PROGRESS_FC_INTERVAL) == 0 ||
249 1202 : m_nFeatureCount == m_nTmpFeatureCount / 2)
250 : {
251 2 : CPLDebugProgress(
252 : "PARQUET",
253 : "CopyTmpGpkgLayerToFinalFile(): %.02f%% progress",
254 2 : 100.0 * double(m_nFeatureCount) /
255 2 : double(m_nTmpFeatureCount));
256 : }
257 : }
258 : }
259 : else
260 : {
261 : // Non-leaf node: it references child nodes.
262 :
263 : // nCellCount shouldn't be over 51 normally, but even 65535
264 : // would be fine...
265 2 : assert(nCellCount <= 65535);
266 48 : for (int i = 0; i < nCellCount; ++i)
267 : {
268 : int64_t nNode;
269 46 : memcpy(&nNode, pabyNodeData + nOffset, sizeof(int64_t));
270 46 : CPL_MSBPTR64(&nNode);
271 : aNodeNoDepthPair.emplace_back(
272 46 : std::make_pair(nNode, nCurDepth + 1));
273 46 : nOffset += SIZEOF_CELL;
274 : }
275 : }
276 : }
277 :
278 2 : CPLDebug("PARQUET",
279 : "CopyTmpGpkgLayerToFinalFile(): 100%%, successfully finished");
280 2 : return true;
281 : }
282 :
283 : /************************************************************************/
284 : /* IsSupportedGeometryType() */
285 : /************************************************************************/
286 :
287 271 : bool OGRParquetWriterLayer::IsSupportedGeometryType(
288 : OGRwkbGeometryType eGType) const
289 : {
290 271 : const auto eFlattenType = wkbFlatten(eGType);
291 271 : if (!OGR_GT_HasM(eGType) && eFlattenType <= wkbGeometryCollection)
292 : {
293 270 : return true;
294 : }
295 :
296 : const auto osConfigOptionName =
297 3 : "OGR_" + GetDriverUCName() + "_ALLOW_ALL_DIMS";
298 1 : if (CPLTestBool(CPLGetConfigOption(osConfigOptionName.c_str(), "NO")))
299 : {
300 0 : return true;
301 : }
302 :
303 1 : CPLError(CE_Failure, CPLE_NotSupported,
304 : "Only 2D and Z geometry types are supported (unless the "
305 : "%s configuration option is set to YES)",
306 : osConfigOptionName.c_str());
307 1 : return false;
308 : }
309 :
310 : /************************************************************************/
311 : /* SetOptions() */
312 : /************************************************************************/
313 :
314 272 : bool OGRParquetWriterLayer::SetOptions(CSLConstList papszOptions,
315 : const OGRSpatialReference *poSpatialRef,
316 : OGRwkbGeometryType eGType)
317 : {
318 272 : const char *pszWriteCoveringBBox = CSLFetchNameValueDef(
319 : papszOptions, "WRITE_COVERING_BBOX",
320 : CPLGetConfigOption("OGR_PARQUET_WRITE_COVERING_BBOX", nullptr));
321 272 : m_bWriteBBoxStruct =
322 272 : pszWriteCoveringBBox == nullptr || CPLTestBool(pszWriteCoveringBBox);
323 :
324 272 : if (CPLTestBool(CSLFetchNameValueDef(papszOptions, "SORT_BY_BBOX", "NO")))
325 : {
326 6 : const std::string osTmpGPKG(std::string(m_poDataset->GetDescription()) +
327 3 : ".tmp.gpkg");
328 3 : auto poGPKGDrv = GetGDALDriverManager()->GetDriverByName("GPKG");
329 3 : if (!poGPKGDrv)
330 : {
331 1 : CPLError(
332 : CE_Failure, CPLE_AppDefined,
333 : "Driver GPKG required for SORT_BY_BBOX layer creation option");
334 1 : return false;
335 : }
336 2 : m_poTmpGPKG.reset(poGPKGDrv->Create(osTmpGPKG.c_str(), 0, 0, 0,
337 : GDT_Unknown, nullptr));
338 2 : if (!m_poTmpGPKG)
339 0 : return false;
340 2 : m_poTmpGPKG->MarkSuppressOnClose();
341 2 : m_poTmpGPKGLayer = m_poTmpGPKG->CreateLayer("tmp");
342 2 : if (!m_poTmpGPKGLayer)
343 0 : return false;
344 : // Serialized feature
345 2 : m_poTmpGPKGLayer->CreateField(
346 2 : std::make_unique<OGRFieldDefn>("serialized_feature", OFTBinary)
347 2 : .get());
348 2 : CPL_IGNORE_RET_VAL(m_poTmpGPKGLayer->StartTransaction());
349 : }
350 :
351 : const char *pszGeomEncoding =
352 271 : CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING");
353 271 : m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
354 271 : if (pszGeomEncoding)
355 : {
356 148 : if (EQUAL(pszGeomEncoding, "WKB"))
357 0 : m_eGeomEncoding = OGRArrowGeomEncoding::WKB;
358 148 : else if (EQUAL(pszGeomEncoding, "WKT"))
359 8 : m_eGeomEncoding = OGRArrowGeomEncoding::WKT;
360 140 : else if (EQUAL(pszGeomEncoding, "GEOARROW_INTERLEAVED"))
361 : {
362 28 : CPLErrorOnce(
363 : CE_Warning, CPLE_AppDefined,
364 : "Use of GEOMETRY_ENCODING=GEOARROW_INTERLEAVED is not "
365 : "recommended. "
366 : "GeoParquet 1.1 uses GEOMETRY_ENCODING=GEOARROW (struct) "
367 : "instead.");
368 28 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC;
369 : }
370 112 : else if (EQUAL(pszGeomEncoding, "GEOARROW") ||
371 0 : EQUAL(pszGeomEncoding, "GEOARROW_STRUCT"))
372 112 : m_eGeomEncoding = OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC;
373 : else
374 : {
375 0 : CPLError(CE_Failure, CPLE_NotSupported,
376 : "Unsupported GEOMETRY_ENCODING = %s", pszGeomEncoding);
377 0 : return false;
378 : }
379 : }
380 :
381 : const char *pszCoordPrecision =
382 271 : CSLFetchNameValue(papszOptions, "COORDINATE_PRECISION");
383 271 : if (pszCoordPrecision)
384 0 : m_nWKTCoordinatePrecision = atoi(pszCoordPrecision);
385 :
386 271 : m_bForceCounterClockwiseOrientation =
387 271 : EQUAL(CSLFetchNameValueDef(papszOptions, "POLYGON_ORIENTATION",
388 : "COUNTERCLOCKWISE"),
389 : "COUNTERCLOCKWISE");
390 :
391 271 : if (eGType != wkbNone)
392 : {
393 245 : if (!IsSupportedGeometryType(eGType))
394 : {
395 1 : return false;
396 : }
397 :
398 244 : m_poFeatureDefn->SetGeomType(eGType);
399 244 : auto eGeomEncoding = m_eGeomEncoding;
400 244 : if (eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_FSL_GENERIC ||
401 216 : eGeomEncoding == OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC)
402 : {
403 140 : const auto eEncodingType = eGeomEncoding;
404 140 : eGeomEncoding = GetPreciseArrowGeomEncoding(eEncodingType, eGType);
405 140 : if (eGeomEncoding == eEncodingType)
406 0 : return false;
407 : }
408 244 : m_aeGeomEncoding.push_back(eGeomEncoding);
409 244 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetName(
410 : CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry"));
411 244 : if (poSpatialRef)
412 : {
413 27 : auto poSRS = poSpatialRef->Clone();
414 27 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS);
415 27 : poSRS->Release();
416 : }
417 : }
418 :
419 270 : m_osFIDColumn = CSLFetchNameValueDef(papszOptions, "FID", "");
420 :
421 270 : const char *pszCompression = CSLFetchNameValue(papszOptions, "COMPRESSION");
422 270 : if (pszCompression == nullptr)
423 : {
424 792 : auto oResult = arrow::util::Codec::GetCompressionType("snappy");
425 264 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
426 : {
427 264 : pszCompression = "SNAPPY";
428 : }
429 : else
430 : {
431 0 : pszCompression = "NONE";
432 : }
433 : }
434 :
435 270 : if (EQUAL(pszCompression, "NONE"))
436 0 : pszCompression = "UNCOMPRESSED";
437 : auto oResult = arrow::util::Codec::GetCompressionType(
438 540 : CPLString(pszCompression).tolower());
439 270 : if (!oResult.ok())
440 : {
441 1 : CPLError(CE_Failure, CPLE_NotSupported,
442 : "Unrecognized compression method: %s", pszCompression);
443 1 : return false;
444 : }
445 269 : m_eCompression = *oResult;
446 269 : if (!arrow::util::Codec::IsAvailable(m_eCompression))
447 : {
448 0 : CPLError(CE_Failure, CPLE_NotSupported,
449 : "Compression method %s is known, but libarrow has not "
450 : "been built with support for it",
451 : pszCompression);
452 0 : return false;
453 : }
454 269 : m_oWriterPropertiesBuilder.compression(m_eCompression);
455 :
456 : const char *pszCompressionLevel =
457 269 : CSLFetchNameValue(papszOptions, "COMPRESSION_LEVEL");
458 269 : if (pszCompressionLevel)
459 : {
460 2 : const int nCompressionLevel = atoi(pszCompressionLevel);
461 2 : if (nCompressionLevel != DEFAULT_COMPRESSION_LEVEL)
462 2 : m_oWriterPropertiesBuilder.compression_level(nCompressionLevel);
463 : }
464 267 : else if (EQUAL(pszCompression, "ZSTD"))
465 1 : m_oWriterPropertiesBuilder.compression_level(
466 : OGR_PARQUET_ZSTD_DEFAULT_COMPRESSION_LEVEL);
467 :
468 : const std::string osCreator =
469 269 : CSLFetchNameValueDef(papszOptions, "CREATOR", "");
470 269 : if (!osCreator.empty())
471 1 : m_oWriterPropertiesBuilder.created_by(osCreator);
472 : else
473 268 : m_oWriterPropertiesBuilder.created_by("GDAL " GDAL_RELEASE_NAME
474 : ", using " CREATED_BY_VERSION);
475 :
476 : // Undocumented option. Not clear it is useful besides unit test purposes
477 269 : if (!CPLTestBool(CSLFetchNameValueDef(papszOptions, "STATISTICS", "YES")))
478 1 : m_oWriterPropertiesBuilder.disable_statistics();
479 :
480 : #if PARQUET_VERSION_MAJOR >= 12
481 : // Undocumented option. Not clear it is useful to disable it.
482 269 : if (CPLTestBool(CSLFetchNameValueDef(papszOptions, "PAGE_INDEX", "YES")))
483 269 : m_oWriterPropertiesBuilder.enable_write_page_index();
484 : #endif
485 :
486 : const char *pszWriteGeo =
487 269 : CPLGetConfigOption("OGR_PARQUET_WRITE_GEO", nullptr);
488 269 : m_bWriteGeoMetadata = pszWriteGeo == nullptr || CPLTestBool(pszWriteGeo);
489 :
490 269 : if (m_eGeomEncoding == OGRArrowGeomEncoding::WKB && eGType != wkbNone)
491 : {
492 : #if ARROW_VERSION_MAJOR >= 21
493 : const char *pszUseParquetGeoTypes =
494 : CSLFetchNameValueDef(papszOptions, "USE_PARQUET_GEO_TYPES", "NO");
495 : if (EQUAL(pszUseParquetGeoTypes, "ONLY"))
496 : {
497 : m_bUseArrowWKBExtension = true;
498 : if (pszWriteGeo == nullptr)
499 : m_bWriteGeoMetadata = false;
500 : if (pszWriteCoveringBBox == nullptr)
501 : m_bWriteBBoxStruct = false;
502 : }
503 : else
504 : {
505 : m_bUseArrowWKBExtension = CPLTestBool(pszUseParquetGeoTypes);
506 : }
507 : #else
508 103 : m_oWriterPropertiesBuilder.disable_statistics(
509 309 : parquet::schema::ColumnPath::FromDotString(
510 103 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef()));
511 : #endif
512 : }
513 :
514 : const char *pszRowGroupSize =
515 269 : CSLFetchNameValue(papszOptions, "ROW_GROUP_SIZE");
516 269 : if (pszRowGroupSize)
517 : {
518 10 : auto nRowGroupSize = static_cast<int64_t>(atoll(pszRowGroupSize));
519 10 : if (nRowGroupSize > 0)
520 : {
521 10 : if (nRowGroupSize > INT_MAX)
522 0 : nRowGroupSize = INT_MAX;
523 10 : m_nRowGroupSize = nRowGroupSize;
524 : }
525 : }
526 :
527 269 : m_bEdgesSpherical = EQUAL(
528 : CSLFetchNameValueDef(papszOptions, "EDGES", "PLANAR"), "SPHERICAL");
529 :
530 269 : m_bInitializationOK = true;
531 269 : return true;
532 : }
533 :
534 : /************************************************************************/
535 : /* CloseFileWriter() */
536 : /************************************************************************/
537 :
538 269 : bool OGRParquetWriterLayer::CloseFileWriter()
539 : {
540 538 : auto status = m_poFileWriter->Close();
541 269 : if (!status.ok())
542 : {
543 0 : CPLError(CE_Failure, CPLE_AppDefined,
544 : "FileWriter::Close() failed with %s",
545 0 : status.message().c_str());
546 : }
547 538 : return status.ok();
548 : }
549 :
550 : /************************************************************************/
551 : /* GetGeoMetadata() */
552 : /************************************************************************/
553 :
554 269 : std::string OGRParquetWriterLayer::GetGeoMetadata() const
555 : {
556 : // Just for unit testing purposes
557 : const char *pszGeoMetadata =
558 269 : CPLGetConfigOption("OGR_PARQUET_GEO_METADATA", nullptr);
559 269 : if (pszGeoMetadata)
560 16 : return pszGeoMetadata;
561 :
562 253 : if (m_poFeatureDefn->GetGeomFieldCount() != 0 && m_bWriteGeoMetadata)
563 : {
564 472 : CPLJSONObject oRoot;
565 236 : oRoot.Add("version", "1.1.0");
566 236 : oRoot.Add("primary_column",
567 236 : m_poFeatureDefn->GetGeomFieldDefn(0)->GetNameRef());
568 472 : CPLJSONObject oColumns;
569 236 : oRoot.Add("columns", oColumns);
570 489 : for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i)
571 : {
572 253 : const auto poGeomFieldDefn = m_poFeatureDefn->GetGeomFieldDefn(i);
573 506 : CPLJSONObject oColumn;
574 253 : oColumns.Add(poGeomFieldDefn->GetNameRef(), oColumn);
575 253 : oColumn.Add("encoding",
576 253 : GetGeomEncodingAsString(m_aeGeomEncoding[i], true));
577 :
578 253 : if (CPLTestBool(CPLGetConfigOption("OGR_PARQUET_WRITE_CRS", "YES")))
579 : {
580 252 : const auto poSRS = poGeomFieldDefn->GetSpatialRef();
581 252 : if (poSRS)
582 : {
583 52 : OGRSpatialReference oSRSIdentified(IdentifyCRS(poSRS));
584 :
585 : const char *pszAuthName =
586 26 : oSRSIdentified.GetAuthorityName(nullptr);
587 : const char *pszAuthCode =
588 26 : oSRSIdentified.GetAuthorityCode(nullptr);
589 :
590 26 : bool bOmitCRS = false;
591 26 : if (pszAuthName != nullptr && pszAuthCode != nullptr &&
592 25 : ((EQUAL(pszAuthName, "EPSG") &&
593 22 : EQUAL(pszAuthCode, "4326")) ||
594 14 : (EQUAL(pszAuthName, "OGC") &&
595 3 : EQUAL(pszAuthCode, "CRS84"))))
596 : {
597 : // To make things less confusing for non-geo-aware
598 : // consumers, omit EPSG:4326 / OGC:CRS84 CRS by default
599 14 : bOmitCRS = CPLTestBool(CPLGetConfigOption(
600 : "OGR_PARQUET_CRS_OMIT_IF_WGS84", "YES"));
601 : }
602 :
603 26 : if (bOmitCRS)
604 : {
605 : // do nothing
606 : }
607 12 : else if (EQUAL(CPLGetConfigOption(
608 : "OGR_PARQUET_CRS_ENCODING", "PROJJSON"),
609 : "PROJJSON"))
610 : {
611 : // CRS encoded as PROJJSON for GeoParquet >= 0.4.0
612 12 : char *pszPROJJSON = nullptr;
613 12 : oSRSIdentified.exportToPROJJSON(&pszPROJJSON, nullptr);
614 24 : CPLJSONDocument oCRSDoc;
615 12 : CPL_IGNORE_RET_VAL(oCRSDoc.LoadMemory(pszPROJJSON));
616 12 : CPLFree(pszPROJJSON);
617 12 : CPLJSONObject oCRSRoot = oCRSDoc.GetRoot();
618 12 : RemoveIDFromMemberOfEnsembles(oCRSRoot);
619 12 : oColumn.Add("crs", oCRSRoot);
620 : }
621 : else
622 : {
623 : // WKT was used in GeoParquet <= 0.3.0
624 0 : const char *const apszOptions[] = {
625 : "FORMAT=WKT2_2019", "MULTILINE=NO", nullptr};
626 0 : char *pszWKT = nullptr;
627 0 : oSRSIdentified.exportToWkt(&pszWKT, apszOptions);
628 0 : if (pszWKT)
629 0 : oColumn.Add("crs", pszWKT);
630 0 : CPLFree(pszWKT);
631 : }
632 :
633 26 : const double dfCoordEpoch = poSRS->GetCoordinateEpoch();
634 26 : if (dfCoordEpoch > 0)
635 2 : oColumn.Add("epoch", dfCoordEpoch);
636 : }
637 : else
638 : {
639 226 : oColumn.AddNull("crs");
640 : }
641 : }
642 :
643 253 : if (m_bEdgesSpherical)
644 : {
645 1 : oColumn.Add("edges", "spherical");
646 : }
647 :
648 479 : if (m_aoEnvelopes[i].IsInit() &&
649 226 : CPLTestBool(
650 : CPLGetConfigOption("OGR_PARQUET_WRITE_BBOX", "YES")))
651 : {
652 226 : bool bHasZ = false;
653 411 : for (const auto eGeomType : m_oSetWrittenGeometryTypes[i])
654 : {
655 268 : bHasZ = OGR_GT_HasZ(eGeomType);
656 268 : if (bHasZ)
657 83 : break;
658 : }
659 226 : CPLJSONArray oBBOX;
660 226 : oBBOX.Add(m_aoEnvelopes[i].MinX);
661 226 : oBBOX.Add(m_aoEnvelopes[i].MinY);
662 226 : if (bHasZ)
663 83 : oBBOX.Add(m_aoEnvelopes[i].MinZ);
664 226 : oBBOX.Add(m_aoEnvelopes[i].MaxX);
665 226 : oBBOX.Add(m_aoEnvelopes[i].MaxY);
666 226 : if (bHasZ)
667 83 : oBBOX.Add(m_aoEnvelopes[i].MaxZ);
668 226 : oColumn.Add("bbox", oBBOX);
669 : }
670 :
671 : // Bounding box column definition
672 436 : if (m_bWriteBBoxStruct &&
673 183 : CPLTestBool(CPLGetConfigOption(
674 : "OGR_PARQUET_WRITE_COVERING_BBOX_IN_METADATA", "YES")))
675 : {
676 366 : CPLJSONObject oCovering;
677 183 : oColumn.Add("covering", oCovering);
678 366 : CPLJSONObject oBBOX;
679 183 : oCovering.Add("bbox", oBBOX);
680 : const auto AddComponent =
681 2196 : [this, i, &oBBOX](const char *pszComponent)
682 : {
683 732 : CPLJSONArray oArray;
684 732 : oArray.Add(m_apoFieldsBBOX[i]->name());
685 732 : oArray.Add(pszComponent);
686 732 : oBBOX.Add(pszComponent, oArray);
687 732 : };
688 183 : AddComponent("xmin");
689 183 : AddComponent("ymin");
690 183 : AddComponent("xmax");
691 183 : AddComponent("ymax");
692 : }
693 :
694 284 : const auto GetStringGeometryType = [](OGRwkbGeometryType eType)
695 : {
696 284 : const auto eFlattenType = wkbFlatten(eType);
697 284 : std::string osType = "Unknown";
698 284 : if (wkbPoint == eFlattenType)
699 66 : osType = "Point";
700 218 : else if (wkbLineString == eFlattenType)
701 34 : osType = "LineString";
702 184 : else if (wkbPolygon == eFlattenType)
703 55 : osType = "Polygon";
704 129 : else if (wkbMultiPoint == eFlattenType)
705 26 : osType = "MultiPoint";
706 103 : else if (wkbMultiLineString == eFlattenType)
707 29 : osType = "MultiLineString";
708 74 : else if (wkbMultiPolygon == eFlattenType)
709 69 : osType = "MultiPolygon";
710 5 : else if (wkbGeometryCollection == eFlattenType)
711 5 : osType = "GeometryCollection";
712 284 : if (osType != "Unknown")
713 : {
714 : // M and ZM not supported officially currently, but it
715 : // doesn't hurt to anticipate
716 284 : if (OGR_GT_HasZ(eType) && OGR_GT_HasM(eType))
717 8 : osType += " ZM";
718 276 : else if (OGR_GT_HasZ(eType))
719 91 : osType += " Z";
720 185 : else if (OGR_GT_HasM(eType))
721 8 : osType += " M";
722 : }
723 284 : return osType;
724 : };
725 :
726 253 : if (m_bForceCounterClockwiseOrientation)
727 252 : oColumn.Add("orientation", "counterclockwise");
728 :
729 253 : CPLJSONArray oArray;
730 537 : for (const auto eType : m_oSetWrittenGeometryTypes[i])
731 : {
732 284 : oArray.Add(GetStringGeometryType(eType));
733 : }
734 253 : oColumn.Add("geometry_types", oArray);
735 : }
736 :
737 236 : return oRoot.Format(CPLJSONObject::PrettyFormat::Plain);
738 : }
739 17 : return std::string();
740 : }
741 :
742 : /************************************************************************/
743 : /* PerformStepsBeforeFinalFlushGroup() */
744 : /************************************************************************/
745 :
746 269 : void OGRParquetWriterLayer::PerformStepsBeforeFinalFlushGroup()
747 : {
748 269 : if (m_poKeyValueMetadata)
749 : {
750 538 : std::string osGeoMetadata = GetGeoMetadata();
751 538 : auto poTmpSchema = m_poSchema;
752 269 : if (!osGeoMetadata.empty())
753 : {
754 : // HACK: it would be good for Arrow to provide a clean way to alter
755 : // key value metadata before finalizing.
756 : // We need to write metadata at end to write the bounding box.
757 252 : const_cast<arrow::KeyValueMetadata *>(m_poKeyValueMetadata.get())
758 252 : ->Append("geo", osGeoMetadata);
759 :
760 252 : auto kvMetadata = poTmpSchema->metadata()
761 11 : ? poTmpSchema->metadata()->Copy()
762 263 : : std::make_shared<arrow::KeyValueMetadata>();
763 252 : kvMetadata->Append("geo", std::move(osGeoMetadata));
764 252 : poTmpSchema = poTmpSchema->WithMetadata(kvMetadata);
765 : }
766 :
767 269 : if (CPLTestBool(
768 : CPLGetConfigOption("OGR_PARQUET_WRITE_ARROW_SCHEMA", "YES")))
769 : {
770 : auto status =
771 538 : ::arrow::ipc::SerializeSchema(*poTmpSchema, m_poMemoryPool);
772 269 : if (status.ok())
773 : {
774 : // The serialized schema is not UTF-8, which is required for
775 : // Thrift
776 538 : const std::string schema_as_string = (*status)->ToString();
777 : std::string schema_base64 =
778 269 : ::arrow::util::base64_encode(schema_as_string);
779 269 : static const std::string kArrowSchemaKey = "ARROW:schema";
780 : const_cast<arrow::KeyValueMetadata *>(
781 269 : m_poKeyValueMetadata.get())
782 269 : ->Append(kArrowSchemaKey, std::move(schema_base64));
783 : }
784 : }
785 :
786 : // Put GDAL metadata into a gdal:metadata domain
787 538 : CPLJSONObject oMultiMetadata;
788 269 : bool bHasMultiMetadata = false;
789 275 : auto &l_oMDMD = oMDMD.GetDomainList() && *(oMDMD.GetDomainList())
790 275 : ? oMDMD
791 263 : : m_poDataset->GetMultiDomainMetadata();
792 277 : for (CSLConstList papszDomainIter = l_oMDMD.GetDomainList();
793 277 : papszDomainIter && *papszDomainIter; ++papszDomainIter)
794 : {
795 8 : const char *pszDomain = *papszDomainIter;
796 8 : CSLConstList papszMD = l_oMDMD.GetMetadata(pszDomain);
797 8 : if (STARTS_WITH(pszDomain, "json:") && papszMD && papszMD[0])
798 : {
799 1 : CPLJSONDocument oDoc;
800 1 : if (oDoc.LoadMemory(papszMD[0]))
801 : {
802 1 : bHasMultiMetadata = true;
803 1 : oMultiMetadata.Add(pszDomain, oDoc.GetRoot());
804 1 : continue;
805 0 : }
806 : }
807 7 : else if (STARTS_WITH(pszDomain, "xml:") && papszMD && papszMD[0])
808 : {
809 1 : bHasMultiMetadata = true;
810 1 : oMultiMetadata.Add(pszDomain, papszMD[0]);
811 1 : continue;
812 : }
813 12 : CPLJSONObject oMetadata;
814 6 : bool bHasMetadata = false;
815 12 : for (CSLConstList papszMDIter = papszMD;
816 12 : papszMDIter && *papszMDIter; ++papszMDIter)
817 : {
818 6 : char *pszKey = nullptr;
819 6 : const char *pszValue = CPLParseNameValue(*papszMDIter, &pszKey);
820 6 : if (pszKey && pszValue)
821 : {
822 6 : bHasMetadata = true;
823 6 : bHasMultiMetadata = true;
824 6 : oMetadata.Add(pszKey, pszValue);
825 : }
826 6 : CPLFree(pszKey);
827 : }
828 6 : if (bHasMetadata)
829 6 : oMultiMetadata.Add(pszDomain, oMetadata);
830 : }
831 269 : if (bHasMultiMetadata)
832 : {
833 6 : const_cast<arrow::KeyValueMetadata *>(m_poKeyValueMetadata.get())
834 6 : ->Append(
835 : "gdal:metadata",
836 12 : oMultiMetadata.Format(CPLJSONObject::PrettyFormat::Plain));
837 : }
838 : }
839 269 : }
840 :
841 : /************************************************************************/
842 : /* Open() */
843 : /************************************************************************/
844 :
845 : // Same as parquet::arrow::FileWriter::Open(), except we also
846 : // return KeyValueMetadata
847 : static arrow::Status
848 269 : Open(const ::arrow::Schema &schema, ::arrow::MemoryPool *pool,
849 : std::shared_ptr<::arrow::io::OutputStream> sink,
850 : std::shared_ptr<parquet::WriterProperties> properties,
851 : std::shared_ptr<parquet::ArrowWriterProperties> arrow_properties,
852 : std::unique_ptr<parquet::arrow::FileWriter> *writer,
853 : std::shared_ptr<const arrow::KeyValueMetadata> *outMetadata)
854 : {
855 269 : std::shared_ptr<parquet::SchemaDescriptor> parquet_schema;
856 538 : RETURN_NOT_OK(parquet::arrow::ToParquetSchema(
857 : &schema, *properties, *arrow_properties, &parquet_schema));
858 :
859 : auto schema_node = std::static_pointer_cast<parquet::schema::GroupNode>(
860 538 : parquet_schema->schema_root());
861 :
862 269 : auto metadata = schema.metadata()
863 16 : ? schema.metadata()->Copy()
864 554 : : std::make_shared<arrow::KeyValueMetadata>();
865 269 : *outMetadata = metadata;
866 :
867 269 : std::unique_ptr<parquet::ParquetFileWriter> base_writer;
868 269 : PARQUET_CATCH_NOT_OK(base_writer = parquet::ParquetFileWriter::Open(
869 : std::move(sink), std::move(schema_node),
870 : std::move(properties), metadata));
871 :
872 269 : auto schema_ptr = std::make_shared<::arrow::Schema>(schema);
873 : return parquet::arrow::FileWriter::Make(
874 538 : pool, std::move(base_writer), std::move(schema_ptr),
875 807 : std::move(arrow_properties), writer);
876 : }
877 :
878 : /************************************************************************/
879 : /* CreateSchema() */
880 : /************************************************************************/
881 :
882 269 : void OGRParquetWriterLayer::CreateSchema()
883 : {
884 269 : CreateSchemaCommon();
885 269 : }
886 :
887 : /************************************************************************/
888 : /* CreateGeomField() */
889 : /************************************************************************/
890 :
891 27 : OGRErr OGRParquetWriterLayer::CreateGeomField(const OGRGeomFieldDefn *poField,
892 : int bApproxOK)
893 : {
894 27 : OGRErr eErr = OGRArrowWriterLayer::CreateGeomField(poField, bApproxOK);
895 53 : if (eErr == OGRERR_NONE &&
896 26 : m_aeGeomEncoding.back() == OGRArrowGeomEncoding::WKB
897 : #if ARROW_VERSION_MAJOR < 21
898 : // Geostatistics in Arrow 21 do not support geographic type for now
899 53 : && m_bEdgesSpherical
900 : #endif
901 : )
902 : {
903 0 : m_oWriterPropertiesBuilder.disable_statistics(
904 0 : parquet::schema::ColumnPath::FromDotString(
905 0 : m_poFeatureDefn
906 0 : ->GetGeomFieldDefn(m_poFeatureDefn->GetGeomFieldCount() - 1)
907 : ->GetNameRef()));
908 : }
909 27 : return eErr;
910 : }
911 :
912 : /************************************************************************/
913 : /* CreateWriter() */
914 : /************************************************************************/
915 :
916 269 : void OGRParquetWriterLayer::CreateWriter()
917 : {
918 269 : CPLAssert(m_poFileWriter == nullptr);
919 :
920 269 : if (m_poSchema == nullptr)
921 : {
922 40 : CreateSchema();
923 : }
924 : else
925 : {
926 229 : FinalizeSchema();
927 : }
928 :
929 : auto arrowWriterProperties =
930 269 : parquet::ArrowWriterProperties::Builder().store_schema()->build();
931 807 : CPL_IGNORE_RET_VAL(Open(*m_poSchema, m_poMemoryPool, m_poOutputStream,
932 538 : m_oWriterPropertiesBuilder.build(),
933 269 : std::move(arrowWriterProperties), &m_poFileWriter,
934 : &m_poKeyValueMetadata));
935 269 : }
936 :
937 : /************************************************************************/
938 : /* ICreateFeature() */
939 : /************************************************************************/
940 :
941 3091 : OGRErr OGRParquetWriterLayer::ICreateFeature(OGRFeature *poFeature)
942 : {
943 : // If not using SORT_BY_BBOX=YES layer creation option, we can directly
944 : // write features to the final Parquet file
945 3091 : if (!m_poTmpGPKGLayer)
946 887 : return OGRArrowWriterLayer::ICreateFeature(poFeature);
947 :
948 : // SORT_BY_BBOX=YES case: we write for now a serialized version of poFeature
949 : // in a temporary GeoPackage file.
950 :
951 2204 : GIntBig nFID = poFeature->GetFID();
952 2204 : if (!m_osFIDColumn.empty() && nFID == OGRNullFID)
953 : {
954 1102 : nFID = m_nTmpFeatureCount;
955 1102 : poFeature->SetFID(nFID);
956 : }
957 2204 : ++m_nTmpFeatureCount;
958 :
959 4408 : std::vector<GByte> abyBuffer;
960 : // Serialize the source feature as a single array of bytes to preserve it
961 : // fully
962 2204 : if (!poFeature->SerializeToBinary(abyBuffer))
963 : {
964 0 : return OGRERR_FAILURE;
965 : }
966 :
967 : // SQLite3 limitation: a row must fit in slightly less than 1 GB.
968 2204 : constexpr int SOME_MARGIN = 128;
969 2204 : if (abyBuffer.size() > 1024 * 1024 * 1024 - SOME_MARGIN)
970 : {
971 0 : CPLError(CE_Failure, CPLE_NotSupported,
972 : "Features larger than 1 GB are not supported");
973 0 : return OGRERR_FAILURE;
974 : }
975 :
976 4408 : OGRFeature oFeat(m_poTmpGPKGLayer->GetLayerDefn());
977 2204 : oFeat.SetFID(nFID);
978 2204 : oFeat.SetField(0, static_cast<int>(abyBuffer.size()), abyBuffer.data());
979 2204 : const auto poSrcGeom = poFeature->GetGeometryRef();
980 2204 : if (poSrcGeom && !poSrcGeom->IsEmpty())
981 : {
982 : // For the purpose of building an RTree, just use the bounding box of
983 : // the geometry as the geometry.
984 1202 : OGREnvelope sEnvelope;
985 1202 : poSrcGeom->getEnvelope(&sEnvelope);
986 2404 : auto poPoly = std::make_unique<OGRPolygon>();
987 2404 : auto poLR = std::make_unique<OGRLinearRing>();
988 1202 : poLR->addPoint(sEnvelope.MinX, sEnvelope.MinY);
989 1202 : poLR->addPoint(sEnvelope.MinX, sEnvelope.MaxY);
990 1202 : poLR->addPoint(sEnvelope.MaxX, sEnvelope.MaxY);
991 1202 : poLR->addPoint(sEnvelope.MaxX, sEnvelope.MinY);
992 1202 : poLR->addPoint(sEnvelope.MinX, sEnvelope.MinY);
993 1202 : poPoly->addRingDirectly(poLR.release());
994 1202 : oFeat.SetGeometryDirectly(poPoly.release());
995 : }
996 2204 : return m_poTmpGPKGLayer->CreateFeature(&oFeat);
997 : }
998 :
999 : /************************************************************************/
1000 : /* FlushGroup() */
1001 : /************************************************************************/
1002 :
1003 253 : bool OGRParquetWriterLayer::FlushGroup()
1004 : {
1005 : #if PARQUET_VERSION_MAJOR >= 20
1006 : auto status = m_poFileWriter->NewRowGroup();
1007 : #else
1008 506 : auto status = m_poFileWriter->NewRowGroup(m_apoBuilders[0]->length());
1009 : #endif
1010 253 : if (!status.ok())
1011 : {
1012 0 : CPLError(CE_Failure, CPLE_AppDefined, "NewRowGroup() failed with %s",
1013 0 : status.message().c_str());
1014 0 : ClearArrayBuilers();
1015 0 : return false;
1016 : }
1017 :
1018 253 : auto ret = WriteArrays(
1019 1025 : [this](const std::shared_ptr<arrow::Field> &field,
1020 1025 : const std::shared_ptr<arrow::Array> &array)
1021 : {
1022 2050 : auto l_status = m_poFileWriter->WriteColumnChunk(*array);
1023 1025 : if (!l_status.ok())
1024 : {
1025 0 : CPLError(CE_Failure, CPLE_AppDefined,
1026 : "WriteColumnChunk() failed for field %s: %s",
1027 0 : field->name().c_str(), l_status.message().c_str());
1028 0 : return false;
1029 : }
1030 1025 : return true;
1031 : });
1032 :
1033 253 : ClearArrayBuilers();
1034 253 : return ret;
1035 : }
1036 :
1037 : /************************************************************************/
1038 : /* FixupWKBGeometryBeforeWriting() */
1039 : /************************************************************************/
1040 :
1041 43 : void OGRParquetWriterLayer::FixupWKBGeometryBeforeWriting(GByte *pabyWkb,
1042 : size_t nLen)
1043 : {
1044 43 : if (!m_bForceCounterClockwiseOrientation)
1045 0 : return;
1046 :
1047 43 : OGRWKBFixupCounterClockWiseExternalRing(pabyWkb, nLen);
1048 : }
1049 :
1050 : /************************************************************************/
1051 : /* FixupGeometryBeforeWriting() */
1052 : /************************************************************************/
1053 :
1054 1354 : void OGRParquetWriterLayer::FixupGeometryBeforeWriting(OGRGeometry *poGeom)
1055 : {
1056 1354 : if (!m_bForceCounterClockwiseOrientation)
1057 3 : return;
1058 :
1059 1351 : const auto eFlattenType = wkbFlatten(poGeom->getGeometryType());
1060 : // Polygon rings MUST follow the right-hand rule for orientation
1061 : // (counterclockwise external rings, clockwise internal rings)
1062 1351 : if (eFlattenType == wkbPolygon)
1063 : {
1064 64 : bool bFirstRing = true;
1065 131 : for (auto poRing : poGeom->toPolygon())
1066 : {
1067 75 : if ((bFirstRing && poRing->isClockwise()) ||
1068 8 : (!bFirstRing && !poRing->isClockwise()))
1069 : {
1070 62 : poRing->reversePoints();
1071 : }
1072 67 : bFirstRing = false;
1073 : }
1074 : }
1075 1287 : else if (eFlattenType == wkbMultiPolygon ||
1076 : eFlattenType == wkbGeometryCollection)
1077 : {
1078 35 : for (auto poSubGeom : poGeom->toGeometryCollection())
1079 : {
1080 21 : FixupGeometryBeforeWriting(poSubGeom);
1081 : }
1082 : }
1083 : }
1084 :
1085 : /************************************************************************/
1086 : /* WriteArrowBatch() */
1087 : /************************************************************************/
1088 :
1089 : #if PARQUET_VERSION_MAJOR > 10
1090 : inline bool
1091 14 : OGRParquetWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema,
1092 : struct ArrowArray *array,
1093 : CSLConstList papszOptions)
1094 : {
1095 14 : if (m_poTmpGPKGLayer)
1096 : {
1097 : // When using SORT_BY_BBOX=YES option, we can't directly write the
1098 : // input array, because we need to sort features. Hence we fallback
1099 : // to the OGRLayer base implementation, which will ultimately call
1100 : // OGRParquetWriterLayer::ICreateFeature()
1101 0 : return OGRLayer::WriteArrowBatch(schema, array, papszOptions);
1102 : }
1103 :
1104 28 : return WriteArrowBatchInternal(
1105 : schema, array, papszOptions,
1106 28 : [this](const std::shared_ptr<arrow::RecordBatch> &poBatch)
1107 : {
1108 28 : auto status = m_poFileWriter->NewBufferedRowGroup();
1109 14 : if (!status.ok())
1110 : {
1111 0 : CPLError(CE_Failure, CPLE_AppDefined,
1112 : "NewBufferedRowGroup() failed with %s",
1113 0 : status.message().c_str());
1114 0 : return false;
1115 : }
1116 :
1117 14 : status = m_poFileWriter->WriteRecordBatch(*poBatch);
1118 14 : if (!status.ok())
1119 : {
1120 0 : CPLError(CE_Failure, CPLE_AppDefined,
1121 : "WriteRecordBatch() failed: %s",
1122 0 : status.message().c_str());
1123 0 : return false;
1124 : }
1125 :
1126 14 : return true;
1127 14 : });
1128 : }
1129 : #endif
1130 :
1131 : /************************************************************************/
1132 : /* TestCapability() */
1133 : /************************************************************************/
1134 :
1135 491 : inline int OGRParquetWriterLayer::TestCapability(const char *pszCap)
1136 : {
1137 : #if PARQUET_VERSION_MAJOR <= 10
1138 : if (EQUAL(pszCap, OLCFastWriteArrowBatch))
1139 : return false;
1140 : #endif
1141 :
1142 491 : if (m_poTmpGPKGLayer && EQUAL(pszCap, OLCFastWriteArrowBatch))
1143 : {
1144 : // When using SORT_BY_BBOX=YES option, we can't directly write the
1145 : // input array, because we need to sort features. So this is not
1146 : // fast
1147 1 : return false;
1148 : }
1149 :
1150 490 : return OGRArrowWriterLayer::TestCapability(pszCap);
1151 : }
1152 :
1153 : /************************************************************************/
1154 : /* CreateFieldFromArrowSchema() */
1155 : /************************************************************************/
1156 :
1157 : #if PARQUET_VERSION_MAJOR > 10
1158 396 : bool OGRParquetWriterLayer::CreateFieldFromArrowSchema(
1159 : const struct ArrowSchema *schema, CSLConstList papszOptions)
1160 : {
1161 396 : if (m_poTmpGPKGLayer)
1162 : {
1163 : // When using SORT_BY_BBOX=YES option, we can't directly write the
1164 : // input array, because we need to sort features. But this process
1165 : // only supports the base Arrow types supported by
1166 : // OGRLayer::WriteArrowBatch()
1167 0 : return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions);
1168 : }
1169 :
1170 396 : return OGRArrowWriterLayer::CreateFieldFromArrowSchema(schema,
1171 396 : papszOptions);
1172 : }
1173 : #endif
1174 :
1175 : /************************************************************************/
1176 : /* IsArrowSchemaSupported() */
1177 : /************************************************************************/
1178 :
1179 : #if PARQUET_VERSION_MAJOR > 10
1180 1077 : bool OGRParquetWriterLayer::IsArrowSchemaSupported(
1181 : const struct ArrowSchema *schema, CSLConstList papszOptions,
1182 : std::string &osErrorMsg) const
1183 : {
1184 1077 : if (m_poTmpGPKGLayer)
1185 : {
1186 : // When using SORT_BY_BBOX=YES option, we can't directly write the
1187 : // input array, because we need to sort features. But this process
1188 : // only supports the base Arrow types supported by
1189 : // OGRLayer::WriteArrowBatch()
1190 0 : return OGRLayer::IsArrowSchemaSupported(schema, papszOptions,
1191 0 : osErrorMsg);
1192 : }
1193 :
1194 1077 : if (schema->format[0] == 'e' && schema->format[1] == 0)
1195 : {
1196 1 : osErrorMsg = "float16 not supported";
1197 1 : return false;
1198 : }
1199 1076 : if (schema->format[0] == 'v' && schema->format[1] == 'u')
1200 : {
1201 1 : osErrorMsg = "StringView not supported";
1202 1 : return false;
1203 : }
1204 1075 : if (schema->format[0] == 'v' && schema->format[1] == 'z')
1205 : {
1206 1 : osErrorMsg = "BinaryView not supported";
1207 1 : return false;
1208 : }
1209 1074 : if (schema->format[0] == '+' && schema->format[1] == 'v')
1210 : {
1211 0 : if (schema->format[2] == 'l')
1212 : {
1213 0 : osErrorMsg = "ListView not supported";
1214 0 : return false;
1215 : }
1216 0 : else if (schema->format[2] == 'L')
1217 : {
1218 0 : osErrorMsg = "LargeListView not supported";
1219 0 : return false;
1220 : }
1221 : }
1222 2136 : for (int64_t i = 0; i < schema->n_children; ++i)
1223 : {
1224 1065 : if (!IsArrowSchemaSupported(schema->children[i], papszOptions,
1225 : osErrorMsg))
1226 : {
1227 3 : return false;
1228 : }
1229 : }
1230 1071 : return true;
1231 : }
1232 : #endif
1233 :
1234 : /************************************************************************/
1235 : /* SetMetadata() */
1236 : /************************************************************************/
1237 :
1238 11 : CPLErr OGRParquetWriterLayer::SetMetadata(char **papszMetadata,
1239 : const char *pszDomain)
1240 : {
1241 11 : if (!pszDomain || !EQUAL(pszDomain, "SHAPEFILE"))
1242 : {
1243 7 : return OGRLayer::SetMetadata(papszMetadata, pszDomain);
1244 : }
1245 4 : return CE_None;
1246 : }
1247 :
1248 : /************************************************************************/
1249 : /* GetDataset() */
1250 : /************************************************************************/
1251 :
1252 23 : GDALDataset *OGRParquetWriterLayer::GetDataset()
1253 : {
1254 23 : return m_poDataset;
1255 : }
|