Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: GDAL
4 : * Purpose: Zarr driver. Virtual file system for
5 : * https://fsspec.github.io/kerchunk/spec.html#version-1
6 : * Author: Even Rouault <even dot rouault at spatialys.com>
7 : *
8 : ******************************************************************************
9 : * Copyright (c) 2025, Even Rouault <even dot rouault at spatialys.com>
10 : *
11 : * SPDX-License-Identifier: MIT
12 : ****************************************************************************/
13 :
14 : #undef _REENTRANT
15 :
16 : #include "vsikerchunk.h"
17 : #include "vsikerchunk_inline.hpp"
18 :
19 : #include "cpl_conv.h"
20 : #include "cpl_json.h"
21 : #include "cpl_json_streaming_parser.h"
22 : #include "cpl_json_streaming_writer.h"
23 : #include "cpl_mem_cache.h"
24 : #include "cpl_vsi_virtual.h"
25 :
26 : #include "gdal_priv.h"
27 : #include "ogrsf_frmts.h"
28 :
29 : #include <cerrno>
30 : #include <cinttypes>
31 : #include <limits>
32 : #include <mutex>
33 : #include <set>
34 : #include <utility>
35 :
36 : /************************************************************************/
37 : /* VSIKerchunkKeyInfo */
38 : /************************************************************************/
39 :
40 : struct VSIKerchunkKeyInfo
41 : {
42 : // points to an element in VSIKerchunkRefFile::m_oSetURI
43 : const std::string *posURI = nullptr;
44 :
45 : uint64_t nOffset = 0;
46 : uint32_t nSize = 0;
47 : std::vector<GByte> abyValue{};
48 : };
49 :
50 : /************************************************************************/
51 : /* VSIKerchunkRefFile */
52 : /************************************************************************/
53 :
54 : class VSIKerchunkRefFile
55 : {
56 : private:
57 : std::set<std::string> m_oSetURI{};
58 : std::map<std::string, VSIKerchunkKeyInfo> m_oMapKeys{};
59 :
60 : public:
61 536 : const std::map<std::string, VSIKerchunkKeyInfo> &GetMapKeys() const
62 : {
63 536 : return m_oMapKeys;
64 : }
65 :
66 125 : void AddInlineContent(const std::string &key, std::vector<GByte> &&abyValue)
67 : {
68 250 : VSIKerchunkKeyInfo info;
69 125 : info.abyValue = std::move(abyValue);
70 125 : m_oMapKeys[key] = std::move(info);
71 125 : }
72 :
73 128 : bool AddInlineContent(const std::string &key, const std::string_view &str)
74 : {
75 256 : std::vector<GByte> abyValue;
76 128 : if (cpl::starts_with(str, "base64:"))
77 : {
78 : abyValue.insert(
79 0 : abyValue.end(),
80 4 : reinterpret_cast<const GByte *>(str.data()) + strlen("base64:"),
81 4 : reinterpret_cast<const GByte *>(str.data()) + str.size());
82 4 : abyValue.push_back(0);
83 4 : const int nSize = CPLBase64DecodeInPlace(abyValue.data());
84 4 : if (nSize == 0)
85 : {
86 3 : CPLError(CE_Failure, CPLE_AppDefined,
87 : "VSIKerchunkJSONRefFileSystem: Base64 decoding "
88 : "failed for key '%s'",
89 : key.c_str());
90 3 : return false;
91 : }
92 1 : abyValue.resize(nSize);
93 : }
94 : else
95 : {
96 : abyValue.insert(
97 124 : abyValue.end(), reinterpret_cast<const GByte *>(str.data()),
98 248 : reinterpret_cast<const GByte *>(str.data()) + str.size());
99 : }
100 :
101 125 : AddInlineContent(key, std::move(abyValue));
102 125 : return true;
103 : }
104 :
105 20 : void AddReferencedContent(const std::string &key, const std::string &osURI,
106 : uint64_t nOffset, uint32_t nSize)
107 : {
108 20 : auto oPair = m_oSetURI.insert(osURI);
109 :
110 40 : VSIKerchunkKeyInfo info;
111 20 : info.posURI = &(*(oPair.first));
112 20 : info.nOffset = nOffset;
113 20 : info.nSize = nSize;
114 20 : m_oMapKeys[key] = std::move(info);
115 20 : }
116 :
117 : bool ConvertToParquetRef(const std::string &osCacheDir,
118 : GDALProgressFunc pfnProgress, void *pProgressData);
119 : };
120 :
121 : /************************************************************************/
122 : /* VSIKerchunkJSONRefFileSystem */
123 : /************************************************************************/
124 :
125 : class VSIKerchunkJSONRefFileSystem final : public VSIFilesystemHandler
126 : {
127 : public:
128 1629 : VSIKerchunkJSONRefFileSystem()
129 1629 : {
130 1629 : IsFileSystemInstantiated() = true;
131 1629 : }
132 :
133 2226 : ~VSIKerchunkJSONRefFileSystem()
134 1113 : {
135 1113 : IsFileSystemInstantiated() = false;
136 2226 : }
137 :
138 4371 : static bool &IsFileSystemInstantiated()
139 : {
140 : static bool bIsFileSystemInstantiated = false;
141 4371 : return bIsFileSystemInstantiated;
142 : }
143 :
144 : VSIVirtualHandle *Open(const char *pszFilename, const char *pszAccess,
145 : bool bSetError, CSLConstList papszOptions) override;
146 :
147 : int Stat(const char *pszFilename, VSIStatBufL *pStatBuf,
148 : int nFlags) override;
149 :
150 : char **ReadDirEx(const char *pszDirname, int nMaxFiles) override;
151 :
152 : private:
153 : friend bool VSIKerchunkConvertJSONToParquet(const char *pszSrcJSONFilename,
154 : const char *pszDstDirname,
155 : GDALProgressFunc pfnProgress,
156 : void *pProgressData);
157 :
158 : lru11::Cache<std::string, std::shared_ptr<VSIKerchunkRefFile>, std::mutex>
159 : m_oCache{};
160 :
161 : static std::pair<std::string, std::string>
162 : SplitFilename(const char *pszFilename);
163 :
164 : std::pair<std::shared_ptr<VSIKerchunkRefFile>, std::string>
165 : Load(const std::string &osJSONFilename, bool bUseCache);
166 : std::shared_ptr<VSIKerchunkRefFile>
167 : LoadInternal(const std::string &osJSONFilename,
168 : GDALProgressFunc pfnProgress, void *pProgressData);
169 : std::shared_ptr<VSIKerchunkRefFile>
170 : LoadStreaming(const std::string &osJSONFilename,
171 : GDALProgressFunc pfnProgress, void *pProgressData);
172 : };
173 :
174 : /************************************************************************/
175 : /* VSIKerchunkJSONRefFileSystem::SplitFilename() */
176 : /************************************************************************/
177 :
178 : /*static*/
179 : std::pair<std::string, std::string>
180 355 : VSIKerchunkJSONRefFileSystem::SplitFilename(const char *pszFilename)
181 : {
182 355 : if (STARTS_WITH(pszFilename, JSON_REF_FS_PREFIX))
183 299 : pszFilename += strlen(JSON_REF_FS_PREFIX);
184 56 : else if (STARTS_WITH(pszFilename, JSON_REF_CACHED_FS_PREFIX))
185 56 : pszFilename += strlen(JSON_REF_CACHED_FS_PREFIX);
186 : else
187 0 : return {std::string(), std::string()};
188 :
189 710 : std::string osJSONFilename;
190 :
191 355 : if (*pszFilename == '{')
192 : {
193 : // Parse /vsikerchunk_json_ref/{/path/to/some.json}[key]
194 327 : int nLevel = 1;
195 327 : ++pszFilename;
196 22973 : for (; *pszFilename; ++pszFilename)
197 : {
198 22963 : if (*pszFilename == '{')
199 : {
200 0 : ++nLevel;
201 : }
202 22963 : else if (*pszFilename == '}')
203 : {
204 317 : --nLevel;
205 317 : if (nLevel == 0)
206 : {
207 317 : ++pszFilename;
208 317 : break;
209 : }
210 : }
211 22646 : osJSONFilename += *pszFilename;
212 : }
213 327 : if (nLevel != 0)
214 : {
215 10 : CPLError(CE_Failure, CPLE_AppDefined,
216 : "Invalid %s syntax: should be "
217 : "%s{/path/to/some/file}[/optional_key]",
218 : JSON_REF_FS_PREFIX, JSON_REF_FS_PREFIX);
219 10 : return {std::string(), std::string()};
220 : }
221 :
222 : return {osJSONFilename,
223 634 : *pszFilename == '/' ? pszFilename + 1 : pszFilename};
224 : }
225 : else
226 : {
227 28 : int nCountDotJson = 0;
228 28 : const char *pszIter = pszFilename;
229 28 : const char *pszAfterJSON = nullptr;
230 46 : while ((pszIter = strstr(pszIter, ".json")) != nullptr)
231 : {
232 18 : ++nCountDotJson;
233 18 : if (nCountDotJson == 1)
234 18 : pszAfterJSON = pszIter + strlen(".json");
235 : else
236 0 : pszAfterJSON = nullptr;
237 18 : pszIter += strlen(".json");
238 : }
239 28 : if (!pszAfterJSON)
240 : {
241 10 : if (nCountDotJson >= 2)
242 : {
243 0 : CPLError(CE_Failure, CPLE_AppDefined,
244 : "Ambiguous %s syntax: should be "
245 : "%s{/path/to/some/file}[/optional_key]",
246 : JSON_REF_FS_PREFIX, JSON_REF_FS_PREFIX);
247 : }
248 : else
249 : {
250 10 : CPLError(CE_Failure, CPLE_AppDefined,
251 : "Invalid %s syntax: should be "
252 : "%s/path/to/some.json[/optional_key] or "
253 : "%s{/path/to/some/file}[/optional_key]",
254 : JSON_REF_FS_PREFIX, JSON_REF_FS_PREFIX,
255 : JSON_REF_FS_PREFIX);
256 : }
257 10 : return {std::string(), std::string()};
258 : }
259 36 : return {std::string(pszFilename, pszAfterJSON - pszFilename),
260 36 : *pszAfterJSON == '/' ? pszAfterJSON + 1 : pszAfterJSON};
261 : }
262 : }
263 :
264 : /************************************************************************/
265 : /* class VSIKerchunkJSONRefParser */
266 : /************************************************************************/
267 :
268 : namespace
269 : {
270 : class VSIKerchunkJSONRefParser final : public CPLJSonStreamingParser
271 : {
272 : public:
273 71 : explicit VSIKerchunkJSONRefParser(
274 : const std::shared_ptr<VSIKerchunkRefFile> &refFile)
275 71 : : m_refFile(refFile)
276 : {
277 71 : m_oWriter.SetPrettyFormatting(false);
278 71 : }
279 :
280 71 : ~VSIKerchunkJSONRefParser()
281 71 : {
282 : // In case the parsing would be stopped, the writer may be in
283 : // an inconsistent state. This avoids assertion in debug mode.
284 71 : m_oWriter.clear();
285 71 : }
286 :
287 : protected:
288 77 : void String(const char *pszValue, size_t nLength) override
289 : {
290 77 : if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 0)
291 : {
292 9 : if (nLength > 0 && pszValue[nLength - 1] == 0)
293 2 : --nLength;
294 :
295 9 : if (!m_refFile->AddInlineContent(
296 9 : m_osCurKey, std::string_view(pszValue, nLength)))
297 : {
298 2 : StopParsing();
299 : }
300 :
301 9 : m_oWriter.clear();
302 :
303 9 : m_osCurKey.clear();
304 : }
305 68 : else if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 1)
306 : {
307 42 : if (m_iArrayMemberIdx == 0)
308 : {
309 36 : m_osURI.assign(pszValue, nLength);
310 : }
311 : else
312 : {
313 6 : UnexpectedContentInArray();
314 : }
315 : }
316 26 : else if (m_nLevel > m_nKeyLevel)
317 : {
318 24 : m_oWriter.Add(std::string_view(pszValue, nLength));
319 : }
320 77 : }
321 :
322 161 : void Number(const char *pszValue, size_t nLength) override
323 : {
324 161 : if (m_nLevel == m_nKeyLevel)
325 : {
326 52 : if (m_nArrayLevel == 1)
327 : {
328 48 : if (m_iArrayMemberIdx == 1)
329 : {
330 27 : m_osTmpForNumber.assign(pszValue, nLength);
331 27 : errno = 0;
332 27 : m_nOffset =
333 27 : std::strtoull(m_osTmpForNumber.c_str(), nullptr, 10);
334 50 : if (errno != 0 || m_osTmpForNumber[0] == '-' ||
335 23 : m_osTmpForNumber.find('.') != std::string::npos)
336 : {
337 6 : CPLError(
338 : CE_Failure, CPLE_AppDefined,
339 : "VSIKerchunkJSONRefFileSystem: array value at "
340 : "index 1 for key '%s' is not an unsigned 64 bit "
341 : "integer",
342 : m_osCurKey.c_str());
343 6 : StopParsing();
344 : }
345 : }
346 21 : else if (m_iArrayMemberIdx == 2)
347 : {
348 19 : m_osTmpForNumber.assign(pszValue, nLength);
349 19 : errno = 0;
350 : const uint64_t nSize =
351 19 : std::strtoull(m_osTmpForNumber.c_str(), nullptr, 10);
352 19 : if (errno != 0 || m_osTmpForNumber[0] == '-' ||
353 53 : nSize > std::numeric_limits<uint32_t>::max() ||
354 15 : m_osTmpForNumber.find('.') != std::string::npos)
355 : {
356 6 : CPLError(
357 : CE_Failure, CPLE_AppDefined,
358 : "VSIKerchunkJSONRefFileSystem: array value at "
359 : "index 2 for key '%s' is not an unsigned 32 bit "
360 : "integer",
361 : m_osCurKey.c_str());
362 6 : StopParsing();
363 : }
364 : else
365 : {
366 13 : m_nSize = static_cast<uint32_t>(nSize);
367 : }
368 : }
369 : else
370 : {
371 2 : UnexpectedContentInArray();
372 : }
373 : }
374 : else
375 : {
376 4 : UnexpectedContent();
377 : }
378 : }
379 109 : else if (m_nLevel > m_nKeyLevel)
380 : {
381 108 : m_oWriter.AddSerializedValue(std::string_view(pszValue, nLength));
382 : }
383 161 : }
384 :
385 4 : void Boolean(bool b) override
386 : {
387 4 : if (m_nLevel == m_nKeyLevel)
388 : {
389 4 : UnexpectedContent();
390 : }
391 0 : else if (m_nLevel > m_nKeyLevel)
392 : {
393 0 : m_oWriter.Add(b);
394 : }
395 4 : }
396 :
397 28 : void Null() override
398 : {
399 28 : if (m_nLevel == m_nKeyLevel)
400 : {
401 4 : UnexpectedContent();
402 : }
403 24 : else if (m_nLevel > m_nKeyLevel)
404 : {
405 24 : m_oWriter.AddNull();
406 : }
407 28 : }
408 :
409 168 : void StartObject() override
410 : {
411 168 : if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 1)
412 : {
413 2 : UnexpectedContentInArray();
414 : }
415 : else
416 : {
417 166 : if (m_nLevel >= m_nKeyLevel)
418 : {
419 95 : m_oWriter.StartObj();
420 : }
421 166 : ++m_nLevel;
422 166 : m_bFirstMember = true;
423 : }
424 168 : }
425 :
426 124 : void EndObject() override
427 : {
428 124 : if (m_nLevel == m_nKeyLevel)
429 : {
430 28 : FinishObjectValueProcessing();
431 : }
432 124 : --m_nLevel;
433 124 : if (m_nLevel >= m_nKeyLevel)
434 : {
435 95 : m_oWriter.EndObj();
436 : }
437 124 : }
438 :
439 329 : void StartObjectMember(const char *pszKey, size_t nLength) override
440 : {
441 329 : if (m_nLevel == 1 && m_bFirstMember)
442 : {
443 82 : if (nLength == strlen("version") &&
444 68 : memcmp(pszKey, "version", nLength) == 0)
445 : {
446 3 : m_nKeyLevel = 2;
447 : }
448 : else
449 : {
450 79 : m_nKeyLevel = 1;
451 : }
452 : }
453 247 : else if (m_nLevel == 1 && m_nKeyLevel == 2 &&
454 1 : nLength == strlen("templates") &&
455 1 : memcmp(pszKey, "templates", nLength) == 0)
456 : {
457 1 : CPLError(CE_Failure, CPLE_NotSupported,
458 : "VSIKerchunkJSONRefFileSystem: 'templates' key found, but "
459 : "not supported");
460 1 : StopParsing();
461 : }
462 246 : else if (m_nLevel == 1 && m_nKeyLevel == 2 &&
463 1 : nLength == strlen("gen") &&
464 1 : memcmp(pszKey, "gen", nLength) == 0)
465 : {
466 1 : CPLError(CE_Failure, CPLE_NotSupported,
467 : "VSIKerchunkJSONRefFileSystem: 'gen' key found, but not "
468 : "supported");
469 1 : StopParsing();
470 : }
471 :
472 329 : if (m_nLevel == m_nKeyLevel)
473 : {
474 158 : FinishObjectValueProcessing();
475 158 : m_osCurKey.assign(pszKey, nLength);
476 : }
477 171 : else if (m_nLevel > m_nKeyLevel)
478 : {
479 163 : m_oWriter.AddObjKey(std::string_view(pszKey, nLength));
480 : }
481 329 : m_bFirstMember = false;
482 329 : }
483 :
484 82 : void StartArray() override
485 : {
486 82 : if (m_nLevel == m_nKeyLevel)
487 : {
488 46 : if (m_nArrayLevel == 0)
489 : {
490 46 : m_iArrayMemberIdx = -1;
491 46 : m_osURI.clear();
492 46 : m_nOffset = 0;
493 46 : m_nSize = 0;
494 46 : m_nArrayLevel = 1;
495 : }
496 : else
497 : {
498 0 : UnexpectedContentInArray();
499 : }
500 : }
501 36 : else if (m_nLevel > m_nKeyLevel)
502 : {
503 36 : m_oWriter.StartArray();
504 36 : ++m_nArrayLevel;
505 : }
506 82 : }
507 :
508 56 : void EndArray() override
509 : {
510 56 : if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 1)
511 : {
512 20 : if (m_iArrayMemberIdx == -1)
513 : {
514 2 : CPLError(CE_Failure, CPLE_AppDefined,
515 : "VSIKerchunkJSONRefFileSystem: array value for key "
516 : "'%s' is not of size 1 or 3",
517 : m_osCurKey.c_str());
518 2 : StopParsing();
519 : }
520 : else
521 : {
522 18 : m_refFile->AddReferencedContent(m_osCurKey, m_osURI, m_nOffset,
523 : m_nSize);
524 18 : --m_nArrayLevel;
525 18 : m_oWriter.clear();
526 18 : m_osCurKey.clear();
527 : }
528 : }
529 36 : else if (m_nLevel >= m_nKeyLevel)
530 : {
531 36 : --m_nArrayLevel;
532 36 : if (m_nLevel > m_nKeyLevel)
533 36 : m_oWriter.EndArray();
534 : }
535 56 : }
536 :
537 125 : void StartArrayMember() override
538 : {
539 125 : if (m_nLevel >= m_nKeyLevel)
540 125 : ++m_iArrayMemberIdx;
541 125 : }
542 :
543 2 : void Exception(const char *pszMessage) override
544 : {
545 2 : CPLError(CE_Failure, CPLE_AppDefined, "%s", pszMessage);
546 2 : }
547 :
548 : private:
549 : std::shared_ptr<VSIKerchunkRefFile> m_refFile{};
550 : int m_nLevel = 0;
551 : int m_nArrayLevel = 0;
552 : int m_iArrayMemberIdx = -1;
553 : bool m_bFirstMember = false;
554 : int m_nKeyLevel = std::numeric_limits<int>::max();
555 : std::string m_osCurKey{};
556 : std::string m_osURI{};
557 : std::string m_osTmpForNumber{};
558 : uint64_t m_nOffset = 0;
559 : uint32_t m_nSize = 0;
560 :
561 : CPLJSonStreamingWriter m_oWriter{nullptr, nullptr};
562 :
563 186 : void FinishObjectValueProcessing()
564 : {
565 186 : if (!m_osCurKey.empty())
566 : {
567 93 : const std::string &osStr = m_oWriter.GetString();
568 93 : CPL_IGNORE_RET_VAL(m_refFile->AddInlineContent(m_osCurKey, osStr));
569 :
570 93 : m_oWriter.clear();
571 :
572 93 : m_osCurKey.clear();
573 : }
574 186 : }
575 :
576 12 : void UnexpectedContent()
577 : {
578 12 : CPLError(CE_Failure, CPLE_AppDefined, "Unexpected content");
579 12 : StopParsing();
580 12 : }
581 :
582 10 : void UnexpectedContentInArray()
583 : {
584 10 : CPLError(CE_Failure, CPLE_AppDefined,
585 : "Unexpected content at position %d of array",
586 : m_iArrayMemberIdx);
587 10 : StopParsing();
588 10 : }
589 : };
590 : } // namespace
591 :
592 : /************************************************************************/
593 : /* VSIKerchunkJSONRefFileSystem::LoadStreaming() */
594 : /************************************************************************/
595 :
596 : std::shared_ptr<VSIKerchunkRefFile>
597 71 : VSIKerchunkJSONRefFileSystem::LoadStreaming(const std::string &osJSONFilename,
598 : GDALProgressFunc pfnProgress,
599 : void *pProgressData)
600 : {
601 142 : auto refFile = std::make_shared<VSIKerchunkRefFile>();
602 142 : VSIKerchunkJSONRefParser parser(refFile);
603 :
604 71 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem",
605 : "Using streaming parser for %s", osJSONFilename.c_str());
606 :
607 : // For network file systems, get the streaming version of the filename,
608 : // as we don't need arbitrary seeking in the file
609 : const std::string osFilename =
610 71 : VSIFileManager::GetHandler(osJSONFilename.c_str())
611 142 : ->GetStreamingFilename(osJSONFilename);
612 :
613 142 : auto f = VSIVirtualHandleUniquePtr(VSIFOpenL(osFilename.c_str(), "rb"));
614 71 : if (!f)
615 : {
616 2 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
617 : osJSONFilename.c_str());
618 2 : return nullptr;
619 : }
620 69 : uint64_t nTotalSize = 0;
621 69 : if (!cpl::starts_with(osFilename, "/vsigzip/"))
622 : {
623 69 : f->Seek(0, SEEK_END);
624 69 : nTotalSize = f->Tell();
625 69 : f->Seek(0, SEEK_SET);
626 : }
627 138 : std::string sBuffer;
628 69 : constexpr size_t BUFFER_SIZE = 10 * 1024 * 1024; // Arbitrary
629 69 : sBuffer.resize(BUFFER_SIZE);
630 : while (true)
631 : {
632 70 : const size_t nRead = f->Read(sBuffer.data(), 1, sBuffer.size());
633 70 : const bool bFinished = nRead < sBuffer.size();
634 : try
635 : {
636 70 : if (!parser.Parse(sBuffer.data(), nRead, bFinished))
637 : {
638 : // The parser will have emitted an error
639 42 : return nullptr;
640 : }
641 : }
642 0 : catch (const std::exception &e)
643 : {
644 : // Out-of-memory typically
645 0 : CPLError(CE_Failure, CPLE_AppDefined,
646 : "Exception occurred while parsing %s: %s",
647 0 : osJSONFilename.c_str(), e.what());
648 0 : return nullptr;
649 : }
650 28 : if (nTotalSize)
651 : {
652 27 : const double dfProgressRatio = static_cast<double>(f->Tell()) /
653 27 : static_cast<double>(nTotalSize);
654 27 : CPLDebug("VSIKerchunkJSONRefFileSystem", "%02.1f %% of %s read",
655 : 100 * dfProgressRatio, osJSONFilename.c_str());
656 28 : if (pfnProgress &&
657 1 : !pfnProgress(dfProgressRatio, "Parsing of JSON file",
658 : pProgressData))
659 : {
660 0 : return nullptr;
661 : }
662 : }
663 : else
664 : {
665 1 : CPLDebug("VSIKerchunkJSONRefFileSystem",
666 : "%" PRIu64 " bytes read in %s",
667 1 : static_cast<uint64_t>(f->Tell()), osJSONFilename.c_str());
668 : }
669 28 : if (nRead < sBuffer.size())
670 : {
671 27 : break;
672 : }
673 1 : }
674 27 : if (f->Tell() == 0)
675 : {
676 1 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
677 : osJSONFilename.c_str());
678 1 : return nullptr;
679 : }
680 26 : if (pfnProgress)
681 1 : pfnProgress(1.0, "Parsing of JSON file", pProgressData);
682 :
683 26 : return refFile;
684 : }
685 :
686 : /************************************************************************/
687 : /* VSIKerchunkJSONRefFileSystem::LoadInternal() */
688 : /************************************************************************/
689 :
690 : std::shared_ptr<VSIKerchunkRefFile>
691 109 : VSIKerchunkJSONRefFileSystem::LoadInternal(const std::string &osJSONFilename,
692 : GDALProgressFunc pfnProgress,
693 : void *pProgressData)
694 : {
695 109 : const char *pszUseStreamingParser = VSIGetPathSpecificOption(
696 : osJSONFilename.c_str(), "VSIKERCHUNK_USE_STREAMING_PARSER", "AUTO");
697 109 : if (EQUAL(pszUseStreamingParser, "AUTO"))
698 : {
699 : auto f =
700 50 : VSIVirtualHandleUniquePtr(VSIFOpenL(osJSONFilename.c_str(), "rb"));
701 50 : if (!f)
702 : {
703 6 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
704 : osJSONFilename.c_str());
705 6 : return nullptr;
706 : }
707 44 : std::string sBuffer;
708 44 : constexpr size_t HEADER_SIZE = 1024; // Arbitrary
709 44 : sBuffer.resize(HEADER_SIZE);
710 44 : const size_t nRead = f->Read(sBuffer.data(), 1, sBuffer.size());
711 44 : sBuffer.resize(nRead);
712 44 : if (ZARRIsLikelyStreamableKerchunkJSONRefContent(sBuffer))
713 : {
714 41 : return LoadStreaming(osJSONFilename, pfnProgress, pProgressData);
715 : }
716 : }
717 59 : else if (CPLTestBool(pszUseStreamingParser))
718 : {
719 30 : return LoadStreaming(osJSONFilename, pfnProgress, pProgressData);
720 : }
721 :
722 64 : CPLJSONDocument oDoc;
723 : {
724 : #if SIZEOF_VOIDP > 4
725 32 : CPLConfigOptionSetter oSetter("CPL_JSON_MAX_SIZE", "1GB", true);
726 : #endif
727 32 : if (!oDoc.Load(osJSONFilename))
728 : {
729 5 : CPLError(CE_Failure, CPLE_AppDefined,
730 : "VSIKerchunkJSONRefFileSystem: cannot open %s",
731 : osJSONFilename.c_str());
732 5 : return nullptr;
733 : }
734 : }
735 :
736 54 : auto oRoot = oDoc.GetRoot();
737 81 : const auto oVersion = oRoot.GetObj("version");
738 54 : CPLJSONObject oRefs;
739 27 : if (!oVersion.IsValid())
740 : {
741 : // Cf https://fsspec.github.io/kerchunk/spec.html#version-0
742 :
743 23 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem",
744 : "'version' key not found. Assuming version 0");
745 23 : oRefs = std::move(oRoot);
746 23 : if (!oRefs.GetObj(".zgroup").IsValid())
747 : {
748 0 : CPLError(CE_Failure, CPLE_AppDefined,
749 : "VSIKerchunkJSONRefFileSystem: '.zgroup' key not found");
750 0 : return nullptr;
751 : }
752 : }
753 4 : else if (oVersion.GetType() != CPLJSONObject::Type::Integer)
754 : {
755 4 : CPLError(CE_Failure, CPLE_AppDefined,
756 : "VSIKerchunkJSONRefFileSystem: 'version' key not integer");
757 4 : return nullptr;
758 : }
759 0 : else if (oVersion.ToInteger() != 1)
760 : {
761 0 : CPLError(CE_Failure, CPLE_NotSupported,
762 : "VSIKerchunkJSONRefFileSystem: 'version' = %d not handled",
763 : oVersion.ToInteger());
764 0 : return nullptr;
765 : }
766 : else
767 : {
768 : // Cf https://fsspec.github.io/kerchunk/spec.html#version-1
769 :
770 0 : if (oRoot.GetObj("templates").IsValid())
771 : {
772 0 : CPLError(CE_Failure, CPLE_NotSupported,
773 : "VSIKerchunkJSONRefFileSystem: 'templates' key found, but "
774 : "not supported");
775 0 : return nullptr;
776 : }
777 :
778 0 : if (oRoot.GetObj("gen").IsValid())
779 : {
780 0 : CPLError(CE_Failure, CPLE_NotSupported,
781 : "VSIKerchunkJSONRefFileSystem: 'gen' key found, but not "
782 : "supported");
783 0 : return nullptr;
784 : }
785 :
786 0 : oRefs = oRoot.GetObj("refs");
787 0 : if (!oRefs.IsValid())
788 : {
789 0 : CPLError(CE_Failure, CPLE_AppDefined,
790 : "VSIKerchunkJSONRefFileSystem: 'refs' key not found");
791 0 : return nullptr;
792 : }
793 :
794 0 : if (oRoot.GetObj("templates").IsValid())
795 : {
796 0 : CPLError(CE_Failure, CPLE_NotSupported,
797 : "VSIKerchunkJSONRefFileSystem: 'templates' key found but "
798 : "not supported");
799 0 : return nullptr;
800 : }
801 :
802 0 : if (oRoot.GetObj("templates").IsValid())
803 : {
804 0 : CPLError(CE_Failure, CPLE_NotSupported,
805 : "VSIKerchunkJSONRefFileSystem: 'templates' key found but "
806 : "not supported");
807 0 : return nullptr;
808 : }
809 : }
810 :
811 23 : if (oRefs.GetType() != CPLJSONObject::Type::Object)
812 : {
813 0 : CPLError(CE_Failure, CPLE_AppDefined,
814 : "VSIKerchunkJSONRefFileSystem: value of 'refs' is not a dict");
815 0 : return nullptr;
816 : }
817 :
818 46 : auto refFile = std::make_shared<VSIKerchunkRefFile>();
819 50 : for (const auto &oEntry : oRefs.GetChildren())
820 : {
821 45 : const std::string osKeyName = oEntry.GetName();
822 45 : if (oEntry.GetType() == CPLJSONObject::Type::String)
823 : {
824 2 : if (!refFile->AddInlineContent(osKeyName, oEntry.ToString()))
825 : {
826 1 : return nullptr;
827 : }
828 : }
829 43 : else if (oEntry.GetType() == CPLJSONObject::Type::Object)
830 : {
831 : const std::string osSerialized =
832 24 : oEntry.Format(CPLJSONObject::PrettyFormat::Plain);
833 24 : CPL_IGNORE_RET_VAL(
834 24 : refFile->AddInlineContent(osKeyName, osSerialized));
835 : }
836 19 : else if (oEntry.GetType() == CPLJSONObject::Type::Array)
837 : {
838 15 : const auto oArray = oEntry.ToArray();
839 : // Some files such as https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr/reference.json
840 : // (pointed by https://guide.cloudnativegeo.org/kerchunk/kerchunk-in-practice.html)
841 : // have array entries with just the URL, and no offset/size
842 : // This is when the whole file needs to be read
843 15 : if (oArray.Size() != 1 && oArray.Size() != 3)
844 : {
845 3 : CPLError(CE_Failure, CPLE_AppDefined,
846 : "VSIKerchunkJSONRefFileSystem: array value for key "
847 : "'%s' is not of size 1 or 3",
848 : osKeyName.c_str());
849 3 : return nullptr;
850 : }
851 12 : if (oArray[0].GetType() != CPLJSONObject::Type::String)
852 : {
853 4 : CPLError(CE_Failure, CPLE_AppDefined,
854 : "VSIKerchunkJSONRefFileSystem: array value at index 0 "
855 : "for key '%s' is not a string",
856 : osKeyName.c_str());
857 4 : return nullptr;
858 : }
859 8 : if (oArray.Size() == 3)
860 : {
861 8 : if ((oArray[1].GetType() != CPLJSONObject::Type::Integer &&
862 23 : oArray[1].GetType() != CPLJSONObject::Type::Long) ||
863 15 : !(oArray[1].ToLong() >= 0))
864 : {
865 2 : CPLError(
866 : CE_Failure, CPLE_AppDefined,
867 : "VSIKerchunkJSONRefFileSystem: array value at index 1 "
868 : "for key '%s' is not an unsigned 64 bit integer",
869 : osKeyName.c_str());
870 2 : return nullptr;
871 : }
872 6 : if ((oArray[2].GetType() != CPLJSONObject::Type::Integer &&
873 16 : oArray[2].GetType() != CPLJSONObject::Type::Long) ||
874 10 : !(oArray[2].ToLong() >= 0 &&
875 9 : static_cast<uint64_t>(oArray[2].ToLong()) <=
876 3 : std::numeric_limits<uint32_t>::max()))
877 : {
878 4 : CPLError(
879 : CE_Failure, CPLE_AppDefined,
880 : "VSIKerchunkJSONRefFileSystem: array value at index 2 "
881 : "for key '%s' is not an unsigned 32 bit integer",
882 : osKeyName.c_str());
883 4 : return nullptr;
884 : }
885 : }
886 :
887 4 : refFile->AddReferencedContent(
888 4 : osKeyName, oArray[0].ToString(),
889 4 : oArray.Size() == 3 ? oArray[1].ToLong() : 0,
890 4 : oArray.Size() == 3 ? static_cast<uint32_t>(oArray[2].ToLong())
891 : : 0);
892 : }
893 : else
894 : {
895 4 : CPLError(
896 : CE_Failure, CPLE_AppDefined,
897 : "VSIKerchunkJSONRefFileSystem: invalid value type for key '%s'",
898 : osKeyName.c_str());
899 4 : return nullptr;
900 : }
901 : }
902 :
903 5 : return refFile;
904 : }
905 :
906 : /************************************************************************/
907 : /* VSIKerchunkJSONRefFileSystem::Load() */
908 : /************************************************************************/
909 :
910 : std::pair<std::shared_ptr<VSIKerchunkRefFile>, std::string>
911 335 : VSIKerchunkJSONRefFileSystem::Load(const std::string &osJSONFilename,
912 : bool bUseCache)
913 : {
914 335 : std::shared_ptr<VSIKerchunkRefFile> refFile;
915 335 : if (m_oCache.tryGet(osJSONFilename, refFile))
916 211 : return {refFile, std::string()};
917 :
918 : // Deal with local file cache
919 124 : const char *pszUseCache = VSIGetPathSpecificOption(
920 : osJSONFilename.c_str(), "VSIKERCHUNK_USE_CACHE", "NO");
921 124 : if (bUseCache || CPLTestBool(pszUseCache))
922 : {
923 26 : if (GDALGetDriverByName("PARQUET") == nullptr)
924 : {
925 0 : CPLError(CE_Failure, CPLE_NotSupported,
926 : "VSIKERCHUNK_USE_CACHE=YES only enabled if PARQUET driver "
927 : "is available");
928 0 : return {nullptr, std::string()};
929 : }
930 :
931 : VSIStatBufL sStat;
932 52 : if (VSIStatL(osJSONFilename.c_str(), &sStat) != 0 ||
933 26 : VSI_ISDIR(sStat.st_mode))
934 : {
935 0 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
936 : osJSONFilename.c_str());
937 0 : return {nullptr, std::string()};
938 : }
939 :
940 26 : std::string osCacheSubDir = CPLGetBasenameSafe(osJSONFilename.c_str());
941 : osCacheSubDir += CPLSPrintf("_%" PRIu64 "_%" PRIu64,
942 26 : static_cast<uint64_t>(sStat.st_size),
943 26 : static_cast<uint64_t>(sStat.st_mtime));
944 :
945 26 : const std::string osRootCacheDir = GDALGetCacheDirectory();
946 26 : if (!osRootCacheDir.empty())
947 : {
948 : const std::string osKerchunkCacheDir = VSIGetPathSpecificOption(
949 : osJSONFilename.c_str(), "VSIKERCHUNK_CACHE_DIR",
950 52 : CPLFormFilenameSafe(osRootCacheDir.c_str(),
951 : "zarr_kerchunk_cache", nullptr)
952 78 : .c_str());
953 : const std::string osCacheDir = CPLFormFilenameSafe(
954 52 : osKerchunkCacheDir.c_str(), osCacheSubDir.c_str(), "zarr");
955 26 : CPLDebug("VSIKerchunkJSONRefFileSystem", "Using cache dir %s",
956 : osCacheDir.c_str());
957 :
958 52 : if (VSIStatL(CPLFormFilenameSafe(osCacheDir.c_str(), ".zmetadata",
959 : nullptr)
960 : .c_str(),
961 26 : &sStat) == 0)
962 : {
963 9 : CPLDebug("VSIKerchunkJSONRefFileSystem",
964 : "Using Kerchunk Parquet cache %s", osCacheDir.c_str());
965 9 : return {nullptr, osCacheDir};
966 : }
967 :
968 23 : if (VSIMkdirRecursive(osCacheDir.c_str(), 0755) != 0 &&
969 6 : !(VSIStatL(osCacheDir.c_str(), &sStat) == 0 &&
970 0 : VSI_ISDIR(sStat.st_mode)))
971 : {
972 6 : CPLError(CE_Failure, CPLE_AppDefined,
973 : "Cannot create directory %s", osCacheDir.c_str());
974 6 : return {nullptr, std::string()};
975 : }
976 :
977 : const std::string osLockFilename =
978 22 : CPLFormFilenameSafe(osCacheDir.c_str(), ".lock", nullptr);
979 :
980 11 : CPLLockFileHandle hLockHandle = nullptr;
981 22 : CPLStringList aosOptions;
982 11 : aosOptions.SetNameValue("VERBOSE_WAIT_MESSAGE", "YES");
983 : const char *pszKerchunkDebug =
984 11 : CPLGetConfigOption("VSIKERCHUNK_FOR_TESTS", nullptr);
985 11 : if (pszKerchunkDebug &&
986 4 : strstr(pszKerchunkDebug, "SHORT_DELAY_STALLED_LOCK"))
987 : {
988 2 : aosOptions.SetNameValue("STALLED_DELAY", "1");
989 : }
990 :
991 11 : CPLDebug("VSIKerchunkJSONRefFileSystem", "Acquiring lock");
992 11 : switch (CPLLockFileEx(osLockFilename.c_str(), &hLockHandle,
993 11 : aosOptions.List()))
994 : {
995 10 : case CLFS_OK:
996 10 : break;
997 1 : case CLFS_CANNOT_CREATE_LOCK:
998 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create lock %s",
999 : osLockFilename.c_str());
1000 1 : break;
1001 0 : case CLFS_LOCK_BUSY:
1002 0 : CPLAssert(false); // cannot happen with infinite wait time
1003 : break;
1004 0 : case CLFS_API_MISUSE:
1005 0 : CPLAssert(false);
1006 : break;
1007 0 : case CLFS_THREAD_CREATION_FAILED:
1008 0 : CPLError(CE_Failure, CPLE_AppDefined,
1009 : "Thread creation failed for refresh of %s",
1010 : osLockFilename.c_str());
1011 0 : break;
1012 : }
1013 11 : if (!hLockHandle)
1014 : {
1015 1 : return {nullptr, std::string()};
1016 : }
1017 :
1018 : struct LockFileHolder
1019 : {
1020 : CPLLockFileHandle m_hLockHandle = nullptr;
1021 :
1022 10 : explicit LockFileHolder(CPLLockFileHandle hLockHandleIn)
1023 10 : : m_hLockHandle(hLockHandleIn)
1024 : {
1025 10 : }
1026 :
1027 10 : ~LockFileHolder()
1028 10 : {
1029 10 : release();
1030 10 : }
1031 :
1032 20 : void release()
1033 : {
1034 20 : if (m_hLockHandle)
1035 : {
1036 10 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1037 : "Releasing lock");
1038 10 : CPLUnlockFileEx(m_hLockHandle);
1039 10 : m_hLockHandle = nullptr;
1040 : }
1041 20 : }
1042 :
1043 : CPL_DISALLOW_COPY_ASSIGN(LockFileHolder)
1044 : };
1045 :
1046 20 : LockFileHolder lockFileHolder(hLockHandle);
1047 :
1048 20 : if (VSIStatL(CPLFormFilenameSafe(osCacheDir.c_str(), ".zmetadata",
1049 : nullptr)
1050 : .c_str(),
1051 10 : &sStat) == 0)
1052 : {
1053 0 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1054 : "Using Kerchunk Parquet cache %s (after lock taking)",
1055 : osCacheDir.c_str());
1056 0 : return {nullptr, osCacheDir};
1057 : }
1058 :
1059 10 : refFile = LoadInternal(osJSONFilename, nullptr, nullptr);
1060 :
1061 10 : if (refFile)
1062 : {
1063 10 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1064 : "Generating Kerchunk Parquet cache %s...",
1065 : osCacheDir.c_str());
1066 :
1067 10 : if (pszKerchunkDebug &&
1068 3 : strstr(pszKerchunkDebug,
1069 : "WAIT_BEFORE_CONVERT_TO_PARQUET_REF"))
1070 : {
1071 1 : CPLSleep(0.5);
1072 : }
1073 :
1074 10 : if (refFile->ConvertToParquetRef(osCacheDir, nullptr, nullptr))
1075 : {
1076 4 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1077 : "Generation Kerchunk Parquet cache %s: OK",
1078 : osCacheDir.c_str());
1079 : }
1080 : else
1081 : {
1082 6 : CPLError(CE_Failure, CPLE_AppDefined,
1083 : "Generation of Kerchunk Parquet cache %s failed",
1084 : osCacheDir.c_str());
1085 6 : refFile.reset();
1086 : }
1087 :
1088 10 : lockFileHolder.release();
1089 10 : m_oCache.insert(osJSONFilename, refFile);
1090 : }
1091 :
1092 10 : return {refFile, std::string()};
1093 : }
1094 : }
1095 :
1096 98 : refFile = LoadInternal(osJSONFilename, nullptr, nullptr);
1097 98 : if (refFile)
1098 20 : m_oCache.insert(osJSONFilename, refFile);
1099 98 : return {refFile, std::string()};
1100 : }
1101 :
1102 : /************************************************************************/
1103 : /* VSIKerchunkRefFile::ConvertToParquetRef() */
1104 : /************************************************************************/
1105 :
1106 11 : bool VSIKerchunkRefFile::ConvertToParquetRef(const std::string &osCacheDir,
1107 : GDALProgressFunc pfnProgress,
1108 : void *pProgressData)
1109 : {
1110 : struct Serializer
1111 : {
1112 : VSIVirtualHandle *m_poFile = nullptr;
1113 :
1114 11 : explicit Serializer(VSIVirtualHandle *poFile) : m_poFile(poFile)
1115 : {
1116 11 : }
1117 :
1118 289 : static void func(const char *pszTxt, void *pUserData)
1119 : {
1120 289 : Serializer *self = static_cast<Serializer *>(pUserData);
1121 289 : self->m_poFile->Write(pszTxt, 1, strlen(pszTxt));
1122 289 : }
1123 : };
1124 :
1125 : const std::string osZMetadataFilename =
1126 22 : CPLFormFilenameSafe(osCacheDir.c_str(), ".zmetadata", nullptr);
1127 22 : const std::string osZMetadataTmpFilename = osZMetadataFilename + ".tmp";
1128 : auto poFile = std::unique_ptr<VSIVirtualHandle>(
1129 22 : VSIFOpenL(osZMetadataTmpFilename.c_str(), "wb"));
1130 11 : if (!poFile)
1131 0 : return false;
1132 11 : Serializer serializer(poFile.get());
1133 :
1134 : struct ZarrArrayInfo
1135 : {
1136 : std::vector<uint64_t> anChunkCount{};
1137 : std::map<uint64_t, const VSIKerchunkKeyInfo *> chunkInfo{};
1138 : };
1139 :
1140 22 : std::map<std::string, ZarrArrayInfo> zarrArrays;
1141 :
1142 : // First pass on keys to write JSON objects in .zmetadata
1143 22 : CPLJSonStreamingWriter oWriter(Serializer::func, &serializer);
1144 11 : oWriter.StartObj();
1145 11 : oWriter.AddObjKey("metadata");
1146 11 : oWriter.StartObj();
1147 :
1148 11 : bool bOK = true;
1149 11 : size_t nCurObjectIter = 0;
1150 11 : const size_t nTotalObjects = m_oMapKeys.size();
1151 44 : for (const auto &[key, info] : m_oMapKeys)
1152 : {
1153 54 : if (cpl::ends_with(key, ".zarray") || cpl::ends_with(key, ".zgroup") ||
1154 16 : cpl::ends_with(key, ".zattrs"))
1155 : {
1156 32 : CPLJSONDocument oDoc;
1157 32 : std::string osStr;
1158 32 : osStr.append(reinterpret_cast<const char *>(info.abyValue.data()),
1159 64 : info.abyValue.size());
1160 32 : if (!oDoc.LoadMemory(osStr.c_str()))
1161 : {
1162 1 : CPLError(CE_Failure, CPLE_AppDefined,
1163 : "Cannot parse JSON content for %s", key.c_str());
1164 1 : bOK = false;
1165 1 : break;
1166 : }
1167 :
1168 31 : if (cpl::ends_with(key, ".zarray"))
1169 : {
1170 10 : const std::string osArrayName = CPLGetDirnameSafe(key.c_str());
1171 :
1172 20 : const auto oShape = oDoc.GetRoot().GetArray("shape");
1173 20 : const auto oChunks = oDoc.GetRoot().GetArray("chunks");
1174 20 : if (oShape.IsValid() && oChunks.IsValid() &&
1175 10 : oShape.Size() == oChunks.Size())
1176 : {
1177 18 : std::vector<uint64_t> anChunkCount;
1178 9 : uint64_t nTotalChunkCount = 1;
1179 17 : for (int i = 0; i < oShape.Size(); ++i)
1180 : {
1181 11 : const auto nShape = oShape[i].ToLong();
1182 11 : const auto nChunk = oChunks[i].ToLong();
1183 11 : if (nShape == 0 || nChunk == 0)
1184 : {
1185 2 : bOK = false;
1186 3 : break;
1187 : }
1188 9 : const uint64_t nChunkCount =
1189 9 : DIV_ROUND_UP(nShape, nChunk);
1190 9 : if (nChunkCount > std::numeric_limits<uint64_t>::max() /
1191 : nTotalChunkCount)
1192 : {
1193 1 : bOK = false;
1194 1 : break;
1195 : }
1196 8 : anChunkCount.push_back(nChunkCount);
1197 8 : nTotalChunkCount *= nChunkCount;
1198 : }
1199 9 : zarrArrays[osArrayName].anChunkCount =
1200 18 : std::move(anChunkCount);
1201 : }
1202 : else
1203 : {
1204 1 : bOK = false;
1205 : }
1206 10 : if (!bOK)
1207 : {
1208 4 : CPLError(CE_Failure, CPLE_AppDefined,
1209 : "Invalid Zarr array definition for %s",
1210 : osArrayName.c_str());
1211 4 : oWriter.clear();
1212 4 : break;
1213 : }
1214 : }
1215 :
1216 27 : oWriter.AddObjKey(key);
1217 27 : oWriter.AddSerializedValue(osStr);
1218 :
1219 27 : ++nCurObjectIter;
1220 31 : if (pfnProgress &&
1221 4 : !pfnProgress(static_cast<double>(nCurObjectIter) /
1222 4 : static_cast<double>(nTotalObjects),
1223 : "", pProgressData))
1224 : {
1225 0 : return false;
1226 : }
1227 : }
1228 : }
1229 :
1230 11 : constexpr int nRecordSize = 100000;
1231 :
1232 11 : if (bOK)
1233 : {
1234 6 : oWriter.EndObj();
1235 6 : oWriter.AddObjKey("record_size");
1236 6 : oWriter.Add(nRecordSize);
1237 6 : oWriter.EndObj();
1238 : }
1239 :
1240 11 : bOK = (poFile->Close() == 0) && bOK;
1241 11 : poFile.reset();
1242 :
1243 11 : if (!bOK)
1244 : {
1245 5 : oWriter.clear();
1246 5 : VSIUnlink(osZMetadataTmpFilename.c_str());
1247 5 : return false;
1248 : }
1249 :
1250 : // Second pass to retrieve chunks
1251 33 : for (const auto &[key, info] : m_oMapKeys)
1252 : {
1253 44 : if (cpl::ends_with(key, ".zarray") || cpl::ends_with(key, ".zgroup") ||
1254 16 : cpl::ends_with(key, ".zattrs"))
1255 : {
1256 : // already done
1257 : }
1258 : else
1259 : {
1260 6 : const std::string osArrayName = CPLGetDirnameSafe(key.c_str());
1261 6 : auto oIter = zarrArrays.find(osArrayName);
1262 6 : if (oIter != zarrArrays.end())
1263 : {
1264 6 : auto &oArrayInfo = oIter->second;
1265 : const CPLStringList aosIndices(
1266 6 : CSLTokenizeString2(CPLGetFilename(key.c_str()), ".", 0));
1267 6 : if ((static_cast<size_t>(aosIndices.size()) ==
1268 6 : oArrayInfo.anChunkCount.size()) ||
1269 0 : (aosIndices.size() == 1 &&
1270 0 : strcmp(aosIndices[0], "0") == 0 &&
1271 0 : oArrayInfo.anChunkCount.empty()))
1272 : {
1273 6 : std::vector<uint64_t> anIndices;
1274 11 : for (size_t i = 0; i < oArrayInfo.anChunkCount.size(); ++i)
1275 : {
1276 6 : char *endptr = nullptr;
1277 6 : anIndices.push_back(
1278 6 : std::strtoull(aosIndices[i], &endptr, 10));
1279 6 : if (aosIndices[i][0] == '-' ||
1280 12 : endptr != aosIndices[i] + strlen(aosIndices[i]) ||
1281 6 : anIndices[i] >= oArrayInfo.anChunkCount[i])
1282 : {
1283 1 : CPLError(CE_Failure, CPLE_AppDefined,
1284 : "Invalid key indices: %s", key.c_str());
1285 1 : return false;
1286 : }
1287 : }
1288 :
1289 5 : uint64_t nLinearIndex = 0;
1290 5 : uint64_t nMulFactor = 1;
1291 10 : for (size_t i = anIndices.size(); i > 0;)
1292 : {
1293 5 : --i;
1294 5 : nLinearIndex += anIndices[i] * nMulFactor;
1295 5 : nMulFactor *= oArrayInfo.anChunkCount[i];
1296 : }
1297 :
1298 : #ifdef DEBUG_VERBOSE
1299 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem",
1300 : "Chunk %" PRIu64 " of array %s found",
1301 : nLinearIndex, osArrayName.c_str());
1302 : #endif
1303 5 : oArrayInfo.chunkInfo[nLinearIndex] = &info;
1304 : }
1305 : }
1306 : }
1307 : }
1308 :
1309 5 : auto poDrv = GetGDALDriverManager()->GetDriverByName("PARQUET");
1310 5 : if (!poDrv)
1311 : {
1312 : // shouldn't happen given earlier checks
1313 0 : CPLAssert(false);
1314 : return false;
1315 : }
1316 :
1317 : // Third pass to create Parquet files
1318 10 : CPLStringList aosLayerCreationOptions;
1319 : aosLayerCreationOptions.SetNameValue("ROW_GROUP_SIZE",
1320 5 : CPLSPrintf("%d", nRecordSize));
1321 :
1322 10 : for (const auto &[osArrayName, oArrayInfo] : zarrArrays)
1323 : {
1324 5 : uint64_t nChunkCount = 1;
1325 10 : for (size_t i = 0; i < oArrayInfo.anChunkCount.size(); ++i)
1326 : {
1327 5 : nChunkCount *= oArrayInfo.anChunkCount[i];
1328 : }
1329 :
1330 0 : std::unique_ptr<GDALDataset> poDS;
1331 5 : OGRLayer *poLayer = nullptr;
1332 :
1333 10 : for (uint64_t i = 0; i < nChunkCount; ++i)
1334 : {
1335 5 : if ((i % nRecordSize) == 0)
1336 : {
1337 5 : if (poDS)
1338 : {
1339 0 : if (poDS->Close() != CE_None)
1340 : {
1341 0 : CPLError(CE_Failure, CPLE_AppDefined,
1342 : "Close() on %s failed",
1343 0 : poDS->GetDescription());
1344 0 : return false;
1345 : }
1346 0 : poDS.reset();
1347 : }
1348 : const std::string osParqFilename = CPLFormFilenameSafe(
1349 5 : CPLFormFilenameSafe(osCacheDir.c_str(), osArrayName.c_str(),
1350 : nullptr)
1351 : .c_str(),
1352 : CPLSPrintf("refs.%" PRIu64 ".parq", i / nRecordSize),
1353 10 : nullptr);
1354 5 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Creating %s",
1355 : osParqFilename.c_str());
1356 5 : VSIMkdirRecursive(
1357 10 : CPLGetPathSafe(osParqFilename.c_str()).c_str(), 0755);
1358 5 : poDS.reset(poDrv->Create(osParqFilename.c_str(), 0, 0, 0,
1359 : GDT_Unknown, nullptr));
1360 5 : if (!poDS)
1361 0 : return false;
1362 10 : poLayer = poDS->CreateLayer(
1363 10 : CPLGetBasenameSafe(osParqFilename.c_str()).c_str(), nullptr,
1364 5 : wkbNone, aosLayerCreationOptions.List());
1365 5 : if (poLayer)
1366 : {
1367 : {
1368 10 : OGRFieldDefn oFieldDefn("path", OFTString);
1369 5 : poLayer->CreateField(&oFieldDefn);
1370 : }
1371 : {
1372 10 : OGRFieldDefn oFieldDefn("offset", OFTInteger64);
1373 5 : poLayer->CreateField(&oFieldDefn);
1374 : }
1375 : {
1376 10 : OGRFieldDefn oFieldDefn("size", OFTInteger64);
1377 5 : poLayer->CreateField(&oFieldDefn);
1378 : }
1379 : {
1380 10 : OGRFieldDefn oFieldDefn("raw", OFTBinary);
1381 5 : poLayer->CreateField(&oFieldDefn);
1382 : }
1383 : }
1384 : }
1385 5 : if (!poLayer)
1386 0 : return false;
1387 :
1388 : auto poFeature =
1389 5 : std::make_unique<OGRFeature>(poLayer->GetLayerDefn());
1390 5 : auto oIter = oArrayInfo.chunkInfo.find(i);
1391 5 : if (oIter != oArrayInfo.chunkInfo.end())
1392 : {
1393 5 : const auto *chunkInfo = oIter->second;
1394 5 : if (chunkInfo->posURI)
1395 5 : poFeature->SetField(0, chunkInfo->posURI->c_str());
1396 5 : poFeature->SetField(1,
1397 5 : static_cast<GIntBig>(chunkInfo->nOffset));
1398 5 : poFeature->SetField(2, static_cast<GIntBig>(chunkInfo->nSize));
1399 5 : if (!chunkInfo->abyValue.empty())
1400 : {
1401 0 : if (chunkInfo->abyValue.size() >
1402 : static_cast<size_t>(INT_MAX))
1403 : {
1404 0 : CPLError(CE_Failure, CPLE_NotSupported,
1405 : "Too big blob for chunk %" PRIu64
1406 : " of array %s",
1407 : i, osArrayName.c_str());
1408 0 : return false;
1409 : }
1410 0 : poFeature->SetField(
1411 0 : 3, static_cast<int>(chunkInfo->abyValue.size()),
1412 0 : static_cast<const void *>(chunkInfo->abyValue.data()));
1413 : }
1414 : }
1415 :
1416 5 : if (poLayer->CreateFeature(poFeature.get()) != OGRERR_NONE)
1417 : {
1418 0 : CPLError(CE_Failure, CPLE_AppDefined,
1419 : "CreateFeature() on %s failed",
1420 0 : poDS->GetDescription());
1421 0 : return false;
1422 : }
1423 :
1424 5 : ++nCurObjectIter;
1425 5 : if (pfnProgress && (nCurObjectIter % 1000) == 0 &&
1426 0 : !pfnProgress(static_cast<double>(nCurObjectIter) /
1427 0 : static_cast<double>(nTotalObjects),
1428 : "", pProgressData))
1429 : {
1430 0 : return false;
1431 : }
1432 : }
1433 :
1434 5 : if (poDS)
1435 : {
1436 5 : if (poDS->Close() != CE_None)
1437 : {
1438 0 : CPLError(CE_Failure, CPLE_AppDefined, "Close() on %s failed",
1439 0 : poDS->GetDescription());
1440 0 : return false;
1441 : }
1442 5 : poDS.reset();
1443 : }
1444 : }
1445 :
1446 5 : if (VSIRename(osZMetadataTmpFilename.c_str(),
1447 5 : osZMetadataFilename.c_str()) != 0)
1448 : {
1449 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot rename %s to %s",
1450 : osZMetadataTmpFilename.c_str(), osZMetadataFilename.c_str());
1451 0 : return false;
1452 : }
1453 :
1454 5 : if (pfnProgress)
1455 1 : pfnProgress(1.0, "", pProgressData);
1456 :
1457 5 : return true;
1458 : }
1459 :
1460 : /************************************************************************/
1461 : /* VSIKerchunkConvertJSONToParquet() */
1462 : /************************************************************************/
1463 :
1464 1 : bool VSIKerchunkConvertJSONToParquet(const char *pszSrcJSONFilename,
1465 : const char *pszDstDirname,
1466 : GDALProgressFunc pfnProgress,
1467 : void *pProgressData)
1468 : {
1469 1 : if (GDALGetDriverByName("PARQUET") == nullptr)
1470 : {
1471 0 : CPLError(CE_Failure, CPLE_NotSupported,
1472 : "Conversion to a Parquet reference store is not possible "
1473 : "because the PARQUET driver is not available.");
1474 0 : return false;
1475 : }
1476 :
1477 1 : auto poFS = cpl::down_cast<VSIKerchunkJSONRefFileSystem *>(
1478 : VSIFileManager::GetHandler(JSON_REF_FS_PREFIX));
1479 1 : std::shared_ptr<VSIKerchunkRefFile> refFile;
1480 1 : if (!poFS->m_oCache.tryGet(pszSrcJSONFilename, refFile))
1481 : {
1482 : void *pScaledProgressData =
1483 1 : GDALCreateScaledProgress(0.0, 0.5, pfnProgress, pProgressData);
1484 : try
1485 : {
1486 2 : refFile = poFS->LoadInternal(
1487 : pszSrcJSONFilename,
1488 : pScaledProgressData ? GDALScaledProgress : nullptr,
1489 1 : pScaledProgressData);
1490 : }
1491 0 : catch (const std::exception &e)
1492 : {
1493 0 : CPLError(CE_Failure, CPLE_AppDefined,
1494 : "VSIKerchunkJSONRefFileSystem::Load() failed: %s",
1495 0 : e.what());
1496 0 : GDALDestroyScaledProgress(pScaledProgressData);
1497 0 : return false;
1498 : }
1499 1 : GDALDestroyScaledProgress(pScaledProgressData);
1500 : }
1501 1 : if (!refFile)
1502 : {
1503 0 : CPLError(CE_Failure, CPLE_AppDefined,
1504 : "%s is not a Kerchunk JSON reference store",
1505 : pszSrcJSONFilename);
1506 0 : return false;
1507 : }
1508 :
1509 1 : VSIMkdir(pszDstDirname, 0755);
1510 :
1511 : void *pScaledProgressData =
1512 1 : GDALCreateScaledProgress(0.5, 1.0, pfnProgress, pProgressData);
1513 1 : const bool bRet = refFile->ConvertToParquetRef(
1514 : pszDstDirname, pScaledProgressData ? GDALScaledProgress : nullptr,
1515 : pScaledProgressData);
1516 1 : GDALDestroyScaledProgress(pScaledProgressData);
1517 1 : return bRet;
1518 : }
1519 :
1520 : /************************************************************************/
1521 : /* VSIKerchunkJSONRefFileSystem::Open() */
1522 : /************************************************************************/
1523 :
1524 : VSIVirtualHandle *
1525 87 : VSIKerchunkJSONRefFileSystem::Open(const char *pszFilename,
1526 : const char *pszAccess, bool /* bSetError */,
1527 : CSLConstList /* papszOptions */)
1528 : {
1529 87 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Open(%s)", pszFilename);
1530 87 : if (strcmp(pszAccess, "r") != 0 && strcmp(pszAccess, "rb") != 0)
1531 0 : return nullptr;
1532 :
1533 174 : const auto [osJSONFilename, osKey] = SplitFilename(pszFilename);
1534 87 : if (osJSONFilename.empty())
1535 4 : return nullptr;
1536 :
1537 83 : const auto [refFile, osParqFilename] = Load(
1538 166 : osJSONFilename, STARTS_WITH(pszFilename, JSON_REF_CACHED_FS_PREFIX));
1539 83 : if (!refFile)
1540 : {
1541 6 : if (osParqFilename.empty())
1542 2 : return nullptr;
1543 :
1544 4 : return VSIFOpenL(
1545 8 : CPLFormFilenameSafe(CPLSPrintf("%s{%s}", PARQUET_REF_FS_PREFIX,
1546 : osParqFilename.c_str()),
1547 : osKey.c_str(), nullptr)
1548 : .c_str(),
1549 4 : pszAccess);
1550 : }
1551 :
1552 77 : const auto oIter = refFile->GetMapKeys().find(osKey);
1553 77 : if (oIter == refFile->GetMapKeys().end())
1554 6 : return nullptr;
1555 :
1556 71 : const auto &keyInfo = oIter->second;
1557 71 : if (!keyInfo.posURI)
1558 : {
1559 52 : return VSIFileFromMemBuffer(
1560 52 : nullptr, const_cast<GByte *>(keyInfo.abyValue.data()),
1561 104 : keyInfo.abyValue.size(), /* bTakeOwnership = */ false);
1562 : }
1563 : else
1564 : {
1565 : std::string osVSIPath = VSIKerchunkMorphURIToVSIPath(
1566 38 : *(keyInfo.posURI), CPLGetPathSafe(osJSONFilename.c_str()));
1567 19 : if (osVSIPath.empty())
1568 2 : return nullptr;
1569 : const std::string osPath =
1570 17 : keyInfo.nSize
1571 7 : ? CPLSPrintf("/vsisubfile/%" PRIu64 "_%u,%s", keyInfo.nOffset,
1572 7 : keyInfo.nSize, osVSIPath.c_str())
1573 34 : : std::move(osVSIPath);
1574 17 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Opening %s",
1575 : osPath.c_str());
1576 : CPLConfigOptionSetter oSetter("GDAL_DISABLE_READDIR_ON_OPEN",
1577 34 : "EMPTY_DIR", false);
1578 17 : return VSIFOpenL(osPath.c_str(), "rb");
1579 : }
1580 : }
1581 :
1582 : /************************************************************************/
1583 : /* VSIKerchunkJSONRefFileSystem::Stat() */
1584 : /************************************************************************/
1585 :
1586 251 : int VSIKerchunkJSONRefFileSystem::Stat(const char *pszFilename,
1587 : VSIStatBufL *pStatBuf, int nFlags)
1588 : {
1589 251 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Stat(%s)", pszFilename);
1590 251 : memset(pStatBuf, 0, sizeof(VSIStatBufL));
1591 :
1592 502 : const auto [osJSONFilename, osKey] = SplitFilename(pszFilename);
1593 251 : if (osJSONFilename.empty())
1594 12 : return -1;
1595 :
1596 239 : const auto [refFile, osParqFilename] = Load(
1597 478 : osJSONFilename, STARTS_WITH(pszFilename, JSON_REF_CACHED_FS_PREFIX));
1598 239 : if (!refFile)
1599 : {
1600 91 : if (osParqFilename.empty())
1601 86 : return -1;
1602 :
1603 5 : return VSIStatExL(
1604 10 : CPLFormFilenameSafe(CPLSPrintf("%s{%s}", PARQUET_REF_FS_PREFIX,
1605 : osParqFilename.c_str()),
1606 : osKey.c_str(), nullptr)
1607 : .c_str(),
1608 5 : pStatBuf, nFlags);
1609 : }
1610 :
1611 148 : if (osKey.empty())
1612 : {
1613 27 : pStatBuf->st_mode = S_IFDIR;
1614 27 : return 0;
1615 : }
1616 :
1617 121 : const auto oIter = refFile->GetMapKeys().find(osKey);
1618 121 : if (oIter == refFile->GetMapKeys().end())
1619 : {
1620 195 : if (cpl::contains(refFile->GetMapKeys(), osKey + "/.zgroup") ||
1621 130 : cpl::contains(refFile->GetMapKeys(), osKey + "/.zarray"))
1622 : {
1623 8 : pStatBuf->st_mode = S_IFDIR;
1624 8 : return 0;
1625 : }
1626 :
1627 57 : return -1;
1628 : }
1629 :
1630 56 : const auto &keyInfo = oIter->second;
1631 56 : if (!(keyInfo.posURI))
1632 : {
1633 48 : pStatBuf->st_size = keyInfo.abyValue.size();
1634 : }
1635 : else
1636 : {
1637 8 : if (keyInfo.nSize)
1638 : {
1639 4 : pStatBuf->st_size = keyInfo.nSize;
1640 : }
1641 : else
1642 : {
1643 : const std::string osVSIPath = VSIKerchunkMorphURIToVSIPath(
1644 8 : *(keyInfo.posURI), CPLGetPathSafe(osJSONFilename.c_str()));
1645 4 : if (osVSIPath.empty())
1646 0 : return -1;
1647 4 : return VSIStatExL(osVSIPath.c_str(), pStatBuf, nFlags);
1648 : }
1649 : }
1650 52 : pStatBuf->st_mode = S_IFREG;
1651 :
1652 52 : return 0;
1653 : }
1654 :
1655 : /************************************************************************/
1656 : /* VSIKerchunkJSONRefFileSystem::ReadDirEx() */
1657 : /************************************************************************/
1658 :
1659 17 : char **VSIKerchunkJSONRefFileSystem::ReadDirEx(const char *pszDirname,
1660 : int nMaxFiles)
1661 : {
1662 17 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "ReadDir(%s)", pszDirname);
1663 :
1664 34 : const auto [osJSONFilename, osAskedKey] = SplitFilename(pszDirname);
1665 17 : if (osJSONFilename.empty())
1666 4 : return nullptr;
1667 :
1668 13 : const auto [refFile, osParqFilename] = Load(
1669 26 : osJSONFilename, STARTS_WITH(pszDirname, JSON_REF_CACHED_FS_PREFIX));
1670 13 : if (!refFile)
1671 : {
1672 3 : if (osParqFilename.empty())
1673 3 : return nullptr;
1674 :
1675 0 : return VSIReadDirEx(
1676 0 : CPLFormFilenameSafe(CPLSPrintf("%s{%s}", PARQUET_REF_FS_PREFIX,
1677 : osParqFilename.c_str()),
1678 : osAskedKey.c_str(), nullptr)
1679 : .c_str(),
1680 0 : nMaxFiles);
1681 : }
1682 :
1683 20 : std::set<std::string> set;
1684 60 : for (const auto &[key, value] : refFile->GetMapKeys())
1685 : {
1686 50 : if (osAskedKey.empty())
1687 : {
1688 20 : const auto nPos = key.find('/');
1689 20 : if (nPos == std::string::npos)
1690 8 : set.insert(key);
1691 : else
1692 12 : set.insert(key.substr(0, nPos));
1693 : }
1694 30 : else if (key.size() > osAskedKey.size() &&
1695 42 : cpl::starts_with(key, osAskedKey) &&
1696 12 : key[osAskedKey.size()] == '/')
1697 : {
1698 24 : std::string subKey = key.substr(osAskedKey.size() + 1);
1699 12 : const auto nPos = subKey.find('/');
1700 12 : if (nPos == std::string::npos)
1701 12 : set.insert(std::move(subKey));
1702 : else
1703 0 : set.insert(subKey.substr(0, nPos));
1704 : }
1705 : }
1706 :
1707 20 : CPLStringList aosRet;
1708 34 : for (const std::string &v : set)
1709 : {
1710 : // CPLDebugOnly("VSIKerchunkJSONRefFileSystem", ".. %s", v.c_str());
1711 24 : aosRet.AddString(v.c_str());
1712 : }
1713 10 : return aosRet.StealList();
1714 : }
1715 :
1716 : /************************************************************************/
1717 : /* VSIInstallKerchunkJSONRefFileSystem() */
1718 : /************************************************************************/
1719 :
1720 1629 : void VSIInstallKerchunkJSONRefFileSystem()
1721 : {
1722 : static std::mutex oMutex;
1723 3258 : std::lock_guard<std::mutex> oLock(oMutex);
1724 : // cppcheck-suppress knownConditionTrueFalse
1725 1629 : if (!VSIKerchunkJSONRefFileSystem::IsFileSystemInstantiated())
1726 : {
1727 1629 : auto fs = std::make_unique<VSIKerchunkJSONRefFileSystem>().release();
1728 1629 : VSIFileManager::InstallHandler(JSON_REF_FS_PREFIX, fs);
1729 1629 : VSIFileManager::InstallHandler(JSON_REF_CACHED_FS_PREFIX, fs);
1730 : }
1731 1629 : }
|