Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Feather Translator
4 : * Purpose: Implements OGRFeatherDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * Permission is hereby granted, free of charge, to any person obtaining a
11 : * copy of this software and associated documentation files (the "Software"),
12 : * to deal in the Software without restriction, including without limitation
13 : * the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 : * and/or sell copies of the Software, and to permit persons to whom the
15 : * Software is furnished to do so, subject to the following conditions:
16 : *
17 : * The above copyright notice and this permission notice shall be included
18 : * in all copies or substantial portions of the Software.
19 : *
20 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 : * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 : * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25 : * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26 : * DEALINGS IN THE SOFTWARE.
27 : ****************************************************************************/
28 :
29 : #include "gdal_pam.h"
30 : #include "ogrsf_frmts.h"
31 :
32 : #include <map>
33 :
34 : #include "ogr_feather.h"
35 : #include "../arrow_common/ograrrowrandomaccessfile.h"
36 : #include "../arrow_common/ograrrowwritablefile.h"
37 : #include "../arrow_common/ograrrowdataset.hpp"
38 :
39 : #include "ogrfeatherdrivercore.h"
40 :
41 : /************************************************************************/
42 : /* IsArrowIPCStream() */
43 : /************************************************************************/
44 :
45 543 : static bool IsArrowIPCStream(GDALOpenInfo *poOpenInfo)
46 : {
47 : // WARNING: if making changes in that method, reflect them in
48 : // OGRFeatherDriverIsArrowIPCStreamBasic() in ogrfeatherdrivercore.cpp
49 :
50 543 : if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
51 3 : return true;
52 :
53 540 : constexpr int CONTINUATION_SIZE = 4; // 0xFFFFFFFF
54 540 : constexpr int METADATA_SIZE_SIZE = 4;
55 :
56 : // See
57 : // https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
58 540 : if (poOpenInfo->fpL != nullptr &&
59 540 : poOpenInfo->nHeaderBytes >= CONTINUATION_SIZE + METADATA_SIZE_SIZE &&
60 540 : memcmp(poOpenInfo->pabyHeader, "\xFF\xFF\xFF\xFF", CONTINUATION_SIZE) ==
61 : 0)
62 : {
63 8 : const char *pszExt = CPLGetExtension(poOpenInfo->pszFilename);
64 8 : if (EQUAL(pszExt, "arrows") || EQUAL(pszExt, "ipc"))
65 3 : return true;
66 :
67 5 : const uint32_t nMetadataSize =
68 5 : CPL_LSBUINT32PTR(poOpenInfo->pabyHeader + CONTINUATION_SIZE);
69 5 : if (strcmp(poOpenInfo->pszFilename, "/vsistdin/") == 0)
70 : {
71 : // Padding after metadata and before body is not necessarily present
72 : // but the body must be at least 4 bytes
73 0 : constexpr int PADDING_MAX_SIZE = 4;
74 :
75 : // /vsistdin/ cannot seek back beyond first MB
76 0 : if (nMetadataSize >
77 : 1024 * 1024 -
78 : (CONTINUATION_SIZE + METADATA_SIZE_SIZE + PADDING_MAX_SIZE))
79 : {
80 0 : return false;
81 : }
82 0 : const int nSizeToRead = CONTINUATION_SIZE + METADATA_SIZE_SIZE +
83 0 : nMetadataSize + PADDING_MAX_SIZE;
84 0 : if (!poOpenInfo->TryToIngest(nSizeToRead))
85 : {
86 0 : return false;
87 : }
88 :
89 : const std::string osTmpFilename(
90 0 : CPLSPrintf("/vsimem/_arrow/%p", poOpenInfo));
91 : auto fp = VSIVirtualHandleUniquePtr(VSIFileFromMemBuffer(
92 : osTmpFilename.c_str(), poOpenInfo->pabyHeader, nSizeToRead,
93 0 : false));
94 : auto infile =
95 0 : std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
96 0 : auto options = arrow::ipc::IpcReadOptions::Defaults();
97 : auto result =
98 0 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
99 0 : CPLDebug("ARROW", "RecordBatchStreamReader::Open(): %s",
100 0 : result.status().message().c_str());
101 0 : VSIUnlink(osTmpFilename.c_str());
102 0 : return result.ok();
103 : }
104 :
105 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_END);
106 5 : const auto nFileSize = VSIFTellL(poOpenInfo->fpL);
107 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
108 5 : if (nMetadataSize >
109 5 : nFileSize - (CONTINUATION_SIZE + METADATA_SIZE_SIZE))
110 0 : return false;
111 :
112 : // Do not give ownership of poOpenInfo->fpL to infile
113 : auto infile =
114 10 : std::make_shared<OGRArrowRandomAccessFile>(poOpenInfo->fpL, false);
115 10 : auto options = arrow::ipc::IpcReadOptions::Defaults();
116 : auto result =
117 10 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
118 5 : VSIFSeekL(poOpenInfo->fpL, 0, SEEK_SET);
119 5 : return result.ok();
120 : }
121 532 : return false;
122 : }
123 :
124 : /************************************************************************/
125 : /* Open() */
126 : /************************************************************************/
127 :
128 543 : static GDALDataset *OGRFeatherDriverOpen(GDALOpenInfo *poOpenInfo)
129 : {
130 543 : if (poOpenInfo->eAccess == GA_Update)
131 : {
132 0 : return nullptr;
133 : }
134 :
135 543 : const bool bIsStreamingFormat = IsArrowIPCStream(poOpenInfo);
136 543 : if (!bIsStreamingFormat && !OGRFeatherDriverIsArrowFileFormat(poOpenInfo))
137 : {
138 0 : return nullptr;
139 : }
140 :
141 543 : std::shared_ptr<arrow::io::RandomAccessFile> infile;
142 543 : if (STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:"))
143 : {
144 3 : const std::string osFilename(poOpenInfo->pszFilename +
145 3 : strlen("ARROW_IPC_STREAM:"));
146 : auto fp =
147 3 : VSIVirtualHandleUniquePtr(VSIFOpenL(osFilename.c_str(), "rb"));
148 3 : if (fp == nullptr)
149 : {
150 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot open %s",
151 : osFilename.c_str());
152 1 : return nullptr;
153 : }
154 2 : infile = std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
155 : }
156 943 : else if (STARTS_WITH(poOpenInfo->pszFilename, "/vsi") ||
157 403 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "NO")))
158 : {
159 137 : VSIVirtualHandleUniquePtr fp(poOpenInfo->fpL);
160 137 : poOpenInfo->fpL = nullptr;
161 137 : infile = std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
162 : }
163 : else
164 : {
165 806 : auto result = arrow::io::ReadableFile::Open(poOpenInfo->pszFilename);
166 403 : if (!result.ok())
167 : {
168 0 : CPLError(CE_Failure, CPLE_AppDefined,
169 : "ReadableFile::Open() failed with %s",
170 0 : result.status().message().c_str());
171 0 : return nullptr;
172 : }
173 403 : infile = *result;
174 : }
175 :
176 : auto poMemoryPool = std::shared_ptr<arrow::MemoryPool>(
177 1084 : arrow::MemoryPool::CreateDefault().release());
178 1084 : auto options = arrow::ipc::IpcReadOptions::Defaults();
179 542 : options.memory_pool = poMemoryPool.get();
180 :
181 1084 : auto poDS = std::make_unique<OGRFeatherDataset>(poMemoryPool);
182 542 : if (bIsStreamingFormat)
183 : {
184 : auto result =
185 10 : arrow::ipc::RecordBatchStreamReader::Open(infile, options);
186 10 : if (!result.ok())
187 : {
188 1 : CPLError(CE_Failure, CPLE_AppDefined,
189 : "RecordBatchStreamReader::Open() failed with %s",
190 1 : result.status().message().c_str());
191 1 : return nullptr;
192 : }
193 18 : auto poRecordBatchStreamReader = *result;
194 9 : const bool bSeekable =
195 16 : !STARTS_WITH_CI(poOpenInfo->pszFilename, "ARROW_IPC_STREAM:") &&
196 7 : strcmp(poOpenInfo->pszFilename, "/vsistdin/") != 0;
197 18 : std::string osLayername = CPLGetBasename(poOpenInfo->pszFilename);
198 9 : if (osLayername.empty())
199 0 : osLayername = "layer";
200 : auto poLayer = std::make_unique<OGRFeatherLayer>(
201 9 : poDS.get(), osLayername.c_str(), infile, bSeekable, options,
202 18 : poRecordBatchStreamReader);
203 9 : poDS->SetLayer(std::move(poLayer));
204 :
205 : // Pre-load field domains, as this depends on the first record batch
206 9 : auto poLayerPtr = poDS->GetLayer(0);
207 9 : const auto poFeatureDefn = poLayerPtr->GetLayerDefn();
208 9 : bool bHasReadBatch = false;
209 534 : for (int i = 0; i < poFeatureDefn->GetFieldCount(); ++i)
210 : {
211 525 : const auto poFieldDefn = poFeatureDefn->GetFieldDefn(i);
212 525 : const auto &osDomainName = poFieldDefn->GetDomainName();
213 525 : if (!osDomainName.empty())
214 : {
215 6 : if (!bHasReadBatch)
216 : {
217 6 : bHasReadBatch = true;
218 6 : delete poLayerPtr->GetNextFeature();
219 6 : poLayerPtr->ResetReading();
220 : }
221 6 : poDS->GetFieldDomain(osDomainName);
222 : }
223 : }
224 : }
225 : else
226 : {
227 532 : auto result = arrow::ipc::RecordBatchFileReader::Open(infile, options);
228 532 : if (!result.ok())
229 : {
230 1 : CPLError(CE_Failure, CPLE_AppDefined,
231 : "RecordBatchFileReader::Open() failed with %s",
232 1 : result.status().message().c_str());
233 1 : return nullptr;
234 : }
235 1062 : auto poRecordBatchReader = *result;
236 : auto poLayer = std::make_unique<OGRFeatherLayer>(
237 531 : poDS.get(), CPLGetBasename(poOpenInfo->pszFilename),
238 531 : poRecordBatchReader);
239 531 : poDS->SetLayer(std::move(poLayer));
240 : }
241 540 : return poDS.release();
242 : }
243 :
244 : /************************************************************************/
245 : /* Create() */
246 : /************************************************************************/
247 :
248 139 : static GDALDataset *OGRFeatherDriverCreate(const char *pszName, int nXSize,
249 : int nYSize, int nBands,
250 : GDALDataType eType,
251 : char ** /* papszOptions */)
252 : {
253 139 : if (!(nXSize == 0 && nYSize == 0 && nBands == 0 && eType == GDT_Unknown))
254 0 : return nullptr;
255 :
256 139 : std::shared_ptr<arrow::io::OutputStream> out_file;
257 144 : if (STARTS_WITH(pszName, "/vsi") ||
258 5 : CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "YES")))
259 : {
260 139 : VSILFILE *fp = VSIFOpenL(pszName, "wb");
261 139 : if (fp == nullptr)
262 : {
263 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s", pszName);
264 1 : return nullptr;
265 : }
266 138 : out_file = std::make_shared<OGRArrowWritableFile>(fp);
267 : }
268 : else
269 : {
270 0 : auto result = arrow::io::FileOutputStream::Open(pszName);
271 0 : if (!result.ok())
272 : {
273 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create %s: %s", pszName,
274 0 : result.status().message().c_str());
275 0 : return nullptr;
276 : }
277 0 : out_file = *result;
278 : }
279 :
280 138 : return new OGRFeatherWriterDataset(pszName, out_file);
281 : }
282 :
283 : /************************************************************************/
284 : /* OGRFeatherDriver() */
285 : /************************************************************************/
286 :
287 : class OGRFeatherDriver final : public GDALDriver
288 : {
289 : bool m_bMetadataInitialized = false;
290 : void InitMetadata();
291 :
292 : public:
293 1347 : const char *GetMetadataItem(const char *pszName,
294 : const char *pszDomain) override
295 : {
296 1347 : if (EQUAL(pszName, GDAL_DS_LAYER_CREATIONOPTIONLIST))
297 : {
298 259 : InitMetadata();
299 : }
300 1347 : return GDALDriver::GetMetadataItem(pszName, pszDomain);
301 : }
302 :
303 133 : char **GetMetadata(const char *pszDomain) override
304 : {
305 133 : InitMetadata();
306 133 : return GDALDriver::GetMetadata(pszDomain);
307 : }
308 : };
309 :
310 392 : void OGRFeatherDriver::InitMetadata()
311 : {
312 392 : if (m_bMetadataInitialized)
313 385 : return;
314 7 : m_bMetadataInitialized = true;
315 :
316 : CPLXMLTreeCloser oTree(
317 14 : CPLCreateXMLNode(nullptr, CXT_Element, "LayerCreationOptionList"));
318 :
319 14 : std::vector<const char *> apszCompressionMethods;
320 7 : bool bHasLZ4 = false;
321 21 : for (const char *pszMethod : {"ZSTD", "LZ4"})
322 : {
323 : auto oResult = arrow::util::Codec::GetCompressionType(
324 28 : CPLString(pszMethod).tolower());
325 14 : if (oResult.ok() && arrow::util::Codec::IsAvailable(*oResult))
326 : {
327 14 : if (EQUAL(pszMethod, "LZ4"))
328 7 : bHasLZ4 = true;
329 14 : apszCompressionMethods.emplace_back(pszMethod);
330 : }
331 : }
332 :
333 : {
334 7 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
335 7 : CPLAddXMLAttributeAndValue(psOption, "name", "FORMAT");
336 7 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
337 7 : CPLAddXMLAttributeAndValue(psOption, "description",
338 : "File format variant");
339 21 : for (const char *pszEncoding : {"FILE", "STREAM"})
340 : {
341 14 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
342 14 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
343 : }
344 : }
345 :
346 : {
347 7 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
348 7 : CPLAddXMLAttributeAndValue(psOption, "name", "COMPRESSION");
349 7 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
350 7 : CPLAddXMLAttributeAndValue(psOption, "description",
351 : "Compression method");
352 7 : CPLAddXMLAttributeAndValue(psOption, "default",
353 : bHasLZ4 ? "LZ4" : "NONE");
354 : {
355 7 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
356 7 : CPLAddXMLAttributeAndValue(poValueNode, "alias", "UNCOMPRESSED");
357 7 : CPLCreateXMLNode(poValueNode, CXT_Text, "NONE");
358 : }
359 21 : for (const char *pszMethod : apszCompressionMethods)
360 : {
361 14 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
362 14 : CPLCreateXMLNode(poValueNode, CXT_Text, pszMethod);
363 : }
364 : }
365 :
366 : {
367 7 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
368 7 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_ENCODING");
369 7 : CPLAddXMLAttributeAndValue(psOption, "type", "string-select");
370 7 : CPLAddXMLAttributeAndValue(psOption, "description",
371 : "Encoding of geometry columns");
372 7 : CPLAddXMLAttributeAndValue(psOption, "default", "GEOARROW");
373 28 : for (const char *pszEncoding :
374 35 : {"GEOARROW", "GEOARROW_INTERLEAVED", "WKB", "WKT"})
375 : {
376 28 : auto poValueNode = CPLCreateXMLNode(psOption, CXT_Element, "Value");
377 28 : CPLCreateXMLNode(poValueNode, CXT_Text, pszEncoding);
378 28 : if (EQUAL(pszEncoding, "GEOARROW"))
379 7 : CPLAddXMLAttributeAndValue(poValueNode, "alias",
380 : "GEOARROW_STRUCT");
381 : }
382 : }
383 :
384 : {
385 7 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
386 7 : CPLAddXMLAttributeAndValue(psOption, "name", "BATCH_SIZE");
387 7 : CPLAddXMLAttributeAndValue(psOption, "type", "integer");
388 7 : CPLAddXMLAttributeAndValue(psOption, "description",
389 : "Maximum number of rows per batch");
390 7 : CPLAddXMLAttributeAndValue(psOption, "default", "65536");
391 : }
392 :
393 : {
394 7 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
395 7 : CPLAddXMLAttributeAndValue(psOption, "name", "GEOMETRY_NAME");
396 7 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
397 7 : CPLAddXMLAttributeAndValue(psOption, "description",
398 : "Name of geometry column");
399 7 : CPLAddXMLAttributeAndValue(psOption, "default", "geometry");
400 : }
401 :
402 : {
403 7 : auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option");
404 7 : CPLAddXMLAttributeAndValue(psOption, "name", "FID");
405 7 : CPLAddXMLAttributeAndValue(psOption, "type", "string");
406 7 : CPLAddXMLAttributeAndValue(psOption, "description",
407 : "Name of the FID column to create");
408 : }
409 :
410 7 : char *pszXML = CPLSerializeXMLTree(oTree.get());
411 7 : GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML);
412 7 : CPLFree(pszXML);
413 : }
414 :
415 : /************************************************************************/
416 : /* RegisterOGRArrow() */
417 : /************************************************************************/
418 :
419 11 : void RegisterOGRArrow()
420 : {
421 11 : if (GDALGetDriverByName(DRIVER_NAME) != nullptr)
422 0 : return;
423 :
424 22 : auto poDriver = std::make_unique<OGRFeatherDriver>();
425 :
426 11 : OGRFeatherDriverSetCommonMetadata(poDriver.get());
427 :
428 11 : poDriver->pfnOpen = OGRFeatherDriverOpen;
429 11 : poDriver->pfnCreate = OGRFeatherDriverCreate;
430 :
431 11 : GetGDALDriverManager()->RegisterDriver(poDriver.release());
432 : }
|