Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "gdal_pam.h"
14 : #include "ogrsf_frmts.h"
15 :
16 : #include <map>
17 :
18 : #include "ogr_feather.h"
19 : #include "../arrow_common/ograrrowrandomaccessfile.h"
20 : #include "../arrow_common/ograrrowwritablefile.h"
21 : #include "../arrow_common/ograrrowdataset.hpp"
22 :
23 : #include "ogrfeatherdrivercore.h"
24 :
25 : /************************************************************************/
26 : /* IsArrowIPCStream() */
27 : /************************************************************************/
28 :
29 548 : static bool IsArrowIPCStream(GDALOpenInfo *poOpenInfo)
30 : {
31 : // WARNING: if making changes in that method, reflect them in
32 : // OGRFeatherDriverIsArrowIPCStreamBasic() in ogrfeatherdrivercore.cpp
33 :
34 548 : if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
35 3 : return true;
36 :
37 545 : constexpr int CONTINUATION_SIZE = 4; // 0xFFFFFFFF
38 545 : constexpr int METADATA_SIZE_SIZE = 4;
39 :
40 : // See
41 : // https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
42 545 : if (poOpenInfo->fpL != nullptr &&
43 545 : poOpenInfo->nHeaderBytes >= CONTINUATION_SIZE + METADATA_SIZE_SIZE &&
44 545 : memcmp(poOpenInfo->pabyHeader, "\xFF\xFF\xFF\xFF", CONTINUATION_SIZE) ==
45 : 0)
46 : {
47 9 : const char *pszExt = poOpenInfo->osExtension.c_str();
48 9 : if (EQUAL(pszExt, "arrows") || EQUAL(pszExt, "ipc"))
49 3 : return true;
50 :
51 6 : const uint32_t nMetadataSize =
52 6 : CPL_LSBUINT32PTR(poOpenInfo->pabyHeader + CONTINUATION_SIZE);
53 6 : if (strcmp(poOpenInfo->pszFilename, "/vsistdin/") == 0)
54 : {
55 1 : if (poOpenInfo->IsSingleAllowedDriver("ARROW"))
56 1 : return true;
57 :
58 : // Padding after metadata and before body is not necessarily present
59 : // but the body must be at least 4 bytes
60 0 : constexpr int PADDING_MAX_SIZE = 4;
61 :
62 : // /vsistdin/ cannot seek back beyond first MB
63 0 : if (nMetadataSize >
64 : 1024 * 1024 -
65 : (CONTINUATION_SIZE + METADATA_SIZE_SIZE + PADDING_MAX_SIZE))
66 : {
67 0 : return false;
68 : }
69 0 : const int nSizeToRead = CONTINUATION_SIZE + METADATA_SIZE_SIZE +
70 0 : nMetadataSize + PADDING_MAX_SIZE;
71 0 : if (!poOpenInfo->TryToIngest(nSizeToRead))
72 : {
73 0 : return false;
74 : }
75 :
76 : const std::string osTmpFilename(
77 0 : VSIMemGenerateHiddenFilename("arrow"));
78 : auto fp = VSIVirtualHandleUniquePtr(VSIFileFromMemBuffer(
79 : osTmpFilename.c_str(), poOpenInfo->pabyHeader, nSizeToRead,
80 0 : false));
81 : auto infile = std::make_shared<OGRArrowRandomAccessFile>(
82 0 : osTmpFilename.c_str(), std::move(fp));
83 0 : auto options = arrow::ipc::IpcReadOptions::Defaults();
84 : auto result =
85 0 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
86 0 : CPLDebug("ARROW", "RecordBatchStreamReader::Open(): %s",
87 0 : result.status().message().c_str());
88 0 : VSIUnlink(osTmpFilename.c_str());
89 0 : return result.ok();
90 : }
91 :
92 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_END);
93 5 : const auto nFileSize = VSIFTellL(poOpenInfo->fpL);
94 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
95 5 : if (nMetadataSize >
96 5 : nFileSize - (CONTINUATION_SIZE + METADATA_SIZE_SIZE))
97 0 : return false;
98 :
99 : // Do not give ownership of poOpenInfo->fpL to infile
100 : auto infile = std::make_shared<OGRArrowRandomAccessFile>(
101 10 : poOpenInfo->pszFilename, poOpenInfo->fpL, false);
102 10 : auto options = arrow::ipc::IpcReadOptions::Defaults();
103 : auto result =
104 10 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
105 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
106 5 : return result.ok();
107 : }
108 536 : return false;
109 : }
110 :
111 : /************************************************************************/
112 : /* Open() */
113 : /************************************************************************/
114 :
115 548 : static GDALDataset *OGRFeatherDriverOpen(GDALOpenInfo *poOpenInfo)
116 : {
117 548 : if (poOpenInfo->eAccess == GA_Update)
118 : {
119 0 : return nullptr;
120 : }
121 :
122 548 : GDALOpenInfo *poOpenInfoForIdentify = poOpenInfo;
123 548 : std::unique_ptr<GDALOpenInfo> poOpenInfoTmp;
124 548 : if (STARTS_WITH(poOpenInfo->pszFilename, "gdalvsi://"))
125 : {
126 1 : poOpenInfoTmp = std::make_unique<GDALOpenInfo>(poOpenInfo->pszFilename +
127 : strlen("gdalvsi://"),
128 1 : poOpenInfo->nOpenFlags);
129 1 : poOpenInfoForIdentify = poOpenInfoTmp.get();
130 : }
131 :
132 548 : const bool bIsStreamingFormat = IsArrowIPCStream(poOpenInfoForIdentify);
133 1084 : if (!bIsStreamingFormat &&
134 536 : !OGRFeatherDriverIsArrowFileFormat(poOpenInfoForIdentify))
135 : {
136 0 : return nullptr;
137 : }
138 :
139 548 : std::shared_ptr<arrow::io::RandomAccessFile> infile;
140 548 : if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
141 : {
142 3 : const std::string osFilename(poOpenInfo->pszFilename +
143 3 : strlen("ARROW_IPC_STREAM:"));
144 : auto fp =
145 3 : VSIVirtualHandleUniquePtr(VSIFOpenL(osFilename.c_str(), "rb"));
146 3 : if (fp == nullptr)
147 : {
148 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot open %s",
149 : osFilename.c_str());
150 1 : return nullptr;
151 : }
152 4 : infile = std::make_shared<OGRArrowRandomAccessFile>(osFilename.c_str(),
153 4 : std::move(fp));
154 : }
155 953 : else if (STARTS_WITH(poOpenInfo->pszFilename, "/vsi") ||
156 408 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "NO")))
157 : {
158 137 : VSIVirtualHandleUniquePtr fp(poOpenInfo->fpL);
159 137 : poOpenInfo->fpL = nullptr;
160 137 : infile = std::make_shared<OGRArrowRandomAccessFile>(
161 137 : poOpenInfo->pszFilename, std::move(fp));
162 : }
163 : else
164 : {
165 : // FileSystemFromUriOrPath() doesn't like relative paths
166 : // so transform them to absolute.
167 408 : std::string osPath(poOpenInfo->pszFilename);
168 408 : if (CPLIsFilenameRelative(osPath.c_str()))
169 : {
170 407 : char *pszCurDir = CPLGetCurrentDir();
171 407 : if (pszCurDir == nullptr)
172 0 : return nullptr;
173 407 : osPath = CPLFormFilenameSafe(pszCurDir, osPath.c_str(), nullptr);
174 407 : CPLFree(pszCurDir);
175 : }
176 :
177 408 : std::string osFSPath;
178 : auto poFS =
179 816 : arrow::fs::FileSystemFromUriOrPath(osPath.c_str(), &osFSPath);
180 408 : if (!poFS.ok())
181 : {
182 0 : CPLError(CE_Failure, CPLE_AppDefined,
183 : "arrow::fs::FileSystemFromUriOrPath failed with %s",
184 0 : poFS.status().message().c_str());
185 0 : return nullptr;
186 : }
187 408 : auto result = (*poFS)->OpenInputFile(osFSPath);
188 408 : if (!result.ok())
189 : {
190 0 : CPLError(CE_Failure, CPLE_AppDefined,
191 : "OpenInputFile() failed with %s",
192 0 : result.status().message().c_str());
193 0 : return nullptr;
194 : }
195 408 : infile = *result;
196 : }
197 :
198 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
199 1094 : arrow::MemoryPool::CreateDefault().release());
200 1094 : auto options = arrow::ipc::IpcReadOptions::Defaults();
201 547 : options.memory_pool = poMemoryPool.get();
202 :
203 1094 : auto poDS = std::make_unique<OGRFeatherDataset>(poMemoryPool);
204 547 : if (bIsStreamingFormat)
205 : {
206 : auto result =
207 11 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
208 11 : if (!result.ok())
209 : {
210 1 : CPLError(CE_Failure, CPLE_AppDefined,
211 : "RecordBatchStreamReader::Open() failed with %s",
212 1 : result.status().message().c_str());
213 1 : return nullptr;
214 : }
215 20 : auto poRecordBatchStreamReader = *result;
216 10 : const bool bSeekable =
217 18 : !STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:") &&
218 8 : strcmp(poOpenInfo->pszFilename, "/vsistdin/") != 0;
219 20 : std::string osLayername = CPLGetBasenameSafe(poOpenInfo->pszFilename);
220 10 : if (osLayername.empty())
221 1 : osLayername = "layer";
222 : auto poLayer = std::make_unique<OGRFeatherLayer>(
223 10 : poDS.get(), osLayername.c_str(), infile, bSeekable, options,
224 20 : poRecordBatchStreamReader);
225 10 : poDS->SetLayer(std::move(poLayer));
226 :
227 : // Pre-load field domains, as this depends on the first record batch
228 10 : auto poLayerPtr = poDS->GetLayer(0);
229 10 : const auto poFeatureDefn = poLayerPtr->GetLayerDefn();
230 10 : bool bHasReadBatch = false;
231 536 : for (int i = 0; i < poFeatureDefn->GetFieldCount(); ++i)
232 : {
233 526 : const auto poFieldDefn = poFeatureDefn->GetFieldDefn(i);
234 526 : const auto &osDomainName = poFieldDefn->GetDomainName();
235 526 : if (!osDomainName.empty())
236 : {
237 6 : if (!bHasReadBatch)
238 : {
239 6 : bHasReadBatch = true;
240 6 : delete poLayerPtr->GetNextFeature();
241 6 : poLayerPtr->ResetReading();
242 : }
243 6 : poDS->GetFieldDomain(osDomainName);
244 : }
245 : }
246 : }
247 : else
248 : {
249 536 : auto result = arrow::ipc::RecordBatchFileReader::Open(infile, options);
250 536 : if (!result.ok())
251 : {
252 1 : CPLError(CE_Failure, CPLE_AppDefined,
253 : "RecordBatchFileReader::Open() failed with %s",
254 1 : result.status().message().c_str());
255 1 : return nullptr;
256 : }
257 1070 : auto poRecordBatchReader = *result;
258 : auto poLayer = std::make_unique<OGRFeatherLayer>(
259 1070 : poDS.get(), CPLGetBasenameSafe(poOpenInfo->pszFilename).c_str(),
260 535 : poRecordBatchReader);
261 535 : poDS->SetLayer(std::move(poLayer));
262 : }
263 545 : return poDS.release();
264 : }
265 :
266 : /************************************************************************/
267 : /* Create() */
268 : /************************************************************************/
269 :
270 151 : static GDALDataset *OGRFeatherDriverCreate(const char *pszName, int nXSize,
271 : int nYSize, int nBands,
272 : GDALDataType eType,
273 : char ** /* papszOptions */)
274 : {
275 151 : if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
276 0 : return nullptr;
277 :
278 151 : std::shared_ptr<arrow::io::OutputStream> out_file;
279 169 : if (STARTS_WITH(pszName, "/vsi") ||
280 18 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "YES")))
281 : {
282 151 : VSILFILE *fp = VSIFOpenL(pszName, "wb");
283 151 : if (fp == nullptr)
284 : {
285 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
286 1 : return nullptr;
287 : }
288 150 : out_file = std::make_shared<OGRArrowWritableFile>(fp);
289 : }
290 : else
291 : {
292 0 : auto result = arrow::io::FileOutputStream::Open(pszName);
293 0 : if (!result.ok())
294 : {
295 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s: %s", pszName,
296 0 : result.status().message().c_str());
297 0 : return nullptr;
298 : }
299 0 : out_file = *result;
300 : }
301 :
302 150 : return new OGRFeatherWriterDataset(pszName, out_file);
303 : }
304 :
305 : /************************************************************************/
306 : /* OGRFeatherDriver() */
307 : /************************************************************************/
308 :
309 : class OGRFeatherDriver final : public GDALDriver
310 : {
311 : bool m_bMetadataInitialized = false;
312 : void InitMetadata();
313 :
314 : public:
315 1559 : const char *GetMetadataItem(const char *pszName,
316 : const char *pszDomain) override
317 : {
318 1559 : if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
319 : {
320 271 : InitMetadata();
321 : }
322 1559 : return GDALDriver::GetMetadataItem(pszName, pszDomain);
323 : }
324 :
325 135 : char **GetMetadata(const char *pszDomain) override
326 : {
327 135 : InitMetadata();
328 135 : return GDALDriver::GetMetadata(pszDomain);
329 : }
330 : };
331 :
332 406 : void OGRFeatherDriver::InitMetadata()
333 : {
334 406 : if (m_bMetadataInitialized)
335 397 : return;
336 9 : m_bMetadataInitialized = true;
337 :
338 : CPLXMLTreeCloser oTree(
339 18 : CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
340 :
341 18 : std::vector<const char *> apszCompressionMethods;
342 9 : bool bHasLZ4 = false;
343 27 : for (const char *pszMethod : {"ZSTD", "LZ4"})
344 : {
345 : auto oResult = arrow::util::Codec::GetCompressionType(
346 36 : CPLString(pszMethod).tolower());
347 18 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
348 : {
349 18 : if (EQUAL(pszMethod, "LZ4"))
350 9 : bHasLZ4 = true;
351 18 : apszCompressionMethods.emplace_back(pszMethod);
352 : }
353 : }
354 :
355 : {
356 9 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
357 9 : CPLAddXMLAttributeAndValue(psOption, "name", "FORMAT");
358 9 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
359 9 : CPLAddXMLAttributeAndValue(psOption, "description",
360 : "File format variant");
361 27 : for (const char *pszEncoding : {"FILE", "STREAM"})
362 : {
363 18 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
364 18 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
365 : }
366 : }
367 :
368 : {
369 9 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
370 9 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
371 9 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
372 9 : CPLAddXMLAttributeAndValue(psOption, "description",
373 : "Compression method");
374 9 : CPLAddXMLAttributeAndValue(psOption, "default",
375 : bHasLZ4 ? "LZ4" : "NONE");
376 : {
377 9 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
378 9 : CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
379 9 : CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
380 : }
381 27 : for (const char *pszMethod : apszCompressionMethods)
382 : {
383 18 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
384 18 : CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
385 : }
386 : }
387 :
388 : {
389 9 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
390 9 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
391 9 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
392 9 : CPLAddXMLAttributeAndValue(psOption, "description",
393 : "Encoding of geometry columns");
394 9 : CPLAddXMLAttributeAndValue(psOption, "default", "GEOARROW");
395 36 : for (const char *pszEncoding :
396 45 : {"GEOARROW", "GEOARROW_INTERLEAVED", "WKB", "WKT"})
397 : {
398 36 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
399 36 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
400 36 : if (EQUAL(pszEncoding, "GEOARROW"))
401 9 : CPLAddXMLAttributeAndValue(poValueNode, "alias",
402 : "GEOARROW_STRUCT");
403 : }
404 : }
405 :
406 : {
407 9 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
408 9 : CPLAddXMLAttributeAndValue(psOption, "name", "BATCH_SIZE");
409 9 : CPLAddXMLAttributeAndValue(psOption, "type", "integer");
410 9 : CPLAddXMLAttributeAndValue(psOption, "description",
411 : "Maximum number of rows per batch");
412 9 : CPLAddXMLAttributeAndValue(psOption, "default", "65536");
413 : }
414 :
415 : {
416 9 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
417 9 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
418 9 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
419 9 : CPLAddXMLAttributeAndValue(psOption, "description",
420 : "Name of geometry column");
421 9 : CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
422 : }
423 :
424 : {
425 9 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
426 9 : CPLAddXMLAttributeAndValue(psOption, "name", "FID");
427 9 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
428 9 : CPLAddXMLAttributeAndValue(psOption, "description",
429 : "Name of the FID column to create");
430 : }
431 :
432 9 : char *pszXML = CPLSerializeXMLTree(oTree.get());
433 9 : GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
434 9 : CPLFree(pszXML);
435 : }
436 :
437 : /************************************************************************/
438 : /* RegisterOGRArrow() */
439 : /************************************************************************/
440 :
441 13 : void RegisterOGRArrow()
442 : {
443 13 : if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
444 0 : return;
445 :
446 26 : auto poDriver = std::make_unique<OGRFeatherDriver>();
447 :
448 13 : OGRFeatherDriverSetCommonMetadata(poDriver.get());
449 :
450 13 : poDriver->pfnOpen = OGRFeatherDriverOpen;
451 13 : poDriver->pfnCreate = OGRFeatherDriverCreate;
452 :
453 13 : poDriver->SetMetadataItem("ARROW_VERSION", ARROW_VERSION_STRING);
454 :
455 13 : GetGDALDriverManager()->RegisterDriver(poDriver.release());
456 :
457 : #if ARROW_VERSION_MAJOR >= 16
458 : // Mostly for tests
459 : const char *pszPath =
460 13 : CPLGetConfigOption("OGR_ARROW_LOAD_FILE_SYSTEM_FACTORIES", nullptr);
461 13 : if (pszPath)
462 : {
463 0 : auto result = arrow::fs::LoadFileSystemFactories(pszPath);
464 0 : if (!result.ok())
465 : {
466 0 : CPLError(CE_Warning, CPLE_AppDefined,
467 : "arrow::fs::LoadFileSystemFactories() failed with %s",
468 0 : result.message().c_str());
469 : }
470 : }
471 : #endif
472 : }
|