Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "gdal_pam.h"
14 : #include "ogrsf_frmts.h"
15 :
16 : #include <map>
17 : #include <mutex>
18 :
19 : #include "ogr_feather.h"
20 : #include "../arrow_common/ograrrowrandomaccessfile.h"
21 : #include "../arrow_common/ograrrowwritablefile.h"
22 : #include "../arrow_common/ograrrowdataset.hpp"
23 :
24 : #include "ogrfeatherdrivercore.h"
25 :
26 : /************************************************************************/
27 : /* IsArrowIPCStream() */
28 : /************************************************************************/
29 :
30 548 : static bool IsArrowIPCStream(GDALOpenInfo *poOpenInfo)
31 : {
32 : // WARNING: if making changes in that method, reflect them in
33 : // OGRFeatherDriverIsArrowIPCStreamBasic() in ogrfeatherdrivercore.cpp
34 :
35 548 : if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
36 3 : return true;
37 :
38 545 : constexpr int CONTINUATION_SIZE = 4; // 0xFFFFFFFF
39 545 : constexpr int METADATA_SIZE_SIZE = 4;
40 :
41 : // See
42 : // https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
43 545 : if (poOpenInfo->fpL != nullptr &&
44 545 : poOpenInfo->nHeaderBytes >= CONTINUATION_SIZE + METADATA_SIZE_SIZE &&
45 545 : memcmp(poOpenInfo->pabyHeader, "\xFF\xFF\xFF\xFF", CONTINUATION_SIZE) ==
46 : 0)
47 : {
48 9 : const char *pszExt = poOpenInfo->osExtension.c_str();
49 9 : if (EQUAL(pszExt, "arrows") || EQUAL(pszExt, "ipc"))
50 3 : return true;
51 :
52 6 : const uint32_t nMetadataSize =
53 6 : CPL_LSBUINT32PTR(poOpenInfo->pabyHeader + CONTINUATION_SIZE);
54 6 : if (strcmp(poOpenInfo->pszFilename, "/vsistdin/") == 0)
55 : {
56 1 : if (poOpenInfo->IsSingleAllowedDriver("ARROW"))
57 1 : return true;
58 :
59 : // Padding after metadata and before body is not necessarily present
60 : // but the body must be at least 4 bytes
61 0 : constexpr int PADDING_MAX_SIZE = 4;
62 :
63 : // /vsistdin/ cannot seek back beyond first MB
64 0 : if (nMetadataSize >
65 : 1024 * 1024 -
66 : (CONTINUATION_SIZE + METADATA_SIZE_SIZE + PADDING_MAX_SIZE))
67 : {
68 0 : return false;
69 : }
70 0 : const int nSizeToRead = CONTINUATION_SIZE + METADATA_SIZE_SIZE +
71 0 : nMetadataSize + PADDING_MAX_SIZE;
72 0 : if (!poOpenInfo->TryToIngest(nSizeToRead))
73 : {
74 0 : return false;
75 : }
76 :
77 : const std::string osTmpFilename(
78 0 : VSIMemGenerateHiddenFilename("arrow"));
79 : auto fp = VSIVirtualHandleUniquePtr(VSIFileFromMemBuffer(
80 : osTmpFilename.c_str(), poOpenInfo->pabyHeader, nSizeToRead,
81 0 : false));
82 : auto infile = std::make_shared<OGRArrowRandomAccessFile>(
83 0 : osTmpFilename.c_str(), std::move(fp));
84 0 : auto options = arrow::ipc::IpcReadOptions::Defaults();
85 : auto result =
86 0 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
87 0 : CPLDebug("ARROW", "RecordBatchStreamReader::Open(): %s",
88 0 : result.status().message().c_str());
89 0 : VSIUnlink(osTmpFilename.c_str());
90 0 : return result.ok();
91 : }
92 :
93 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_END);
94 5 : const auto nFileSize = VSIFTellL(poOpenInfo->fpL);
95 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
96 5 : if (nMetadataSize >
97 5 : nFileSize - (CONTINUATION_SIZE + METADATA_SIZE_SIZE))
98 0 : return false;
99 :
100 : // Do not give ownership of poOpenInfo->fpL to infile
101 : auto infile = std::make_shared<OGRArrowRandomAccessFile>(
102 10 : poOpenInfo->pszFilename, poOpenInfo->fpL, false);
103 10 : auto options = arrow::ipc::IpcReadOptions::Defaults();
104 : auto result =
105 10 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
106 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
107 5 : return result.ok();
108 : }
109 536 : return false;
110 : }
111 :
112 : /************************************************************************/
113 : /* Open() */
114 : /************************************************************************/
115 :
116 548 : static GDALDataset *OGRFeatherDriverOpen(GDALOpenInfo *poOpenInfo)
117 : {
118 548 : if (poOpenInfo->eAccess == GA_Update)
119 : {
120 0 : return nullptr;
121 : }
122 :
123 : #if ARROW_VERSION_MAJOR >= 21
124 : // Register geoarrow.wkb extension only if requested for Arrow driver
125 : if (CPLTestBool(CPLGetConfigOption(
126 : "OGR_ARROW_REGISTER_GEOARROW_WKB_EXTENSION", "NO")) &&
127 : arrow::GetExtensionType(EXTENSION_NAME_GEOARROW_WKB))
128 : {
129 : CPL_IGNORE_RET_VAL(arrow::RegisterExtensionType(
130 : std::make_shared<OGRGeoArrowWkbExtensionType>(
131 : std::move(arrow::binary()), std::string())));
132 : }
133 : #endif
134 :
135 548 : GDALOpenInfo *poOpenInfoForIdentify = poOpenInfo;
136 548 : std::unique_ptr<GDALOpenInfo> poOpenInfoTmp;
137 548 : if (STARTS_WITH(poOpenInfo->pszFilename, "gdalvsi://"))
138 : {
139 1 : poOpenInfoTmp = std::make_unique<GDALOpenInfo>(poOpenInfo->pszFilename +
140 : strlen("gdalvsi://"),
141 1 : poOpenInfo->nOpenFlags);
142 1 : poOpenInfoForIdentify = poOpenInfoTmp.get();
143 : }
144 :
145 548 : const bool bIsStreamingFormat = IsArrowIPCStream(poOpenInfoForIdentify);
146 1084 : if (!bIsStreamingFormat &&
147 536 : !OGRFeatherDriverIsArrowFileFormat(poOpenInfoForIdentify))
148 : {
149 0 : return nullptr;
150 : }
151 :
152 548 : std::shared_ptr<arrow::io::RandomAccessFile> infile;
153 548 : if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
154 : {
155 3 : const std::string osFilename(poOpenInfo->pszFilename +
156 3 : strlen("ARROW_IPC_STREAM:"));
157 : auto fp =
158 3 : VSIVirtualHandleUniquePtr(VSIFOpenL(osFilename.c_str(), "rb"));
159 3 : if (fp == nullptr)
160 : {
161 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot open %s",
162 : osFilename.c_str());
163 1 : return nullptr;
164 : }
165 4 : infile = std::make_shared<OGRArrowRandomAccessFile>(osFilename.c_str(),
166 4 : std::move(fp));
167 : }
168 953 : else if (STARTS_WITH(poOpenInfo->pszFilename, "/vsi") ||
169 408 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "NO")))
170 : {
171 137 : VSIVirtualHandleUniquePtr fp(poOpenInfo->fpL);
172 137 : poOpenInfo->fpL = nullptr;
173 137 : infile = std::make_shared<OGRArrowRandomAccessFile>(
174 137 : poOpenInfo->pszFilename, std::move(fp));
175 : }
176 : else
177 : {
178 : // FileSystemFromUriOrPath() doesn't like relative paths
179 : // so transform them to absolute.
180 408 : std::string osPath(poOpenInfo->pszFilename);
181 408 : if (CPLIsFilenameRelative(osPath.c_str()))
182 : {
183 407 : char *pszCurDir = CPLGetCurrentDir();
184 407 : if (pszCurDir == nullptr)
185 0 : return nullptr;
186 407 : osPath = CPLFormFilenameSafe(pszCurDir, osPath.c_str(), nullptr);
187 407 : CPLFree(pszCurDir);
188 : }
189 :
190 408 : std::string osFSPath;
191 : auto poFS =
192 816 : arrow::fs::FileSystemFromUriOrPath(osPath.c_str(), &osFSPath);
193 408 : if (!poFS.ok())
194 : {
195 0 : CPLError(CE_Failure, CPLE_AppDefined,
196 : "arrow::fs::FileSystemFromUriOrPath failed with %s",
197 0 : poFS.status().message().c_str());
198 0 : return nullptr;
199 : }
200 408 : auto result = (*poFS)->OpenInputFile(osFSPath);
201 408 : if (!result.ok())
202 : {
203 0 : CPLError(CE_Failure, CPLE_AppDefined,
204 : "OpenInputFile() failed with %s",
205 0 : result.status().message().c_str());
206 0 : return nullptr;
207 : }
208 408 : infile = *result;
209 : }
210 :
211 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
212 1094 : arrow::MemoryPool::CreateDefault().release());
213 1094 : auto options = arrow::ipc::IpcReadOptions::Defaults();
214 547 : options.memory_pool = poMemoryPool.get();
215 :
216 1094 : auto poDS = std::make_unique<OGRFeatherDataset>(poMemoryPool);
217 547 : if (bIsStreamingFormat)
218 : {
219 : auto result =
220 11 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
221 11 : if (!result.ok())
222 : {
223 1 : CPLError(CE_Failure, CPLE_AppDefined,
224 : "RecordBatchStreamReader::Open() failed with %s",
225 1 : result.status().message().c_str());
226 1 : return nullptr;
227 : }
228 20 : auto poRecordBatchStreamReader = *result;
229 10 : const bool bSeekable =
230 18 : !STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:") &&
231 8 : strcmp(poOpenInfo->pszFilename, "/vsistdin/") != 0;
232 20 : std::string osLayername = CPLGetBasenameSafe(poOpenInfo->pszFilename);
233 10 : if (osLayername.empty())
234 1 : osLayername = "layer";
235 : auto poLayer = std::make_unique<OGRFeatherLayer>(
236 10 : poDS.get(), osLayername.c_str(), infile, bSeekable, options,
237 20 : poRecordBatchStreamReader);
238 10 : poDS->SetLayer(std::move(poLayer));
239 :
240 : // Pre-load field domains, as this depends on the first record batch
241 10 : auto poLayerPtr = poDS->GetLayer(0);
242 10 : const auto poFeatureDefn = poLayerPtr->GetLayerDefn();
243 10 : bool bHasReadBatch = false;
244 536 : for (int i = 0; i < poFeatureDefn->GetFieldCount(); ++i)
245 : {
246 526 : const auto poFieldDefn = poFeatureDefn->GetFieldDefn(i);
247 526 : const auto &osDomainName = poFieldDefn->GetDomainName();
248 526 : if (!osDomainName.empty())
249 : {
250 6 : if (!bHasReadBatch)
251 : {
252 6 : bHasReadBatch = true;
253 6 : delete poLayerPtr->GetNextFeature();
254 6 : poLayerPtr->ResetReading();
255 : }
256 6 : poDS->GetFieldDomain(osDomainName);
257 : }
258 : }
259 : }
260 : else
261 : {
262 536 : auto result = arrow::ipc::RecordBatchFileReader::Open(infile, options);
263 536 : if (!result.ok())
264 : {
265 1 : CPLError(CE_Failure, CPLE_AppDefined,
266 : "RecordBatchFileReader::Open() failed with %s",
267 1 : result.status().message().c_str());
268 1 : return nullptr;
269 : }
270 1070 : auto poRecordBatchReader = *result;
271 : auto poLayer = std::make_unique<OGRFeatherLayer>(
272 1070 : poDS.get(), CPLGetBasenameSafe(poOpenInfo->pszFilename).c_str(),
273 535 : poRecordBatchReader);
274 535 : poDS->SetLayer(std::move(poLayer));
275 : }
276 545 : return poDS.release();
277 : }
278 :
279 : /************************************************************************/
280 : /* Create() */
281 : /************************************************************************/
282 :
283 151 : static GDALDataset *OGRFeatherDriverCreate(const char *pszName, int nXSize,
284 : int nYSize, int nBands,
285 : GDALDataType eType,
286 : char ** /* papszOptions */)
287 : {
288 151 : if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
289 0 : return nullptr;
290 :
291 151 : std::shared_ptr<arrow::io::OutputStream> out_file;
292 169 : if (STARTS_WITH(pszName, "/vsi") ||
293 18 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "YES")))
294 : {
295 : VSIVirtualHandleUniquePtr fp =
296 151 : VSIFilesystemHandler::OpenStatic(pszName, "wb");
297 151 : if (fp == nullptr)
298 : {
299 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
300 1 : return nullptr;
301 : }
302 150 : out_file = std::make_shared<OGRArrowWritableFile>(std::move(fp));
303 : }
304 : else
305 : {
306 0 : auto result = arrow::io::FileOutputStream::Open(pszName);
307 0 : if (!result.ok())
308 : {
309 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s: %s", pszName,
310 0 : result.status().message().c_str());
311 0 : return nullptr;
312 : }
313 0 : out_file = *result;
314 : }
315 :
316 150 : return new OGRFeatherWriterDataset(pszName, out_file);
317 : }
318 :
319 : /************************************************************************/
320 : /* OGRFeatherDriver() */
321 : /************************************************************************/
322 :
323 : class OGRFeatherDriver final : public GDALDriver
324 : {
325 : std::recursive_mutex m_oMutex{};
326 : bool m_bMetadataInitialized = false;
327 : void InitMetadata();
328 :
329 : public:
330 : const char *GetMetadataItem(const char *pszName,
331 : const char *pszDomain) override;
332 :
333 140 : char **GetMetadata(const char *pszDomain) override
334 : {
335 280 : std::lock_guard oLock(m_oMutex);
336 140 : InitMetadata();
337 280 : return GDALDriver::GetMetadata(pszDomain);
338 : }
339 : };
340 :
341 1627 : const char *OGRFeatherDriver::GetMetadataItem(const char *pszName,
342 : const char *pszDomain)
343 : {
344 3254 : std::lock_guard oLock(m_oMutex);
345 1627 : if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
346 : {
347 271 : InitMetadata();
348 : }
349 3254 : return GDALDriver::GetMetadataItem(pszName, pszDomain);
350 : }
351 :
352 411 : void OGRFeatherDriver::InitMetadata()
353 : {
354 411 : if (m_bMetadataInitialized)
355 401 : return;
356 10 : m_bMetadataInitialized = true;
357 :
358 : CPLXMLTreeCloser oTree(
359 20 : CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
360 :
361 20 : std::vector<const char *> apszCompressionMethods;
362 10 : bool bHasLZ4 = false;
363 30 : for (const char *pszMethod : {"ZSTD", "LZ4"})
364 : {
365 : auto oResult = arrow::util::Codec::GetCompressionType(
366 40 : CPLString(pszMethod).tolower());
367 20 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
368 : {
369 20 : if (EQUAL(pszMethod, "LZ4"))
370 10 : bHasLZ4 = true;
371 20 : apszCompressionMethods.emplace_back(pszMethod);
372 : }
373 : }
374 :
375 : {
376 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
377 10 : CPLAddXMLAttributeAndValue(psOption, "name", "FORMAT");
378 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
379 10 : CPLAddXMLAttributeAndValue(psOption, "description",
380 : "File format variant");
381 30 : for (const char *pszEncoding : {"FILE", "STREAM"})
382 : {
383 20 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
384 20 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
385 : }
386 : }
387 :
388 : {
389 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
390 10 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
391 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
392 10 : CPLAddXMLAttributeAndValue(psOption, "description",
393 : "Compression method");
394 10 : CPLAddXMLAttributeAndValue(psOption, "default",
395 : bHasLZ4 ? "LZ4" : "NONE");
396 : {
397 10 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
398 10 : CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
399 10 : CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
400 : }
401 30 : for (const char *pszMethod : apszCompressionMethods)
402 : {
403 20 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
404 20 : CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
405 : }
406 : }
407 :
408 : {
409 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
410 10 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
411 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
412 10 : CPLAddXMLAttributeAndValue(psOption, "description",
413 : "Encoding of geometry columns");
414 10 : CPLAddXMLAttributeAndValue(psOption, "default", "GEOARROW");
415 40 : for (const char *pszEncoding :
416 50 : {"GEOARROW", "GEOARROW_INTERLEAVED", "WKB", "WKT"})
417 : {
418 40 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
419 40 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
420 40 : if (EQUAL(pszEncoding, "GEOARROW"))
421 10 : CPLAddXMLAttributeAndValue(poValueNode, "alias",
422 : "GEOARROW_STRUCT");
423 : }
424 : }
425 :
426 : {
427 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
428 10 : CPLAddXMLAttributeAndValue(psOption, "name", "BATCH_SIZE");
429 10 : CPLAddXMLAttributeAndValue(psOption, "type", "integer");
430 10 : CPLAddXMLAttributeAndValue(psOption, "description",
431 : "Maximum number of rows per batch");
432 10 : CPLAddXMLAttributeAndValue(psOption, "default", "65536");
433 : }
434 :
435 : {
436 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
437 10 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
438 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
439 10 : CPLAddXMLAttributeAndValue(psOption, "description",
440 : "Name of geometry column");
441 10 : CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
442 : }
443 :
444 : {
445 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
446 10 : CPLAddXMLAttributeAndValue(psOption, "name", "FID");
447 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
448 10 : CPLAddXMLAttributeAndValue(psOption, "description",
449 : "Name of the FID column to create");
450 : }
451 :
452 10 : char *pszXML = CPLSerializeXMLTree(oTree.get());
453 10 : GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
454 10 : CPLFree(pszXML);
455 : }
456 :
457 : /************************************************************************/
458 : /* RegisterOGRArrow() */
459 : /************************************************************************/
460 :
461 14 : void RegisterOGRArrow()
462 : {
463 14 : if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
464 0 : return;
465 :
466 28 : auto poDriver = std::make_unique<OGRFeatherDriver>();
467 :
468 14 : OGRFeatherDriverSetCommonMetadata(poDriver.get());
469 :
470 14 : poDriver->pfnOpen = OGRFeatherDriverOpen;
471 14 : poDriver->pfnCreate = OGRFeatherDriverCreate;
472 :
473 14 : poDriver->SetMetadataItem("ARROW_VERSION", ARROW_VERSION_STRING);
474 :
475 14 : GetGDALDriverManager()->RegisterDriver(poDriver.release());
476 :
477 : #if ARROW_VERSION_MAJOR >= 16
478 : // Mostly for tests
479 : const char *pszPath =
480 14 : CPLGetConfigOption("OGR_ARROW_LOAD_FILE_SYSTEM_FACTORIES", nullptr);
481 14 : if (pszPath)
482 : {
483 0 : auto result = arrow::fs::LoadFileSystemFactories(pszPath);
484 0 : if (!result.ok())
485 : {
486 0 : CPLError(CE_Warning, CPLE_AppDefined,
487 : "arrow::fs::LoadFileSystemFactories() failed with %s",
488 0 : result.message().c_str());
489 : }
490 : }
491 : #endif
492 : }
|