Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: GDAL
4 : * Purpose: Zarr driver, "sharding_indexed" codec
5 : * Author: Even Rouault <even dot rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2026, Development Seed
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "zarr_v3_codec.h"
14 :
15 : #include "cpl_mem_cache.h"
16 : #include "cpl_vsi_virtual.h"
17 : #include "cpl_worker_thread_pool.h"
18 : #include "gdal_thread_pool.h"
19 :
20 : #include <algorithm>
21 : #include <cinttypes>
22 : #include <limits>
23 : #include <mutex>
24 :
25 : // Process-wide LRU cache for shard indices.
26 : // Key: shard filepath/URL. Value: flat byte buffer of Location entries
27 : // (each 16 bytes: uint64_t nOffset + uint64_t nSize, host byte order).
28 : // Populated lazily by BatchDecodePartial.
29 : // Shards whose index exceeds GDAL_ZARR_SHARD_INDEX_CACHE_MAX_BYTES
30 : // (default 1 MiB) fall back to per-entry ReadMultiRange.
31 : // Thread safety: lru11::Cache<..., std::mutex> serialises all access.
32 : static constexpr size_t SHARD_INDEX_CACHE_ENTRIES = 64;
33 : static lru11::Cache<std::string, std::vector<GByte>, std::mutex>
34 : g_oShardIndexCache{SHARD_INDEX_CACHE_ENTRIES, 0};
35 :
36 : // Implements https://zarr-specs.readthedocs.io/en/latest/v3/codecs/sharding-indexed/index.html
37 :
38 1006 : void ZarrClearShardIndexCache()
39 : {
40 1006 : g_oShardIndexCache.clear();
41 1006 : }
42 :
43 37 : void ZarrEraseShardIndexFromCache(const std::string &osFilename)
44 : {
45 37 : g_oShardIndexCache.remove(osFilename);
46 37 : }
47 :
48 : /************************************************************************/
49 : /* ZarrV3CodecShardingIndexed() */
50 : /************************************************************************/
51 :
52 718 : ZarrV3CodecShardingIndexed::ZarrV3CodecShardingIndexed() : ZarrV3Codec(NAME)
53 : {
54 718 : }
55 :
56 : /************************************************************************/
57 : /* ZarrV3CodecShardingIndexed::Clone() */
58 : /************************************************************************/
59 :
60 8 : std::unique_ptr<ZarrV3Codec> ZarrV3CodecShardingIndexed::Clone() const
61 : {
62 16 : auto psClone = std::make_unique<ZarrV3CodecShardingIndexed>();
63 16 : ZarrArrayMetadata oOutputArrayMetadata;
64 16 : psClone->InitFromConfiguration(std::string(), m_oConfiguration,
65 8 : m_oInputArrayMetadata, oOutputArrayMetadata,
66 : /* bEmitWarnings = */ false);
67 16 : return psClone;
68 : }
69 :
70 : /************************************************************************/
71 : /* ZarrV3CodecShardingIndexed::InitFromConfiguration() */
72 : /************************************************************************/
73 :
74 718 : bool ZarrV3CodecShardingIndexed::InitFromConfiguration(
75 : const std::string &osArrayName, const CPLJSONObject &configuration,
76 : const ZarrArrayMetadata &oInputArrayMetadata,
77 : ZarrArrayMetadata &oOutputArrayMetadata, bool bEmitWarnings)
78 : {
79 718 : if (oInputArrayMetadata.anBlockSizes.empty())
80 : {
81 0 : CPLError(
82 : CE_Failure, CPLE_AppDefined,
83 : "Codec sharding_indexed: sharding not supported for scalar array");
84 0 : return false;
85 : }
86 :
87 718 : m_oConfiguration = configuration.Clone();
88 718 : m_oInputArrayMetadata = oInputArrayMetadata;
89 :
90 1435 : if (!configuration.IsValid() ||
91 717 : configuration.GetType() != CPLJSONObject::Type::Object)
92 : {
93 2 : CPLError(
94 : CE_Failure, CPLE_AppDefined,
95 : "Codec sharding_indexed: configuration missing or not an object");
96 2 : return false;
97 : }
98 :
99 2148 : const auto oChunkShape = configuration["chunk_shape"].ToArray();
100 1430 : if (!oChunkShape.IsValid() ||
101 714 : oChunkShape.GetType() != CPLJSONObject::Type::Array)
102 : {
103 2 : CPLError(CE_Failure, CPLE_AppDefined,
104 : "Codec sharding_indexed: configuration.chunk_shape missing or "
105 : "not an array");
106 2 : return false;
107 : }
108 1428 : if (static_cast<size_t>(oChunkShape.Size()) !=
109 714 : m_oInputArrayMetadata.anBlockSizes.size())
110 : {
111 1 : CPLError(CE_Failure, CPLE_AppDefined,
112 : "Codec sharding_indexed: configuration.chunk_shape should "
113 : "have the same shape as the array");
114 1 : return false;
115 : }
116 1426 : std::vector<size_t> anCountInnerChunks;
117 2155 : for (int i = 0; i < oChunkShape.Size(); ++i)
118 : {
119 2890 : if (oChunkShape[i].GetType() != CPLJSONObject::Type::Integer &&
120 1445 : oChunkShape[i].GetType() != CPLJSONObject::Type::Long)
121 : {
122 0 : CPLError(CE_Failure, CPLE_AppDefined,
123 : "Codec sharding_indexed: configuration.chunk_shape[%d] "
124 : "should be an integer",
125 : i);
126 0 : return false;
127 : }
128 1445 : const int64_t nVal = oChunkShape[i].ToLong();
129 2888 : if (nVal <= 0 ||
130 1443 : static_cast<uint64_t>(nVal) >
131 2888 : m_oInputArrayMetadata.anBlockSizes[i] ||
132 1443 : (m_oInputArrayMetadata.anBlockSizes[i] % nVal) != 0)
133 : {
134 3 : CPLError(
135 : CE_Failure, CPLE_AppDefined,
136 : "Codec sharding_indexed: configuration.chunk_shape[%d]=%" PRId64
137 : " should be a strictly positive value that is a divisor of "
138 : "%" PRIu64,
139 : i, nVal,
140 3 : static_cast<uint64_t>(m_oInputArrayMetadata.anBlockSizes[i]));
141 3 : return false;
142 : }
143 : #ifndef __COVERITY__
144 : // The following cast is safe since ZarrArray::ParseChunkSize() has
145 : // previously validated that m_oInputArrayMetadata.anBlockSizes[i] fits
146 : // on size_t
147 : if constexpr (sizeof(size_t) < sizeof(uint64_t))
148 : {
149 : // coverity[result_independent_of_operands]
150 : CPLAssert(nVal <= std::numeric_limits<size_t>::max());
151 : }
152 : #endif
153 1442 : m_anInnerBlockSize.push_back(static_cast<size_t>(nVal));
154 1442 : anCountInnerChunks.push_back(
155 1442 : static_cast<size_t>(m_oInputArrayMetadata.anBlockSizes[i] / nVal));
156 : }
157 :
158 2130 : const auto oCodecs = configuration["codecs"];
159 710 : if (!oCodecs.IsValid() || oCodecs.GetType() != CPLJSONObject::Type::Array)
160 : {
161 2 : CPLError(CE_Failure, CPLE_AppDefined,
162 : "Codec sharding_indexed: configuration.codecs missing or "
163 : "not an array");
164 2 : return false;
165 : }
166 708 : if (oCodecs.ToArray().Size() == 0)
167 : {
168 1 : CPLError(CE_Failure, CPLE_AppDefined,
169 : "Codec sharding_indexed: configuration.codecs[] is empty");
170 1 : return false;
171 : }
172 1414 : ZarrArrayMetadata inputArrayMetadataCodecs = m_oInputArrayMetadata;
173 707 : inputArrayMetadataCodecs.anBlockSizes = m_anInnerBlockSize;
174 : m_poCodecSequence =
175 707 : std::make_unique<ZarrV3CodecSequence>(inputArrayMetadataCodecs);
176 707 : if (!m_poCodecSequence->InitFromJson(osArrayName, oCodecs,
177 : oOutputArrayMetadata))
178 : {
179 1 : CPLError(CE_Failure, CPLE_AppDefined,
180 : "Codec sharding_indexed: initialization of codecs failed");
181 1 : return false;
182 : }
183 :
184 706 : if (bEmitWarnings && m_poCodecSequence->SupportsPartialDecoding())
185 : {
186 : // Implementation limitation
187 1 : CPLError(CE_Warning, CPLE_AppDefined,
188 : "Nested sharding detected. For now, partial decoding is only "
189 : "implemented on the outer-most shard level");
190 : }
191 :
192 2118 : const auto oIndexCodecs = configuration["index_codecs"];
193 1411 : if (!oIndexCodecs.IsValid() ||
194 705 : oIndexCodecs.GetType() != CPLJSONObject::Type::Array)
195 : {
196 2 : CPLError(
197 : CE_Failure, CPLE_AppDefined,
198 : "Codec sharding_indexed: configuration.index_codecs missing or "
199 : "not an array");
200 2 : return false;
201 : }
202 704 : if (oIndexCodecs.ToArray().Size() == 0)
203 : {
204 1 : CPLError(
205 : CE_Failure, CPLE_AppDefined,
206 : "Codec sharding_indexed: configuration.index_codecs[] is empty");
207 1 : return false;
208 : }
209 1406 : ZarrArrayMetadata inputArrayMetadataIndex;
210 703 : inputArrayMetadataIndex.oElt.nativeType =
211 : DtypeElt::NativeType::UNSIGNED_INT;
212 703 : inputArrayMetadataIndex.oElt.nativeSize = sizeof(uint64_t);
213 : inputArrayMetadataIndex.oElt.gdalType =
214 703 : GDALExtendedDataType::Create(GDT_UInt64);
215 703 : inputArrayMetadataIndex.oElt.gdalSize = sizeof(uint64_t);
216 703 : inputArrayMetadataIndex.anBlockSizes = std::move(anCountInnerChunks);
217 : // 2 for offset and size
218 703 : inputArrayMetadataIndex.anBlockSizes.push_back(2);
219 : m_poIndexCodecSequence =
220 703 : std::make_unique<ZarrV3CodecSequence>(inputArrayMetadataIndex);
221 1406 : ZarrArrayMetadata oOutputArrayMetadataIndex;
222 703 : if (!m_poIndexCodecSequence->InitFromJson(osArrayName, oIndexCodecs,
223 : oOutputArrayMetadataIndex))
224 : {
225 1 : CPLError(
226 : CE_Failure, CPLE_AppDefined,
227 : "Codec sharding_indexed: initialization of index_codecs failed");
228 1 : return false;
229 : }
230 702 : const auto &indexCodecs = m_poIndexCodecSequence->GetCodecs();
231 702 : if (indexCodecs.empty())
232 : {
233 : // ok, there is only a "bytes" codec, optimized away if the order
234 : // is the one of the native architecture
235 : }
236 1116 : else if (indexCodecs[0]->GetName() == ZarrV3CodecBytes::NAME ||
237 557 : indexCodecs[0]->GetName() == ZarrV3CodecCRC32C::NAME)
238 : {
239 : // ok
240 : }
241 1 : else if (indexCodecs.size() == 2 &&
242 0 : indexCodecs[1]->GetName() == ZarrV3CodecCRC32C::NAME)
243 : {
244 : // ok
245 : }
246 : else
247 : {
248 1 : CPLError(CE_Failure, CPLE_NotSupported,
249 : "Codec sharding_indexed: this implementation only supports "
250 : "Bytes, possibly followed by CRC32C, as index_codecs");
251 1 : return false;
252 : }
253 701 : m_bIndexHasCRC32 = (!indexCodecs.empty() && indexCodecs.back()->GetName() ==
254 : ZarrV3CodecCRC32C::NAME);
255 :
256 : const std::string osIndexLocation =
257 2103 : configuration.GetString("index_location", "end");
258 701 : if (osIndexLocation != "start" && osIndexLocation != "end")
259 : {
260 1 : CPLError(CE_Failure, CPLE_AppDefined,
261 : "Codec sharding_indexed: invalid value for index_location");
262 1 : return false;
263 : }
264 700 : m_bIndexLocationAtEnd = (osIndexLocation == "end");
265 :
266 700 : return true;
267 : }
268 :
269 : /************************************************************************/
270 : /* ExtractSubArrayFromLargerOne() */
271 : /************************************************************************/
272 :
273 : // Inverse of CopySubArrayIntoLargerOne(): extract a contiguous inner chunk
274 : // from its position in the larger shard buffer.
275 : static void
276 153 : ExtractSubArrayFromLargerOne(const ZarrByteVectorQuickResize &abySrc,
277 : const std::vector<size_t> &anSrcBlockSize,
278 : const std::vector<size_t> &anInnerBlockSize,
279 : const std::vector<size_t> &anInnerBlockIndices,
280 : ZarrByteVectorQuickResize &abyChunk,
281 : const size_t nDTSize)
282 : {
283 153 : const auto nDims = anInnerBlockSize.size();
284 153 : CPLAssert(nDims > 0);
285 153 : CPLAssert(nDims == anInnerBlockIndices.size());
286 153 : CPLAssert(nDims == anSrcBlockSize.size());
287 : // +1 to avoid gcc -Wnull-dereference false positives
288 306 : std::vector<const GByte *> srcPtrStack(nDims + 1);
289 306 : std::vector<size_t> count(nDims + 1);
290 306 : std::vector<size_t> srcStride(nDims + 1);
291 :
292 153 : size_t nSrcStride = nDTSize;
293 483 : for (size_t iDim = nDims; iDim > 0;)
294 : {
295 330 : --iDim;
296 330 : srcStride[iDim] = nSrcStride;
297 330 : nSrcStride *= anSrcBlockSize[iDim];
298 : }
299 :
300 153 : srcPtrStack[0] = abySrc.data();
301 483 : for (size_t iDim = 0; iDim < nDims; ++iDim)
302 : {
303 330 : CPLAssert((anInnerBlockIndices[iDim] + 1) * anInnerBlockSize[iDim] <=
304 : anSrcBlockSize[iDim]);
305 660 : srcPtrStack[0] += anInnerBlockIndices[iDim] * anInnerBlockSize[iDim] *
306 330 : srcStride[iDim];
307 : }
308 153 : GByte *pabyDst = abyChunk.data();
309 :
310 153 : const size_t nLastDimSize = anInnerBlockSize.back() * nDTSize;
311 153 : size_t dimIdx = 0;
312 1025 : lbl_next_depth:
313 1025 : if (dimIdx + 1 == nDims)
314 : {
315 848 : memcpy(pabyDst, srcPtrStack[dimIdx], nLastDimSize);
316 848 : pabyDst += nLastDimSize;
317 : }
318 : else
319 : {
320 177 : count[dimIdx] = anInnerBlockSize[dimIdx];
321 : while (true)
322 : {
323 872 : dimIdx++;
324 872 : srcPtrStack[dimIdx] = srcPtrStack[dimIdx - 1];
325 872 : goto lbl_next_depth;
326 872 : lbl_return_to_caller:
327 872 : dimIdx--;
328 872 : if (--count[dimIdx] == 0)
329 177 : break;
330 695 : srcPtrStack[dimIdx] += srcStride[dimIdx];
331 : }
332 : }
333 1025 : if (dimIdx > 0)
334 872 : goto lbl_return_to_caller;
335 153 : }
336 :
337 : /************************************************************************/
338 : /* IsAllNoData() */
339 : /************************************************************************/
340 :
341 : // Check if a buffer consists entirely of the fill value.
342 153 : static bool IsAllNoData(const ZarrByteVectorQuickResize &abyChunk,
343 : const ZarrArrayMetadata &metadata)
344 : {
345 153 : const size_t nDTSize = metadata.oElt.nativeSize;
346 153 : const size_t nBytes = abyChunk.size();
347 :
348 157 : if (metadata.abyNoData.empty() ||
349 157 : metadata.abyNoData == std::vector<GByte>(nDTSize, 0))
350 : {
351 : // Zero fill value: byte-by-byte check (compiler auto-vectorizes)
352 149 : const GByte *p = abyChunk.data();
353 4700 : for (size_t i = 0; i < nBytes; ++i)
354 : {
355 4664 : if (p[i] != 0)
356 113 : return false;
357 : }
358 36 : return true;
359 : }
360 : else
361 : {
362 : // Non-zero fill value: element-wise compare
363 4 : CPLAssert(metadata.abyNoData.size() == nDTSize);
364 4 : const GByte *p = abyChunk.data();
365 4 : const size_t nCount = nBytes / nDTSize;
366 4 : for (size_t i = 0; i < nCount; ++i)
367 : {
368 4 : if (memcmp(p + i * nDTSize, metadata.abyNoData.data(), nDTSize) !=
369 : 0)
370 4 : return false;
371 : }
372 0 : return true;
373 : }
374 : }
375 :
376 : /************************************************************************/
377 : /* ZarrV3CodecShardingIndexed::Encode() */
378 : /************************************************************************/
379 :
380 37 : bool ZarrV3CodecShardingIndexed::Encode(const ZarrByteVectorQuickResize &abySrc,
381 : ZarrByteVectorQuickResize &abyDst) const
382 : {
383 : // Compute total number of inner chunks
384 37 : size_t nInnerChunks = 1;
385 115 : for (size_t i = 0; i < m_anInnerBlockSize.size(); ++i)
386 : {
387 : const size_t nCountInnerChunksThisDim =
388 78 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
389 78 : nInnerChunks *= nCountInnerChunksThisDim;
390 : }
391 :
392 37 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
393 : const size_t nExpectedSrcSize =
394 37 : nDTSize * MultiplyElements(m_oInputArrayMetadata.anBlockSizes);
395 37 : if (abySrc.size() != nExpectedSrcSize)
396 : {
397 0 : CPLError(CE_Failure, CPLE_AppDefined,
398 : "ZarrV3CodecShardingIndexed::Encode(): input buffer size "
399 : "(%" PRIu64 ") != expected (%" PRIu64 ")",
400 0 : static_cast<uint64_t>(abySrc.size()),
401 : static_cast<uint64_t>(nExpectedSrcSize));
402 0 : return false;
403 : }
404 :
405 : // Index: one Location per inner chunk, initially all empty
406 : std::vector<Location> anLocations(nInnerChunks,
407 : {std::numeric_limits<uint64_t>::max(),
408 74 : std::numeric_limits<uint64_t>::max()});
409 :
410 : // Accumulate encoded inner chunk data
411 74 : ZarrByteVectorQuickResize abyData;
412 : // Reserve approximate capacity: decoded chunk size is close to the upper
413 : // bound per chunk (compression may slightly increase size in pathological
414 : // cases), but avoids most reallocations.
415 : const size_t nDecodedChunkSize =
416 37 : nDTSize * MultiplyElements(m_anInnerBlockSize);
417 : // resize+clear = reserve (ZarrByteVectorQuickResize has no reserve())
418 : try
419 : {
420 37 : abyData.resize(nInnerChunks * nDecodedChunkSize);
421 : }
422 0 : catch (const std::exception &)
423 : {
424 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
425 : "Cannot allocate memory for accumulated shard data");
426 0 : return false;
427 : }
428 37 : abyData.clear();
429 74 : ZarrByteVectorQuickResize abyChunk;
430 :
431 37 : uint64_t nCurrentOffset = 0;
432 74 : std::vector<size_t> anChunkIndices(m_anInnerBlockSize.size(), 0);
433 :
434 190 : for (size_t iChunk = 0; iChunk < nInnerChunks; ++iChunk)
435 : {
436 : // Update chunk coordinates (same iteration order as Decode)
437 153 : if (iChunk > 0)
438 : {
439 116 : size_t iDim = m_anInnerBlockSize.size() - 1;
440 164 : while (++anChunkIndices[iDim] ==
441 328 : m_oInputArrayMetadata.anBlockSizes[iDim] /
442 164 : m_anInnerBlockSize[iDim])
443 : {
444 48 : anChunkIndices[iDim] = 0;
445 48 : --iDim;
446 : }
447 : }
448 :
449 : // Extract this inner chunk from the shard buffer
450 : try
451 : {
452 153 : abyChunk.resize(nDecodedChunkSize);
453 : }
454 0 : catch (const std::exception &)
455 : {
456 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
457 : "Cannot allocate memory for inner chunk");
458 0 : return false;
459 : }
460 153 : ExtractSubArrayFromLargerOne(abySrc, m_oInputArrayMetadata.anBlockSizes,
461 153 : m_anInnerBlockSize, anChunkIndices,
462 : abyChunk, nDTSize);
463 :
464 : // Skip empty (all-nodata) chunks
465 153 : if (IsAllNoData(abyChunk, m_oInputArrayMetadata))
466 36 : continue;
467 :
468 : // Encode inner chunk through codec chain (bytes + compressor)
469 117 : if (!m_poCodecSequence->Encode(abyChunk))
470 : {
471 0 : CPLError(CE_Failure, CPLE_AppDefined,
472 : "ZarrV3CodecShardingIndexed::Encode(): cannot encode "
473 : "inner chunk %" PRIu64,
474 : static_cast<uint64_t>(iChunk));
475 0 : return false;
476 : }
477 :
478 117 : anLocations[iChunk] = {nCurrentOffset, abyChunk.size()};
479 117 : nCurrentOffset += abyChunk.size();
480 117 : abyData.insert(abyData.end(), abyChunk.begin(), abyChunk.end());
481 : }
482 :
483 : // All inner chunks are nodata: signal empty shard via zero-length output
484 37 : if (abyData.empty())
485 : {
486 0 : abyDst.clear();
487 0 : return true;
488 : }
489 :
490 : // Build index buffer
491 74 : ZarrByteVectorQuickResize abyIndex;
492 37 : const size_t nIndexRawSize = nInnerChunks * sizeof(Location);
493 :
494 : // If index is at start, data offsets must account for the index prefix.
495 : // Encode the index once to determine its encoded size (includes CRC32C
496 : // overhead), then re-encode below with the adjusted offsets.
497 : // Note: currently unreachable in write mode (creation hardcodes "end"),
498 : // but kept for completeness with the Decode() start-index path.
499 37 : if (!m_bIndexLocationAtEnd)
500 : {
501 : // Encode a temporary copy of the index to determine its encoded size
502 0 : abyIndex.resize(nIndexRawSize);
503 0 : memcpy(abyIndex.data(), anLocations.data(), nIndexRawSize);
504 0 : if (!m_poIndexCodecSequence->Encode(abyIndex))
505 : {
506 0 : CPLError(
507 : CE_Failure, CPLE_AppDefined,
508 : "ZarrV3CodecShardingIndexed::Encode(): cannot encode index");
509 0 : return false;
510 : }
511 0 : const uint64_t nIndexEncodedSize = abyIndex.size();
512 : // Adjust offsets
513 0 : for (auto &loc : anLocations)
514 : {
515 0 : if (loc.nOffset != std::numeric_limits<uint64_t>::max())
516 0 : loc.nOffset += nIndexEncodedSize;
517 : }
518 : }
519 :
520 : // (Re-)encode index with final offsets
521 37 : abyIndex.resize(nIndexRawSize);
522 37 : memcpy(abyIndex.data(), anLocations.data(), nIndexRawSize);
523 37 : if (!m_poIndexCodecSequence->Encode(abyIndex))
524 : {
525 0 : CPLError(CE_Failure, CPLE_AppDefined,
526 : "ZarrV3CodecShardingIndexed::Encode(): cannot encode index");
527 0 : return false;
528 : }
529 :
530 : // Assemble output: data + index (or index + data)
531 37 : if (m_bIndexLocationAtEnd)
532 : {
533 37 : abyDst = std::move(abyData);
534 37 : abyDst.insert(abyDst.end(), abyIndex.begin(), abyIndex.end());
535 : }
536 : else
537 : {
538 0 : abyDst = std::move(abyIndex);
539 0 : abyDst.insert(abyDst.end(), abyData.begin(), abyData.end());
540 : }
541 :
542 37 : return true;
543 : }
544 :
545 : /************************************************************************/
546 : /* CopySubArrayIntoLargerOne() */
547 : /************************************************************************/
548 :
549 : static void
550 148 : CopySubArrayIntoLargerOne(const ZarrByteVectorQuickResize &abyChunk,
551 : const std::vector<size_t> &anInnerBlockSize,
552 : const std::vector<size_t> &anInnerBlockIndices,
553 : ZarrByteVectorQuickResize &abyDst,
554 : const std::vector<size_t> &anDstBlockSize,
555 : const size_t nDTSize)
556 : {
557 148 : const auto nDims = anInnerBlockSize.size();
558 148 : CPLAssert(nDims > 0);
559 148 : CPLAssert(nDims == anInnerBlockIndices.size());
560 148 : CPLAssert(nDims == anDstBlockSize.size());
561 : // +1 just to make some gcc versions not emit -Wnull-dereference false positives
562 296 : std::vector<GByte *> dstPtrStack(nDims + 1);
563 296 : std::vector<size_t> count(nDims + 1);
564 296 : std::vector<size_t> dstStride(nDims + 1);
565 :
566 148 : size_t nDstStride = nDTSize;
567 444 : for (size_t iDim = nDims; iDim > 0;)
568 : {
569 296 : --iDim;
570 296 : dstStride[iDim] = nDstStride;
571 296 : nDstStride *= anDstBlockSize[iDim];
572 : }
573 :
574 148 : dstPtrStack[0] = abyDst.data();
575 444 : for (size_t iDim = 0; iDim < nDims; ++iDim)
576 : {
577 296 : CPLAssert((anInnerBlockIndices[iDim] + 1) * anInnerBlockSize[iDim] <=
578 : anDstBlockSize[iDim]);
579 592 : dstPtrStack[0] += anInnerBlockIndices[iDim] * anInnerBlockSize[iDim] *
580 296 : dstStride[iDim];
581 : }
582 148 : const GByte *pabySrc = abyChunk.data();
583 :
584 148 : const size_t nLastDimSize = anInnerBlockSize.back() * nDTSize;
585 148 : size_t dimIdx = 0;
586 312 : lbl_next_depth:
587 312 : if (dimIdx + 1 == nDims)
588 : {
589 164 : memcpy(dstPtrStack[dimIdx], pabySrc, nLastDimSize);
590 164 : pabySrc += nLastDimSize;
591 : }
592 : else
593 : {
594 148 : count[dimIdx] = anInnerBlockSize[dimIdx];
595 : while (true)
596 : {
597 164 : dimIdx++;
598 164 : dstPtrStack[dimIdx] = dstPtrStack[dimIdx - 1];
599 164 : goto lbl_next_depth;
600 164 : lbl_return_to_caller:
601 164 : dimIdx--;
602 164 : if (--count[dimIdx] == 0)
603 148 : break;
604 16 : dstPtrStack[dimIdx] += dstStride[dimIdx];
605 : }
606 : }
607 312 : if (dimIdx > 0)
608 164 : goto lbl_return_to_caller;
609 148 : }
610 :
611 : /************************************************************************/
612 : /* FillWithNoData() */
613 : /************************************************************************/
614 :
615 152 : static void FillWithNoData(ZarrByteVectorQuickResize &abyDst,
616 : const size_t nCount,
617 : const ZarrArrayMetadata &metadata)
618 : {
619 152 : const size_t nDTSize = metadata.oElt.nativeSize;
620 304 : if (metadata.abyNoData.empty() ||
621 304 : metadata.abyNoData == std::vector<GByte>(nDTSize, 0))
622 : {
623 150 : memset(abyDst.data(), 0, nDTSize * nCount);
624 : }
625 : else
626 : {
627 2 : CPLAssert(metadata.abyNoData.size() == nDTSize);
628 152 : for (size_t i = 0; i < nCount; ++i)
629 : {
630 150 : memcpy(abyDst.data() + i * nDTSize, metadata.abyNoData.data(),
631 : nDTSize);
632 : }
633 : }
634 152 : }
635 :
636 : /************************************************************************/
637 : /* ZarrV3CodecShardingIndexed::Decode() */
638 : /************************************************************************/
639 :
640 148 : bool ZarrV3CodecShardingIndexed::Decode(const ZarrByteVectorQuickResize &abySrc,
641 : ZarrByteVectorQuickResize &abyDst) const
642 : {
643 148 : size_t nInnerChunks = 1;
644 443 : for (size_t i = 0; i < m_anInnerBlockSize.size(); ++i)
645 : {
646 : const size_t nCountInnerChunksThisdim =
647 295 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
648 295 : nInnerChunks *= nCountInnerChunksThisdim;
649 : }
650 :
651 296 : const size_t nIndexEncodedSize = nInnerChunks * sizeof(Location) +
652 148 : (m_bIndexHasCRC32 ? sizeof(uint32_t) : 0);
653 148 : if (abySrc.size() < nIndexEncodedSize)
654 : {
655 1 : CPLError(CE_Failure, CPLE_NotSupported,
656 : "ZarrV3CodecShardingIndexed::Decode(): input buffer is too "
657 : "small to hold the shard index");
658 1 : return false;
659 : }
660 294 : ZarrByteVectorQuickResize abyIndex;
661 147 : if (m_bIndexLocationAtEnd)
662 : {
663 147 : abyIndex.insert(abyIndex.end(),
664 147 : abySrc.begin() + (abySrc.size() - nIndexEncodedSize),
665 441 : abySrc.end());
666 : }
667 : else
668 : {
669 0 : abyIndex.insert(abyIndex.end(), abySrc.begin(),
670 0 : abySrc.end() + nIndexEncodedSize);
671 : }
672 :
673 147 : if (!m_poIndexCodecSequence->Decode(abyIndex))
674 : {
675 0 : CPLError(
676 : CE_Failure, CPLE_NotSupported,
677 : "ZarrV3CodecShardingIndexed::Decode(): cannot decode shard index");
678 0 : return false;
679 : }
680 :
681 147 : if (abyIndex.size() != nInnerChunks * sizeof(Location))
682 : {
683 0 : CPLError(CE_Failure, CPLE_NotSupported,
684 : "ZarrV3CodecShardingIndexed::Decode(): shard index has not "
685 : "expected size");
686 0 : return false;
687 : }
688 :
689 : const Location *panLocations =
690 147 : reinterpret_cast<const Location *>(abyIndex.data());
691 :
692 294 : ZarrByteVectorQuickResize abyChunk;
693 147 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
694 : const size_t nExpectedDecodedChunkSize =
695 147 : nDTSize * MultiplyElements(m_anInnerBlockSize);
696 : const size_t nDstCount =
697 147 : MultiplyElements(m_oInputArrayMetadata.anBlockSizes);
698 :
699 : try
700 : {
701 147 : abyDst.resize(nDstCount * nDTSize);
702 : }
703 0 : catch (const std::exception &)
704 : {
705 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
706 : "Cannot allocate memory for decoded shard");
707 0 : return false;
708 : }
709 :
710 147 : FillWithNoData(abyDst, nDstCount, m_oInputArrayMetadata);
711 :
712 294 : std::vector<size_t> anChunkIndices(m_anInnerBlockSize.size(), 0);
713 385 : for (size_t iChunk = 0; iChunk < nInnerChunks; ++iChunk)
714 : {
715 368 : if (iChunk > 0)
716 : {
717 : // Update chunk coordinates
718 221 : size_t iDim = m_anInnerBlockSize.size() - 1;
719 303 : while (++anChunkIndices[iDim] ==
720 606 : m_oInputArrayMetadata.anBlockSizes[iDim] /
721 303 : m_anInnerBlockSize[iDim])
722 : {
723 82 : anChunkIndices[iDim] = 0;
724 82 : --iDim;
725 : }
726 : }
727 :
728 : #ifdef DEBUG_VERBOSE
729 : CPLDebug("ZARR", "Chunk %" PRIu64 ": offset %" PRIu64 ", size %" PRIu64,
730 : static_cast<uint64_t>(iChunk), panLocations[iChunk].nOffset,
731 : panLocations[iChunk].nSize);
732 : #endif
733 :
734 826 : if (panLocations[iChunk].nOffset ==
735 478 : std::numeric_limits<uint64_t>::max() &&
736 110 : panLocations[iChunk].nSize == std::numeric_limits<uint64_t>::max())
737 : {
738 : // Empty chunk
739 90 : continue;
740 : }
741 :
742 496 : if (panLocations[iChunk].nOffset >= abySrc.size() ||
743 218 : panLocations[iChunk].nSize >
744 218 : abySrc.size() - panLocations[iChunk].nOffset)
745 : {
746 80 : CPLError(CE_Failure, CPLE_NotSupported,
747 : "ZarrV3CodecShardingIndexed::Decode(): invalid chunk "
748 : "location for chunk %" PRIu64 ": offset=%" PRIu64
749 : ", size=%" PRIu64,
750 : static_cast<uint64_t>(iChunk),
751 80 : panLocations[iChunk].nOffset, panLocations[iChunk].nSize);
752 80 : return false;
753 : }
754 :
755 198 : abyChunk.clear();
756 : abyChunk.insert(
757 198 : abyChunk.end(),
758 0 : abySrc.begin() + static_cast<size_t>(panLocations[iChunk].nOffset),
759 0 : abySrc.begin() + static_cast<size_t>(panLocations[iChunk].nOffset +
760 396 : panLocations[iChunk].nSize));
761 198 : if (!m_poCodecSequence->Decode(abyChunk))
762 : {
763 50 : CPLError(CE_Failure, CPLE_NotSupported,
764 : "ZarrV3CodecShardingIndexed::Decode(): cannot decode "
765 : "chunk %" PRIu64,
766 : static_cast<uint64_t>(iChunk));
767 50 : return false;
768 : }
769 :
770 148 : if (abyChunk.size() != nExpectedDecodedChunkSize)
771 : {
772 0 : CPLError(CE_Failure, CPLE_NotSupported,
773 : "ZarrV3CodecShardingIndexed::Decode(): decoded size for "
774 : "chunk %" PRIu64 " is %" PRIu64 " whereas %" PRIu64
775 : " is expected",
776 : static_cast<uint64_t>(iChunk),
777 0 : static_cast<uint64_t>(abyChunk.size()),
778 : static_cast<uint64_t>(nExpectedDecodedChunkSize));
779 0 : return false;
780 : }
781 :
782 148 : CopySubArrayIntoLargerOne(abyChunk, m_anInnerBlockSize, anChunkIndices,
783 148 : abyDst, m_oInputArrayMetadata.anBlockSizes,
784 : nDTSize);
785 : }
786 :
787 17 : return true;
788 : }
789 :
790 : /************************************************************************/
791 : /* ZarrV3CodecShardingIndexed::DecodePartial() */
792 : /************************************************************************/
793 :
794 1028 : bool ZarrV3CodecShardingIndexed::DecodePartial(
795 : VSIVirtualHandle *poFile, const ZarrByteVectorQuickResize & /* abySrc */,
796 : ZarrByteVectorQuickResize &abyDst, std::vector<size_t> &anStartIdx,
797 : std::vector<size_t> &anCount)
798 : {
799 1028 : CPLAssert(anStartIdx.size() == m_oInputArrayMetadata.anBlockSizes.size());
800 1028 : CPLAssert(anStartIdx.size() == anCount.size());
801 :
802 1028 : size_t nInnerChunkCount = 1;
803 1028 : size_t nInnerChunkIdx = 0;
804 3085 : for (size_t i = 0; i < anStartIdx.size(); ++i)
805 : {
806 2057 : CPLAssert(anStartIdx[i] + anCount[i] <=
807 : m_oInputArrayMetadata.anBlockSizes[i]);
808 4114 : if ((anStartIdx[i] % m_anInnerBlockSize[i]) != 0 ||
809 2057 : anCount[i] != m_anInnerBlockSize[i])
810 : {
811 : // Should not happen with the current call sites.
812 0 : CPLError(CE_Failure, CPLE_AppDefined,
813 : "ZarrV3CodecShardingIndexed::DecodePartial() only "
814 : "supported on an exact inner chunk");
815 0 : return false;
816 : }
817 :
818 : const size_t nCountInnerChunksThisDim =
819 2057 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
820 4114 : nInnerChunkIdx = nInnerChunkIdx * nCountInnerChunksThisDim +
821 2057 : anStartIdx[i] / m_anInnerBlockSize[i];
822 2057 : nInnerChunkCount *= nCountInnerChunksThisDim;
823 : }
824 :
825 1028 : abyDst.clear();
826 :
827 1028 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
828 1028 : const auto nExpectedDecodedChunkSize = nDTSize * MultiplyElements(anCount);
829 :
830 1028 : vsi_l_offset nLocationOffset =
831 : static_cast<vsi_l_offset>(nInnerChunkIdx) * sizeof(Location);
832 1028 : if (m_bIndexLocationAtEnd)
833 : {
834 1028 : poFile->Seek(0, SEEK_END);
835 1028 : const auto nFileSize = poFile->Tell();
836 1028 : vsi_l_offset nIndexSize =
837 : static_cast<vsi_l_offset>(nInnerChunkCount) * sizeof(Location);
838 1028 : if (m_bIndexHasCRC32)
839 1025 : nIndexSize += sizeof(uint32_t);
840 1028 : if (nFileSize < nIndexSize)
841 : {
842 0 : CPLError(CE_Failure, CPLE_AppDefined,
843 : "ZarrV3CodecShardingIndexed::DecodePartial(): shard file "
844 : "too small");
845 0 : return false;
846 : }
847 1028 : nLocationOffset += nFileSize - nIndexSize;
848 : }
849 :
850 : Location loc;
851 2056 : if (poFile->Seek(nLocationOffset, SEEK_SET) != 0 ||
852 1028 : poFile->Read(&loc, 1, sizeof(loc)) != sizeof(loc))
853 : {
854 :
855 0 : CPLError(CE_Failure, CPLE_AppDefined,
856 : "ZarrV3CodecShardingIndexed::DecodePartial(): "
857 : "cannot read index for chunk %" PRIu64,
858 : static_cast<uint64_t>(nInnerChunkIdx));
859 0 : return false;
860 : }
861 :
862 2053 : if (!m_poIndexCodecSequence->GetCodecs().empty() &&
863 1025 : m_poIndexCodecSequence->GetCodecs().front()->GetName() ==
864 2053 : ZarrV3CodecBytes::NAME &&
865 0 : !m_poIndexCodecSequence->GetCodecs().front()->IsNoOp())
866 : {
867 0 : CPL_SWAP64PTR(&(loc.nOffset));
868 0 : CPL_SWAP64PTR(&(loc.nSize));
869 : }
870 :
871 1033 : if (loc.nOffset == std::numeric_limits<uint64_t>::max() &&
872 5 : loc.nSize == std::numeric_limits<uint64_t>::max())
873 : {
874 : // Empty chunk
875 : try
876 : {
877 5 : abyDst.resize(nExpectedDecodedChunkSize);
878 : }
879 0 : catch (const std::exception &)
880 : {
881 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
882 : "Cannot allocate memory for decoded shard");
883 0 : return false;
884 : }
885 5 : FillWithNoData(abyDst, MultiplyElements(anCount),
886 5 : m_oInputArrayMetadata);
887 5 : return true;
888 : }
889 :
890 1023 : constexpr size_t THRESHOLD = 10 * 1024 * 1024;
891 1023 : if (loc.nSize > THRESHOLD)
892 : {
893 : // When the chunk size is above a certain threshold, check it against
894 : // the actual file size to avoid excessive memory allocation attempts.
895 :
896 0 : poFile->Seek(0, SEEK_END);
897 0 : const auto nFileSize = poFile->Tell();
898 :
899 0 : if (loc.nOffset >= nFileSize || loc.nSize > nFileSize - loc.nOffset)
900 : {
901 0 : CPLError(
902 : CE_Failure, CPLE_NotSupported,
903 : "ZarrV3CodecShardingIndexed::DecodePartial(): invalid chunk "
904 : "location for chunk %" PRIu64 ": offset=%" PRIu64
905 : ", size=%" PRIu64,
906 : static_cast<uint64_t>(nInnerChunkIdx), loc.nOffset, loc.nSize);
907 0 : return false;
908 : }
909 : }
910 :
911 : if constexpr (sizeof(size_t) < sizeof(uint64_t))
912 : {
913 : #ifndef __COVERITY__
914 : // coverity[result_independent_of_operands]
915 : if (loc.nSize > std::numeric_limits<size_t>::max())
916 : {
917 : CPLError(
918 : CE_Failure, CPLE_NotSupported,
919 : "ZarrV3CodecShardingIndexed::DecodePartial(): too large chunk "
920 : "size for chunk %" PRIu64 " for this platform: size=%" PRIu64,
921 : static_cast<uint64_t>(nInnerChunkIdx), loc.nSize);
922 : return false;
923 : }
924 : #endif
925 : }
926 :
927 : try
928 : {
929 1023 : abyDst.resize(static_cast<size_t>(loc.nSize));
930 : }
931 0 : catch (const std::exception &)
932 : {
933 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
934 : "Cannot allocate memory for decoded shard");
935 0 : return false;
936 : }
937 :
938 2046 : if (poFile->Seek(loc.nOffset, SEEK_SET) != 0 ||
939 1023 : poFile->Read(abyDst.data(), 1, abyDst.size()) != abyDst.size())
940 : {
941 0 : CPLError(CE_Failure, CPLE_NotSupported,
942 : "ZarrV3CodecShardingIndexed::DecodePartial(): cannot read "
943 : "data for chunk %" PRIu64 ": offset=%" PRIu64
944 : ", size=%" PRIu64,
945 : static_cast<uint64_t>(nInnerChunkIdx), loc.nOffset, loc.nSize);
946 0 : return false;
947 : }
948 :
949 1023 : if (!m_poCodecSequence->Decode(abyDst))
950 : {
951 356 : CPLError(CE_Failure, CPLE_NotSupported,
952 : "ZarrV3CodecShardingIndexed::DecodePartial(): cannot decode "
953 : "chunk %" PRIu64,
954 : static_cast<uint64_t>(nInnerChunkIdx));
955 356 : return false;
956 : }
957 :
958 667 : if (abyDst.size() != nExpectedDecodedChunkSize)
959 : {
960 0 : CPLError(
961 : CE_Failure, CPLE_NotSupported,
962 : "ZarrV3CodecShardingIndexed::DecodePartial(): decoded size for "
963 : "chunk %" PRIu64 " is %" PRIu64 " whereas %" PRIu64 " is expected",
964 : static_cast<uint64_t>(nInnerChunkIdx),
965 0 : static_cast<uint64_t>(abyDst.size()),
966 : static_cast<uint64_t>(nExpectedDecodedChunkSize));
967 0 : return false;
968 : }
969 :
970 667 : return true;
971 : }
972 :
973 : /************************************************************************/
974 : /* ZarrV3CodecShardingIndexed::BatchDecodePartial() */
975 : /************************************************************************/
976 :
977 4023 : bool ZarrV3CodecShardingIndexed::BatchDecodePartial(
978 : VSIVirtualHandle *poFile, const char *pszFilename,
979 : const std::vector<std::pair<std::vector<size_t>, std::vector<size_t>>>
980 : &anRequests,
981 : std::vector<ZarrByteVectorQuickResize> &aResults)
982 : {
983 4023 : if (anRequests.empty())
984 0 : return true;
985 :
986 4023 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
987 :
988 : // --- Compute inner chunk count and per-request inner chunk indices ---
989 4023 : size_t nInnerChunkCount = 1;
990 12076 : for (size_t i = 0; i < m_oInputArrayMetadata.anBlockSizes.size(); ++i)
991 : {
992 8053 : nInnerChunkCount *=
993 8053 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
994 : }
995 :
996 : // Determine whether index codec requires byte-swapping
997 : const bool bSwapIndex =
998 8043 : !m_poIndexCodecSequence->GetCodecs().empty() &&
999 4020 : m_poIndexCodecSequence->GetCodecs().front()->GetName() ==
1000 8043 : ZarrV3CodecBytes::NAME &&
1001 1 : !m_poIndexCodecSequence->GetCodecs().front()->IsNoOp();
1002 :
1003 : // Compute index base offset. For index-at-end, we need the file size.
1004 4023 : vsi_l_offset nIndexBaseOffset = 0;
1005 4023 : if (m_bIndexLocationAtEnd)
1006 : {
1007 4023 : poFile->Seek(0, SEEK_END);
1008 4023 : const auto nFileSize = poFile->Tell();
1009 4023 : vsi_l_offset nIndexSize =
1010 : static_cast<vsi_l_offset>(nInnerChunkCount) * sizeof(Location);
1011 4023 : if (m_bIndexHasCRC32)
1012 4020 : nIndexSize += sizeof(uint32_t);
1013 4023 : if (nFileSize < nIndexSize)
1014 : {
1015 0 : CPLError(CE_Failure, CPLE_AppDefined,
1016 : "BatchDecodePartial: shard file too small");
1017 0 : return false;
1018 : }
1019 4023 : nIndexBaseOffset = nFileSize - nIndexSize;
1020 : }
1021 :
1022 : // Build per-request inner chunk indices
1023 8046 : std::vector<size_t> anInnerChunkIndices(anRequests.size());
1024 16115 : for (size_t iReq = 0; iReq < anRequests.size(); ++iReq)
1025 : {
1026 12092 : const auto &anStartIdx = anRequests[iReq].first;
1027 12092 : CPLAssert(anStartIdx.size() ==
1028 : m_oInputArrayMetadata.anBlockSizes.size());
1029 :
1030 12092 : size_t nInnerChunkIdx = 0;
1031 36322 : for (size_t i = 0; i < anStartIdx.size(); ++i)
1032 : {
1033 : const size_t nCountInnerChunksThisDim =
1034 24230 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
1035 48460 : nInnerChunkIdx = nInnerChunkIdx * nCountInnerChunksThisDim +
1036 24230 : anStartIdx[i] / m_anInnerBlockSize[i];
1037 : }
1038 12092 : anInnerChunkIndices[iReq] = nInnerChunkIdx;
1039 : }
1040 :
1041 : // --- Pass 1: read index entries, using per-shard cache when possible ---
1042 8046 : std::vector<Location> aLocations(anRequests.size());
1043 :
1044 4023 : const size_t nIndexBytes = nInnerChunkCount * sizeof(Location);
1045 : const size_t nMaxIndexBytes =
1046 4023 : static_cast<size_t>(CPLAtoGIntBig(CPLGetConfigOption(
1047 4023 : "GDAL_ZARR_SHARD_INDEX_CACHE_MAX_BYTES", "1048576")));
1048 4023 : if (pszFilename != nullptr && nIndexBytes <= nMaxIndexBytes)
1049 : {
1050 : // Try to serve from the in-process index cache.
1051 : // g_oShardIndexCache serialises its own access via std::mutex.
1052 4023 : std::vector<GByte> abyIndexBuf;
1053 4023 : bool bCacheHit = g_oShardIndexCache.tryGet(pszFilename, abyIndexBuf);
1054 4023 : if (bCacheHit && abyIndexBuf.size() != nIndexBytes)
1055 0 : bCacheHit = false; // stale entry from different layout
1056 :
1057 4023 : if (!bCacheHit)
1058 : {
1059 : // Cache miss: read the full index in one contiguous I/O call.
1060 78 : abyIndexBuf.resize(nIndexBytes);
1061 78 : poFile->Seek(nIndexBaseOffset, SEEK_SET);
1062 78 : if (poFile->Read(abyIndexBuf.data(), nIndexBytes, 1) != 1)
1063 : {
1064 0 : CPLError(CE_Failure, CPLE_AppDefined,
1065 : "BatchDecodePartial: failed to read shard index");
1066 0 : return false;
1067 : }
1068 78 : if (bSwapIndex)
1069 : {
1070 5 : for (size_t j = 0; j < nInnerChunkCount; ++j)
1071 : {
1072 4 : GByte *p = abyIndexBuf.data() + j * sizeof(Location);
1073 4 : CPL_SWAP64PTR(p);
1074 4 : CPL_SWAP64PTR(p + sizeof(uint64_t));
1075 : }
1076 : }
1077 78 : g_oShardIndexCache.insert(pszFilename, abyIndexBuf);
1078 : }
1079 :
1080 16115 : for (size_t i = 0; i < anRequests.size(); ++i)
1081 12092 : memcpy(&aLocations[i],
1082 12092 : abyIndexBuf.data() +
1083 12092 : anInnerChunkIndices[i] * sizeof(Location),
1084 4023 : sizeof(Location));
1085 : }
1086 : else
1087 : {
1088 : // Shard index too large for cache or no filename: fall back to
1089 : // per-entry ReadMultiRange.
1090 0 : std::vector<vsi_l_offset> anIdxOffsets(anRequests.size());
1091 0 : std::vector<size_t> anIdxSizes(anRequests.size(), sizeof(Location));
1092 0 : std::vector<void *> ppIdxData(anRequests.size());
1093 :
1094 0 : for (size_t i = 0; i < anRequests.size(); ++i)
1095 : {
1096 0 : anIdxOffsets[i] = nIndexBaseOffset + static_cast<vsi_l_offset>(
1097 0 : anInnerChunkIndices[i]) *
1098 : sizeof(Location);
1099 : #ifndef __COVERITY__
1100 0 : ppIdxData[i] = &aLocations[i];
1101 : #endif
1102 : }
1103 :
1104 0 : if (poFile->ReadMultiRange(static_cast<int>(anRequests.size()),
1105 0 : ppIdxData.data(), anIdxOffsets.data(),
1106 0 : anIdxSizes.data()) != 0)
1107 : {
1108 0 : CPLError(CE_Failure, CPLE_AppDefined,
1109 : "BatchDecodePartial: ReadMultiRange() failed for index");
1110 0 : return false;
1111 : }
1112 :
1113 0 : if (bSwapIndex)
1114 : {
1115 0 : for (auto &loc : aLocations)
1116 : {
1117 0 : CPL_SWAP64PTR(&(loc.nOffset));
1118 0 : CPL_SWAP64PTR(&(loc.nSize));
1119 : }
1120 : }
1121 : }
1122 :
1123 : // --- Classify requests: empty chunks vs data chunks ---
1124 4023 : aResults.resize(anRequests.size());
1125 :
1126 : struct DataRange
1127 : {
1128 : size_t nReqIdx;
1129 : };
1130 :
1131 8046 : std::vector<DataRange> aDataRanges;
1132 8046 : std::vector<vsi_l_offset> anDataOffsets;
1133 8046 : std::vector<size_t> anDataSizes;
1134 :
1135 16115 : for (size_t iReq = 0; iReq < anRequests.size(); ++iReq)
1136 : {
1137 12092 : const auto &anCount = anRequests[iReq].second;
1138 : const auto nExpectedDecodedChunkSize =
1139 12092 : nDTSize * MultiplyElements(anCount);
1140 12092 : const Location &loc = aLocations[iReq];
1141 :
1142 12092 : if (loc.nOffset == std::numeric_limits<uint64_t>::max() &&
1143 0 : loc.nSize == std::numeric_limits<uint64_t>::max())
1144 : {
1145 : // Empty chunk - fill with nodata
1146 : try
1147 : {
1148 0 : aResults[iReq].resize(nExpectedDecodedChunkSize);
1149 : }
1150 0 : catch (const std::exception &)
1151 : {
1152 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
1153 : "Cannot allocate memory for decoded chunk");
1154 0 : return false;
1155 : }
1156 0 : FillWithNoData(aResults[iReq], MultiplyElements(anCount),
1157 0 : m_oInputArrayMetadata);
1158 0 : continue;
1159 : }
1160 :
1161 : if constexpr (sizeof(size_t) < sizeof(uint64_t))
1162 : {
1163 : #ifndef __COVERITY__
1164 : // coverity[result_independent_of_operands]
1165 : if (loc.nSize > std::numeric_limits<size_t>::max())
1166 : {
1167 : CPLError(CE_Failure, CPLE_NotSupported,
1168 : "BatchDecodePartial: too large chunk size");
1169 : return false;
1170 : }
1171 : #endif
1172 : }
1173 :
1174 12092 : aDataRanges.push_back({iReq});
1175 12092 : anDataOffsets.push_back(loc.nOffset);
1176 12092 : anDataSizes.push_back(static_cast<size_t>(loc.nSize));
1177 : }
1178 :
1179 4023 : if (aDataRanges.empty())
1180 0 : return true;
1181 :
1182 : // Validate against file size (same threshold as DecodePartial)
1183 4023 : constexpr size_t THRESHOLD = 10 * 1024 * 1024;
1184 : {
1185 4023 : uint64_t nAccSize = 0;
1186 16115 : for (const auto &nSize : anDataSizes)
1187 : {
1188 12092 : if (nAccSize > std::numeric_limits<uint64_t>::max() - nSize)
1189 : {
1190 0 : nAccSize = std::numeric_limits<uint64_t>::max();
1191 0 : break;
1192 : }
1193 12092 : nAccSize += nSize;
1194 : }
1195 4023 : if (nAccSize > THRESHOLD)
1196 : {
1197 0 : poFile->Seek(0, SEEK_END);
1198 0 : const auto nFileSize = poFile->Tell();
1199 0 : for (size_t i = 0; i < aDataRanges.size(); ++i)
1200 : {
1201 0 : if (anDataOffsets[i] >= nFileSize ||
1202 0 : anDataSizes[i] > nFileSize - anDataOffsets[i])
1203 : {
1204 0 : CPLError(CE_Failure, CPLE_NotSupported,
1205 : "BatchDecodePartial: invalid chunk location: "
1206 : "offset=%" PRIu64 ", size=%" PRIu64,
1207 0 : static_cast<uint64_t>(anDataOffsets[i]),
1208 0 : static_cast<uint64_t>(anDataSizes[i]));
1209 0 : return false;
1210 : }
1211 : }
1212 : }
1213 : }
1214 :
1215 : // --- Pass 2: ReadMultiRange for data chunks ---
1216 8046 : std::vector<ZarrByteVectorQuickResize> aCompressed(aDataRanges.size());
1217 8046 : std::vector<void *> ppData(aDataRanges.size());
1218 :
1219 16115 : for (size_t i = 0; i < aDataRanges.size(); ++i)
1220 : {
1221 : try
1222 : {
1223 12092 : aCompressed[i].resize(anDataSizes[i]);
1224 : }
1225 0 : catch (const std::exception &)
1226 : {
1227 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
1228 : "Cannot allocate memory for compressed chunk");
1229 0 : return false;
1230 : }
1231 12092 : ppData[i] = aCompressed[i].data();
1232 : }
1233 :
1234 4023 : CPLDebugOnly("ZARR",
1235 : "BatchDecodePartial: ReadMultiRange() with %d data ranges",
1236 : static_cast<int>(aDataRanges.size()));
1237 :
1238 4023 : if (poFile->ReadMultiRange(static_cast<int>(aDataRanges.size()),
1239 4023 : ppData.data(), anDataOffsets.data(),
1240 8046 : anDataSizes.data()) != 0)
1241 : {
1242 0 : CPLError(CE_Failure, CPLE_AppDefined,
1243 : "BatchDecodePartial: ReadMultiRange() failed for data");
1244 0 : return false;
1245 : }
1246 :
1247 : // --- Decode compressed chunks (parallel when GDAL_NUM_THREADS > 1) ---
1248 4023 : const int nMaxThreads = GDALGetNumThreads();
1249 4023 : const int nChunks = static_cast<int>(aDataRanges.size());
1250 4023 : const int nThreads = std::min(std::max(1, nMaxThreads), nChunks);
1251 :
1252 : // Try parallel decode when multiple threads are available
1253 0 : CPLWorkerThreadPool *wtp = (nThreads > 1 && nChunks > 1)
1254 4023 : ? GDALGetGlobalThreadPool(nMaxThreads)
1255 4023 : : nullptr;
1256 :
1257 4023 : if (!wtp)
1258 : {
1259 : // Sequential fallback
1260 15182 : for (size_t i = 0; i < aDataRanges.size(); ++i)
1261 : {
1262 11515 : const size_t iReq = aDataRanges[i].nReqIdx;
1263 11515 : const auto &anCount = anRequests[iReq].second;
1264 : const auto nExpectedDecodedChunkSize =
1265 11515 : nDTSize * MultiplyElements(anCount);
1266 :
1267 11515 : if (!m_poCodecSequence->Decode(aCompressed[i]))
1268 : {
1269 356 : CPLError(CE_Failure, CPLE_NotSupported,
1270 : "BatchDecodePartial: cannot decode chunk %" PRIu64,
1271 356 : static_cast<uint64_t>(anInnerChunkIndices[iReq]));
1272 356 : return false;
1273 : }
1274 :
1275 11159 : if (aCompressed[i].size() != nExpectedDecodedChunkSize)
1276 : {
1277 0 : CPLError(CE_Failure, CPLE_NotSupported,
1278 : "BatchDecodePartial: decoded size %" PRIu64
1279 : " != expected %" PRIu64,
1280 0 : static_cast<uint64_t>(aCompressed[i].size()),
1281 : static_cast<uint64_t>(nExpectedDecodedChunkSize));
1282 0 : return false;
1283 : }
1284 :
1285 11159 : aResults[iReq] = std::move(aCompressed[i]);
1286 : }
1287 3667 : return true;
1288 : }
1289 :
1290 0 : CPLDebugOnly("ZARR",
1291 : "BatchDecodePartial: parallel decode with %d threads "
1292 : "for %d chunks",
1293 : nThreads, nChunks);
1294 :
1295 : {
1296 0 : bool bGlobalOK = true;
1297 0 : std::mutex oMutex;
1298 :
1299 : // Clone codecs per thread on the main thread (Clone() is not
1300 : // thread-safe due to JSON object cloning)
1301 0 : std::vector<std::unique_ptr<ZarrV3CodecSequence>> apoCodecs(nThreads);
1302 0 : for (int t = 0; t < nThreads; ++t)
1303 0 : apoCodecs[t] = m_poCodecSequence->Clone();
1304 :
1305 0 : auto poJobQueue = wtp->CreateJobQueue();
1306 0 : for (int t = 0; t < nThreads; ++t)
1307 : {
1308 0 : const int iFirst =
1309 0 : static_cast<int>(static_cast<int64_t>(t) * nChunks / nThreads);
1310 0 : const int iEnd = static_cast<int>(static_cast<int64_t>(t + 1) *
1311 0 : nChunks / nThreads);
1312 :
1313 0 : poJobQueue->SubmitJob(
1314 0 : [iFirst, iEnd, t, &aDataRanges, &anRequests, &aCompressed,
1315 0 : &aResults, &apoCodecs, &bGlobalOK, &oMutex, nDTSize]()
1316 : {
1317 0 : for (int i = iFirst; i < iEnd; ++i)
1318 : {
1319 : {
1320 0 : std::lock_guard<std::mutex> oLock(oMutex);
1321 0 : if (!bGlobalOK)
1322 0 : return;
1323 : }
1324 :
1325 0 : const size_t iReq = aDataRanges[i].nReqIdx;
1326 0 : const auto &anCount = anRequests[iReq].second;
1327 : const auto nExpected =
1328 0 : nDTSize * MultiplyElements(anCount);
1329 :
1330 0 : if (!apoCodecs[t]->Decode(aCompressed[i]) ||
1331 0 : aCompressed[i].size() != nExpected)
1332 : {
1333 0 : std::lock_guard<std::mutex> oLock(oMutex);
1334 0 : bGlobalOK = false;
1335 0 : return;
1336 : }
1337 :
1338 : // Each job writes to a unique iReq slot - no lock
1339 0 : aResults[iReq] = std::move(aCompressed[i]);
1340 : }
1341 : });
1342 : }
1343 0 : poJobQueue->WaitCompletion();
1344 :
1345 0 : if (!bGlobalOK)
1346 : {
1347 0 : CPLError(CE_Failure, CPLE_NotSupported,
1348 : "BatchDecodePartial: parallel decode failed");
1349 0 : return false;
1350 : }
1351 : }
1352 :
1353 0 : return true;
1354 : }
1355 :
1356 : /************************************************************************/
1357 : /* ZarrV3CodecShardingIndexed::GetInnerMostBlockSize() */
1358 : /************************************************************************/
1359 :
1360 552 : std::vector<size_t> ZarrV3CodecShardingIndexed::GetInnerMostBlockSize(
1361 : const std::vector<size_t> &) const
1362 : {
1363 552 : return m_anInnerBlockSize;
1364 : // TODO if we one day properly support nested sharding
1365 : // return m_poCodecSequence->GetInnerMostBlockSize(m_anInnerBlockSize);
1366 : }
|