Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "gdal_pam.h"
14 : #include "ogrsf_frmts.h"
15 :
16 : #include <map>
17 : #include <mutex>
18 :
19 : #include "ogr_feather.h"
20 : #include "../arrow_common/ograrrowrandomaccessfile.h"
21 : #include "../arrow_common/ograrrowwritablefile.h"
22 : #include "../arrow_common/ograrrowdataset.hpp"
23 :
24 : #include "ogrfeatherdrivercore.h"
25 :
26 : /************************************************************************/
27 : /* IsArrowIPCStream() */
28 : /************************************************************************/
29 :
30 548 : static bool IsArrowIPCStream(GDALOpenInfo *poOpenInfo)
31 : {
32 : // WARNING: if making changes in that method, reflect them in
33 : // OGRFeatherDriverIsArrowIPCStreamBasic() in ogrfeatherdrivercore.cpp
34 :
35 548 : if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
36 3 : return true;
37 :
38 545 : constexpr int CONTINUATION_SIZE = 4; // 0xFFFFFFFF
39 545 : constexpr int METADATA_SIZE_SIZE = 4;
40 :
41 : // See
42 : // https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
43 545 : if (poOpenInfo->fpL != nullptr &&
44 545 : poOpenInfo->nHeaderBytes >= CONTINUATION_SIZE + METADATA_SIZE_SIZE &&
45 545 : memcmp(poOpenInfo->pabyHeader, "\xFF\xFF\xFF\xFF", CONTINUATION_SIZE) ==
46 : 0)
47 : {
48 9 : const char *pszExt = poOpenInfo->osExtension.c_str();
49 9 : if (EQUAL(pszExt, "arrows") || EQUAL(pszExt, "ipc"))
50 3 : return true;
51 :
52 6 : const uint32_t nMetadataSize =
53 6 : CPL_LSBUINT32PTR(poOpenInfo->pabyHeader + CONTINUATION_SIZE);
54 6 : if (strcmp(poOpenInfo->pszFilename, "/vsistdin/") == 0)
55 : {
56 1 : if (poOpenInfo->IsSingleAllowedDriver("ARROW"))
57 1 : return true;
58 :
59 : // Padding after metadata and before body is not necessarily present
60 : // but the body must be at least 4 bytes
61 0 : constexpr int PADDING_MAX_SIZE = 4;
62 :
63 : // /vsistdin/ cannot seek back beyond first MB
64 0 : if (nMetadataSize >
65 : 1024 * 1024 -
66 : (CONTINUATION_SIZE + METADATA_SIZE_SIZE + PADDING_MAX_SIZE))
67 : {
68 0 : return false;
69 : }
70 0 : const int nSizeToRead = CONTINUATION_SIZE + METADATA_SIZE_SIZE +
71 0 : nMetadataSize + PADDING_MAX_SIZE;
72 0 : if (!poOpenInfo->TryToIngest(nSizeToRead))
73 : {
74 0 : return false;
75 : }
76 :
77 : const std::string osTmpFilename(
78 0 : VSIMemGenerateHiddenFilename("arrow"));
79 : auto fp = VSIVirtualHandleUniquePtr(VSIFileFromMemBuffer(
80 : osTmpFilename.c_str(), poOpenInfo->pabyHeader, nSizeToRead,
81 0 : false));
82 : auto infile = std::make_shared<OGRArrowRandomAccessFile>(
83 0 : osTmpFilename.c_str(), std::move(fp));
84 0 : auto options = arrow::ipc::IpcReadOptions::Defaults();
85 : auto result =
86 0 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
87 0 : CPLDebug("ARROW", "RecordBatchStreamReader::Open(): %s",
88 0 : result.status().message().c_str());
89 0 : VSIUnlink(osTmpFilename.c_str());
90 0 : return result.ok();
91 : }
92 :
93 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_END);
94 5 : const auto nFileSize = VSIFTellL(poOpenInfo->fpL);
95 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
96 5 : if (nMetadataSize >
97 5 : nFileSize - (CONTINUATION_SIZE + METADATA_SIZE_SIZE))
98 0 : return false;
99 :
100 : // Do not give ownership of poOpenInfo->fpL to infile
101 : auto infile = std::make_shared<OGRArrowRandomAccessFile>(
102 10 : poOpenInfo->pszFilename, poOpenInfo->fpL, false);
103 10 : auto options = arrow::ipc::IpcReadOptions::Defaults();
104 : auto result =
105 10 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
106 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
107 5 : return result.ok();
108 : }
109 536 : return false;
110 : }
111 :
112 : /************************************************************************/
113 : /* Open() */
114 : /************************************************************************/
115 :
116 548 : static GDALDataset *OGRFeatherDriverOpen(GDALOpenInfo *poOpenInfo)
117 : {
118 548 : if (poOpenInfo->eAccess == GA_Update)
119 : {
120 0 : return nullptr;
121 : }
122 :
123 548 : GDALOpenInfo *poOpenInfoForIdentify = poOpenInfo;
124 548 : std::unique_ptr<GDALOpenInfo> poOpenInfoTmp;
125 548 : if (STARTS_WITH(poOpenInfo->pszFilename, "gdalvsi://"))
126 : {
127 1 : poOpenInfoTmp = std::make_unique<GDALOpenInfo>(poOpenInfo->pszFilename +
128 : strlen("gdalvsi://"),
129 1 : poOpenInfo->nOpenFlags);
130 1 : poOpenInfoForIdentify = poOpenInfoTmp.get();
131 : }
132 :
133 548 : const bool bIsStreamingFormat = IsArrowIPCStream(poOpenInfoForIdentify);
134 1084 : if (!bIsStreamingFormat &&
135 536 : !OGRFeatherDriverIsArrowFileFormat(poOpenInfoForIdentify))
136 : {
137 0 : return nullptr;
138 : }
139 :
140 548 : std::shared_ptr<arrow::io::RandomAccessFile> infile;
141 548 : if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
142 : {
143 3 : const std::string osFilename(poOpenInfo->pszFilename +
144 3 : strlen("ARROW_IPC_STREAM:"));
145 : auto fp =
146 3 : VSIVirtualHandleUniquePtr(VSIFOpenL(osFilename.c_str(), "rb"));
147 3 : if (fp == nullptr)
148 : {
149 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot open %s",
150 : osFilename.c_str());
151 1 : return nullptr;
152 : }
153 4 : infile = std::make_shared<OGRArrowRandomAccessFile>(osFilename.c_str(),
154 4 : std::move(fp));
155 : }
156 953 : else if (STARTS_WITH(poOpenInfo->pszFilename, "/vsi") ||
157 408 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "NO")))
158 : {
159 137 : VSIVirtualHandleUniquePtr fp(poOpenInfo->fpL);
160 137 : poOpenInfo->fpL = nullptr;
161 137 : infile = std::make_shared<OGRArrowRandomAccessFile>(
162 137 : poOpenInfo->pszFilename, std::move(fp));
163 : }
164 : else
165 : {
166 : // FileSystemFromUriOrPath() doesn't like relative paths
167 : // so transform them to absolute.
168 408 : std::string osPath(poOpenInfo->pszFilename);
169 408 : if (CPLIsFilenameRelative(osPath.c_str()))
170 : {
171 407 : char *pszCurDir = CPLGetCurrentDir();
172 407 : if (pszCurDir == nullptr)
173 0 : return nullptr;
174 407 : osPath = CPLFormFilenameSafe(pszCurDir, osPath.c_str(), nullptr);
175 407 : CPLFree(pszCurDir);
176 : }
177 :
178 408 : std::string osFSPath;
179 : auto poFS =
180 816 : arrow::fs::FileSystemFromUriOrPath(osPath.c_str(), &osFSPath);
181 408 : if (!poFS.ok())
182 : {
183 0 : CPLError(CE_Failure, CPLE_AppDefined,
184 : "arrow::fs::FileSystemFromUriOrPath failed with %s",
185 0 : poFS.status().message().c_str());
186 0 : return nullptr;
187 : }
188 408 : auto result = (*poFS)->OpenInputFile(osFSPath);
189 408 : if (!result.ok())
190 : {
191 0 : CPLError(CE_Failure, CPLE_AppDefined,
192 : "OpenInputFile() failed with %s",
193 0 : result.status().message().c_str());
194 0 : return nullptr;
195 : }
196 408 : infile = *result;
197 : }
198 :
199 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
200 1094 : arrow::MemoryPool::CreateDefault().release());
201 1094 : auto options = arrow::ipc::IpcReadOptions::Defaults();
202 547 : options.memory_pool = poMemoryPool.get();
203 :
204 1094 : auto poDS = std::make_unique<OGRFeatherDataset>(poMemoryPool);
205 547 : if (bIsStreamingFormat)
206 : {
207 : auto result =
208 11 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
209 11 : if (!result.ok())
210 : {
211 1 : CPLError(CE_Failure, CPLE_AppDefined,
212 : "RecordBatchStreamReader::Open() failed with %s",
213 1 : result.status().message().c_str());
214 1 : return nullptr;
215 : }
216 20 : auto poRecordBatchStreamReader = *result;
217 10 : const bool bSeekable =
218 18 : !STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:") &&
219 8 : strcmp(poOpenInfo->pszFilename, "/vsistdin/") != 0;
220 20 : std::string osLayername = CPLGetBasenameSafe(poOpenInfo->pszFilename);
221 10 : if (osLayername.empty())
222 1 : osLayername = "layer";
223 : auto poLayer = std::make_unique<OGRFeatherLayer>(
224 10 : poDS.get(), osLayername.c_str(), infile, bSeekable, options,
225 20 : poRecordBatchStreamReader);
226 10 : poDS->SetLayer(std::move(poLayer));
227 :
228 : // Pre-load field domains, as this depends on the first record batch
229 10 : auto poLayerPtr = poDS->GetLayer(0);
230 10 : const auto poFeatureDefn = poLayerPtr->GetLayerDefn();
231 10 : bool bHasReadBatch = false;
232 536 : for (int i = 0; i < poFeatureDefn->GetFieldCount(); ++i)
233 : {
234 526 : const auto poFieldDefn = poFeatureDefn->GetFieldDefn(i);
235 526 : const auto &osDomainName = poFieldDefn->GetDomainName();
236 526 : if (!osDomainName.empty())
237 : {
238 6 : if (!bHasReadBatch)
239 : {
240 6 : bHasReadBatch = true;
241 6 : delete poLayerPtr->GetNextFeature();
242 6 : poLayerPtr->ResetReading();
243 : }
244 6 : poDS->GetFieldDomain(osDomainName);
245 : }
246 : }
247 : }
248 : else
249 : {
250 536 : auto result = arrow::ipc::RecordBatchFileReader::Open(infile, options);
251 536 : if (!result.ok())
252 : {
253 1 : CPLError(CE_Failure, CPLE_AppDefined,
254 : "RecordBatchFileReader::Open() failed with %s",
255 1 : result.status().message().c_str());
256 1 : return nullptr;
257 : }
258 1070 : auto poRecordBatchReader = *result;
259 : auto poLayer = std::make_unique<OGRFeatherLayer>(
260 1070 : poDS.get(), CPLGetBasenameSafe(poOpenInfo->pszFilename).c_str(),
261 535 : poRecordBatchReader);
262 535 : poDS->SetLayer(std::move(poLayer));
263 : }
264 545 : return poDS.release();
265 : }
266 :
267 : /************************************************************************/
268 : /* Create() */
269 : /************************************************************************/
270 :
271 151 : static GDALDataset *OGRFeatherDriverCreate(const char *pszName, int nXSize,
272 : int nYSize, int nBands,
273 : GDALDataType eType,
274 : char ** /* papszOptions */)
275 : {
276 151 : if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
277 0 : return nullptr;
278 :
279 151 : std::shared_ptr<arrow::io::OutputStream> out_file;
280 169 : if (STARTS_WITH(pszName, "/vsi") ||
281 18 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "YES")))
282 : {
283 151 : VSILFILE *fp = VSIFOpenL(pszName, "wb");
284 151 : if (fp == nullptr)
285 : {
286 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
287 1 : return nullptr;
288 : }
289 150 : out_file = std::make_shared<OGRArrowWritableFile>(fp);
290 : }
291 : else
292 : {
293 0 : auto result = arrow::io::FileOutputStream::Open(pszName);
294 0 : if (!result.ok())
295 : {
296 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s: %s", pszName,
297 0 : result.status().message().c_str());
298 0 : return nullptr;
299 : }
300 0 : out_file = *result;
301 : }
302 :
303 150 : return new OGRFeatherWriterDataset(pszName, out_file);
304 : }
305 :
306 : /************************************************************************/
307 : /* OGRFeatherDriver() */
308 : /************************************************************************/
309 :
310 : class OGRFeatherDriver final : public GDALDriver
311 : {
312 : std::mutex m_oMutex{};
313 : bool m_bMetadataInitialized = false;
314 : void InitMetadata();
315 :
316 : public:
317 1597 : const char *GetMetadataItem(const char *pszName,
318 : const char *pszDomain) override
319 : {
320 3194 : std::lock_guard oLock(m_oMutex);
321 1597 : if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
322 : {
323 271 : InitMetadata();
324 : }
325 3194 : return GDALDriver::GetMetadataItem(pszName, pszDomain);
326 : }
327 :
328 140 : char **GetMetadata(const char *pszDomain) override
329 : {
330 280 : std::lock_guard oLock(m_oMutex);
331 140 : InitMetadata();
332 280 : return GDALDriver::GetMetadata(pszDomain);
333 : }
334 : };
335 :
336 411 : void OGRFeatherDriver::InitMetadata()
337 : {
338 411 : if (m_bMetadataInitialized)
339 401 : return;
340 10 : m_bMetadataInitialized = true;
341 :
342 : CPLXMLTreeCloser oTree(
343 20 : CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
344 :
345 20 : std::vector<const char *> apszCompressionMethods;
346 10 : bool bHasLZ4 = false;
347 30 : for (const char *pszMethod : {"ZSTD", "LZ4"})
348 : {
349 : auto oResult = arrow::util::Codec::GetCompressionType(
350 40 : CPLString(pszMethod).tolower());
351 20 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
352 : {
353 20 : if (EQUAL(pszMethod, "LZ4"))
354 10 : bHasLZ4 = true;
355 20 : apszCompressionMethods.emplace_back(pszMethod);
356 : }
357 : }
358 :
359 : {
360 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
361 10 : CPLAddXMLAttributeAndValue(psOption, "name", "FORMAT");
362 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
363 10 : CPLAddXMLAttributeAndValue(psOption, "description",
364 : "File format variant");
365 30 : for (const char *pszEncoding : {"FILE", "STREAM"})
366 : {
367 20 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
368 20 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
369 : }
370 : }
371 :
372 : {
373 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
374 10 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
375 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
376 10 : CPLAddXMLAttributeAndValue(psOption, "description",
377 : "Compression method");
378 10 : CPLAddXMLAttributeAndValue(psOption, "default",
379 : bHasLZ4 ? "LZ4" : "NONE");
380 : {
381 10 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
382 10 : CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
383 10 : CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
384 : }
385 30 : for (const char *pszMethod : apszCompressionMethods)
386 : {
387 20 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
388 20 : CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
389 : }
390 : }
391 :
392 : {
393 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
394 10 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
395 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
396 10 : CPLAddXMLAttributeAndValue(psOption, "description",
397 : "Encoding of geometry columns");
398 10 : CPLAddXMLAttributeAndValue(psOption, "default", "GEOARROW");
399 40 : for (const char *pszEncoding :
400 50 : {"GEOARROW", "GEOARROW_INTERLEAVED", "WKB", "WKT"})
401 : {
402 40 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
403 40 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
404 40 : if (EQUAL(pszEncoding, "GEOARROW"))
405 10 : CPLAddXMLAttributeAndValue(poValueNode, "alias",
406 : "GEOARROW_STRUCT");
407 : }
408 : }
409 :
410 : {
411 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
412 10 : CPLAddXMLAttributeAndValue(psOption, "name", "BATCH_SIZE");
413 10 : CPLAddXMLAttributeAndValue(psOption, "type", "integer");
414 10 : CPLAddXMLAttributeAndValue(psOption, "description",
415 : "Maximum number of rows per batch");
416 10 : CPLAddXMLAttributeAndValue(psOption, "default", "65536");
417 : }
418 :
419 : {
420 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
421 10 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
422 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
423 10 : CPLAddXMLAttributeAndValue(psOption, "description",
424 : "Name of geometry column");
425 10 : CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
426 : }
427 :
428 : {
429 10 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
430 10 : CPLAddXMLAttributeAndValue(psOption, "name", "FID");
431 10 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
432 10 : CPLAddXMLAttributeAndValue(psOption, "description",
433 : "Name of the FID column to create");
434 : }
435 :
436 10 : char *pszXML = CPLSerializeXMLTree(oTree.get());
437 10 : GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
438 10 : CPLFree(pszXML);
439 : }
440 :
441 : /************************************************************************/
442 : /* RegisterOGRArrow() */
443 : /************************************************************************/
444 :
445 14 : void RegisterOGRArrow()
446 : {
447 14 : if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
448 0 : return;
449 :
450 28 : auto poDriver = std::make_unique<OGRFeatherDriver>();
451 :
452 14 : OGRFeatherDriverSetCommonMetadata(poDriver.get());
453 :
454 14 : poDriver->pfnOpen = OGRFeatherDriverOpen;
455 14 : poDriver->pfnCreate = OGRFeatherDriverCreate;
456 :
457 14 : poDriver->SetMetadataItem("ARROW_VERSION", ARROW_VERSION_STRING);
458 :
459 14 : GetGDALDriverManager()->RegisterDriver(poDriver.release());
460 :
461 : #if ARROW_VERSION_MAJOR >= 16
462 : // Mostly for tests
463 : const char *pszPath =
464 14 : CPLGetConfigOption("OGR_ARROW_LOAD_FILE_SYSTEM_FACTORIES", nullptr);
465 14 : if (pszPath)
466 : {
467 0 : auto result = arrow::fs::LoadFileSystemFactories(pszPath);
468 0 : if (!result.ok())
469 : {
470 0 : CPLError(CE_Warning, CPLE_AppDefined,
471 : "arrow::fs::LoadFileSystemFactories() failed with %s",
472 0 : result.message().c_str());
473 : }
474 : }
475 : #endif
476 : }
|