Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "gdal_pam.h"
14 : #include "ogrsf_frmts.h"
15 :
16 : #include <map>
17 : #include <mutex>
18 :
19 : #include "ogr_feather.h"
20 : #include "../arrow_common/ograrrowrandomaccessfile.h"
21 : #include "../arrow_common/ograrrowwritablefile.h"
22 : #include "../arrow_common/ograrrowdataset.hpp"
23 :
24 : #include "ogrfeatherdrivercore.h"
25 :
26 : /************************************************************************/
27 : /* IsArrowIPCStream() */
28 : /************************************************************************/
29 :
30 548 : static bool IsArrowIPCStream(GDALOpenInfo *poOpenInfo)
31 : {
32 : // WARNING: if making changes in that method, reflect them in
33 : // OGRFeatherDriverIsArrowIPCStreamBasic() in ogrfeatherdrivercore.cpp
34 :
35 548 : if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
36 3 : return true;
37 :
38 545 : constexpr int CONTINUATION_SIZE = 4; // 0xFFFFFFFF
39 545 : constexpr int METADATA_SIZE_SIZE = 4;
40 :
41 : // See
42 : // https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
43 545 : if (poOpenInfo->fpL != nullptr &&
44 545 : poOpenInfo->nHeaderBytes >= CONTINUATION_SIZE + METADATA_SIZE_SIZE &&
45 545 : memcmp(poOpenInfo->pabyHeader, "\xFF\xFF\xFF\xFF", CONTINUATION_SIZE) ==
46 : 0)
47 : {
48 9 : const char *pszExt = poOpenInfo->osExtension.c_str();
49 9 : if (EQUAL(pszExt, "arrows") || EQUAL(pszExt, "ipc"))
50 3 : return true;
51 :
52 6 : const uint32_t nMetadataSize =
53 6 : CPL_LSBUINT32PTR(poOpenInfo->pabyHeader + CONTINUATION_SIZE);
54 6 : if (strcmp(poOpenInfo->pszFilename, "/vsistdin/") == 0)
55 : {
56 1 : if (poOpenInfo->IsSingleAllowedDriver("ARROW"))
57 1 : return true;
58 :
59 : // Padding after metadata and before body is not necessarily present
60 : // but the body must be at least 4 bytes
61 0 : constexpr int PADDING_MAX_SIZE = 4;
62 :
63 : // /vsistdin/ cannot seek back beyond first MB
64 0 : if (nMetadataSize >
65 : 1024 * 1024 -
66 : (CONTINUATION_SIZE + METADATA_SIZE_SIZE + PADDING_MAX_SIZE))
67 : {
68 0 : return false;
69 : }
70 0 : const int nSizeToRead = CONTINUATION_SIZE + METADATA_SIZE_SIZE +
71 0 : nMetadataSize + PADDING_MAX_SIZE;
72 0 : if (!poOpenInfo->TryToIngest(nSizeToRead))
73 : {
74 0 : return false;
75 : }
76 :
77 : const std::string osTmpFilename(
78 0 : VSIMemGenerateHiddenFilename("arrow"));
79 : auto fp = VSIVirtualHandleUniquePtr(VSIFileFromMemBuffer(
80 : osTmpFilename.c_str(), poOpenInfo->pabyHeader, nSizeToRead,
81 0 : false));
82 : auto infile = std::make_shared<OGRArrowRandomAccessFile>(
83 0 : osTmpFilename.c_str(), std::move(fp));
84 0 : auto options = arrow::ipc::IpcReadOptions::Defaults();
85 : auto result =
86 0 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
87 0 : CPLDebug("ARROW", "RecordBatchStreamReader::Open(): %s",
88 0 : result.status().message().c_str());
89 0 : VSIUnlink(osTmpFilename.c_str());
90 0 : return result.ok();
91 : }
92 :
93 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_END);
94 5 : const auto nFileSize = VSIFTellL(poOpenInfo->fpL);
95 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
96 5 : if (nMetadataSize >
97 5 : nFileSize - (CONTINUATION_SIZE + METADATA_SIZE_SIZE))
98 0 : return false;
99 :
100 : // Do not give ownership of poOpenInfo->fpL to infile
101 : auto infile = std::make_shared<OGRArrowRandomAccessFile>(
102 10 : poOpenInfo->pszFilename, poOpenInfo->fpL, false);
103 10 : auto options = arrow::ipc::IpcReadOptions::Defaults();
104 : auto result =
105 10 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
106 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
107 5 : return result.ok();
108 : }
109 536 : return false;
110 : }
111 :
112 : /************************************************************************/
113 : /* Open() */
114 : /************************************************************************/
115 :
116 548 : static GDALDataset *OGRFeatherDriverOpen(GDALOpenInfo *poOpenInfo)
117 : {
118 548 : if (poOpenInfo->eAccess == GA_Update)
119 : {
120 0 : return nullptr;
121 : }
122 :
123 : #if ARROW_VERSION_MAJOR >= 21
124 : // Register geoarrow.wkb extension only if requested for Arrow driver
125 : if (CPLTestBool(CPLGetConfigOption(
126 : "OGR_ARROW_REGISTER_GEOARROW_WKB_EXTENSION", "NO")) &&
127 : arrow::GetExtensionType(EXTENSION_NAME_GEOARROW_WKB))
128 : {
129 : CPL_IGNORE_RET_VAL(arrow::RegisterExtensionType(
130 : std::make_shared<OGRGeoArrowWkbExtensionType>(
131 : std::move(arrow::binary()), std::string())));
132 : }
133 : #endif
134 :
135 548 : GDALOpenInfo *poOpenInfoForIdentify = poOpenInfo;
136 548 : std::unique_ptr<GDALOpenInfo> poOpenInfoTmp;
137 548 : if (STARTS_WITH(poOpenInfo->pszFilename, "gdalvsi://"))
138 : {
139 1 : poOpenInfoTmp = std::make_unique<GDALOpenInfo>(poOpenInfo->pszFilename +
140 : strlen("gdalvsi://"),
141 1 : poOpenInfo->nOpenFlags);
142 1 : poOpenInfoForIdentify = poOpenInfoTmp.get();
143 : }
144 :
145 548 : const bool bIsStreamingFormat = IsArrowIPCStream(poOpenInfoForIdentify);
146 1084 : if (!bIsStreamingFormat &&
147 536 : !OGRFeatherDriverIsArrowFileFormat(poOpenInfoForIdentify))
148 : {
149 0 : return nullptr;
150 : }
151 :
152 548 : std::shared_ptr<arrow::io::RandomAccessFile> infile;
153 548 : if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
154 : {
155 3 : const std::string osFilename(poOpenInfo->pszFilename +
156 3 : strlen("ARROW_IPC_STREAM:"));
157 : auto fp =
158 3 : VSIVirtualHandleUniquePtr(VSIFOpenL(osFilename.c_str(), "rb"));
159 3 : if (fp == nullptr)
160 : {
161 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot open %s",
162 : osFilename.c_str());
163 1 : return nullptr;
164 : }
165 4 : infile = std::make_shared<OGRArrowRandomAccessFile>(osFilename.c_str(),
166 4 : std::move(fp));
167 : }
168 953 : else if (STARTS_WITH(poOpenInfo->pszFilename, "/vsi") ||
169 408 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "NO")))
170 : {
171 137 : VSIVirtualHandleUniquePtr fp(poOpenInfo->fpL);
172 137 : poOpenInfo->fpL = nullptr;
173 137 : infile = std::make_shared<OGRArrowRandomAccessFile>(
174 137 : poOpenInfo->pszFilename, std::move(fp));
175 : }
176 : else
177 : {
178 : // FileSystemFromUriOrPath() doesn't like relative paths
179 : // so transform them to absolute.
180 408 : std::string osPath(poOpenInfo->pszFilename);
181 408 : if (CPLIsFilenameRelative(osPath.c_str()))
182 : {
183 407 : char *pszCurDir = CPLGetCurrentDir();
184 407 : if (pszCurDir == nullptr)
185 0 : return nullptr;
186 407 : osPath = CPLFormFilenameSafe(pszCurDir, osPath.c_str(), nullptr);
187 407 : CPLFree(pszCurDir);
188 : }
189 :
190 408 : std::string osFSPath;
191 : auto poFS =
192 816 : arrow::fs::FileSystemFromUriOrPath(osPath.c_str(), &osFSPath);
193 408 : if (!poFS.ok())
194 : {
195 0 : CPLError(CE_Failure, CPLE_AppDefined,
196 : "arrow::fs::FileSystemFromUriOrPath failed with %s",
197 0 : poFS.status().message().c_str());
198 0 : return nullptr;
199 : }
200 408 : auto result = (*poFS)->OpenInputFile(osFSPath);
201 408 : if (!result.ok())
202 : {
203 0 : CPLError(CE_Failure, CPLE_AppDefined,
204 : "OpenInputFile() failed with %s",
205 0 : result.status().message().c_str());
206 0 : return nullptr;
207 : }
208 408 : infile = *result;
209 : }
210 :
211 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
212 1094 : arrow::MemoryPool::CreateDefault().release());
213 1094 : auto options = arrow::ipc::IpcReadOptions::Defaults();
214 547 : options.memory_pool = poMemoryPool.get();
215 :
216 1094 : auto poDS = std::make_unique<OGRFeatherDataset>(poMemoryPool);
217 547 : if (bIsStreamingFormat)
218 : {
219 : auto result =
220 11 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
221 11 : if (!result.ok())
222 : {
223 1 : CPLError(CE_Failure, CPLE_AppDefined,
224 : "RecordBatchStreamReader::Open() failed with %s",
225 1 : result.status().message().c_str());
226 1 : return nullptr;
227 : }
228 20 : auto poRecordBatchStreamReader = *result;
229 10 : const bool bSeekable =
230 18 : !STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:") &&
231 8 : strcmp(poOpenInfo->pszFilename, "/vsistdin/") != 0;
232 20 : std::string osLayername = CPLGetBasenameSafe(poOpenInfo->pszFilename);
233 10 : if (osLayername.empty())
234 1 : osLayername = "layer";
235 : auto poLayer = std::make_unique<OGRFeatherLayer>(
236 10 : poDS.get(), osLayername.c_str(), infile, bSeekable, options,
237 20 : poRecordBatchStreamReader);
238 10 : poDS->SetLayer(std::move(poLayer));
239 :
240 : // Pre-load field domains, as this depends on the first record batch
241 10 : auto poLayerPtr = poDS->GetLayer(0);
242 10 : const auto poFeatureDefn = poLayerPtr->GetLayerDefn();
243 10 : bool bHasReadBatch = false;
244 536 : for (int i = 0; i < poFeatureDefn->GetFieldCount(); ++i)
245 : {
246 526 : const auto poFieldDefn = poFeatureDefn->GetFieldDefn(i);
247 526 : const auto &osDomainName = poFieldDefn->GetDomainName();
248 526 : if (!osDomainName.empty())
249 : {
250 6 : if (!bHasReadBatch)
251 : {
252 6 : bHasReadBatch = true;
253 6 : delete poLayerPtr->GetNextFeature();
254 6 : poLayerPtr->ResetReading();
255 : }
256 6 : poDS->GetFieldDomain(osDomainName);
257 : }
258 : }
259 : }
260 : else
261 : {
262 536 : auto result = arrow::ipc::RecordBatchFileReader::Open(infile, options);
263 536 : if (!result.ok())
264 : {
265 1 : CPLError(CE_Failure, CPLE_AppDefined,
266 : "RecordBatchFileReader::Open() failed with %s",
267 1 : result.status().message().c_str());
268 1 : return nullptr;
269 : }
270 1070 : auto poRecordBatchReader = *result;
271 : auto poLayer = std::make_unique<OGRFeatherLayer>(
272 1070 : poDS.get(), CPLGetBasenameSafe(poOpenInfo->pszFilename).c_str(),
273 535 : poRecordBatchReader);
274 535 : poDS->SetLayer(std::move(poLayer));
275 : }
276 545 : return poDS.release();
277 : }
278 :
279 : /************************************************************************/
280 : /* Create() */
281 : /************************************************************************/
282 :
283 151 : static GDALDataset *OGRFeatherDriverCreate(const char *pszName, int nXSize,
284 : int nYSize, int nBands,
285 : GDALDataType eType,
286 : char ** /* papszOptions */)
287 : {
288 151 : if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
289 0 : return nullptr;
290 :
291 151 : std::shared_ptr<arrow::io::OutputStream> out_file;
292 169 : if (STARTS_WITH(pszName, "/vsi") ||
293 18 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "YES")))
294 : {
295 151 : VSILFILE *fp = VSIFOpenL(pszName, "wb");
296 151 : if (fp == nullptr)
297 : {
298 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
299 1 : return nullptr;
300 : }
301 150 : out_file = std::make_shared<OGRArrowWritableFile>(fp);
302 : }
303 : else
304 : {
305 0 : auto result = arrow::io::FileOutputStream::Open(pszName);
306 0 : if (!result.ok())
307 : {
308 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s: %s", pszName,
309 0 : result.status().message().c_str());
310 0 : return nullptr;
311 : }
312 0 : out_file = *result;
313 : }
314 :
315 150 : return new OGRFeatherWriterDataset(pszName, out_file);
316 : }
317 :
318 : /************************************************************************/
319 : /* OGRFeatherDriver() */
320 : /************************************************************************/
321 :
322 : class OGRFeatherDriver final : public GDALDriver
323 : {
324 : std::mutex m_oMutex{};
325 : bool m_bMetadataInitialized = false;
326 : void InitMetadata();
327 :
328 : public:
329 : const char *GetMetadataItem(const char *pszName,
330 : const char *pszDomain) override;
331 :
332 140 : char **GetMetadata(const char *pszDomain) override
333 : {
334 280 : std::lock_guard oLock(m_oMutex);
335 140 : InitMetadata();
336 280 : return GDALDriver::GetMetadata(pszDomain);
337 : }
338 : };
339 :
340 1597 : const char *OGRFeatherDriver::GetMetadataItem(const char *pszName,
341 : const char *pszDomain)
342 : {
343 3194 : std::lock_guard oLock(m_oMutex);
344 1597 : if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
345 : {
346 271 : InitMetadata();
347 : }
348 3194 : return GDALDriver::GetMetadataItem(pszName, pszDomain);
349 : }
350 :
351 411 : void OGRFeatherDriver::InitMetadata()
352 : {
353 411 : if (m_bMetadataInitialized)
354 401 : return;
355 10 : m_bMetadataInitialized = true;
356 :
357 : CPLXMLTreeCloser oTree(
358 20 : CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
359 :
360 20 : std::vector<const char *> apszCompressionMethods;
361 10 : bool bHasLZ4 = false;
362 30 : for (const char *pszMethod : {"ZSTD", "LZ4"})
363 : {
364 : auto oResult = arrow::util::Codec::GetCompressionType(
365 40 : CPLString(pszMethod).tolower());
366 20 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
367 : {
368 20 : if (EQUAL(pszMethod, "LZ4"))
369 10 : bHasLZ4 = true;
370 20 : apszCompressionMethods.emplace_back(pszMethod);
371 : }
372 : }
373 :
374 : {
375 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
376 10 : CPLAddXMLAttributeAndValue(psOption, "name", "FORMAT");
377 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
378 10 : CPLAddXMLAttributeAndValue(psOption, "description",
379 : "File format variant");
380 30 : for (const char *pszEncoding : {"FILE", "STREAM"})
381 : {
382 20 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
383 20 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
384 : }
385 : }
386 :
387 : {
388 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
389 10 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
390 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
391 10 : CPLAddXMLAttributeAndValue(psOption, "description",
392 : "Compression method");
393 10 : CPLAddXMLAttributeAndValue(psOption, "default",
394 : bHasLZ4 ? "LZ4" : "NONE");
395 : {
396 10 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
397 10 : CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
398 10 : CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
399 : }
400 30 : for (const char *pszMethod : apszCompressionMethods)
401 : {
402 20 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
403 20 : CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
404 : }
405 : }
406 :
407 : {
408 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
409 10 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
410 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
411 10 : CPLAddXMLAttributeAndValue(psOption, "description",
412 : "Encoding of geometry columns");
413 10 : CPLAddXMLAttributeAndValue(psOption, "default", "GEOARROW");
414 40 : for (const char *pszEncoding :
415 50 : {"GEOARROW", "GEOARROW_INTERLEAVED", "WKB", "WKT"})
416 : {
417 40 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
418 40 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
419 40 : if (EQUAL(pszEncoding, "GEOARROW"))
420 10 : CPLAddXMLAttributeAndValue(poValueNode, "alias",
421 : "GEOARROW_STRUCT");
422 : }
423 : }
424 :
425 : {
426 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
427 10 : CPLAddXMLAttributeAndValue(psOption, "name", "BATCH_SIZE");
428 10 : CPLAddXMLAttributeAndValue(psOption, "type", "integer");
429 10 : CPLAddXMLAttributeAndValue(psOption, "description",
430 : "Maximum number of rows per batch");
431 10 : CPLAddXMLAttributeAndValue(psOption, "default", "65536");
432 : }
433 :
434 : {
435 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
436 10 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
437 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
438 10 : CPLAddXMLAttributeAndValue(psOption, "description",
439 : "Name of geometry column");
440 10 : CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
441 : }
442 :
443 : {
444 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
445 10 : CPLAddXMLAttributeAndValue(psOption, "name", "FID");
446 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
447 10 : CPLAddXMLAttributeAndValue(psOption, "description",
448 : "Name of the FID column to create");
449 : }
450 :
451 10 : char *pszXML = CPLSerializeXMLTree(oTree.get());
452 10 : GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
453 10 : CPLFree(pszXML);
454 : }
455 :
456 : /************************************************************************/
457 : /* RegisterOGRArrow() */
458 : /************************************************************************/
459 :
460 14 : void RegisterOGRArrow()
461 : {
462 14 : if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
463 0 : return;
464 :
465 28 : auto poDriver = std::make_unique<OGRFeatherDriver>();
466 :
467 14 : OGRFeatherDriverSetCommonMetadata(poDriver.get());
468 :
469 14 : poDriver->pfnOpen = OGRFeatherDriverOpen;
470 14 : poDriver->pfnCreate = OGRFeatherDriverCreate;
471 :
472 14 : poDriver->SetMetadataItem("ARROW_VERSION", ARROW_VERSION_STRING);
473 :
474 14 : GetGDALDriverManager()->RegisterDriver(poDriver.release());
475 :
476 : #if ARROW_VERSION_MAJOR >= 16
477 : // Mostly for tests
478 : const char *pszPath =
479 14 : CPLGetConfigOption("OGR_ARROW_LOAD_FILE_SYSTEM_FACTORIES", nullptr);
480 14 : if (pszPath)
481 : {
482 0 : auto result = arrow::fs::LoadFileSystemFactories(pszPath);
483 0 : if (!result.ok())
484 : {
485 0 : CPLError(CE_Warning, CPLE_AppDefined,
486 : "arrow::fs::LoadFileSystemFactories() failed with %s",
487 0 : result.message().c_str());
488 : }
489 : }
490 : #endif
491 : }
|