Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: GDAL
4 : * Purpose: Zarr driver. Virtual file system for
5 : * https://fsspec.github.io/kerchunk/spec.html#version-1
6 : * Author: Even Rouault <even dot rouault at spatialys.com>
7 : *
8 : ******************************************************************************
9 : * Copyright (c) 2025, Even Rouault <even dot rouault at spatialys.com>
10 : *
11 : * SPDX-License-Identifier: MIT
12 : ****************************************************************************/
13 :
14 : #undef _REENTRANT
15 :
16 : #include "vsikerchunk.h"
17 : #include "vsikerchunk_inline.hpp"
18 :
19 : #include "cpl_conv.h"
20 : #include "cpl_json.h"
21 : #include "cpl_json_streaming_parser.h"
22 : #include "cpl_json_streaming_writer.h"
23 : #include "cpl_mem_cache.h"
24 : #include "cpl_multiproc.h" // CPLSleep()
25 : #include "cpl_vsi_error.h"
26 : #include "cpl_vsi_virtual.h"
27 :
28 : #include "gdal_priv.h"
29 : #include "ogrsf_frmts.h"
30 :
31 : #include <cerrno>
32 : #include <cinttypes>
33 : #include <limits>
34 : #include <mutex>
35 : #include <set>
36 : #include <utility>
37 :
38 : /************************************************************************/
39 : /* VSIKerchunkKeyInfo */
40 : /************************************************************************/
41 :
42 : struct VSIKerchunkKeyInfo
43 : {
44 : // points to an element in VSIKerchunkRefFile::m_oSetURI
45 : const std::string *posURI = nullptr;
46 :
47 : uint64_t nOffset = 0;
48 : uint32_t nSize = 0;
49 : std::vector<GByte> abyValue{};
50 : };
51 :
52 : /************************************************************************/
53 : /* VSIKerchunkRefFile */
54 : /************************************************************************/
55 :
56 : class VSIKerchunkRefFile
57 : {
58 : private:
59 : std::set<std::string> m_oSetURI{};
60 : std::map<std::string, VSIKerchunkKeyInfo> m_oMapKeys{};
61 :
62 : public:
63 548 : const std::map<std::string, VSIKerchunkKeyInfo> &GetMapKeys() const
64 : {
65 548 : return m_oMapKeys;
66 : }
67 :
68 125 : void AddInlineContent(const std::string &key, std::vector<GByte> &&abyValue)
69 : {
70 250 : VSIKerchunkKeyInfo info;
71 125 : info.abyValue = std::move(abyValue);
72 125 : m_oMapKeys[key] = std::move(info);
73 125 : }
74 :
75 128 : bool AddInlineContent(const std::string &key, const std::string_view &str)
76 : {
77 256 : std::vector<GByte> abyValue;
78 128 : if (cpl::starts_with(str, "base64:"))
79 : {
80 : abyValue.insert(
81 0 : abyValue.end(),
82 4 : reinterpret_cast<const GByte *>(str.data()) + strlen("base64:"),
83 4 : reinterpret_cast<const GByte *>(str.data()) + str.size());
84 4 : abyValue.push_back(0);
85 4 : const int nSize = CPLBase64DecodeInPlace(abyValue.data());
86 4 : if (nSize == 0)
87 : {
88 3 : CPLError(CE_Failure, CPLE_AppDefined,
89 : "VSIKerchunkJSONRefFileSystem: Base64 decoding "
90 : "failed for key '%s'",
91 : key.c_str());
92 3 : return false;
93 : }
94 1 : abyValue.resize(nSize);
95 : }
96 : else
97 : {
98 : abyValue.insert(
99 124 : abyValue.end(), reinterpret_cast<const GByte *>(str.data()),
100 248 : reinterpret_cast<const GByte *>(str.data()) + str.size());
101 : }
102 :
103 125 : AddInlineContent(key, std::move(abyValue));
104 125 : return true;
105 : }
106 :
107 20 : void AddReferencedContent(const std::string &key, const std::string &osURI,
108 : uint64_t nOffset, uint32_t nSize)
109 : {
110 20 : auto oPair = m_oSetURI.insert(osURI);
111 :
112 40 : VSIKerchunkKeyInfo info;
113 20 : info.posURI = &(*(oPair.first));
114 20 : info.nOffset = nOffset;
115 20 : info.nSize = nSize;
116 20 : m_oMapKeys[key] = std::move(info);
117 20 : }
118 :
119 : bool ConvertToParquetRef(const std::string &osCacheDir,
120 : GDALProgressFunc pfnProgress, void *pProgressData);
121 : };
122 :
123 : /************************************************************************/
124 : /* VSIKerchunkJSONRefFileSystem */
125 : /************************************************************************/
126 :
127 : class VSIKerchunkJSONRefFileSystem final : public VSIFilesystemHandler
128 : {
129 : public:
130 1750 : VSIKerchunkJSONRefFileSystem()
131 1750 : {
132 1750 : IsFileSystemInstantiated() = true;
133 1750 : }
134 :
135 2244 : ~VSIKerchunkJSONRefFileSystem() override
136 1122 : {
137 1122 : IsFileSystemInstantiated() = false;
138 2244 : }
139 :
140 4622 : static bool &IsFileSystemInstantiated()
141 : {
142 : static bool bIsFileSystemInstantiated = false;
143 4622 : return bIsFileSystemInstantiated;
144 : }
145 :
146 : VSIVirtualHandleUniquePtr Open(const char *pszFilename,
147 : const char *pszAccess, bool bSetError,
148 : CSLConstList papszOptions) override;
149 :
150 : int Stat(const char *pszFilename, VSIStatBufL *pStatBuf,
151 : int nFlags) override;
152 :
153 : char **ReadDirEx(const char *pszDirname, int nMaxFiles) override;
154 :
155 : private:
156 : friend bool VSIKerchunkConvertJSONToParquet(const char *pszSrcJSONFilename,
157 : const char *pszDstDirname,
158 : GDALProgressFunc pfnProgress,
159 : void *pProgressData);
160 :
161 : lru11::Cache<std::string, std::shared_ptr<VSIKerchunkRefFile>, std::mutex>
162 : m_oCache{};
163 :
164 : static std::pair<std::string, std::string>
165 : SplitFilename(const char *pszFilename);
166 :
167 : std::pair<std::shared_ptr<VSIKerchunkRefFile>, std::string>
168 : Load(const std::string &osJSONFilename, bool bUseCache);
169 : std::shared_ptr<VSIKerchunkRefFile>
170 : LoadInternal(const std::string &osJSONFilename,
171 : GDALProgressFunc pfnProgress, void *pProgressData);
172 : std::shared_ptr<VSIKerchunkRefFile>
173 : LoadStreaming(const std::string &osJSONFilename,
174 : GDALProgressFunc pfnProgress, void *pProgressData);
175 : };
176 :
177 : /************************************************************************/
178 : /* VSIKerchunkJSONRefFileSystem::SplitFilename() */
179 : /************************************************************************/
180 :
181 : /*static*/
182 : std::pair<std::string, std::string>
183 356 : VSIKerchunkJSONRefFileSystem::SplitFilename(const char *pszFilename)
184 : {
185 356 : if (STARTS_WITH(pszFilename, JSON_REF_FS_PREFIX))
186 302 : pszFilename += strlen(JSON_REF_FS_PREFIX);
187 54 : else if (STARTS_WITH(pszFilename, JSON_REF_CACHED_FS_PREFIX))
188 54 : pszFilename += strlen(JSON_REF_CACHED_FS_PREFIX);
189 : else
190 0 : return {std::string(), std::string()};
191 :
192 712 : std::string osJSONFilename;
193 :
194 357 : if (*pszFilename == '{')
195 : {
196 : // Parse /vsikerchunk_json_ref/{/path/to/some.json}[key]
197 329 : int nLevel = 1;
198 329 : ++pszFilename;
199 23021 : for (; *pszFilename; ++pszFilename)
200 : {
201 23016 : if (*pszFilename == '{')
202 : {
203 0 : ++nLevel;
204 : }
205 23016 : else if (*pszFilename == '}')
206 : {
207 319 : --nLevel;
208 319 : if (nLevel == 0)
209 : {
210 319 : ++pszFilename;
211 319 : break;
212 : }
213 : }
214 22697 : osJSONFilename += *pszFilename;
215 : }
216 324 : if (nLevel != 0)
217 : {
218 10 : CPLError(CE_Failure, CPLE_AppDefined,
219 : "Invalid %s syntax: should be "
220 : "%s{/path/to/some/file}[/optional_key]",
221 : JSON_REF_FS_PREFIX, JSON_REF_FS_PREFIX);
222 10 : return {std::string(), std::string()};
223 : }
224 :
225 : return {osJSONFilename,
226 632 : *pszFilename == '/' ? pszFilename + 1 : pszFilename};
227 : }
228 : else
229 : {
230 28 : int nCountDotJson = 0;
231 28 : const char *pszIter = pszFilename;
232 28 : const char *pszAfterJSON = nullptr;
233 46 : while ((pszIter = strstr(pszIter, ".json")) != nullptr)
234 : {
235 18 : ++nCountDotJson;
236 18 : if (nCountDotJson == 1)
237 18 : pszAfterJSON = pszIter + strlen(".json");
238 : else
239 0 : pszAfterJSON = nullptr;
240 18 : pszIter += strlen(".json");
241 : }
242 28 : if (!pszAfterJSON)
243 : {
244 10 : if (nCountDotJson >= 2)
245 : {
246 0 : CPLError(CE_Failure, CPLE_AppDefined,
247 : "Ambiguous %s syntax: should be "
248 : "%s{/path/to/some/file}[/optional_key]",
249 : JSON_REF_FS_PREFIX, JSON_REF_FS_PREFIX);
250 : }
251 : else
252 : {
253 10 : CPLError(CE_Failure, CPLE_AppDefined,
254 : "Invalid %s syntax: should be "
255 : "%s/path/to/some.json[/optional_key] or "
256 : "%s{/path/to/some/file}[/optional_key]",
257 : JSON_REF_FS_PREFIX, JSON_REF_FS_PREFIX,
258 : JSON_REF_FS_PREFIX);
259 : }
260 10 : return {std::string(), std::string()};
261 : }
262 36 : return {std::string(pszFilename, pszAfterJSON - pszFilename),
263 36 : *pszAfterJSON == '/' ? pszAfterJSON + 1 : pszAfterJSON};
264 : }
265 : }
266 :
267 : /************************************************************************/
268 : /* class VSIKerchunkJSONRefParser */
269 : /************************************************************************/
270 :
271 : namespace
272 : {
273 : class VSIKerchunkJSONRefParser final : public CPLJSonStreamingParser
274 : {
275 : public:
276 71 : explicit VSIKerchunkJSONRefParser(
277 : const std::shared_ptr<VSIKerchunkRefFile> &refFile)
278 71 : : m_refFile(refFile)
279 : {
280 71 : m_oWriter.SetPrettyFormatting(false);
281 71 : }
282 :
283 71 : ~VSIKerchunkJSONRefParser() override
284 71 : {
285 : // In case the parsing would be stopped, the writer may be in
286 : // an inconsistent state. This avoids assertion in debug mode.
287 71 : m_oWriter.clear();
288 71 : }
289 :
290 : protected:
291 78 : void String(std::string_view sValue) override
292 : {
293 78 : if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 0)
294 : {
295 9 : size_t nLength = sValue.size();
296 9 : if (nLength > 0 && sValue[nLength - 1] == 0)
297 2 : --nLength;
298 :
299 9 : if (!m_refFile->AddInlineContent(
300 9 : m_osCurKey, std::string_view(sValue.data(), nLength)))
301 : {
302 2 : StopParsing();
303 : }
304 :
305 9 : m_oWriter.clear();
306 :
307 9 : m_osCurKey.clear();
308 : }
309 69 : else if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 1)
310 : {
311 42 : if (m_iArrayMemberIdx == 0)
312 : {
313 36 : m_osURI = sValue;
314 : }
315 : else
316 : {
317 6 : UnexpectedContentInArray();
318 : }
319 : }
320 27 : else if (m_nLevel > m_nKeyLevel)
321 : {
322 25 : m_oWriter.Add(sValue);
323 : }
324 78 : }
325 :
326 161 : void Number(std::string_view sValue) override
327 : {
328 161 : if (m_nLevel == m_nKeyLevel)
329 : {
330 52 : if (m_nArrayLevel == 1)
331 : {
332 48 : if (m_iArrayMemberIdx == 1)
333 : {
334 27 : m_osTmpForNumber = sValue;
335 27 : errno = 0;
336 27 : m_nOffset =
337 27 : std::strtoull(m_osTmpForNumber.c_str(), nullptr, 10);
338 50 : if (errno != 0 || m_osTmpForNumber[0] == '-' ||
339 23 : m_osTmpForNumber.find('.') != std::string::npos)
340 : {
341 6 : CPLError(
342 : CE_Failure, CPLE_AppDefined,
343 : "VSIKerchunkJSONRefFileSystem: array value at "
344 : "index 1 for key '%s' is not an unsigned 64 bit "
345 : "integer",
346 : m_osCurKey.c_str());
347 6 : StopParsing();
348 : }
349 : }
350 21 : else if (m_iArrayMemberIdx == 2)
351 : {
352 19 : m_osTmpForNumber = sValue;
353 19 : errno = 0;
354 : const uint64_t nSize =
355 19 : std::strtoull(m_osTmpForNumber.c_str(), nullptr, 10);
356 19 : if (errno != 0 || m_osTmpForNumber[0] == '-' ||
357 53 : nSize > std::numeric_limits<uint32_t>::max() ||
358 15 : m_osTmpForNumber.find('.') != std::string::npos)
359 : {
360 6 : CPLError(
361 : CE_Failure, CPLE_AppDefined,
362 : "VSIKerchunkJSONRefFileSystem: array value at "
363 : "index 2 for key '%s' is not an unsigned 32 bit "
364 : "integer",
365 : m_osCurKey.c_str());
366 6 : StopParsing();
367 : }
368 : else
369 : {
370 13 : m_nSize = static_cast<uint32_t>(nSize);
371 : }
372 : }
373 : else
374 : {
375 2 : UnexpectedContentInArray();
376 : }
377 : }
378 : else
379 : {
380 4 : UnexpectedContent();
381 : }
382 : }
383 109 : else if (m_nLevel > m_nKeyLevel)
384 : {
385 108 : m_oWriter.AddSerializedValue(sValue);
386 : }
387 161 : }
388 :
389 4 : void Boolean(bool b) override
390 : {
391 4 : if (m_nLevel == m_nKeyLevel)
392 : {
393 4 : UnexpectedContent();
394 : }
395 0 : else if (m_nLevel > m_nKeyLevel)
396 : {
397 0 : m_oWriter.Add(b);
398 : }
399 4 : }
400 :
401 28 : void Null() override
402 : {
403 28 : if (m_nLevel == m_nKeyLevel)
404 : {
405 4 : UnexpectedContent();
406 : }
407 24 : else if (m_nLevel > m_nKeyLevel)
408 : {
409 24 : m_oWriter.AddNull();
410 : }
411 28 : }
412 :
413 168 : void StartObject() override
414 : {
415 168 : if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 1)
416 : {
417 2 : UnexpectedContentInArray();
418 : }
419 : else
420 : {
421 166 : if (m_nLevel >= m_nKeyLevel)
422 : {
423 95 : m_oWriter.StartObj();
424 : }
425 166 : ++m_nLevel;
426 166 : m_bFirstMember = true;
427 : }
428 168 : }
429 :
430 124 : void EndObject() override
431 : {
432 124 : if (m_nLevel == m_nKeyLevel)
433 : {
434 28 : FinishObjectValueProcessing();
435 : }
436 124 : --m_nLevel;
437 124 : if (m_nLevel >= m_nKeyLevel)
438 : {
439 95 : m_oWriter.EndObj();
440 : }
441 124 : }
442 :
443 330 : void StartObjectMember(std::string_view sKey) override
444 : {
445 330 : if (m_nLevel == 1 && m_bFirstMember)
446 : {
447 81 : if (sKey == "version")
448 : {
449 3 : m_nKeyLevel = 2;
450 : }
451 : else
452 : {
453 78 : m_nKeyLevel = 1;
454 : }
455 : }
456 249 : else if (m_nLevel == 1 && m_nKeyLevel == 2 && sKey == "templates")
457 : {
458 1 : CPLError(CE_Failure, CPLE_NotSupported,
459 : "VSIKerchunkJSONRefFileSystem: 'templates' key found, but "
460 : "not supported");
461 1 : StopParsing();
462 : }
463 248 : else if (m_nLevel == 1 && m_nKeyLevel == 2 && sKey == "gen")
464 : {
465 1 : CPLError(CE_Failure, CPLE_NotSupported,
466 : "VSIKerchunkJSONRefFileSystem: 'gen' key found, but not "
467 : "supported");
468 1 : StopParsing();
469 : }
470 :
471 330 : if (m_nLevel == m_nKeyLevel)
472 : {
473 158 : FinishObjectValueProcessing();
474 158 : m_osCurKey = sKey;
475 : }
476 172 : else if (m_nLevel > m_nKeyLevel)
477 : {
478 164 : m_oWriter.AddObjKey(sKey);
479 : }
480 330 : m_bFirstMember = false;
481 330 : }
482 :
483 83 : void StartArray() override
484 : {
485 83 : if (m_nLevel == m_nKeyLevel)
486 : {
487 46 : if (m_nArrayLevel == 0)
488 : {
489 46 : m_iArrayMemberIdx = -1;
490 46 : m_osURI.clear();
491 46 : m_nOffset = 0;
492 46 : m_nSize = 0;
493 46 : m_nArrayLevel = 1;
494 : }
495 : else
496 : {
497 0 : UnexpectedContentInArray();
498 : }
499 : }
500 37 : else if (m_nLevel > m_nKeyLevel)
501 : {
502 37 : m_oWriter.StartArray();
503 37 : ++m_nArrayLevel;
504 : }
505 83 : }
506 :
507 57 : void EndArray() override
508 : {
509 57 : if (m_nLevel == m_nKeyLevel && m_nArrayLevel == 1)
510 : {
511 20 : if (m_iArrayMemberIdx == -1)
512 : {
513 2 : CPLError(CE_Failure, CPLE_AppDefined,
514 : "VSIKerchunkJSONRefFileSystem: array value for key "
515 : "'%s' is not of size 1 or 3",
516 : m_osCurKey.c_str());
517 2 : StopParsing();
518 : }
519 : else
520 : {
521 18 : m_refFile->AddReferencedContent(m_osCurKey, m_osURI, m_nOffset,
522 : m_nSize);
523 18 : --m_nArrayLevel;
524 18 : m_oWriter.clear();
525 18 : m_osCurKey.clear();
526 : }
527 : }
528 37 : else if (m_nLevel >= m_nKeyLevel)
529 : {
530 37 : --m_nArrayLevel;
531 37 : if (m_nLevel > m_nKeyLevel)
532 37 : m_oWriter.EndArray();
533 : }
534 57 : }
535 :
536 126 : void StartArrayMember() override
537 : {
538 126 : if (m_nLevel >= m_nKeyLevel)
539 126 : ++m_iArrayMemberIdx;
540 126 : }
541 :
542 2 : void Exception(const char *pszMessage) override
543 : {
544 2 : CPLError(CE_Failure, CPLE_AppDefined, "%s", pszMessage);
545 2 : }
546 :
547 : private:
548 : std::shared_ptr<VSIKerchunkRefFile> m_refFile{};
549 : int m_nLevel = 0;
550 : int m_nArrayLevel = 0;
551 : int m_iArrayMemberIdx = -1;
552 : bool m_bFirstMember = false;
553 : int m_nKeyLevel = std::numeric_limits<int>::max();
554 : std::string m_osCurKey{};
555 : std::string m_osURI{};
556 : std::string m_osTmpForNumber{};
557 : uint64_t m_nOffset = 0;
558 : uint32_t m_nSize = 0;
559 :
560 : CPLJSonStreamingWriter m_oWriter{nullptr, nullptr};
561 :
562 186 : void FinishObjectValueProcessing()
563 : {
564 186 : if (!m_osCurKey.empty())
565 : {
566 93 : const std::string &osStr = m_oWriter.GetString();
567 93 : CPL_IGNORE_RET_VAL(m_refFile->AddInlineContent(m_osCurKey, osStr));
568 :
569 93 : m_oWriter.clear();
570 :
571 93 : m_osCurKey.clear();
572 : }
573 186 : }
574 :
575 12 : void UnexpectedContent()
576 : {
577 12 : CPLError(CE_Failure, CPLE_AppDefined, "Unexpected content");
578 12 : StopParsing();
579 12 : }
580 :
581 10 : void UnexpectedContentInArray()
582 : {
583 10 : CPLError(CE_Failure, CPLE_AppDefined,
584 : "Unexpected content at position %d of array",
585 : m_iArrayMemberIdx);
586 10 : StopParsing();
587 10 : }
588 : };
589 : } // namespace
590 :
591 : /************************************************************************/
592 : /* VSIKerchunkJSONRefFileSystem::LoadStreaming() */
593 : /************************************************************************/
594 :
595 : std::shared_ptr<VSIKerchunkRefFile>
596 71 : VSIKerchunkJSONRefFileSystem::LoadStreaming(const std::string &osJSONFilename,
597 : GDALProgressFunc pfnProgress,
598 : void *pProgressData)
599 : {
600 142 : auto refFile = std::make_shared<VSIKerchunkRefFile>();
601 142 : VSIKerchunkJSONRefParser parser(refFile);
602 :
603 71 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem",
604 : "Using streaming parser for %s", osJSONFilename.c_str());
605 :
606 : // For network file systems, get the streaming version of the filename,
607 : // as we don't need arbitrary seeking in the file
608 : const std::string osFilename =
609 71 : VSIFileManager::GetHandler(osJSONFilename.c_str())
610 142 : ->GetStreamingFilename(osJSONFilename);
611 :
612 142 : auto f = VSIVirtualHandleUniquePtr(VSIFOpenL(osFilename.c_str(), "rb"));
613 71 : if (!f)
614 : {
615 2 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
616 : osJSONFilename.c_str());
617 2 : return nullptr;
618 : }
619 69 : uint64_t nTotalSize = 0;
620 69 : if (!cpl::starts_with(osFilename, "/vsigzip/"))
621 : {
622 69 : f->Seek(0, SEEK_END);
623 69 : nTotalSize = f->Tell();
624 69 : f->Seek(0, SEEK_SET);
625 : }
626 138 : std::string sBuffer;
627 69 : constexpr size_t BUFFER_SIZE = 10 * 1024 * 1024; // Arbitrary
628 69 : sBuffer.resize(BUFFER_SIZE);
629 : while (true)
630 : {
631 70 : const size_t nRead = f->Read(sBuffer.data(), 1, sBuffer.size());
632 70 : const bool bFinished = nRead < sBuffer.size();
633 : try
634 : {
635 70 : if (!parser.Parse(std::string_view(sBuffer.data(), nRead),
636 : bFinished))
637 : {
638 : // The parser will have emitted an error
639 42 : return nullptr;
640 : }
641 : }
642 0 : catch (const std::exception &e)
643 : {
644 : // Out-of-memory typically
645 0 : CPLError(CE_Failure, CPLE_AppDefined,
646 : "Exception occurred while parsing %s: %s",
647 0 : osJSONFilename.c_str(), e.what());
648 0 : return nullptr;
649 : }
650 28 : if (nTotalSize)
651 : {
652 27 : const double dfProgressRatio = static_cast<double>(f->Tell()) /
653 27 : static_cast<double>(nTotalSize);
654 27 : CPLDebug("VSIKerchunkJSONRefFileSystem", "%02.1f %% of %s read",
655 : 100 * dfProgressRatio, osJSONFilename.c_str());
656 28 : if (pfnProgress &&
657 1 : !pfnProgress(dfProgressRatio, "Parsing of JSON file",
658 : pProgressData))
659 : {
660 0 : return nullptr;
661 : }
662 : }
663 : else
664 : {
665 1 : CPLDebug("VSIKerchunkJSONRefFileSystem",
666 : "%" PRIu64 " bytes read in %s",
667 1 : static_cast<uint64_t>(f->Tell()), osJSONFilename.c_str());
668 : }
669 28 : if (nRead < sBuffer.size())
670 : {
671 27 : break;
672 : }
673 1 : }
674 27 : if (f->Tell() == 0)
675 : {
676 1 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
677 : osJSONFilename.c_str());
678 1 : return nullptr;
679 : }
680 26 : if (pfnProgress)
681 1 : pfnProgress(1.0, "Parsing of JSON file", pProgressData);
682 :
683 26 : return refFile;
684 : }
685 :
686 : /************************************************************************/
687 : /* VSIKerchunkJSONRefFileSystem::LoadInternal() */
688 : /************************************************************************/
689 :
690 : std::shared_ptr<VSIKerchunkRefFile>
691 109 : VSIKerchunkJSONRefFileSystem::LoadInternal(const std::string &osJSONFilename,
692 : GDALProgressFunc pfnProgress,
693 : void *pProgressData)
694 : {
695 109 : const char *pszUseStreamingParser = VSIGetPathSpecificOption(
696 : osJSONFilename.c_str(), "VSIKERCHUNK_USE_STREAMING_PARSER", "AUTO");
697 109 : if (EQUAL(pszUseStreamingParser, "AUTO"))
698 : {
699 : auto f =
700 50 : VSIVirtualHandleUniquePtr(VSIFOpenL(osJSONFilename.c_str(), "rb"));
701 50 : if (!f)
702 : {
703 6 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
704 : osJSONFilename.c_str());
705 6 : return nullptr;
706 : }
707 44 : std::string sBuffer;
708 44 : constexpr size_t HEADER_SIZE = 1024; // Arbitrary
709 44 : sBuffer.resize(HEADER_SIZE);
710 44 : const size_t nRead = f->Read(sBuffer.data(), 1, sBuffer.size());
711 44 : sBuffer.resize(nRead);
712 44 : if (ZARRIsLikelyStreamableKerchunkJSONRefContent(sBuffer))
713 : {
714 41 : return LoadStreaming(osJSONFilename, pfnProgress, pProgressData);
715 : }
716 : }
717 59 : else if (CPLTestBool(pszUseStreamingParser))
718 : {
719 30 : return LoadStreaming(osJSONFilename, pfnProgress, pProgressData);
720 : }
721 :
722 64 : CPLJSONDocument oDoc;
723 : {
724 : #if SIZEOF_VOIDP > 4
725 32 : CPLConfigOptionSetter oSetter("CPL_JSON_MAX_SIZE", "1GB", true);
726 : #endif
727 32 : if (!oDoc.Load(osJSONFilename))
728 : {
729 5 : CPLError(CE_Failure, CPLE_AppDefined,
730 : "VSIKerchunkJSONRefFileSystem: cannot open %s",
731 : osJSONFilename.c_str());
732 5 : return nullptr;
733 : }
734 : }
735 :
736 54 : auto oRoot = oDoc.GetRoot();
737 81 : const auto oVersion = oRoot.GetObj("version");
738 54 : CPLJSONObject oRefs;
739 27 : if (!oVersion.IsValid())
740 : {
741 : // Cf https://fsspec.github.io/kerchunk/spec.html#version-0
742 :
743 23 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem",
744 : "'version' key not found. Assuming version 0");
745 23 : oRefs = std::move(oRoot);
746 23 : if (!oRefs.GetObj(".zgroup").IsValid())
747 : {
748 0 : CPLError(CE_Failure, CPLE_AppDefined,
749 : "VSIKerchunkJSONRefFileSystem: '.zgroup' key not found");
750 0 : return nullptr;
751 : }
752 : }
753 4 : else if (oVersion.GetType() != CPLJSONObject::Type::Integer)
754 : {
755 4 : CPLError(CE_Failure, CPLE_AppDefined,
756 : "VSIKerchunkJSONRefFileSystem: 'version' key not integer");
757 4 : return nullptr;
758 : }
759 0 : else if (oVersion.ToInteger() != 1)
760 : {
761 0 : CPLError(CE_Failure, CPLE_NotSupported,
762 : "VSIKerchunkJSONRefFileSystem: 'version' = %d not handled",
763 : oVersion.ToInteger());
764 0 : return nullptr;
765 : }
766 : else
767 : {
768 : // Cf https://fsspec.github.io/kerchunk/spec.html#version-1
769 :
770 0 : if (oRoot.GetObj("templates").IsValid())
771 : {
772 0 : CPLError(CE_Failure, CPLE_NotSupported,
773 : "VSIKerchunkJSONRefFileSystem: 'templates' key found, but "
774 : "not supported");
775 0 : return nullptr;
776 : }
777 :
778 0 : if (oRoot.GetObj("gen").IsValid())
779 : {
780 0 : CPLError(CE_Failure, CPLE_NotSupported,
781 : "VSIKerchunkJSONRefFileSystem: 'gen' key found, but not "
782 : "supported");
783 0 : return nullptr;
784 : }
785 :
786 0 : oRefs = oRoot.GetObj("refs");
787 0 : if (!oRefs.IsValid())
788 : {
789 0 : CPLError(CE_Failure, CPLE_AppDefined,
790 : "VSIKerchunkJSONRefFileSystem: 'refs' key not found");
791 0 : return nullptr;
792 : }
793 :
794 0 : if (oRoot.GetObj("templates").IsValid())
795 : {
796 0 : CPLError(CE_Failure, CPLE_NotSupported,
797 : "VSIKerchunkJSONRefFileSystem: 'templates' key found but "
798 : "not supported");
799 0 : return nullptr;
800 : }
801 :
802 0 : if (oRoot.GetObj("templates").IsValid())
803 : {
804 0 : CPLError(CE_Failure, CPLE_NotSupported,
805 : "VSIKerchunkJSONRefFileSystem: 'templates' key found but "
806 : "not supported");
807 0 : return nullptr;
808 : }
809 : }
810 :
811 23 : if (oRefs.GetType() != CPLJSONObject::Type::Object)
812 : {
813 0 : CPLError(CE_Failure, CPLE_AppDefined,
814 : "VSIKerchunkJSONRefFileSystem: value of 'refs' is not a dict");
815 0 : return nullptr;
816 : }
817 :
818 46 : auto refFile = std::make_shared<VSIKerchunkRefFile>();
819 50 : for (const auto &oEntry : oRefs.GetChildren())
820 : {
821 45 : const std::string osKeyName = oEntry.GetName();
822 45 : if (oEntry.GetType() == CPLJSONObject::Type::String)
823 : {
824 2 : if (!refFile->AddInlineContent(osKeyName, oEntry.ToString()))
825 : {
826 1 : return nullptr;
827 : }
828 : }
829 43 : else if (oEntry.GetType() == CPLJSONObject::Type::Object)
830 : {
831 : const std::string osSerialized =
832 24 : oEntry.Format(CPLJSONObject::PrettyFormat::Plain);
833 24 : CPL_IGNORE_RET_VAL(
834 24 : refFile->AddInlineContent(osKeyName, osSerialized));
835 : }
836 19 : else if (oEntry.GetType() == CPLJSONObject::Type::Array)
837 : {
838 15 : const auto oArray = oEntry.ToArray();
839 : // Some files such as https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr/reference.json
840 : // (pointed by https://guide.cloudnativegeo.org/kerchunk/kerchunk-in-practice.html)
841 : // have array entries with just the URL, and no offset/size
842 : // This is when the whole file needs to be read
843 15 : if (oArray.Size() != 1 && oArray.Size() != 3)
844 : {
845 3 : CPLError(CE_Failure, CPLE_AppDefined,
846 : "VSIKerchunkJSONRefFileSystem: array value for key "
847 : "'%s' is not of size 1 or 3",
848 : osKeyName.c_str());
849 3 : return nullptr;
850 : }
851 12 : if (oArray[0].GetType() != CPLJSONObject::Type::String)
852 : {
853 4 : CPLError(CE_Failure, CPLE_AppDefined,
854 : "VSIKerchunkJSONRefFileSystem: array value at index 0 "
855 : "for key '%s' is not a string",
856 : osKeyName.c_str());
857 4 : return nullptr;
858 : }
859 8 : if (oArray.Size() == 3)
860 : {
861 8 : if ((oArray[1].GetType() != CPLJSONObject::Type::Integer &&
862 23 : oArray[1].GetType() != CPLJSONObject::Type::Long) ||
863 15 : !(oArray[1].ToLong() >= 0))
864 : {
865 2 : CPLError(
866 : CE_Failure, CPLE_AppDefined,
867 : "VSIKerchunkJSONRefFileSystem: array value at index 1 "
868 : "for key '%s' is not an unsigned 64 bit integer",
869 : osKeyName.c_str());
870 2 : return nullptr;
871 : }
872 6 : if ((oArray[2].GetType() != CPLJSONObject::Type::Integer &&
873 16 : oArray[2].GetType() != CPLJSONObject::Type::Long) ||
874 10 : !(oArray[2].ToLong() >= 0 &&
875 9 : static_cast<uint64_t>(oArray[2].ToLong()) <=
876 3 : std::numeric_limits<uint32_t>::max()))
877 : {
878 4 : CPLError(
879 : CE_Failure, CPLE_AppDefined,
880 : "VSIKerchunkJSONRefFileSystem: array value at index 2 "
881 : "for key '%s' is not an unsigned 32 bit integer",
882 : osKeyName.c_str());
883 4 : return nullptr;
884 : }
885 : }
886 :
887 4 : refFile->AddReferencedContent(
888 4 : osKeyName, oArray[0].ToString(),
889 4 : oArray.Size() == 3 ? oArray[1].ToLong() : 0,
890 4 : oArray.Size() == 3 ? static_cast<uint32_t>(oArray[2].ToLong())
891 : : 0);
892 : }
893 : else
894 : {
895 4 : CPLError(
896 : CE_Failure, CPLE_AppDefined,
897 : "VSIKerchunkJSONRefFileSystem: invalid value type for key '%s'",
898 : osKeyName.c_str());
899 4 : return nullptr;
900 : }
901 : }
902 :
903 5 : return refFile;
904 : }
905 :
906 : /************************************************************************/
907 : /* VSIKerchunkJSONRefFileSystem::Load() */
908 : /************************************************************************/
909 :
910 : std::pair<std::shared_ptr<VSIKerchunkRefFile>, std::string>
911 337 : VSIKerchunkJSONRefFileSystem::Load(const std::string &osJSONFilename,
912 : bool bUseCache)
913 : {
914 338 : std::shared_ptr<VSIKerchunkRefFile> refFile;
915 337 : if (m_oCache.tryGet(osJSONFilename, refFile))
916 214 : return {refFile, std::string()};
917 :
918 : // Deal with local file cache
919 124 : const char *pszUseCache = VSIGetPathSpecificOption(
920 : osJSONFilename.c_str(), "VSIKERCHUNK_USE_CACHE", "NO");
921 124 : if (bUseCache || CPLTestBool(pszUseCache))
922 : {
923 26 : if (GDALGetDriverByName("PARQUET") == nullptr)
924 : {
925 0 : CPLError(CE_Failure, CPLE_NotSupported,
926 : "VSIKERCHUNK_USE_CACHE=YES only enabled if PARQUET driver "
927 : "is available");
928 0 : return {nullptr, std::string()};
929 : }
930 :
931 : VSIStatBufL sStat;
932 52 : if (VSIStatL(osJSONFilename.c_str(), &sStat) != 0 ||
933 26 : VSI_ISDIR(sStat.st_mode))
934 : {
935 0 : CPLError(CE_Failure, CPLE_FileIO, "Load json file %s failed",
936 : osJSONFilename.c_str());
937 0 : return {nullptr, std::string()};
938 : }
939 :
940 26 : std::string osCacheSubDir = CPLGetBasenameSafe(osJSONFilename.c_str());
941 : osCacheSubDir += CPLSPrintf("_%" PRIu64 "_%" PRIu64,
942 26 : static_cast<uint64_t>(sStat.st_size),
943 26 : static_cast<uint64_t>(sStat.st_mtime));
944 :
945 26 : const std::string osRootCacheDir = GDALGetCacheDirectory();
946 26 : if (!osRootCacheDir.empty())
947 : {
948 : const std::string osKerchunkCacheDir = VSIGetPathSpecificOption(
949 : osJSONFilename.c_str(), "VSIKERCHUNK_CACHE_DIR",
950 52 : CPLFormFilenameSafe(osRootCacheDir.c_str(),
951 : "zarr_kerchunk_cache", nullptr)
952 78 : .c_str());
953 : const std::string osCacheDir = CPLFormFilenameSafe(
954 52 : osKerchunkCacheDir.c_str(), osCacheSubDir.c_str(), "zarr");
955 26 : CPLDebug("VSIKerchunkJSONRefFileSystem", "Using cache dir %s",
956 : osCacheDir.c_str());
957 :
958 52 : if (VSIStatL(CPLFormFilenameSafe(osCacheDir.c_str(), ".zmetadata",
959 : nullptr)
960 : .c_str(),
961 26 : &sStat) == 0)
962 : {
963 9 : CPLDebug("VSIKerchunkJSONRefFileSystem",
964 : "Using Kerchunk Parquet cache %s", osCacheDir.c_str());
965 9 : return {nullptr, osCacheDir};
966 : }
967 :
968 23 : if (VSIMkdirRecursive(osCacheDir.c_str(), 0755) != 0 &&
969 6 : !(VSIStatL(osCacheDir.c_str(), &sStat) == 0 &&
970 0 : VSI_ISDIR(sStat.st_mode)))
971 : {
972 6 : CPLError(CE_Failure, CPLE_AppDefined,
973 : "Cannot create directory %s", osCacheDir.c_str());
974 6 : return {nullptr, std::string()};
975 : }
976 :
977 : const std::string osLockFilename =
978 22 : CPLFormFilenameSafe(osCacheDir.c_str(), ".lock", nullptr);
979 :
980 11 : CPLLockFileHandle hLockHandle = nullptr;
981 22 : CPLStringList aosOptions;
982 11 : aosOptions.SetNameValue("VERBOSE_WAIT_MESSAGE", "YES");
983 : const char *pszKerchunkDebug =
984 11 : CPLGetConfigOption("VSIKERCHUNK_FOR_TESTS", nullptr);
985 11 : if (pszKerchunkDebug &&
986 4 : strstr(pszKerchunkDebug, "SHORT_DELAY_STALLED_LOCK"))
987 : {
988 2 : aosOptions.SetNameValue("STALLED_DELAY", "1");
989 : }
990 :
991 11 : CPLDebug("VSIKerchunkJSONRefFileSystem", "Acquiring lock");
992 11 : switch (CPLLockFileEx(osLockFilename.c_str(), &hLockHandle,
993 11 : aosOptions.List()))
994 : {
995 10 : case CLFS_OK:
996 10 : break;
997 1 : case CLFS_CANNOT_CREATE_LOCK:
998 1 : CPLError(CE_Failure, CPLE_FileIO, "Cannot create lock %s",
999 : osLockFilename.c_str());
1000 1 : break;
1001 0 : case CLFS_LOCK_BUSY:
1002 0 : CPLAssert(false); // cannot happen with infinite wait time
1003 : break;
1004 0 : case CLFS_API_MISUSE:
1005 0 : CPLAssert(false);
1006 : break;
1007 0 : case CLFS_THREAD_CREATION_FAILED:
1008 0 : CPLError(CE_Failure, CPLE_AppDefined,
1009 : "Thread creation failed for refresh of %s",
1010 : osLockFilename.c_str());
1011 0 : break;
1012 : }
1013 11 : if (!hLockHandle)
1014 : {
1015 1 : return {nullptr, std::string()};
1016 : }
1017 :
1018 : struct LockFileHolder
1019 : {
1020 : CPLLockFileHandle m_hLockHandle = nullptr;
1021 :
1022 10 : explicit LockFileHolder(CPLLockFileHandle hLockHandleIn)
1023 10 : : m_hLockHandle(hLockHandleIn)
1024 : {
1025 10 : }
1026 :
1027 10 : ~LockFileHolder()
1028 10 : {
1029 10 : release();
1030 10 : }
1031 :
1032 20 : void release()
1033 : {
1034 20 : if (m_hLockHandle)
1035 : {
1036 10 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1037 : "Releasing lock");
1038 10 : CPLUnlockFileEx(m_hLockHandle);
1039 10 : m_hLockHandle = nullptr;
1040 : }
1041 20 : }
1042 :
1043 : CPL_DISALLOW_COPY_ASSIGN(LockFileHolder)
1044 : };
1045 :
1046 20 : LockFileHolder lockFileHolder(hLockHandle);
1047 :
1048 20 : if (VSIStatL(CPLFormFilenameSafe(osCacheDir.c_str(), ".zmetadata",
1049 : nullptr)
1050 : .c_str(),
1051 10 : &sStat) == 0)
1052 : {
1053 0 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1054 : "Using Kerchunk Parquet cache %s (after lock taking)",
1055 : osCacheDir.c_str());
1056 0 : return {nullptr, osCacheDir};
1057 : }
1058 :
1059 10 : refFile = LoadInternal(osJSONFilename, nullptr, nullptr);
1060 :
1061 10 : if (refFile)
1062 : {
1063 10 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1064 : "Generating Kerchunk Parquet cache %s...",
1065 : osCacheDir.c_str());
1066 :
1067 10 : if (pszKerchunkDebug &&
1068 3 : strstr(pszKerchunkDebug,
1069 : "WAIT_BEFORE_CONVERT_TO_PARQUET_REF"))
1070 : {
1071 1 : CPLSleep(0.5);
1072 : }
1073 :
1074 10 : if (refFile->ConvertToParquetRef(osCacheDir, nullptr, nullptr))
1075 : {
1076 4 : CPLDebug("VSIKerchunkJSONRefFileSystem",
1077 : "Generation Kerchunk Parquet cache %s: OK",
1078 : osCacheDir.c_str());
1079 : }
1080 : else
1081 : {
1082 6 : CPLError(CE_Failure, CPLE_AppDefined,
1083 : "Generation of Kerchunk Parquet cache %s failed",
1084 : osCacheDir.c_str());
1085 6 : refFile.reset();
1086 : }
1087 :
1088 10 : lockFileHolder.release();
1089 10 : m_oCache.insert(osJSONFilename, refFile);
1090 : }
1091 :
1092 10 : return {refFile, std::string()};
1093 : }
1094 : }
1095 :
1096 98 : refFile = LoadInternal(osJSONFilename, nullptr, nullptr);
1097 98 : if (refFile)
1098 20 : m_oCache.insert(osJSONFilename, refFile);
1099 98 : return {refFile, std::string()};
1100 : }
1101 :
1102 : /************************************************************************/
1103 : /* VSIKerchunkRefFile::ConvertToParquetRef() */
1104 : /************************************************************************/
1105 :
1106 11 : bool VSIKerchunkRefFile::ConvertToParquetRef(const std::string &osCacheDir,
1107 : GDALProgressFunc pfnProgress,
1108 : void *pProgressData)
1109 : {
1110 : struct Serializer
1111 : {
1112 : VSIVirtualHandle *m_poFile = nullptr;
1113 :
1114 11 : explicit Serializer(VSIVirtualHandle *poFile) : m_poFile(poFile)
1115 : {
1116 11 : }
1117 :
1118 289 : static void func(const char *pszTxt, void *pUserData)
1119 : {
1120 289 : Serializer *self = static_cast<Serializer *>(pUserData);
1121 289 : self->m_poFile->Write(pszTxt, 1, strlen(pszTxt));
1122 289 : }
1123 : };
1124 :
1125 : const std::string osZMetadataFilename =
1126 22 : CPLFormFilenameSafe(osCacheDir.c_str(), ".zmetadata", nullptr);
1127 22 : const std::string osZMetadataTmpFilename = osZMetadataFilename + ".tmp";
1128 : auto poFile = std::unique_ptr<VSIVirtualHandle>(
1129 22 : VSIFOpenL(osZMetadataTmpFilename.c_str(), "wb"));
1130 11 : if (!poFile)
1131 0 : return false;
1132 11 : Serializer serializer(poFile.get());
1133 :
1134 : struct ZarrArrayInfo
1135 : {
1136 : std::vector<uint64_t> anChunkCount{};
1137 : std::map<uint64_t, const VSIKerchunkKeyInfo *> chunkInfo{};
1138 : };
1139 :
1140 22 : std::map<std::string, ZarrArrayInfo> zarrArrays;
1141 :
1142 : // First pass on keys to write JSON objects in .zmetadata
1143 22 : CPLJSonStreamingWriter oWriter(Serializer::func, &serializer);
1144 11 : oWriter.StartObj();
1145 11 : oWriter.AddObjKey("metadata");
1146 11 : oWriter.StartObj();
1147 :
1148 11 : bool bOK = true;
1149 11 : size_t nCurObjectIter = 0;
1150 11 : const size_t nTotalObjects = m_oMapKeys.size();
1151 44 : for (const auto &[key, info] : m_oMapKeys)
1152 : {
1153 54 : if (cpl::ends_with(key, ".zarray") || cpl::ends_with(key, ".zgroup") ||
1154 16 : cpl::ends_with(key, ".zattrs"))
1155 : {
1156 32 : CPLJSONDocument oDoc;
1157 32 : std::string osStr;
1158 32 : osStr.append(reinterpret_cast<const char *>(info.abyValue.data()),
1159 64 : info.abyValue.size());
1160 32 : if (!oDoc.LoadMemory(osStr.c_str()))
1161 : {
1162 1 : CPLError(CE_Failure, CPLE_AppDefined,
1163 : "Cannot parse JSON content for %s", key.c_str());
1164 1 : bOK = false;
1165 1 : break;
1166 : }
1167 :
1168 31 : if (cpl::ends_with(key, ".zarray"))
1169 : {
1170 10 : const std::string osArrayName = CPLGetDirnameSafe(key.c_str());
1171 :
1172 20 : const auto oShape = oDoc.GetRoot().GetArray("shape");
1173 20 : const auto oChunks = oDoc.GetRoot().GetArray("chunks");
1174 20 : if (oShape.IsValid() && oChunks.IsValid() &&
1175 10 : oShape.Size() == oChunks.Size())
1176 : {
1177 18 : std::vector<uint64_t> anChunkCount;
1178 9 : uint64_t nTotalChunkCount = 1;
1179 17 : for (int i = 0; i < oShape.Size(); ++i)
1180 : {
1181 11 : const auto nShape = oShape[i].ToLong();
1182 11 : const auto nChunk = oChunks[i].ToLong();
1183 11 : if (nShape == 0 || nChunk == 0)
1184 : {
1185 2 : bOK = false;
1186 3 : break;
1187 : }
1188 9 : const uint64_t nChunkCount =
1189 9 : DIV_ROUND_UP(nShape, nChunk);
1190 9 : if (nChunkCount > std::numeric_limits<uint64_t>::max() /
1191 : nTotalChunkCount)
1192 : {
1193 1 : bOK = false;
1194 1 : break;
1195 : }
1196 8 : anChunkCount.push_back(nChunkCount);
1197 8 : nTotalChunkCount *= nChunkCount;
1198 : }
1199 9 : zarrArrays[osArrayName].anChunkCount =
1200 18 : std::move(anChunkCount);
1201 : }
1202 : else
1203 : {
1204 1 : bOK = false;
1205 : }
1206 10 : if (!bOK)
1207 : {
1208 4 : CPLError(CE_Failure, CPLE_AppDefined,
1209 : "Invalid Zarr array definition for %s",
1210 : osArrayName.c_str());
1211 4 : oWriter.clear();
1212 4 : break;
1213 : }
1214 : }
1215 :
1216 27 : oWriter.AddObjKey(key);
1217 27 : oWriter.AddSerializedValue(osStr);
1218 :
1219 27 : ++nCurObjectIter;
1220 31 : if (pfnProgress &&
1221 4 : !pfnProgress(static_cast<double>(nCurObjectIter) /
1222 4 : static_cast<double>(nTotalObjects),
1223 : "", pProgressData))
1224 : {
1225 0 : return false;
1226 : }
1227 : }
1228 : }
1229 :
1230 11 : constexpr int nRecordSize = 100000;
1231 :
1232 11 : if (bOK)
1233 : {
1234 6 : oWriter.EndObj();
1235 6 : oWriter.AddObjKey("record_size");
1236 6 : oWriter.Add(nRecordSize);
1237 6 : oWriter.EndObj();
1238 : }
1239 :
1240 11 : bOK = (poFile->Close() == 0) && bOK;
1241 11 : poFile.reset();
1242 :
1243 11 : if (!bOK)
1244 : {
1245 5 : oWriter.clear();
1246 5 : VSIUnlink(osZMetadataTmpFilename.c_str());
1247 5 : return false;
1248 : }
1249 :
1250 : // Second pass to retrieve chunks
1251 33 : for (const auto &[key, info] : m_oMapKeys)
1252 : {
1253 44 : if (cpl::ends_with(key, ".zarray") || cpl::ends_with(key, ".zgroup") ||
1254 16 : cpl::ends_with(key, ".zattrs"))
1255 : {
1256 : // already done
1257 : }
1258 : else
1259 : {
1260 6 : const std::string osArrayName = CPLGetDirnameSafe(key.c_str());
1261 6 : auto oIter = zarrArrays.find(osArrayName);
1262 6 : if (oIter != zarrArrays.end())
1263 : {
1264 6 : auto &oArrayInfo = oIter->second;
1265 : const CPLStringList aosIndices(
1266 6 : CSLTokenizeString2(CPLGetFilename(key.c_str()), ".", 0));
1267 6 : if ((static_cast<size_t>(aosIndices.size()) ==
1268 6 : oArrayInfo.anChunkCount.size()) ||
1269 0 : (aosIndices.size() == 1 &&
1270 0 : strcmp(aosIndices[0], "0") == 0 &&
1271 0 : oArrayInfo.anChunkCount.empty()))
1272 : {
1273 6 : std::vector<uint64_t> anIndices;
1274 11 : for (size_t i = 0; i < oArrayInfo.anChunkCount.size(); ++i)
1275 : {
1276 6 : char *endptr = nullptr;
1277 6 : anIndices.push_back(
1278 6 : std::strtoull(aosIndices[i], &endptr, 10));
1279 6 : if (aosIndices[i][0] == '-' ||
1280 12 : endptr != aosIndices[i] + strlen(aosIndices[i]) ||
1281 6 : anIndices[i] >= oArrayInfo.anChunkCount[i])
1282 : {
1283 1 : CPLError(CE_Failure, CPLE_AppDefined,
1284 : "Invalid key indices: %s", key.c_str());
1285 1 : return false;
1286 : }
1287 : }
1288 :
1289 5 : uint64_t nLinearIndex = 0;
1290 5 : uint64_t nMulFactor = 1;
1291 10 : for (size_t i = anIndices.size(); i > 0;)
1292 : {
1293 5 : --i;
1294 5 : nLinearIndex += anIndices[i] * nMulFactor;
1295 5 : nMulFactor *= oArrayInfo.anChunkCount[i];
1296 : }
1297 :
1298 : #ifdef DEBUG_VERBOSE
1299 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem",
1300 : "Chunk %" PRIu64 " of array %s found",
1301 : nLinearIndex, osArrayName.c_str());
1302 : #endif
1303 5 : oArrayInfo.chunkInfo[nLinearIndex] = &info;
1304 : }
1305 : }
1306 : }
1307 : }
1308 :
1309 5 : auto poDrv = GetGDALDriverManager()->GetDriverByName("PARQUET");
1310 5 : if (!poDrv)
1311 : {
1312 : // shouldn't happen given earlier checks
1313 0 : CPLAssert(false);
1314 : return false;
1315 : }
1316 :
1317 : // Third pass to create Parquet files
1318 10 : CPLStringList aosLayerCreationOptions;
1319 : aosLayerCreationOptions.SetNameValue("ROW_GROUP_SIZE",
1320 5 : CPLSPrintf("%d", nRecordSize));
1321 :
1322 10 : for (const auto &[osArrayName, oArrayInfo] : zarrArrays)
1323 : {
1324 5 : uint64_t nChunkCount = 1;
1325 10 : for (size_t i = 0; i < oArrayInfo.anChunkCount.size(); ++i)
1326 : {
1327 5 : nChunkCount *= oArrayInfo.anChunkCount[i];
1328 : }
1329 :
1330 0 : std::unique_ptr<GDALDataset> poDS;
1331 5 : OGRLayer *poLayer = nullptr;
1332 :
1333 10 : for (uint64_t i = 0; i < nChunkCount; ++i)
1334 : {
1335 5 : if ((i % nRecordSize) == 0)
1336 : {
1337 5 : if (poDS)
1338 : {
1339 0 : if (poDS->Close() != CE_None)
1340 : {
1341 0 : CPLError(CE_Failure, CPLE_AppDefined,
1342 : "Close() on %s failed",
1343 0 : poDS->GetDescription());
1344 0 : return false;
1345 : }
1346 0 : poDS.reset();
1347 : }
1348 5 : if (CPLHasPathTraversal(osArrayName.c_str()))
1349 : {
1350 0 : CPLError(CE_Failure, CPLE_AppDefined,
1351 : "Path traversal detected in %s",
1352 : osArrayName.c_str());
1353 0 : return false;
1354 : }
1355 : const std::string osParqFilename = CPLFormFilenameSafe(
1356 5 : CPLFormFilenameSafe(osCacheDir.c_str(), osArrayName.c_str(),
1357 : nullptr)
1358 : .c_str(),
1359 : CPLSPrintf("refs.%" PRIu64 ".parq", i / nRecordSize),
1360 10 : nullptr);
1361 5 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Creating %s",
1362 : osParqFilename.c_str());
1363 5 : VSIMkdirRecursive(
1364 10 : CPLGetPathSafe(osParqFilename.c_str()).c_str(), 0755);
1365 5 : poDS.reset(poDrv->Create(osParqFilename.c_str(), 0, 0, 0,
1366 : GDT_Unknown, nullptr));
1367 5 : if (!poDS)
1368 0 : return false;
1369 10 : poLayer = poDS->CreateLayer(
1370 10 : CPLGetBasenameSafe(osParqFilename.c_str()).c_str(), nullptr,
1371 5 : wkbNone, aosLayerCreationOptions.List());
1372 5 : if (poLayer)
1373 : {
1374 : {
1375 10 : OGRFieldDefn oFieldDefn("path", OFTString);
1376 5 : poLayer->CreateField(&oFieldDefn);
1377 : }
1378 : {
1379 10 : OGRFieldDefn oFieldDefn("offset", OFTInteger64);
1380 5 : poLayer->CreateField(&oFieldDefn);
1381 : }
1382 : {
1383 10 : OGRFieldDefn oFieldDefn("size", OFTInteger64);
1384 5 : poLayer->CreateField(&oFieldDefn);
1385 : }
1386 : {
1387 10 : OGRFieldDefn oFieldDefn("raw", OFTBinary);
1388 5 : poLayer->CreateField(&oFieldDefn);
1389 : }
1390 : }
1391 : }
1392 5 : if (!poLayer)
1393 0 : return false;
1394 :
1395 : auto poFeature =
1396 5 : std::make_unique<OGRFeature>(poLayer->GetLayerDefn());
1397 5 : auto oIter = oArrayInfo.chunkInfo.find(i);
1398 5 : if (oIter != oArrayInfo.chunkInfo.end())
1399 : {
1400 5 : const auto *chunkInfo = oIter->second;
1401 5 : if (chunkInfo->posURI)
1402 5 : poFeature->SetField(0, chunkInfo->posURI->c_str());
1403 5 : poFeature->SetField(1,
1404 5 : static_cast<GIntBig>(chunkInfo->nOffset));
1405 5 : poFeature->SetField(2, static_cast<GIntBig>(chunkInfo->nSize));
1406 5 : if (!chunkInfo->abyValue.empty())
1407 : {
1408 0 : if (chunkInfo->abyValue.size() >
1409 : static_cast<size_t>(INT_MAX))
1410 : {
1411 0 : CPLError(CE_Failure, CPLE_NotSupported,
1412 : "Too big blob for chunk %" PRIu64
1413 : " of array %s",
1414 : i, osArrayName.c_str());
1415 0 : return false;
1416 : }
1417 0 : poFeature->SetField(
1418 0 : 3, static_cast<int>(chunkInfo->abyValue.size()),
1419 0 : static_cast<const void *>(chunkInfo->abyValue.data()));
1420 : }
1421 : }
1422 :
1423 5 : if (poLayer->CreateFeature(poFeature.get()) != OGRERR_NONE)
1424 : {
1425 0 : CPLError(CE_Failure, CPLE_AppDefined,
1426 : "CreateFeature() on %s failed",
1427 0 : poDS->GetDescription());
1428 0 : return false;
1429 : }
1430 :
1431 5 : ++nCurObjectIter;
1432 5 : if (pfnProgress && (nCurObjectIter % 1000) == 0 &&
1433 0 : !pfnProgress(static_cast<double>(nCurObjectIter) /
1434 0 : static_cast<double>(nTotalObjects),
1435 : "", pProgressData))
1436 : {
1437 0 : return false;
1438 : }
1439 : }
1440 :
1441 5 : if (poDS)
1442 : {
1443 5 : if (poDS->Close() != CE_None)
1444 : {
1445 0 : CPLError(CE_Failure, CPLE_AppDefined, "Close() on %s failed",
1446 0 : poDS->GetDescription());
1447 0 : return false;
1448 : }
1449 5 : poDS.reset();
1450 : }
1451 : }
1452 :
1453 5 : if (VSIRename(osZMetadataTmpFilename.c_str(),
1454 5 : osZMetadataFilename.c_str()) != 0)
1455 : {
1456 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot rename %s to %s",
1457 : osZMetadataTmpFilename.c_str(), osZMetadataFilename.c_str());
1458 0 : return false;
1459 : }
1460 :
1461 5 : if (pfnProgress)
1462 1 : pfnProgress(1.0, "", pProgressData);
1463 :
1464 5 : return true;
1465 : }
1466 :
1467 : /************************************************************************/
1468 : /* VSIKerchunkConvertJSONToParquet() */
1469 : /************************************************************************/
1470 :
1471 1 : bool VSIKerchunkConvertJSONToParquet(const char *pszSrcJSONFilename,
1472 : const char *pszDstDirname,
1473 : GDALProgressFunc pfnProgress,
1474 : void *pProgressData)
1475 : {
1476 1 : if (GDALGetDriverByName("PARQUET") == nullptr)
1477 : {
1478 0 : CPLError(CE_Failure, CPLE_NotSupported,
1479 : "Conversion to a Parquet reference store is not possible "
1480 : "because the PARQUET driver is not available.");
1481 0 : return false;
1482 : }
1483 :
1484 1 : auto poFS = cpl::down_cast<VSIKerchunkJSONRefFileSystem *>(
1485 : VSIFileManager::GetHandler(JSON_REF_FS_PREFIX));
1486 1 : std::shared_ptr<VSIKerchunkRefFile> refFile;
1487 1 : if (!poFS->m_oCache.tryGet(pszSrcJSONFilename, refFile))
1488 : {
1489 : void *pScaledProgressData =
1490 1 : GDALCreateScaledProgress(0.0, 0.5, pfnProgress, pProgressData);
1491 : try
1492 : {
1493 2 : refFile = poFS->LoadInternal(
1494 : pszSrcJSONFilename,
1495 : pScaledProgressData ? GDALScaledProgress : nullptr,
1496 1 : pScaledProgressData);
1497 : }
1498 0 : catch (const std::exception &e)
1499 : {
1500 0 : CPLError(CE_Failure, CPLE_AppDefined,
1501 : "VSIKerchunkJSONRefFileSystem::Load() failed: %s",
1502 0 : e.what());
1503 0 : GDALDestroyScaledProgress(pScaledProgressData);
1504 0 : return false;
1505 : }
1506 1 : GDALDestroyScaledProgress(pScaledProgressData);
1507 : }
1508 1 : if (!refFile)
1509 : {
1510 0 : CPLError(CE_Failure, CPLE_AppDefined,
1511 : "%s is not a Kerchunk JSON reference store",
1512 : pszSrcJSONFilename);
1513 0 : return false;
1514 : }
1515 :
1516 1 : VSIMkdir(pszDstDirname, 0755);
1517 :
1518 : void *pScaledProgressData =
1519 1 : GDALCreateScaledProgress(0.5, 1.0, pfnProgress, pProgressData);
1520 1 : const bool bRet = refFile->ConvertToParquetRef(
1521 : pszDstDirname, pScaledProgressData ? GDALScaledProgress : nullptr,
1522 : pScaledProgressData);
1523 1 : GDALDestroyScaledProgress(pScaledProgressData);
1524 1 : return bRet;
1525 : }
1526 :
1527 : /************************************************************************/
1528 : /* VSIKerchunkJSONRefFileSystem::Open() */
1529 : /************************************************************************/
1530 :
1531 : VSIVirtualHandleUniquePtr
1532 86 : VSIKerchunkJSONRefFileSystem::Open(const char *pszFilename,
1533 : const char *pszAccess, bool /* bSetError */,
1534 : CSLConstList /* papszOptions */)
1535 : {
1536 86 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Open(%s)", pszFilename);
1537 87 : if (strcmp(pszAccess, "r") != 0 && strcmp(pszAccess, "rb") != 0)
1538 0 : return nullptr;
1539 :
1540 172 : const auto [osJSONFilename, osKey] = SplitFilename(pszFilename);
1541 87 : if (osJSONFilename.empty())
1542 4 : return nullptr;
1543 :
1544 83 : const auto [refFile, osParqFilename] = Load(
1545 163 : osJSONFilename, STARTS_WITH(pszFilename, JSON_REF_CACHED_FS_PREFIX));
1546 83 : if (!refFile)
1547 : {
1548 6 : if (osParqFilename.empty())
1549 2 : return nullptr;
1550 :
1551 : return VSIFilesystemHandler::OpenStatic(
1552 8 : CPLFormFilenameSafe(CPLSPrintf("%s{%s}", PARQUET_REF_FS_PREFIX,
1553 : osParqFilename.c_str()),
1554 : osKey.c_str(), nullptr)
1555 : .c_str(),
1556 4 : pszAccess);
1557 : }
1558 :
1559 77 : const auto oIter = refFile->GetMapKeys().find(osKey);
1560 77 : if (oIter == refFile->GetMapKeys().end())
1561 6 : return nullptr;
1562 :
1563 71 : const auto &keyInfo = oIter->second;
1564 71 : if (!keyInfo.posURI)
1565 : {
1566 : return VSIVirtualHandleUniquePtr(VSIFileFromMemBuffer(
1567 52 : nullptr, const_cast<GByte *>(keyInfo.abyValue.data()),
1568 104 : keyInfo.abyValue.size(), /* bTakeOwnership = */ false));
1569 : }
1570 : else
1571 : {
1572 : std::string osVSIPath = VSIKerchunkMorphURIToVSIPath(
1573 38 : *(keyInfo.posURI), CPLGetPathSafe(osJSONFilename.c_str()));
1574 19 : if (osVSIPath.empty())
1575 2 : return nullptr;
1576 : const std::string osPath =
1577 17 : keyInfo.nSize
1578 7 : ? CPLSPrintf("/vsisubfile/%" PRIu64 "_%u,%s", keyInfo.nOffset,
1579 7 : keyInfo.nSize, osVSIPath.c_str())
1580 34 : : std::move(osVSIPath);
1581 16 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Opening %s",
1582 : osPath.c_str());
1583 : CPLConfigOptionSetter oSetter("GDAL_DISABLE_READDIR_ON_OPEN",
1584 33 : "EMPTY_DIR", false);
1585 33 : auto fp = VSIFilesystemHandler::OpenStatic(osPath.c_str(), "rb", true);
1586 16 : if (!fp)
1587 : {
1588 1 : if (!VSIToCPLError(CE_Failure, CPLE_FileIO))
1589 0 : CPLError(CE_Failure, CPLE_FileIO, "Cannot open %s",
1590 : osPath.c_str());
1591 : }
1592 16 : return fp;
1593 : }
1594 : }
1595 :
1596 : /************************************************************************/
1597 : /* VSIKerchunkJSONRefFileSystem::Stat() */
1598 : /************************************************************************/
1599 :
1600 254 : int VSIKerchunkJSONRefFileSystem::Stat(const char *pszFilename,
1601 : VSIStatBufL *pStatBuf, int nFlags)
1602 : {
1603 254 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "Stat(%s)", pszFilename);
1604 254 : memset(pStatBuf, 0, sizeof(VSIStatBufL));
1605 :
1606 508 : const auto [osJSONFilename, osKey] = SplitFilename(pszFilename);
1607 253 : if (osJSONFilename.empty())
1608 12 : return -1;
1609 :
1610 242 : const auto [refFile, osParqFilename] = Load(
1611 483 : osJSONFilename, STARTS_WITH(pszFilename, JSON_REF_CACHED_FS_PREFIX));
1612 242 : if (!refFile)
1613 : {
1614 91 : if (osParqFilename.empty())
1615 86 : return -1;
1616 :
1617 5 : return VSIStatExL(
1618 10 : CPLFormFilenameSafe(CPLSPrintf("%s{%s}", PARQUET_REF_FS_PREFIX,
1619 : osParqFilename.c_str()),
1620 : osKey.c_str(), nullptr)
1621 : .c_str(),
1622 5 : pStatBuf, nFlags);
1623 : }
1624 :
1625 151 : if (osKey.empty())
1626 : {
1627 27 : pStatBuf->st_mode = S_IFDIR;
1628 27 : return 0;
1629 : }
1630 :
1631 124 : const auto oIter = refFile->GetMapKeys().find(osKey);
1632 124 : if (oIter == refFile->GetMapKeys().end())
1633 : {
1634 204 : if (cpl::contains(refFile->GetMapKeys(), osKey + "/.zgroup") ||
1635 136 : cpl::contains(refFile->GetMapKeys(), osKey + "/.zarray"))
1636 : {
1637 8 : pStatBuf->st_mode = S_IFDIR;
1638 8 : return 0;
1639 : }
1640 :
1641 60 : return -1;
1642 : }
1643 :
1644 56 : const auto &keyInfo = oIter->second;
1645 56 : if (!(keyInfo.posURI))
1646 : {
1647 48 : pStatBuf->st_size = keyInfo.abyValue.size();
1648 : }
1649 : else
1650 : {
1651 8 : if (keyInfo.nSize)
1652 : {
1653 4 : pStatBuf->st_size = keyInfo.nSize;
1654 : }
1655 : else
1656 : {
1657 : const std::string osVSIPath = VSIKerchunkMorphURIToVSIPath(
1658 8 : *(keyInfo.posURI), CPLGetPathSafe(osJSONFilename.c_str()));
1659 4 : if (osVSIPath.empty())
1660 0 : return -1;
1661 4 : return VSIStatExL(osVSIPath.c_str(), pStatBuf, nFlags);
1662 : }
1663 : }
1664 52 : pStatBuf->st_mode = S_IFREG;
1665 :
1666 52 : return 0;
1667 : }
1668 :
1669 : /************************************************************************/
1670 : /* VSIKerchunkJSONRefFileSystem::ReadDirEx() */
1671 : /************************************************************************/
1672 :
1673 17 : char **VSIKerchunkJSONRefFileSystem::ReadDirEx(const char *pszDirname,
1674 : int nMaxFiles)
1675 : {
1676 17 : CPLDebugOnly("VSIKerchunkJSONRefFileSystem", "ReadDir(%s)", pszDirname);
1677 :
1678 34 : const auto [osJSONFilename, osAskedKey] = SplitFilename(pszDirname);
1679 17 : if (osJSONFilename.empty())
1680 4 : return nullptr;
1681 :
1682 13 : const auto [refFile, osParqFilename] = Load(
1683 26 : osJSONFilename, STARTS_WITH(pszDirname, JSON_REF_CACHED_FS_PREFIX));
1684 13 : if (!refFile)
1685 : {
1686 3 : if (osParqFilename.empty())
1687 3 : return nullptr;
1688 :
1689 0 : return VSIReadDirEx(
1690 0 : CPLFormFilenameSafe(CPLSPrintf("%s{%s}", PARQUET_REF_FS_PREFIX,
1691 : osParqFilename.c_str()),
1692 : osAskedKey.c_str(), nullptr)
1693 : .c_str(),
1694 0 : nMaxFiles);
1695 : }
1696 :
1697 20 : std::set<std::string> set;
1698 60 : for (const auto &[key, value] : refFile->GetMapKeys())
1699 : {
1700 50 : if (osAskedKey.empty())
1701 : {
1702 20 : const auto nPos = key.find('/');
1703 20 : if (nPos == std::string::npos)
1704 8 : set.insert(key);
1705 : else
1706 12 : set.insert(key.substr(0, nPos));
1707 : }
1708 30 : else if (key.size() > osAskedKey.size() &&
1709 42 : cpl::starts_with(key, osAskedKey) &&
1710 12 : key[osAskedKey.size()] == '/')
1711 : {
1712 24 : std::string subKey = key.substr(osAskedKey.size() + 1);
1713 12 : const auto nPos = subKey.find('/');
1714 12 : if (nPos == std::string::npos)
1715 12 : set.insert(std::move(subKey));
1716 : else
1717 0 : set.insert(subKey.substr(0, nPos));
1718 : }
1719 : }
1720 :
1721 20 : CPLStringList aosRet;
1722 34 : for (const std::string &v : set)
1723 : {
1724 : // CPLDebugOnly("VSIKerchunkJSONRefFileSystem", ".. %s", v.c_str());
1725 24 : aosRet.AddString(v.c_str());
1726 : }
1727 10 : return aosRet.StealList();
1728 : }
1729 :
1730 : /************************************************************************/
1731 : /* VSIInstallKerchunkJSONRefFileSystem() */
1732 : /************************************************************************/
1733 :
1734 1750 : void VSIInstallKerchunkJSONRefFileSystem()
1735 : {
1736 : static std::mutex oMutex;
1737 3500 : std::lock_guard<std::mutex> oLock(oMutex);
1738 : // cppcheck-suppress knownConditionTrueFalse
1739 1750 : if (!VSIKerchunkJSONRefFileSystem::IsFileSystemInstantiated())
1740 : {
1741 1750 : auto fs = std::make_unique<VSIKerchunkJSONRefFileSystem>().release();
1742 1750 : VSIFileManager::InstallHandler(JSON_REF_FS_PREFIX, fs);
1743 1750 : VSIFileManager::InstallHandler(JSON_REF_CACHED_FS_PREFIX, fs);
1744 : }
1745 1750 : }
|