Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: GDAL
4 : * Purpose: Zarr driver. Virtual file system for
5 : * https://fsspec.github.io/kerchunk/spec.html#version-1
6 : * Author: Even Rouault <even dot rouault at spatialys.com>
7 : *
8 : ******************************************************************************
9 : * Copyright (c) 2025, Even Rouault <even dot rouault at spatialys.com>
10 : *
11 : * SPDX-License-Identifier: MIT
12 : ****************************************************************************/
13 :
14 : #undef _REENTRANT
15 :
16 : #include "vsikerchunk.h"
17 : #include "vsikerchunk_inline.hpp"
18 :
19 : #include "cpl_conv.h"
20 : #include "cpl_json.h"
21 : #include "cpl_json_streaming_parser.h"
22 : #include "cpl_json_streaming_writer.h"
23 : #include "cpl_mem_cache.h"
24 : #include "cpl_vsi_virtual.h"
25 :
26 : #include "gdal_priv.h"
27 : #include "ogrsf_frmts.h"
28 :
29 : #include <cerrno>
30 : #include <cinttypes>
31 : #include <limits>
32 : #include <mutex>
33 : #include <set>
34 : #include <utility>
35 :
36 : /************************************************************************/
37 : /* VSIKerchunkKeyInfo */
38 : /************************************************************************/
39 :
40 : struct VSIKerchunkKeyInfo
41 : {
42 : // points to an element in VSIKerchunkRefFile::m_oSetURI
43 : const std::string *posURI = nullptr;
44 :
45 : uint64_t nOffset = 0;
46 : uint32_t nSize = 0;
47 : std::vector<GByte> abyValue{};
48 : };
49 :
50 : /************************************************************************/
51 : /* VSIKerchunkRefFile */
52 : /************************************************************************/
53 :
54 : class VSIKerchunkRefFile
55 : {
56 : private:
57 : std::set<std::string> m_oSetURI{};
58 : std::map<std::string, VSIKerchunkKeyInfo> m_oMapKeys{};
59 :
60 : public:
61 536 : const std::map<std::string, VSIKerchunkKeyInfo> &GetMapKeys() const
62 : {
63 536 : return m_oMapKeys;
64 : }
65 :
66 125 : void AddInlineContent(const std::string &key, std::vector<GByte> &&abyValue)
67 : {
68 250 : VSIKerchunkKeyInfo info;
69 125 : info.abyValue = std::move(abyValue);
70 125 : m_oMapKeys[key] = std::move(info);
71 125 : }
72 :
73 128 : bool AddInlineContent(const std::string &key, const std::string_view &str)
74 : {
75 256 : std::vector<GByte> abyValue;
76 128 : if (cpl::starts_with(str, "base64:"))
77 : {
78 : abyValue.insert(
79 0 : abyValue.end(),
80 4 : reinterpret_cast<const GByte *>(str.data()) + strlen("base64:"),
81 4 : reinterpret_cast<const GByte *>(str.data()) + str.size());
82 4 : abyValue.push_back(0);
83 4 : const int nSize = CPLBase64DecodeInPlace(abyValue.data());
84 4 : if (nSize == 0)
85 : {
86 3 : CPLError(CE_Failure, CPLE_AppDefined,
87 : "VSIKerchunkJSONRefFileSystem: Base64 decoding "
88 : "failed for key '%s'",
89 : key.c_str());
90 3 : return false;
91 : }
92 1 : abyValue.resize(nSize);
93 : }
94 : else
95 : {
96 : abyValue.insert(
97 124 : abyValue.end(), reinterpret_cast<const GByte *>(str.data()),
98 248 : reinterpret_cast<const GByte *>(str.data()) + str.size());
99 : }
100 :
101 125 : AddInlineContent(key, std::move(abyValue));
102 125 : return true;
103 : }
104 :
105 20 : void AddReferencedContent(const std::string &key, const std::string &osURI,
106 : uint64_t nOffset, uint32_t nSize)
107 : {
108 20 : auto oPair = m_oSetURI.insert(osURI);
109 :
110 40 : VSIKerchunkKeyInfo info;
111 20 : info.posURI = &(*(oPair.first));
112 20 : info.nOffset = nOffset;
113 20 : info.nSize = nSize;
114 20 : m_oMapKeys[key] = std::move(info);
115 20 : }
116 :
117 : bool ConvertToParquetRef(const std::string &osCacheDir,
118 : GDALProgressFunc pfnProgress, void *pProgressData);
119 : };
120 :
121 : /************************************************************************/
122 : /* VSIKerchunkJSONRefFileSystem */
123 : /************************************************************************/
124 :
125 : class VSIKerchunkJSONRefFileSystem final : public VSIFilesystemHandler
126 : {
127 : public:
128 1741 : VSIKerchunkJSONRefFileSystem()
129 1741 : {
130 1741 : IsFileSystemInstantiated() = true;
131 1741 : }
132 :
133 2242 : ~VSIKerchunkJSONRefFileSystem() override
134 1121 : {
135 1121 : IsFileSystemInstantiated() = false;
136 2242 : }
137 :
138 4603 : static bool &IsFileSystemInstantiated()
139 : {
140 : static bool bIsFileSystemInstantiated = false;
141 4603 : return bIsFileSystemInstantiated;
142 : }
143 :
144 : VSIVirtualHandleUniquePtr Open(const char *pszFilename,
145 : const char *pszAccess, bool bSetError,
146 : CSLConstList papszOptions) override;
147 :
148 : int Stat(const char *pszFilename, VSIStatBufL *pStatBuf,
149 : int nFlags) override;
150 :
151 : char **ReadDirEx(const char *pszDirname, int nMaxFiles) override;
152 :
153 : private:
154 : friend bool VSIKerchunkConvertJSONToParquet(const char *pszSrcJSONFilename,
155 : const char *pszDstDirname,
156 : GDALProgressFunc pfnProgress,
157 : void *pProgressData);
158 :
159 : lru11::Cache<std::string, std::shared_ptr<VSIKerchunkRefFile>, std::mutex>
160 : m_oCache{};
161 :
162 : static std::pair<std::string, std::string>
163 : SplitFilename(const char *pszFilename);
164 :
165 : std::pair<std::shared_ptr<VSIKerchunkRefFile>, std::string>
166 : Load(const std::string &osJSONFilename, bool bUseCache);
167 : std::shared_ptr<VSIKerchunkRefFile>
168 : LoadInternal(const std::string &osJSONFilename,
169 : GDALProgressFunc pfnProgress, void *pProgressData);
170 : std::shared_ptr<VSIKerchunkRefFile>
171 : LoadStreaming(const std::string &osJSONFilename,
172 : GDALProgressFunc pfnProgress, void *pProgressData);
173 : };
174 :
175 : /************************************************************************/
176 : /* VSIKerchunkJSONRefFileSystem::SplitFilename() */
177 : /************************************************************************/
178 :
179 : /*static*/
180 : std::pair<std::string, std::string>
181 355 : VSIKerchunkJSONRefFileSystem::SplitFilename(const char *pszFilename)
182 : {
183 355 : if (STARTS_WITH(pszFilename, JSON_REF_FS_PREFIX))
184 299 : pszFilename += strlen(JSON_REF_FS_PREFIX);
185 56 : else if (STARTS_WITH(pszFilename, JSON_REF_CACHED_FS_PREFIX))
186 56 : pszFilename += strlen(JSON_REF_CACHED_FS_PREFIX);
187 : else
188 0 : return {std::string(), std::string()};
189 :
190 710 : std::string osJSONFilename;
191 :
192 355 : if (*pszFilename == '{')
193 : {
194 : // Parse /vsikerchunk_json_ref/{/path/to/some.json}[key]
195 327 : int nLevel = 1;
196 327 : ++pszFilename;
197 22973 : for (; *pszFilename; ++pszFilename)
198 : {
199 22963 : if (*pszFilename == '{')
200 : {
201 0 : ++nLevel;
202 : }
203 22963 : else if (*pszFilename == '}')
204 : {
205 317 : --nLevel;
206 317 : if (nLevel == 0)
207 : {
208 317 : ++pszFilename;
209 317 : break;
210 : }
211 : }
212 22646 : osJSONFilename += *pszFilename;
213 : }
214 327 : if (nLevel != 0)
215 : {
216 10 : CPLError(CE_Failure, CPLE_AppDefined,
217 : "Invalid %s syntax: should be "
218 : "%s{/path/to/some/file}[/optional_key]",
219 : JSON_REF_FS_PREFIX, JSON_REF_FS_PREFIX);
220 10 : return {std::string(), std::string()};
221 : }
222 :
223 : return {osJSONFilename,
224 634 : *pszFilename == '/' ? pszFilename + 1 : pszFilename};
225 : }
226 : else
227 : {
228 28 : int nCountDotJson = 0;
229 28 : const char *pszIter = pszFilename;
230 28 : const char *pszAfterJSON = nullptr;
231 46 : while ((pszIter = strstr(pszIter, ".json")) != nullptr)
232 : {
233 18 : ++nCountDotJson;
234 18 : if (nCountDotJson == 1)
235 18 : pszAfterJSON = pszIter + strlen(".json");
236 : else
237 0 : pszAfterJSON = nullptr;
238 18 : pszIter += strlen(".json");
239 : }
240 28 : if (!pszAfterJSON)
241 : {
242 10 : if (nCountDotJson >= 2)
243 : {
244 0 : CPLError(CE_Failure, CPLE_AppDefined,
245 : "Ambiguous %s syntax: should be "
246 : "%s{/path/to/some/file}[/optional_key]",
247 : JSON_REF_FS_PREFIX, JSON_REF_FS_PREFIX);
248 : }
249 : else
250 : {
251 10 : CPLError(CE_Failure, CPLE_AppDefined,
252 : "Invalid %s syntax: should be "
253 : "%s/path/to/some.json[/optional_key] or "
254 : "%s{/path/to/some/file}[/optional_key]",
255 : JSON_REF_FS_PREFIX, JSON_REF_FS_PREFIX,
256 : JSON_REF_FS_PREFIX);
257 : }
258 10 : return {std::string(), std::string()};
259 : }
260 36 : return {std::string(pszFilename, pszAfterJSON - pszFilename),
261 36 : *pszAfterJSON == '/' ? pszAfterJSON + 1 : pszAfterJSON};
262 : }
263 : }
264 :
265 : /************************************************************************/
266 : /* class VSIKerchunkJSONRefParser */
267 : /************************************************************************/
268 :
269 : namespace
270 : {
271 : class VSIKerchunkJSONRefParser final : public CPLJSonStreamingParser
272 : {
273 : public:
274 71 : explicit VSIKerchunkJSONRefParser(
275 : const std::shared_ptr<VSIKerchunkRefFile> &refFile)
276 71 : : m_refFile(refFile)
277 : {
278 71 : m_oWriter.SetPrettyFormatting(false);
279 71 : }
280 :
281 71 : ~VSIKerchunkJSONRefParser() override
282 71 : {
283 : // In case the parsing would be stopped, the writer may be in
284 : // an inconsistent state. This avoids assertion in debug mode.
285 71 : m_oWriter.clear();
286 71 : }
287 :
288 : protected:
289 77 : void String(const char *pszValue, size_t nLength) override
290 : {
291 77 : if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 0)
292 : {
293 9 : if (nLength > 0 && pszValue[nLength - 1] == 0)
294 2 : --nLength;
295 :
296 9 : if (!m_refFile->AddInlineContent(
297 9 : m_osCurKey, std::string_view(pszValue, nLength)))
298 : {
299 2 : StopParsing();
300 : }
301 :
302 9 : m_oWriter.clear();
303 :
304 9 : m_osCurKey.clear();
305 : }
306 68 : else if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 1)
307 : {
308 42 : if (m_iArrayMemberIdx == 0)
309 : {
310 36 : m_osURI.assign(pszValue, nLength);
311 : }
312 : else
313 : {
314 6 : UnexpectedContentInArray();
315 : }
316 : }
317 26 : else if (m_nLevel > m_nKeyLevel)
318 : {
319 24 : m_oWriter.Add(std::string_view(pszValue, nLength));
320 : }
321 77 : }
322 :
323 161 : void Number(const char *pszValue, size_t nLength) override
324 : {
325 161 : if (m_nLevel == m_nKeyLevel)
326 : {
327 52 : if (m_nArrayLevel == 1)
328 : {
329 48 : if (m_iArrayMemberIdx == 1)
330 : {
331 27 : m_osTmpForNumber.assign(pszValue, nLength);
332 27 : errno = 0;
333 27 : m_nOffset =
334 27 : std::strtoull(m_osTmpForNumber.c_str(), nullptr, 10);
335 50 : if (errno != 0 || m_osTmpForNumber[0] == '-' ||
336 23 : m_osTmpForNumber.find('.') != std::string::npos)
337 : {
338 6 : CPLError(
339 : CE_Failure, CPLE_AppDefined,
340 : "VSIKerchunkJSONRefFileSystem: array value at "
341 : "index 1 for key '%s' is not an unsigned 64 bit "
342 : "integer",
343 : m_osCurKey.c_str());
344 6 : StopParsing();
345 : }
346 : }
347 21 : else if (m_iArrayMemberIdx == 2)
348 : {
349 19 : m_osTmpForNumber.assign(pszValue, nLength);
350 19 : errno = 0;
351 : const uint64_t nSize =
352 19 : std::strtoull(m_osTmpForNumber.c_str(), nullptr, 10);
353 19 : if (errno != 0 || m_osTmpForNumber[0] == '-' ||
354 53 : nSize > std::numeric_limits<uint32_t>::max() ||
355 15 : m_osTmpForNumber.find('.') != std::string::npos)
356 : {
357 6 : CPLError(
358 : CE_Failure, CPLE_AppDefined,
359 : "VSIKerchunkJSONRefFileSystem: array value at "
360 : "index 2 for key '%s' is not an unsigned 32 bit "
361 : "integer",
362 : m_osCurKey.c_str());
363 6 : StopParsing();
364 : }
365 : else
366 : {
367 13 : m_nSize = static_cast<uint32_t>(nSize);
368 : }
369 : }
370 : else
371 : {
372 2 : UnexpectedContentInArray();
373 : }
374 : }
375 : else
376 : {
377 4 : UnexpectedContent();
378 : }
379 : }
380 109 : else if (m_nLevel > m_nKeyLevel)
381 : {
382 108 : m_oWriter.AddSerializedValue(std::string_view(pszValue, nLength));
383 : }
384 161 : }
385 :
386 4 : void Boolean(bool b) override
387 : {
388 4 : if (m_nLevel == m_nKeyLevel)
389 : {
390 4 : UnexpectedContent();
391 : }
392 0 : else if (m_nLevel > m_nKeyLevel)
393 : {
394 0 : m_oWriter.Add(b);
395 : }
396 4 : }
397 :
398 28 : void Null() override
399 : {
400 28 : if (m_nLevel == m_nKeyLevel)
401 : {
402 4 : UnexpectedContent();
403 : }
404 24 : else if (m_nLevel > m_nKeyLevel)
405 : {
406 24 : m_oWriter.AddNull();
407 : }
408 28 : }
409 :
410 168 : void StartObject() override
411 : {
412 168 : if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 1)
413 : {
414 2 : UnexpectedContentInArray();
415 : }
416 : else
417 : {
418 166 : if (m_nLevel >= m_nKeyLevel)
419 : {
420 95 : m_oWriter.StartObj();
421 : }
422 166 : ++m_nLevel;
423 166 : m_bFirstMember = true;
424 : }
425 168 : }
426 :
427 124 : void EndObject() override
428 : {
429 124 : if (m_nLevel == m_nKeyLevel)
430 : {
431 28 : FinishObjectValueProcessing();
432 : }
433 124 : --m_nLevel;
434 124 : if (m_nLevel >= m_nKeyLevel)
435 : {
436 95 : m_oWriter.EndObj();
437 : }
438 124 : }
439 :
440 329 : void StartObjectMember(const char *pszKey, size_t nLength) override
441 : {
442 329 : if (m_nLevel == 1 && m_bFirstMember)
443 : {
444 82 : if (nLength == strlen("version") &&
445 68 : memcmp(pszKey, "version", nLength) == 0)
446 : {
447 3 : m_nKeyLevel = 2;
448 : }
449 : else
450 : {
451 79 : m_nKeyLevel = 1;
452 : }
453 : }
454 247 : else if (m_nLevel == 1 && m_nKeyLevel == 2 &&
455 1 : nLength == strlen("templates") &&
456 1 : memcmp(pszKey, "templates", nLength) == 0)
457 : {
458 1 : CPLError(CE_Failure, CPLE_NotSupported,
459 : "VSIKerchunkJSONRefFileSystem: 'templates' key found, but "
460 : "not supported");
461 1 : StopParsing();
462 : }
463 246 : else if (m_nLevel == 1 && m_nKeyLevel == 2 &&
464 1 : nLength == strlen("gen") &&
465 1 : memcmp(pszKey, "gen", nLength) == 0)
466 : {
467 1 : CPLError(CE_Failure, CPLE_NotSupported,
468 : "VSIKerchunkJSONRefFileSystem: 'gen' key found, but not "
469 : "supported");
470 1 : StopParsing();
471 : }
472 :
473 329 : if (m_nLevel == m_nKeyLevel)
474 : {
475 158 : FinishObjectValueProcessing();
476 158 : m_osCurKey.assign(pszKey, nLength);
477 : }
478 171 : else if (m_nLevel > m_nKeyLevel)
479 : {
480 163 : m_oWriter.AddObjKey(std::string_view(pszKey, nLength));
481 : }
482 329 : m_bFirstMember = false;
483 329 : }
484 :
485 82 : void StartArray() override
486 : {
487 82 : if (m_nLevel == m_nKeyLevel)
488 : {
489 46 : if (m_nArrayLevel == 0)
490 : {
491 46 : m_iArrayMemberIdx = -1;
492 46 : m_osURI.clear();
493 46 : m_nOffset = 0;
494 46 : m_nSize = 0;
495 46 : m_nArrayLevel = 1;
496 : }
497 : else
498 : {
499 0 : UnexpectedContentInArray();
500 : }
501 : }
502 36 : else if (m_nLevel > m_nKeyLevel)
503 : {
504 36 : m_oWriter.StartArray();
505 36 : ++m_nArrayLevel;
506 : }
507 82 : }
508 :
509 56 : void EndArray() override
510 : {
511 56 : if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 1)
512 : {
513 20 : if (m_iArrayMemberIdx == -1)
514 : {
515 2 : CPLError(CE_Failure, CPLE_AppDefined,
516 : "VSIKerchunkJSONRefFileSystem: array value for key "
517 : "'%s' is not of size 1 or 3",
518 : m_osCurKey.c_str());
519 2 : StopParsing();
520 : }
521 : else
522 : {
523 18 : m_refFile->AddReferencedContent(m_osCurKey, m_osURI, m_nOffset,
524 : m_nSize);
525 18 : --m_nArrayLevel;
526 18 : m_oWriter.clear();
527 18 : m_osCurKey.clear();
528 : }
529 : }
530 36 : else if (m_nLevel >= m_nKeyLevel)
531 : {
532 36 : --m_nArrayLevel;
533 36 : if (m_nLevel > m_nKeyLevel)
534 36 : m_oWriter.EndArray();
535 : }
536 56 : }
537 :
538 125 : void StartArrayMember() override
539 : {
540 125 : if (m_nLevel >= m_nKeyLevel)
541 125 : ++m_iArrayMemberIdx;
542 125 : }
543 :
544 2 : void Exception(const char *pszMessage) override
545 : {
546 2 : CPLError(CE_Failure, CPLE_AppDefined, "%s", pszMessage);
547 2 : }
548 :
549 : private:
550 : std::shared_ptr<VSIKerchunkRefFile> m_refFile{};
551 : int m_nLevel = 0;
552 : int m_nArrayLevel = 0;
553 : int m_iArrayMemberIdx = -1;
554 : bool m_bFirstMember = false;
555 : int m_nKeyLevel = std::numeric_limits<int>::max();
556 : std::string m_osCurKey{};
557 : std::string m_osURI{};
558 : std::string m_osTmpForNumber{};
559 : uint64_t m_nOffset = 0;
560 : uint32_t m_nSize = 0;
561 :
562 : CPLJSonStreamingWriter m_oWriter{nullptr, nullptr};
563 :
564 186 : void FinishObjectValueProcessing()
565 : {
566 186 : if (!m_osCurKey.empty())
567 : {
568 93 : const std::string &osStr = m_oWriter.GetString();
569 93 : CPL_IGNORE_RET_VAL(m_refFile->AddInlineContent(m_osCurKey, osStr));
570 :
571 93 : m_oWriter.clear();
572 :
573 93 : m_osCurKey.clear();
574 : }
575 186 : }
576 :
577 12 : void UnexpectedContent()
578 : {
579 12 : CPLError(CE_Failure, CPLE_AppDefined, "Unexpected content");
580 12 : StopParsing();
581 12 : }
582 :
583 10 : void UnexpectedContentInArray()
584 : {
585 10 : CPLError(CE_Failure, CPLE_AppDefined,
586 : "Unexpected content at position %d of array",
587 : m_iArrayMemberIdx);
588 10 : StopParsing();
589 10 : }
590 : };
591 : } // namespace
592 :
593 : /************************************************************************/
594 : /* VSIKerchunkJSONRefFileSystem::LoadStreaming() */
595 : /************************************************************************/
596 :
597 : std::shared_ptr<VSIKerchunkRefFile>
598 71 : VSIKerchunkJSONRefFileSystem::LoadStreaming(const std::string &osJSONFilename,
599 : GDALProgressFunc pfnProgress,
600 : void *pProgressData)
601 : {
602 142 : auto refFile = std::make_shared<VSIKerchunkRefFile>();
603 142 : VSIKerchunkJSONRefParser parser(refFile);
604 :
605 71 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem",
606 : "Using streaming parser for %s", osJSONFilename.c_str());
607 :
608 : // For network file systems, get the streaming version of the filename,
609 : // as we don't need arbitrary seeking in the file
610 : const std::string osFilename =
611 71 : VSIFileManager::GetHandler(osJSONFilename.c_str())
612 142 : ->GetStreamingFilename(osJSONFilename);
613 :
614 142 : auto f = VSIVirtualHandleUniquePtr(VSIFOpenL(osFilename.c_str(), "rb"));
615 71 : if (!f)
616 : {
617 2 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
618 : osJSONFilename.c_str());
619 2 : return nullptr;
620 : }
621 69 : uint64_t nTotalSize = 0;
622 69 : if (!cpl::starts_with(osFilename, "/vsigzip/"))
623 : {
624 69 : f->Seek(0, SEEK_END);
625 69 : nTotalSize = f->Tell();
626 69 : f->Seek(0, SEEK_SET);
627 : }
628 138 : std::string sBuffer;
629 69 : constexpr size_t BUFFER_SIZE = 10 * 1024 * 1024; // Arbitrary
630 69 : sBuffer.resize(BUFFER_SIZE);
631 : while (true)
632 : {
633 70 : const size_t nRead = f->Read(sBuffer.data(), 1, sBuffer.size());
634 70 : const bool bFinished = nRead < sBuffer.size();
635 : try
636 : {
637 70 : if (!parser.Parse(sBuffer.data(), nRead, bFinished))
638 : {
639 : // The parser will have emitted an error
640 42 : return nullptr;
641 : }
642 : }
643 0 : catch (const std::exception &e)
644 : {
645 : // Out-of-memory typically
646 0 : CPLError(CE_Failure, CPLE_AppDefined,
647 : "Exception occurred while parsing %s: %s",
648 0 : osJSONFilename.c_str(), e.what());
649 0 : return nullptr;
650 : }
651 28 : if (nTotalSize)
652 : {
653 27 : const double dfProgressRatio = static_cast<double>(f->Tell()) /
654 27 : static_cast<double>(nTotalSize);
655 27 : CPLDebug("VSIKerchunkJSONRefFileSystem", "%02.1f %% of %s read",
656 : 100 * dfProgressRatio, osJSONFilename.c_str());
657 28 : if (pfnProgress &&
658 1 : !pfnProgress(dfProgressRatio, "Parsing of JSON file",
659 : pProgressData))
660 : {
661 0 : return nullptr;
662 : }
663 : }
664 : else
665 : {
666 1 : CPLDebug("VSIKerchunkJSONRefFileSystem",
667 : "%" PRIu64 " bytes read in %s",
668 1 : static_cast<uint64_t>(f->Tell()), osJSONFilename.c_str());
669 : }
670 28 : if (nRead < sBuffer.size())
671 : {
672 27 : break;
673 : }
674 1 : }
675 27 : if (f->Tell() == 0)
676 : {
677 1 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
678 : osJSONFilename.c_str());
679 1 : return nullptr;
680 : }
681 26 : if (pfnProgress)
682 1 : pfnProgress(1.0, "Parsing of JSON file", pProgressData);
683 :
684 26 : return refFile;
685 : }
686 :
687 : /************************************************************************/
688 : /* VSIKerchunkJSONRefFileSystem::LoadInternal() */
689 : /************************************************************************/
690 :
691 : std::shared_ptr<VSIKerchunkRefFile>
692 109 : VSIKerchunkJSONRefFileSystem::LoadInternal(const std::string &osJSONFilename,
693 : GDALProgressFunc pfnProgress,
694 : void *pProgressData)
695 : {
696 109 : const char *pszUseStreamingParser = VSIGetPathSpecificOption(
697 : osJSONFilename.c_str(), "VSIKERCHUNK_USE_STREAMING_PARSER", "AUTO");
698 109 : if (EQUAL(pszUseStreamingParser, "AUTO"))
699 : {
700 : auto f =
701 50 : VSIVirtualHandleUniquePtr(VSIFOpenL(osJSONFilename.c_str(), "rb"));
702 50 : if (!f)
703 : {
704 6 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
705 : osJSONFilename.c_str());
706 6 : return nullptr;
707 : }
708 44 : std::string sBuffer;
709 44 : constexpr size_t HEADER_SIZE = 1024; // Arbitrary
710 44 : sBuffer.resize(HEADER_SIZE);
711 44 : const size_t nRead = f->Read(sBuffer.data(), 1, sBuffer.size());
712 44 : sBuffer.resize(nRead);
713 44 : if (ZARRIsLikelyStreamableKerchunkJSONRefContent(sBuffer))
714 : {
715 41 : return LoadStreaming(osJSONFilename, pfnProgress, pProgressData);
716 : }
717 : }
718 59 : else if (CPLTestBool(pszUseStreamingParser))
719 : {
720 30 : return LoadStreaming(osJSONFilename, pfnProgress, pProgressData);
721 : }
722 :
723 64 : CPLJSONDocument oDoc;
724 : {
725 : #if SIZEOF_VOIDP > 4
726 32 : CPLConfigOptionSetter oSetter("CPL_JSON_MAX_SIZE", "1GB", true);
727 : #endif
728 32 : if (!oDoc.Load(osJSONFilename))
729 : {
730 5 : CPLError(CE_Failure, CPLE_AppDefined,
731 : "VSIKerchunkJSONRefFileSystem: cannot open %s",
732 : osJSONFilename.c_str());
733 5 : return nullptr;
734 : }
735 : }
736 :
737 54 : auto oRoot = oDoc.GetRoot();
738 81 : const auto oVersion = oRoot.GetObj("version");
739 54 : CPLJSONObject oRefs;
740 27 : if (!oVersion.IsValid())
741 : {
742 : // Cf https://fsspec.github.io/kerchunk/spec.html#version-0
743 :
744 23 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem",
745 : "'version' key not found. Assuming version 0");
746 23 : oRefs = std::move(oRoot);
747 23 : if (!oRefs.GetObj(".zgroup").IsValid())
748 : {
749 0 : CPLError(CE_Failure, CPLE_AppDefined,
750 : "VSIKerchunkJSONRefFileSystem: '.zgroup' key not found");
751 0 : return nullptr;
752 : }
753 : }
754 4 : else if (oVersion.GetType() != CPLJSONObject::Type::Integer)
755 : {
756 4 : CPLError(CE_Failure, CPLE_AppDefined,
757 : "VSIKerchunkJSONRefFileSystem: 'version' key not integer");
758 4 : return nullptr;
759 : }
760 0 : else if (oVersion.ToInteger() != 1)
761 : {
762 0 : CPLError(CE_Failure, CPLE_NotSupported,
763 : "VSIKerchunkJSONRefFileSystem: 'version' = %d not handled",
764 : oVersion.ToInteger());
765 0 : return nullptr;
766 : }
767 : else
768 : {
769 : // Cf https://fsspec.github.io/kerchunk/spec.html#version-1
770 :
771 0 : if (oRoot.GetObj("templates").IsValid())
772 : {
773 0 : CPLError(CE_Failure, CPLE_NotSupported,
774 : "VSIKerchunkJSONRefFileSystem: 'templates' key found, but "
775 : "not supported");
776 0 : return nullptr;
777 : }
778 :
779 0 : if (oRoot.GetObj("gen").IsValid())
780 : {
781 0 : CPLError(CE_Failure, CPLE_NotSupported,
782 : "VSIKerchunkJSONRefFileSystem: 'gen' key found, but not "
783 : "supported");
784 0 : return nullptr;
785 : }
786 :
787 0 : oRefs = oRoot.GetObj("refs");
788 0 : if (!oRefs.IsValid())
789 : {
790 0 : CPLError(CE_Failure, CPLE_AppDefined,
791 : "VSIKerchunkJSONRefFileSystem: 'refs' key not found");
792 0 : return nullptr;
793 : }
794 :
795 0 : if (oRoot.GetObj("templates").IsValid())
796 : {
797 0 : CPLError(CE_Failure, CPLE_NotSupported,
798 : "VSIKerchunkJSONRefFileSystem: 'templates' key found but "
799 : "not supported");
800 0 : return nullptr;
801 : }
802 :
803 0 : if (oRoot.GetObj("templates").IsValid())
804 : {
805 0 : CPLError(CE_Failure, CPLE_NotSupported,
806 : "VSIKerchunkJSONRefFileSystem: 'templates' key found but "
807 : "not supported");
808 0 : return nullptr;
809 : }
810 : }
811 :
812 23 : if (oRefs.GetType() != CPLJSONObject::Type::Object)
813 : {
814 0 : CPLError(CE_Failure, CPLE_AppDefined,
815 : "VSIKerchunkJSONRefFileSystem: value of 'refs' is not a dict");
816 0 : return nullptr;
817 : }
818 :
819 46 : auto refFile = std::make_shared<VSIKerchunkRefFile>();
820 50 : for (const auto &oEntry : oRefs.GetChildren())
821 : {
822 45 : const std::string osKeyName = oEntry.GetName();
823 45 : if (oEntry.GetType() == CPLJSONObject::Type::String)
824 : {
825 2 : if (!refFile->AddInlineContent(osKeyName, oEntry.ToString()))
826 : {
827 1 : return nullptr;
828 : }
829 : }
830 43 : else if (oEntry.GetType() == CPLJSONObject::Type::Object)
831 : {
832 : const std::string osSerialized =
833 24 : oEntry.Format(CPLJSONObject::PrettyFormat::Plain);
834 24 : CPL_IGNORE_RET_VAL(
835 24 : refFile->AddInlineContent(osKeyName, osSerialized));
836 : }
837 19 : else if (oEntry.GetType() == CPLJSONObject::Type::Array)
838 : {
839 15 : const auto oArray = oEntry.ToArray();
840 : // Some files such as https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr/reference.json
841 : // (pointed by https://guide.cloudnativegeo.org/kerchunk/kerchunk-in-practice.html)
842 : // have array entries with just the URL, and no offset/size
843 : // This is when the whole file needs to be read
844 15 : if (oArray.Size() != 1 && oArray.Size() != 3)
845 : {
846 3 : CPLError(CE_Failure, CPLE_AppDefined,
847 : "VSIKerchunkJSONRefFileSystem: array value for key "
848 : "'%s' is not of size 1 or 3",
849 : osKeyName.c_str());
850 3 : return nullptr;
851 : }
852 12 : if (oArray[0].GetType() != CPLJSONObject::Type::String)
853 : {
854 4 : CPLError(CE_Failure, CPLE_AppDefined,
855 : "VSIKerchunkJSONRefFileSystem: array value at index 0 "
856 : "for key '%s' is not a string",
857 : osKeyName.c_str());
858 4 : return nullptr;
859 : }
860 8 : if (oArray.Size() == 3)
861 : {
862 8 : if ((oArray[1].GetType() != CPLJSONObject::Type::Integer &&
863 23 : oArray[1].GetType() != CPLJSONObject::Type::Long) ||
864 15 : !(oArray[1].ToLong() >= 0))
865 : {
866 2 : CPLError(
867 : CE_Failure, CPLE_AppDefined,
868 : "VSIKerchunkJSONRefFileSystem: array value at index 1 "
869 : "for key '%s' is not an unsigned 64 bit integer",
870 : osKeyName.c_str());
871 2 : return nullptr;
872 : }
873 6 : if ((oArray[2].GetType() != CPLJSONObject::Type::Integer &&
874 16 : oArray[2].GetType() != CPLJSONObject::Type::Long) ||
875 10 : !(oArray[2].ToLong() >= 0 &&
876 9 : static_cast<uint64_t>(oArray[2].ToLong()) <=
877 3 : std::numeric_limits<uint32_t>::max()))
878 : {
879 4 : CPLError(
880 : CE_Failure, CPLE_AppDefined,
881 : "VSIKerchunkJSONRefFileSystem: array value at index 2 "
882 : "for key '%s' is not an unsigned 32 bit integer",
883 : osKeyName.c_str());
884 4 : return nullptr;
885 : }
886 : }
887 :
888 4 : refFile->AddReferencedContent(
889 4 : osKeyName, oArray[0].ToString(),
890 4 : oArray.Size() == 3 ? oArray[1].ToLong() : 0,
891 4 : oArray.Size() == 3 ? static_cast<uint32_t>(oArray[2].ToLong())
892 : : 0);
893 : }
894 : else
895 : {
896 4 : CPLError(
897 : CE_Failure, CPLE_AppDefined,
898 : "VSIKerchunkJSONRefFileSystem: invalid value type for key '%s'",
899 : osKeyName.c_str());
900 4 : return nullptr;
901 : }
902 : }
903 :
904 5 : return refFile;
905 : }
906 :
907 : /************************************************************************/
908 : /* VSIKerchunkJSONRefFileSystem::Load() */
909 : /************************************************************************/
910 :
911 : std::pair<std::shared_ptr<VSIKerchunkRefFile>, std::string>
912 335 : VSIKerchunkJSONRefFileSystem::Load(const std::string &osJSONFilename,
913 : bool bUseCache)
914 : {
915 335 : std::shared_ptr<VSIKerchunkRefFile> refFile;
916 335 : if (m_oCache.tryGet(osJSONFilename, refFile))
917 211 : return {refFile, std::string()};
918 :
919 : // Deal with local file cache
920 124 : const char *pszUseCache = VSIGetPathSpecificOption(
921 : osJSONFilename.c_str(), "VSIKERCHUNK_USE_CACHE", "NO");
922 124 : if (bUseCache || CPLTestBool(pszUseCache))
923 : {
924 26 : if (GDALGetDriverByName("PARQUET") == nullptr)
925 : {
926 0 : CPLError(CE_Failure, CPLE_NotSupported,
927 : "VSIKERCHUNK_USE_CACHE=YES only enabled if PARQUET driver "
928 : "is available");
929 0 : return {nullptr, std::string()};
930 : }
931 :
932 : VSIStatBufL sStat;
933 52 : if (VSIStatL(osJSONFilename.c_str(), &sStat) != 0 ||
934 26 : VSI_ISDIR(sStat.st_mode))
935 : {
936 0 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
937 : osJSONFilename.c_str());
938 0 : return {nullptr, std::string()};
939 : }
940 :
941 26 : std::string osCacheSubDir = CPLGetBasenameSafe(osJSONFilename.c_str());
942 : osCacheSubDir += CPLSPrintf("_%" PRIu64 "_%" PRIu64,
943 26 : static_cast<uint64_t>(sStat.st_size),
944 26 : static_cast<uint64_t>(sStat.st_mtime));
945 :
946 26 : const std::string osRootCacheDir = GDALGetCacheDirectory();
947 26 : if (!osRootCacheDir.empty())
948 : {
949 : const std::string osKerchunkCacheDir = VSIGetPathSpecificOption(
950 : osJSONFilename.c_str(), "VSIKERCHUNK_CACHE_DIR",
951 52 : CPLFormFilenameSafe(osRootCacheDir.c_str(),
952 : "zarr_kerchunk_cache", nullptr)
953 78 : .c_str());
954 : const std::string osCacheDir = CPLFormFilenameSafe(
955 52 : osKerchunkCacheDir.c_str(), osCacheSubDir.c_str(), "zarr");
956 26 : CPLDebug("VSIKerchunkJSONRefFileSystem", "Using cache dir %s",
957 : osCacheDir.c_str());
958 :
959 52 : if (VSIStatL(CPLFormFilenameSafe(osCacheDir.c_str(), ".zmetadata",
960 : nullptr)
961 : .c_str(),
962 26 : &sStat) == 0)
963 : {
964 9 : CPLDebug("VSIKerchunkJSONRefFileSystem",
965 : "Using Kerchunk Parquet cache %s", osCacheDir.c_str());
966 9 : return {nullptr, osCacheDir};
967 : }
968 :
969 23 : if (VSIMkdirRecursive(osCacheDir.c_str(), 0755) != 0 &&
970 6 : !(VSIStatL(osCacheDir.c_str(), &sStat) == 0 &&
971 0 : VSI_ISDIR(sStat.st_mode)))
972 : {
973 6 : CPLError(CE_Failure, CPLE_AppDefined,
974 : "Cannot create directory %s", osCacheDir.c_str());
975 6 : return {nullptr, std::string()};
976 : }
977 :
978 : const std::string osLockFilename =
979 22 : CPLFormFilenameSafe(osCacheDir.c_str(), ".lock", nullptr);
980 :
981 11 : CPLLockFileHandle hLockHandle = nullptr;
982 22 : CPLStringList aosOptions;
983 11 : aosOptions.SetNameValue("VERBOSE_WAIT_MESSAGE", "YES");
984 : const char *pszKerchunkDebug =
985 11 : CPLGetConfigOption("VSIKERCHUNK_FOR_TESTS", nullptr);
986 11 : if (pszKerchunkDebug &&
987 4 : strstr(pszKerchunkDebug, "SHORT_DELAY_STALLED_LOCK"))
988 : {
989 2 : aosOptions.SetNameValue("STALLED_DELAY", "1");
990 : }
991 :
992 11 : CPLDebug("VSIKerchunkJSONRefFileSystem", "Acquiring lock");
993 11 : switch (CPLLockFileEx(osLockFilename.c_str(), &hLockHandle,
994 11 : aosOptions.List()))
995 : {
996 10 : case CLFS_OK:
997 10 : break;
998 1 : case CLFS_CANNOT_CREATE_LOCK:
999 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create lock %s",
1000 : osLockFilename.c_str());
1001 1 : break;
1002 0 : case CLFS_LOCK_BUSY:
1003 0 : CPLAssert(false); // cannot happen with infinite wait time
1004 : break;
1005 0 : case CLFS_API_MISUSE:
1006 0 : CPLAssert(false);
1007 : break;
1008 0 : case CLFS_THREAD_CREATION_FAILED:
1009 0 : CPLError(CE_Failure, CPLE_AppDefined,
1010 : "Thread creation failed for refresh of %s",
1011 : osLockFilename.c_str());
1012 0 : break;
1013 : }
1014 11 : if (!hLockHandle)
1015 : {
1016 1 : return {nullptr, std::string()};
1017 : }
1018 :
1019 : struct LockFileHolder
1020 : {
1021 : CPLLockFileHandle m_hLockHandle = nullptr;
1022 :
1023 10 : explicit LockFileHolder(CPLLockFileHandle hLockHandleIn)
1024 10 : : m_hLockHandle(hLockHandleIn)
1025 : {
1026 10 : }
1027 :
1028 10 : ~LockFileHolder()
1029 10 : {
1030 10 : release();
1031 10 : }
1032 :
1033 20 : void release()
1034 : {
1035 20 : if (m_hLockHandle)
1036 : {
1037 10 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1038 : "Releasing lock");
1039 10 : CPLUnlockFileEx(m_hLockHandle);
1040 10 : m_hLockHandle = nullptr;
1041 : }
1042 20 : }
1043 :
1044 : CPL_DISALLOW_COPY_ASSIGN(LockFileHolder)
1045 : };
1046 :
1047 20 : LockFileHolder lockFileHolder(hLockHandle);
1048 :
1049 20 : if (VSIStatL(CPLFormFilenameSafe(osCacheDir.c_str(), ".zmetadata",
1050 : nullptr)
1051 : .c_str(),
1052 10 : &sStat) == 0)
1053 : {
1054 0 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1055 : "Using Kerchunk Parquet cache %s (after lock taking)",
1056 : osCacheDir.c_str());
1057 0 : return {nullptr, osCacheDir};
1058 : }
1059 :
1060 10 : refFile = LoadInternal(osJSONFilename, nullptr, nullptr);
1061 :
1062 10 : if (refFile)
1063 : {
1064 10 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1065 : "Generating Kerchunk Parquet cache %s...",
1066 : osCacheDir.c_str());
1067 :
1068 10 : if (pszKerchunkDebug &&
1069 3 : strstr(pszKerchunkDebug,
1070 : "WAIT_BEFORE_CONVERT_TO_PARQUET_REF"))
1071 : {
1072 1 : CPLSleep(0.5);
1073 : }
1074 :
1075 10 : if (refFile->ConvertToParquetRef(osCacheDir, nullptr, nullptr))
1076 : {
1077 4 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1078 : "Generation Kerchunk Parquet cache %s: OK",
1079 : osCacheDir.c_str());
1080 : }
1081 : else
1082 : {
1083 6 : CPLError(CE_Failure, CPLE_AppDefined,
1084 : "Generation of Kerchunk Parquet cache %s failed",
1085 : osCacheDir.c_str());
1086 6 : refFile.reset();
1087 : }
1088 :
1089 10 : lockFileHolder.release();
1090 10 : m_oCache.insert(osJSONFilename, refFile);
1091 : }
1092 :
1093 10 : return {refFile, std::string()};
1094 : }
1095 : }
1096 :
1097 98 : refFile = LoadInternal(osJSONFilename, nullptr, nullptr);
1098 98 : if (refFile)
1099 20 : m_oCache.insert(osJSONFilename, refFile);
1100 98 : return {refFile, std::string()};
1101 : }
1102 :
1103 : /************************************************************************/
1104 : /* VSIKerchunkRefFile::ConvertToParquetRef() */
1105 : /************************************************************************/
1106 :
1107 11 : bool VSIKerchunkRefFile::ConvertToParquetRef(const std::string &osCacheDir,
1108 : GDALProgressFunc pfnProgress,
1109 : void *pProgressData)
1110 : {
1111 : struct Serializer
1112 : {
1113 : VSIVirtualHandle *m_poFile = nullptr;
1114 :
1115 11 : explicit Serializer(VSIVirtualHandle *poFile) : m_poFile(poFile)
1116 : {
1117 11 : }
1118 :
1119 289 : static void func(const char *pszTxt, void *pUserData)
1120 : {
1121 289 : Serializer *self = static_cast<Serializer *>(pUserData);
1122 289 : self->m_poFile->Write(pszTxt, 1, strlen(pszTxt));
1123 289 : }
1124 : };
1125 :
1126 : const std::string osZMetadataFilename =
1127 22 : CPLFormFilenameSafe(osCacheDir.c_str(), ".zmetadata", nullptr);
1128 22 : const std::string osZMetadataTmpFilename = osZMetadataFilename + ".tmp";
1129 : auto poFile = std::unique_ptr<VSIVirtualHandle>(
1130 22 : VSIFOpenL(osZMetadataTmpFilename.c_str(), "wb"));
1131 11 : if (!poFile)
1132 0 : return false;
1133 11 : Serializer serializer(poFile.get());
1134 :
1135 : struct ZarrArrayInfo
1136 : {
1137 : std::vector<uint64_t> anChunkCount{};
1138 : std::map<uint64_t, const VSIKerchunkKeyInfo *> chunkInfo{};
1139 : };
1140 :
1141 22 : std::map<std::string, ZarrArrayInfo> zarrArrays;
1142 :
1143 : // First pass on keys to write JSON objects in .zmetadata
1144 22 : CPLJSonStreamingWriter oWriter(Serializer::func, &serializer);
1145 11 : oWriter.StartObj();
1146 11 : oWriter.AddObjKey("metadata");
1147 11 : oWriter.StartObj();
1148 :
1149 11 : bool bOK = true;
1150 11 : size_t nCurObjectIter = 0;
1151 11 : const size_t nTotalObjects = m_oMapKeys.size();
1152 44 : for (const auto &[key, info] : m_oMapKeys)
1153 : {
1154 54 : if (cpl::ends_with(key, ".zarray") || cpl::ends_with(key, ".zgroup") ||
1155 16 : cpl::ends_with(key, ".zattrs"))
1156 : {
1157 32 : CPLJSONDocument oDoc;
1158 32 : std::string osStr;
1159 32 : osStr.append(reinterpret_cast<const char *>(info.abyValue.data()),
1160 64 : info.abyValue.size());
1161 32 : if (!oDoc.LoadMemory(osStr.c_str()))
1162 : {
1163 1 : CPLError(CE_Failure, CPLE_AppDefined,
1164 : "Cannot parse JSON content for %s", key.c_str());
1165 1 : bOK = false;
1166 1 : break;
1167 : }
1168 :
1169 31 : if (cpl::ends_with(key, ".zarray"))
1170 : {
1171 10 : const std::string osArrayName = CPLGetDirnameSafe(key.c_str());
1172 :
1173 20 : const auto oShape = oDoc.GetRoot().GetArray("shape");
1174 20 : const auto oChunks = oDoc.GetRoot().GetArray("chunks");
1175 20 : if (oShape.IsValid() && oChunks.IsValid() &&
1176 10 : oShape.Size() == oChunks.Size())
1177 : {
1178 18 : std::vector<uint64_t> anChunkCount;
1179 9 : uint64_t nTotalChunkCount = 1;
1180 17 : for (int i = 0; i < oShape.Size(); ++i)
1181 : {
1182 11 : const auto nShape = oShape[i].ToLong();
1183 11 : const auto nChunk = oChunks[i].ToLong();
1184 11 : if (nShape == 0 || nChunk == 0)
1185 : {
1186 2 : bOK = false;
1187 3 : break;
1188 : }
1189 9 : const uint64_t nChunkCount =
1190 9 : DIV_ROUND_UP(nShape, nChunk);
1191 9 : if (nChunkCount > std::numeric_limits<uint64_t>::max() /
1192 : nTotalChunkCount)
1193 : {
1194 1 : bOK = false;
1195 1 : break;
1196 : }
1197 8 : anChunkCount.push_back(nChunkCount);
1198 8 : nTotalChunkCount *= nChunkCount;
1199 : }
1200 9 : zarrArrays[osArrayName].anChunkCount =
1201 18 : std::move(anChunkCount);
1202 : }
1203 : else
1204 : {
1205 1 : bOK = false;
1206 : }
1207 10 : if (!bOK)
1208 : {
1209 4 : CPLError(CE_Failure, CPLE_AppDefined,
1210 : "Invalid Zarr array definition for %s",
1211 : osArrayName.c_str());
1212 4 : oWriter.clear();
1213 4 : break;
1214 : }
1215 : }
1216 :
1217 27 : oWriter.AddObjKey(key);
1218 27 : oWriter.AddSerializedValue(osStr);
1219 :
1220 27 : ++nCurObjectIter;
1221 31 : if (pfnProgress &&
1222 4 : !pfnProgress(static_cast<double>(nCurObjectIter) /
1223 4 : static_cast<double>(nTotalObjects),
1224 : "", pProgressData))
1225 : {
1226 0 : return false;
1227 : }
1228 : }
1229 : }
1230 :
1231 11 : constexpr int nRecordSize = 100000;
1232 :
1233 11 : if (bOK)
1234 : {
1235 6 : oWriter.EndObj();
1236 6 : oWriter.AddObjKey("record_size");
1237 6 : oWriter.Add(nRecordSize);
1238 6 : oWriter.EndObj();
1239 : }
1240 :
1241 11 : bOK = (poFile->Close() == 0) && bOK;
1242 11 : poFile.reset();
1243 :
1244 11 : if (!bOK)
1245 : {
1246 5 : oWriter.clear();
1247 5 : VSIUnlink(osZMetadataTmpFilename.c_str());
1248 5 : return false;
1249 : }
1250 :
1251 : // Second pass to retrieve chunks
1252 33 : for (const auto &[key, info] : m_oMapKeys)
1253 : {
1254 44 : if (cpl::ends_with(key, ".zarray") || cpl::ends_with(key, ".zgroup") ||
1255 16 : cpl::ends_with(key, ".zattrs"))
1256 : {
1257 : // already done
1258 : }
1259 : else
1260 : {
1261 6 : const std::string osArrayName = CPLGetDirnameSafe(key.c_str());
1262 6 : auto oIter = zarrArrays.find(osArrayName);
1263 6 : if (oIter != zarrArrays.end())
1264 : {
1265 6 : auto &oArrayInfo = oIter->second;
1266 : const CPLStringList aosIndices(
1267 6 : CSLTokenizeString2(CPLGetFilename(key.c_str()), ".", 0));
1268 6 : if ((static_cast<size_t>(aosIndices.size()) ==
1269 6 : oArrayInfo.anChunkCount.size()) ||
1270 0 : (aosIndices.size() == 1 &&
1271 0 : strcmp(aosIndices[0], "0") == 0 &&
1272 0 : oArrayInfo.anChunkCount.empty()))
1273 : {
1274 6 : std::vector<uint64_t> anIndices;
1275 11 : for (size_t i = 0; i < oArrayInfo.anChunkCount.size(); ++i)
1276 : {
1277 6 : char *endptr = nullptr;
1278 6 : anIndices.push_back(
1279 6 : std::strtoull(aosIndices[i], &endptr, 10));
1280 6 : if (aosIndices[i][0] == '-' ||
1281 12 : endptr != aosIndices[i] + strlen(aosIndices[i]) ||
1282 6 : anIndices[i] >= oArrayInfo.anChunkCount[i])
1283 : {
1284 1 : CPLError(CE_Failure, CPLE_AppDefined,
1285 : "Invalid key indices: %s", key.c_str());
1286 1 : return false;
1287 : }
1288 : }
1289 :
1290 5 : uint64_t nLinearIndex = 0;
1291 5 : uint64_t nMulFactor = 1;
1292 10 : for (size_t i = anIndices.size(); i > 0;)
1293 : {
1294 5 : --i;
1295 5 : nLinearIndex += anIndices[i] * nMulFactor;
1296 5 : nMulFactor *= oArrayInfo.anChunkCount[i];
1297 : }
1298 :
1299 : #ifdef DEBUG_VERBOSE
1300 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem",
1301 : "Chunk %" PRIu64 " of array %s found",
1302 : nLinearIndex, osArrayName.c_str());
1303 : #endif
1304 5 : oArrayInfo.chunkInfo[nLinearIndex] = &info;
1305 : }
1306 : }
1307 : }
1308 : }
1309 :
1310 5 : auto poDrv = GetGDALDriverManager()->GetDriverByName("PARQUET");
1311 5 : if (!poDrv)
1312 : {
1313 : // shouldn't happen given earlier checks
1314 0 : CPLAssert(false);
1315 : return false;
1316 : }
1317 :
1318 : // Third pass to create Parquet files
1319 10 : CPLStringList aosLayerCreationOptions;
1320 : aosLayerCreationOptions.SetNameValue("ROW_GROUP_SIZE",
1321 5 : CPLSPrintf("%d", nRecordSize));
1322 :
1323 10 : for (const auto &[osArrayName, oArrayInfo] : zarrArrays)
1324 : {
1325 5 : uint64_t nChunkCount = 1;
1326 10 : for (size_t i = 0; i < oArrayInfo.anChunkCount.size(); ++i)
1327 : {
1328 5 : nChunkCount *= oArrayInfo.anChunkCount[i];
1329 : }
1330 :
1331 0 : std::unique_ptr<GDALDataset> poDS;
1332 5 : OGRLayer *poLayer = nullptr;
1333 :
1334 10 : for (uint64_t i = 0; i < nChunkCount; ++i)
1335 : {
1336 5 : if ((i % nRecordSize) == 0)
1337 : {
1338 5 : if (poDS)
1339 : {
1340 0 : if (poDS->Close() != CE_None)
1341 : {
1342 0 : CPLError(CE_Failure, CPLE_AppDefined,
1343 : "Close() on %s failed",
1344 0 : poDS->GetDescription());
1345 0 : return false;
1346 : }
1347 0 : poDS.reset();
1348 : }
1349 5 : if (CPLHasPathTraversal(osArrayName.c_str()))
1350 : {
1351 0 : CPLError(CE_Failure, CPLE_AppDefined,
1352 : "Path traversal detected in %s",
1353 : osArrayName.c_str());
1354 0 : return false;
1355 : }
1356 : const std::string osParqFilename = CPLFormFilenameSafe(
1357 5 : CPLFormFilenameSafe(osCacheDir.c_str(), osArrayName.c_str(),
1358 : nullptr)
1359 : .c_str(),
1360 : CPLSPrintf("refs.%" PRIu64 ".parq", i / nRecordSize),
1361 10 : nullptr);
1362 5 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Creating %s",
1363 : osParqFilename.c_str());
1364 5 : VSIMkdirRecursive(
1365 10 : CPLGetPathSafe(osParqFilename.c_str()).c_str(), 0755);
1366 5 : poDS.reset(poDrv->Create(osParqFilename.c_str(), 0, 0, 0,
1367 : GDT_Unknown, nullptr));
1368 5 : if (!poDS)
1369 0 : return false;
1370 10 : poLayer = poDS->CreateLayer(
1371 10 : CPLGetBasenameSafe(osParqFilename.c_str()).c_str(), nullptr,
1372 5 : wkbNone, aosLayerCreationOptions.List());
1373 5 : if (poLayer)
1374 : {
1375 : {
1376 10 : OGRFieldDefn oFieldDefn("path", OFTString);
1377 5 : poLayer->CreateField(&oFieldDefn);
1378 : }
1379 : {
1380 10 : OGRFieldDefn oFieldDefn("offset", OFTInteger64);
1381 5 : poLayer->CreateField(&oFieldDefn);
1382 : }
1383 : {
1384 10 : OGRFieldDefn oFieldDefn("size", OFTInteger64);
1385 5 : poLayer->CreateField(&oFieldDefn);
1386 : }
1387 : {
1388 10 : OGRFieldDefn oFieldDefn("raw", OFTBinary);
1389 5 : poLayer->CreateField(&oFieldDefn);
1390 : }
1391 : }
1392 : }
1393 5 : if (!poLayer)
1394 0 : return false;
1395 :
1396 : auto poFeature =
1397 5 : std::make_unique<OGRFeature>(poLayer->GetLayerDefn());
1398 5 : auto oIter = oArrayInfo.chunkInfo.find(i);
1399 5 : if (oIter != oArrayInfo.chunkInfo.end())
1400 : {
1401 5 : const auto *chunkInfo = oIter->second;
1402 5 : if (chunkInfo->posURI)
1403 5 : poFeature->SetField(0, chunkInfo->posURI->c_str());
1404 5 : poFeature->SetField(1,
1405 5 : static_cast<GIntBig>(chunkInfo->nOffset));
1406 5 : poFeature->SetField(2, static_cast<GIntBig>(chunkInfo->nSize));
1407 5 : if (!chunkInfo->abyValue.empty())
1408 : {
1409 0 : if (chunkInfo->abyValue.size() >
1410 : static_cast<size_t>(INT_MAX))
1411 : {
1412 0 : CPLError(CE_Failure, CPLE_NotSupported,
1413 : "Too big blob for chunk %" PRIu64
1414 : " of array %s",
1415 : i, osArrayName.c_str());
1416 0 : return false;
1417 : }
1418 0 : poFeature->SetField(
1419 0 : 3, static_cast<int>(chunkInfo->abyValue.size()),
1420 0 : static_cast<const void *>(chunkInfo->abyValue.data()));
1421 : }
1422 : }
1423 :
1424 5 : if (poLayer->CreateFeature(poFeature.get()) != OGRERR_NONE)
1425 : {
1426 0 : CPLError(CE_Failure, CPLE_AppDefined,
1427 : "CreateFeature() on %s failed",
1428 0 : poDS->GetDescription());
1429 0 : return false;
1430 : }
1431 :
1432 5 : ++nCurObjectIter;
1433 5 : if (pfnProgress && (nCurObjectIter % 1000) == 0 &&
1434 0 : !pfnProgress(static_cast<double>(nCurObjectIter) /
1435 0 : static_cast<double>(nTotalObjects),
1436 : "", pProgressData))
1437 : {
1438 0 : return false;
1439 : }
1440 : }
1441 :
1442 5 : if (poDS)
1443 : {
1444 5 : if (poDS->Close() != CE_None)
1445 : {
1446 0 : CPLError(CE_Failure, CPLE_AppDefined, "Close() on %s failed",
1447 0 : poDS->GetDescription());
1448 0 : return false;
1449 : }
1450 5 : poDS.reset();
1451 : }
1452 : }
1453 :
1454 5 : if (VSIRename(osZMetadataTmpFilename.c_str(),
1455 5 : osZMetadataFilename.c_str()) != 0)
1456 : {
1457 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot rename %s to %s",
1458 : osZMetadataTmpFilename.c_str(), osZMetadataFilename.c_str());
1459 0 : return false;
1460 : }
1461 :
1462 5 : if (pfnProgress)
1463 1 : pfnProgress(1.0, "", pProgressData);
1464 :
1465 5 : return true;
1466 : }
1467 :
1468 : /************************************************************************/
1469 : /* VSIKerchunkConvertJSONToParquet() */
1470 : /************************************************************************/
1471 :
1472 1 : bool VSIKerchunkConvertJSONToParquet(const char *pszSrcJSONFilename,
1473 : const char *pszDstDirname,
1474 : GDALProgressFunc pfnProgress,
1475 : void *pProgressData)
1476 : {
1477 1 : if (GDALGetDriverByName("PARQUET") == nullptr)
1478 : {
1479 0 : CPLError(CE_Failure, CPLE_NotSupported,
1480 : "Conversion to a Parquet reference store is not possible "
1481 : "because the PARQUET driver is not available.");
1482 0 : return false;
1483 : }
1484 :
1485 1 : auto poFS = cpl::down_cast<VSIKerchunkJSONRefFileSystem *>(
1486 : VSIFileManager::GetHandler(JSON_REF_FS_PREFIX));
1487 1 : std::shared_ptr<VSIKerchunkRefFile> refFile;
1488 1 : if (!poFS->m_oCache.tryGet(pszSrcJSONFilename, refFile))
1489 : {
1490 : void *pScaledProgressData =
1491 1 : GDALCreateScaledProgress(0.0, 0.5, pfnProgress, pProgressData);
1492 : try
1493 : {
1494 2 : refFile = poFS->LoadInternal(
1495 : pszSrcJSONFilename,
1496 : pScaledProgressData ? GDALScaledProgress : nullptr,
1497 1 : pScaledProgressData);
1498 : }
1499 0 : catch (const std::exception &e)
1500 : {
1501 0 : CPLError(CE_Failure, CPLE_AppDefined,
1502 : "VSIKerchunkJSONRefFileSystem::Load() failed: %s",
1503 0 : e.what());
1504 0 : GDALDestroyScaledProgress(pScaledProgressData);
1505 0 : return false;
1506 : }
1507 1 : GDALDestroyScaledProgress(pScaledProgressData);
1508 : }
1509 1 : if (!refFile)
1510 : {
1511 0 : CPLError(CE_Failure, CPLE_AppDefined,
1512 : "%s is not a Kerchunk JSON reference store",
1513 : pszSrcJSONFilename);
1514 0 : return false;
1515 : }
1516 :
1517 1 : VSIMkdir(pszDstDirname, 0755);
1518 :
1519 : void *pScaledProgressData =
1520 1 : GDALCreateScaledProgress(0.5, 1.0, pfnProgress, pProgressData);
1521 1 : const bool bRet = refFile->ConvertToParquetRef(
1522 : pszDstDirname, pScaledProgressData ? GDALScaledProgress : nullptr,
1523 : pScaledProgressData);
1524 1 : GDALDestroyScaledProgress(pScaledProgressData);
1525 1 : return bRet;
1526 : }
1527 :
1528 : /************************************************************************/
1529 : /* VSIKerchunkJSONRefFileSystem::Open() */
1530 : /************************************************************************/
1531 :
1532 : VSIVirtualHandleUniquePtr
1533 87 : VSIKerchunkJSONRefFileSystem::Open(const char *pszFilename,
1534 : const char *pszAccess, bool /* bSetError */,
1535 : CSLConstList /* papszOptions */)
1536 : {
1537 87 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Open(%s)", pszFilename);
1538 87 : if (strcmp(pszAccess, "r") != 0 && strcmp(pszAccess, "rb") != 0)
1539 0 : return nullptr;
1540 :
1541 174 : const auto [osJSONFilename, osKey] = SplitFilename(pszFilename);
1542 87 : if (osJSONFilename.empty())
1543 4 : return nullptr;
1544 :
1545 83 : const auto [refFile, osParqFilename] = Load(
1546 166 : osJSONFilename, STARTS_WITH(pszFilename, JSON_REF_CACHED_FS_PREFIX));
1547 83 : if (!refFile)
1548 : {
1549 6 : if (osParqFilename.empty())
1550 2 : return nullptr;
1551 :
1552 : return VSIFilesystemHandler::OpenStatic(
1553 8 : CPLFormFilenameSafe(CPLSPrintf("%s{%s}", PARQUET_REF_FS_PREFIX,
1554 : osParqFilename.c_str()),
1555 : osKey.c_str(), nullptr)
1556 : .c_str(),
1557 4 : pszAccess);
1558 : }
1559 :
1560 77 : const auto oIter = refFile->GetMapKeys().find(osKey);
1561 77 : if (oIter == refFile->GetMapKeys().end())
1562 6 : return nullptr;
1563 :
1564 71 : const auto &keyInfo = oIter->second;
1565 71 : if (!keyInfo.posURI)
1566 : {
1567 : return VSIVirtualHandleUniquePtr(VSIFileFromMemBuffer(
1568 52 : nullptr, const_cast<GByte *>(keyInfo.abyValue.data()),
1569 104 : keyInfo.abyValue.size(), /* bTakeOwnership = */ false));
1570 : }
1571 : else
1572 : {
1573 : std::string osVSIPath = VSIKerchunkMorphURIToVSIPath(
1574 38 : *(keyInfo.posURI), CPLGetPathSafe(osJSONFilename.c_str()));
1575 19 : if (osVSIPath.empty())
1576 2 : return nullptr;
1577 : const std::string osPath =
1578 17 : keyInfo.nSize
1579 7 : ? CPLSPrintf("/vsisubfile/%" PRIu64 "_%u,%s", keyInfo.nOffset,
1580 7 : keyInfo.nSize, osVSIPath.c_str())
1581 34 : : std::move(osVSIPath);
1582 17 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Opening %s",
1583 : osPath.c_str());
1584 : CPLConfigOptionSetter oSetter("GDAL_DISABLE_READDIR_ON_OPEN",
1585 34 : "EMPTY_DIR", false);
1586 17 : return VSIFilesystemHandler::OpenStatic(osPath.c_str(), "rb");
1587 : }
1588 : }
1589 :
1590 : /************************************************************************/
1591 : /* VSIKerchunkJSONRefFileSystem::Stat() */
1592 : /************************************************************************/
1593 :
1594 251 : int VSIKerchunkJSONRefFileSystem::Stat(const char *pszFilename,
1595 : VSIStatBufL *pStatBuf, int nFlags)
1596 : {
1597 251 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Stat(%s)", pszFilename);
1598 251 : memset(pStatBuf, 0, sizeof(VSIStatBufL));
1599 :
1600 502 : const auto [osJSONFilename, osKey] = SplitFilename(pszFilename);
1601 251 : if (osJSONFilename.empty())
1602 12 : return -1;
1603 :
1604 239 : const auto [refFile, osParqFilename] = Load(
1605 478 : osJSONFilename, STARTS_WITH(pszFilename, JSON_REF_CACHED_FS_PREFIX));
1606 239 : if (!refFile)
1607 : {
1608 91 : if (osParqFilename.empty())
1609 86 : return -1;
1610 :
1611 5 : return VSIStatExL(
1612 10 : CPLFormFilenameSafe(CPLSPrintf("%s{%s}", PARQUET_REF_FS_PREFIX,
1613 : osParqFilename.c_str()),
1614 : osKey.c_str(), nullptr)
1615 : .c_str(),
1616 5 : pStatBuf, nFlags);
1617 : }
1618 :
1619 148 : if (osKey.empty())
1620 : {
1621 27 : pStatBuf->st_mode = S_IFDIR;
1622 27 : return 0;
1623 : }
1624 :
1625 121 : const auto oIter = refFile->GetMapKeys().find(osKey);
1626 121 : if (oIter == refFile->GetMapKeys().end())
1627 : {
1628 195 : if (cpl::contains(refFile->GetMapKeys(), osKey + "/.zgroup") ||
1629 130 : cpl::contains(refFile->GetMapKeys(), osKey + "/.zarray"))
1630 : {
1631 8 : pStatBuf->st_mode = S_IFDIR;
1632 8 : return 0;
1633 : }
1634 :
1635 57 : return -1;
1636 : }
1637 :
1638 56 : const auto &keyInfo = oIter->second;
1639 56 : if (!(keyInfo.posURI))
1640 : {
1641 48 : pStatBuf->st_size = keyInfo.abyValue.size();
1642 : }
1643 : else
1644 : {
1645 8 : if (keyInfo.nSize)
1646 : {
1647 4 : pStatBuf->st_size = keyInfo.nSize;
1648 : }
1649 : else
1650 : {
1651 : const std::string osVSIPath = VSIKerchunkMorphURIToVSIPath(
1652 8 : *(keyInfo.posURI), CPLGetPathSafe(osJSONFilename.c_str()));
1653 4 : if (osVSIPath.empty())
1654 0 : return -1;
1655 4 : return VSIStatExL(osVSIPath.c_str(), pStatBuf, nFlags);
1656 : }
1657 : }
1658 52 : pStatBuf->st_mode = S_IFREG;
1659 :
1660 52 : return 0;
1661 : }
1662 :
1663 : /************************************************************************/
1664 : /* VSIKerchunkJSONRefFileSystem::ReadDirEx() */
1665 : /************************************************************************/
1666 :
1667 17 : char **VSIKerchunkJSONRefFileSystem::ReadDirEx(const char *pszDirname,
1668 : int nMaxFiles)
1669 : {
1670 17 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "ReadDir(%s)", pszDirname);
1671 :
1672 34 : const auto [osJSONFilename, osAskedKey] = SplitFilename(pszDirname);
1673 17 : if (osJSONFilename.empty())
1674 4 : return nullptr;
1675 :
1676 13 : const auto [refFile, osParqFilename] = Load(
1677 26 : osJSONFilename, STARTS_WITH(pszDirname, JSON_REF_CACHED_FS_PREFIX));
1678 13 : if (!refFile)
1679 : {
1680 3 : if (osParqFilename.empty())
1681 3 : return nullptr;
1682 :
1683 0 : return VSIReadDirEx(
1684 0 : CPLFormFilenameSafe(CPLSPrintf("%s{%s}", PARQUET_REF_FS_PREFIX,
1685 : osParqFilename.c_str()),
1686 : osAskedKey.c_str(), nullptr)
1687 : .c_str(),
1688 0 : nMaxFiles);
1689 : }
1690 :
1691 20 : std::set<std::string> set;
1692 60 : for (const auto &[key, value] : refFile->GetMapKeys())
1693 : {
1694 50 : if (osAskedKey.empty())
1695 : {
1696 20 : const auto nPos = key.find('/');
1697 20 : if (nPos == std::string::npos)
1698 8 : set.insert(key);
1699 : else
1700 12 : set.insert(key.substr(0, nPos));
1701 : }
1702 30 : else if (key.size() > osAskedKey.size() &&
1703 42 : cpl::starts_with(key, osAskedKey) &&
1704 12 : key[osAskedKey.size()] == '/')
1705 : {
1706 24 : std::string subKey = key.substr(osAskedKey.size() + 1);
1707 12 : const auto nPos = subKey.find('/');
1708 12 : if (nPos == std::string::npos)
1709 12 : set.insert(std::move(subKey));
1710 : else
1711 0 : set.insert(subKey.substr(0, nPos));
1712 : }
1713 : }
1714 :
1715 20 : CPLStringList aosRet;
1716 34 : for (const std::string &v : set)
1717 : {
1718 : // CPLDebugOnly("VSIKerchunkJSONRefFileSystem", ".. %s", v.c_str());
1719 24 : aosRet.AddString(v.c_str());
1720 : }
1721 10 : return aosRet.StealList();
1722 : }
1723 :
1724 : /************************************************************************/
1725 : /* VSIInstallKerchunkJSONRefFileSystem() */
1726 : /************************************************************************/
1727 :
1728 1741 : void VSIInstallKerchunkJSONRefFileSystem()
1729 : {
1730 : static std::mutex oMutex;
1731 3482 : std::lock_guard<std::mutex> oLock(oMutex);
1732 : // cppcheck-suppress knownConditionTrueFalse
1733 1741 : if (!VSIKerchunkJSONRefFileSystem::IsFileSystemInstantiated())
1734 : {
1735 1741 : auto fs = std::make_unique<VSIKerchunkJSONRefFileSystem>().release();
1736 1741 : VSIFileManager::InstallHandler(JSON_REF_FS_PREFIX, fs);
1737 1741 : VSIFileManager::InstallHandler(JSON_REF_CACHED_FS_PREFIX, fs);
1738 : }
1739 1741 : }
|