Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: GDAL
4 : * Purpose: Zarr driver, "sharding_indexed" codec
5 : * Author: Even Rouault <even dot rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2026, Development Seed
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "zarr_v3_codec.h"
14 :
15 : #include "cpl_mem_cache.h"
16 : #include "cpl_vsi_virtual.h"
17 : #include "cpl_worker_thread_pool.h"
18 : #include "gdal_thread_pool.h"
19 :
20 : #include <algorithm>
21 : #include <cinttypes>
22 : #include <limits>
23 : #include <mutex>
24 :
25 : // Process-wide LRU cache for shard indices.
26 : // Key: shard filepath/URL. Value: flat byte buffer of Location entries
27 : // (each 16 bytes: uint64_t nOffset + uint64_t nSize, host byte order).
28 : // Populated lazily by BatchDecodePartial.
29 : // Shards whose index exceeds GDAL_ZARR_SHARD_INDEX_CACHE_MAX_BYTES
30 : // (default 1 MiB) fall back to per-entry ReadMultiRange.
31 : // Thread safety: lru11::Cache<..., std::mutex> serialises all access.
32 : static constexpr size_t SHARD_INDEX_CACHE_ENTRIES = 64;
33 : static lru11::Cache<std::string, std::vector<GByte>, std::mutex>
34 : g_oShardIndexCache{SHARD_INDEX_CACHE_ENTRIES, 0};
35 :
36 : // Implements https://zarr-specs.readthedocs.io/en/latest/v3/codecs/sharding-indexed/index.html
37 :
38 4 : void ZarrClearShardIndexCache()
39 : {
40 4 : g_oShardIndexCache.clear();
41 4 : }
42 :
43 37 : void ZarrEraseShardIndexFromCache(const std::string &osFilename)
44 : {
45 37 : g_oShardIndexCache.remove(osFilename);
46 37 : }
47 :
48 : /************************************************************************/
49 : /* ZarrV3CodecShardingIndexed() */
50 : /************************************************************************/
51 :
52 696 : ZarrV3CodecShardingIndexed::ZarrV3CodecShardingIndexed() : ZarrV3Codec(NAME)
53 : {
54 696 : }
55 :
56 : /************************************************************************/
57 : /* ZarrV3CodecShardingIndexed::Clone() */
58 : /************************************************************************/
59 :
60 8 : std::unique_ptr<ZarrV3Codec> ZarrV3CodecShardingIndexed::Clone() const
61 : {
62 16 : auto psClone = std::make_unique<ZarrV3CodecShardingIndexed>();
63 16 : ZarrArrayMetadata oOutputArrayMetadata;
64 8 : psClone->InitFromConfiguration(m_oConfiguration, m_oInputArrayMetadata,
65 : oOutputArrayMetadata,
66 : /* bEmitWarnings = */ false);
67 16 : return psClone;
68 : }
69 :
70 : /************************************************************************/
71 : /* ZarrV3CodecShardingIndexed::InitFromConfiguration() */
72 : /************************************************************************/
73 :
74 696 : bool ZarrV3CodecShardingIndexed::InitFromConfiguration(
75 : const CPLJSONObject &configuration,
76 : const ZarrArrayMetadata &oInputArrayMetadata,
77 : ZarrArrayMetadata &oOutputArrayMetadata, bool bEmitWarnings)
78 : {
79 696 : if (oInputArrayMetadata.anBlockSizes.empty())
80 : {
81 0 : CPLError(
82 : CE_Failure, CPLE_AppDefined,
83 : "Codec sharding_indexed: sharding not supported for scalar array");
84 0 : return false;
85 : }
86 :
87 696 : m_oConfiguration = configuration.Clone();
88 696 : m_oInputArrayMetadata = oInputArrayMetadata;
89 :
90 1391 : if (!configuration.IsValid() ||
91 695 : configuration.GetType() != CPLJSONObject::Type::Object)
92 : {
93 2 : CPLError(
94 : CE_Failure, CPLE_AppDefined,
95 : "Codec sharding_indexed: configuration missing or not an object");
96 2 : return false;
97 : }
98 :
99 2082 : const auto oChunkShape = configuration["chunk_shape"].ToArray();
100 1386 : if (!oChunkShape.IsValid() ||
101 692 : oChunkShape.GetType() != CPLJSONObject::Type::Array)
102 : {
103 2 : CPLError(CE_Failure, CPLE_AppDefined,
104 : "Codec sharding_indexed: configuration.chunk_shape missing or "
105 : "not an array");
106 2 : return false;
107 : }
108 1384 : if (static_cast<size_t>(oChunkShape.Size()) !=
109 692 : m_oInputArrayMetadata.anBlockSizes.size())
110 : {
111 1 : CPLError(CE_Failure, CPLE_AppDefined,
112 : "Codec sharding_indexed: configuration.chunk_shape should "
113 : "have the same shape as the array");
114 1 : return false;
115 : }
116 1382 : std::vector<size_t> anCountInnerChunks;
117 2069 : for (int i = 0; i < oChunkShape.Size(); ++i)
118 : {
119 2762 : if (oChunkShape[i].GetType() != CPLJSONObject::Type::Integer &&
120 1381 : oChunkShape[i].GetType() != CPLJSONObject::Type::Long)
121 : {
122 0 : CPLError(CE_Failure, CPLE_AppDefined,
123 : "Codec sharding_indexed: configuration.chunk_shape[%d] "
124 : "should be an integer",
125 : i);
126 0 : return false;
127 : }
128 1381 : const int64_t nVal = oChunkShape[i].ToLong();
129 2760 : if (nVal <= 0 ||
130 1379 : static_cast<uint64_t>(nVal) >
131 2760 : m_oInputArrayMetadata.anBlockSizes[i] ||
132 1379 : (m_oInputArrayMetadata.anBlockSizes[i] % nVal) != 0)
133 : {
134 3 : CPLError(
135 : CE_Failure, CPLE_AppDefined,
136 : "Codec sharding_indexed: configuration.chunk_shape[%d]=%" PRId64
137 : " should be a strictly positive value that is a divisor of "
138 : "%" PRIu64,
139 : i, nVal,
140 3 : static_cast<uint64_t>(m_oInputArrayMetadata.anBlockSizes[i]));
141 3 : return false;
142 : }
143 : #ifndef __COVERITY__
144 : // The following cast is safe since ZarrArray::ParseChunkSize() has
145 : // previously validated that m_oInputArrayMetadata.anBlockSizes[i] fits
146 : // on size_t
147 : if constexpr (sizeof(size_t) < sizeof(uint64_t))
148 : {
149 : // coverity[result_independent_of_operands]
150 : CPLAssert(nVal <= std::numeric_limits<size_t>::max());
151 : }
152 : #endif
153 1378 : m_anInnerBlockSize.push_back(static_cast<size_t>(nVal));
154 1378 : anCountInnerChunks.push_back(
155 1378 : static_cast<size_t>(m_oInputArrayMetadata.anBlockSizes[i] / nVal));
156 : }
157 :
158 2064 : const auto oCodecs = configuration["codecs"];
159 688 : if (!oCodecs.IsValid() || oCodecs.GetType() != CPLJSONObject::Type::Array)
160 : {
161 2 : CPLError(CE_Failure, CPLE_AppDefined,
162 : "Codec sharding_indexed: configuration.codecs missing or "
163 : "not an array");
164 2 : return false;
165 : }
166 686 : if (oCodecs.ToArray().Size() == 0)
167 : {
168 1 : CPLError(CE_Failure, CPLE_AppDefined,
169 : "Codec sharding_indexed: configuration.codecs[] is empty");
170 1 : return false;
171 : }
172 1370 : ZarrArrayMetadata inputArrayMetadataCodecs = m_oInputArrayMetadata;
173 685 : inputArrayMetadataCodecs.anBlockSizes = m_anInnerBlockSize;
174 : m_poCodecSequence =
175 685 : std::make_unique<ZarrV3CodecSequence>(inputArrayMetadataCodecs);
176 685 : if (!m_poCodecSequence->InitFromJson(oCodecs, oOutputArrayMetadata))
177 : {
178 1 : CPLError(CE_Failure, CPLE_AppDefined,
179 : "Codec sharding_indexed: initialization of codecs failed");
180 1 : return false;
181 : }
182 :
183 684 : if (bEmitWarnings && m_poCodecSequence->SupportsPartialDecoding())
184 : {
185 : // Implementation limitation
186 1 : CPLError(CE_Warning, CPLE_AppDefined,
187 : "Nested sharding detected. For now, partial decoding is only "
188 : "implemented on the outer-most shard level");
189 : }
190 :
191 2052 : const auto oIndexCodecs = configuration["index_codecs"];
192 1367 : if (!oIndexCodecs.IsValid() ||
193 683 : oIndexCodecs.GetType() != CPLJSONObject::Type::Array)
194 : {
195 2 : CPLError(
196 : CE_Failure, CPLE_AppDefined,
197 : "Codec sharding_indexed: configuration.index_codecs missing or "
198 : "not an array");
199 2 : return false;
200 : }
201 682 : if (oIndexCodecs.ToArray().Size() == 0)
202 : {
203 1 : CPLError(
204 : CE_Failure, CPLE_AppDefined,
205 : "Codec sharding_indexed: configuration.index_codecs[] is empty");
206 1 : return false;
207 : }
208 1362 : ZarrArrayMetadata inputArrayMetadataIndex;
209 681 : inputArrayMetadataIndex.oElt.nativeType =
210 : DtypeElt::NativeType::UNSIGNED_INT;
211 681 : inputArrayMetadataIndex.oElt.nativeSize = sizeof(uint64_t);
212 : inputArrayMetadataIndex.oElt.gdalType =
213 681 : GDALExtendedDataType::Create(GDT_UInt64);
214 681 : inputArrayMetadataIndex.oElt.gdalSize = sizeof(uint64_t);
215 681 : inputArrayMetadataIndex.anBlockSizes = std::move(anCountInnerChunks);
216 : // 2 for offset and size
217 681 : inputArrayMetadataIndex.anBlockSizes.push_back(2);
218 : m_poIndexCodecSequence =
219 681 : std::make_unique<ZarrV3CodecSequence>(inputArrayMetadataIndex);
220 1362 : ZarrArrayMetadata oOutputArrayMetadataIndex;
221 681 : if (!m_poIndexCodecSequence->InitFromJson(oIndexCodecs,
222 : oOutputArrayMetadataIndex))
223 : {
224 1 : CPLError(
225 : CE_Failure, CPLE_AppDefined,
226 : "Codec sharding_indexed: initialization of index_codecs failed");
227 1 : return false;
228 : }
229 680 : const auto &indexCodecs = m_poIndexCodecSequence->GetCodecs();
230 680 : if (indexCodecs.empty())
231 : {
232 : // ok, there is only a "bytes" codec, optimized away if the order
233 : // is the one of the native architecture
234 : }
235 1074 : else if (indexCodecs[0]->GetName() == ZarrV3CodecBytes::NAME ||
236 536 : indexCodecs[0]->GetName() == ZarrV3CodecCRC32C::NAME)
237 : {
238 : // ok
239 : }
240 1 : else if (indexCodecs.size() == 2 &&
241 0 : indexCodecs[1]->GetName() == ZarrV3CodecCRC32C::NAME)
242 : {
243 : // ok
244 : }
245 : else
246 : {
247 1 : CPLError(CE_Failure, CPLE_NotSupported,
248 : "Codec sharding_indexed: this implementation only supports "
249 : "Bytes, possibly followed by CRC32C, as index_codecs");
250 1 : return false;
251 : }
252 679 : m_bIndexHasCRC32 = (!indexCodecs.empty() && indexCodecs.back()->GetName() ==
253 : ZarrV3CodecCRC32C::NAME);
254 :
255 : const std::string osIndexLocation =
256 2037 : configuration.GetString("index_location", "end");
257 679 : if (osIndexLocation != "start" && osIndexLocation != "end")
258 : {
259 1 : CPLError(CE_Failure, CPLE_AppDefined,
260 : "Codec sharding_indexed: invalid value for index_location");
261 1 : return false;
262 : }
263 678 : m_bIndexLocationAtEnd = (osIndexLocation == "end");
264 :
265 678 : return true;
266 : }
267 :
268 : /************************************************************************/
269 : /* ExtractSubArrayFromLargerOne() */
270 : /************************************************************************/
271 :
272 : // Inverse of CopySubArrayIntoLargerOne(): extract a contiguous inner chunk
273 : // from its position in the larger shard buffer.
274 : static void
275 153 : ExtractSubArrayFromLargerOne(const ZarrByteVectorQuickResize &abySrc,
276 : const std::vector<size_t> &anSrcBlockSize,
277 : const std::vector<size_t> &anInnerBlockSize,
278 : const std::vector<size_t> &anInnerBlockIndices,
279 : ZarrByteVectorQuickResize &abyChunk,
280 : const size_t nDTSize)
281 : {
282 153 : const auto nDims = anInnerBlockSize.size();
283 153 : CPLAssert(nDims > 0);
284 153 : CPLAssert(nDims == anInnerBlockIndices.size());
285 153 : CPLAssert(nDims == anSrcBlockSize.size());
286 : // +1 to avoid gcc -Wnull-dereference false positives
287 306 : std::vector<const GByte *> srcPtrStack(nDims + 1);
288 306 : std::vector<size_t> count(nDims + 1);
289 306 : std::vector<size_t> srcStride(nDims + 1);
290 :
291 153 : size_t nSrcStride = nDTSize;
292 483 : for (size_t iDim = nDims; iDim > 0;)
293 : {
294 330 : --iDim;
295 330 : srcStride[iDim] = nSrcStride;
296 330 : nSrcStride *= anSrcBlockSize[iDim];
297 : }
298 :
299 153 : srcPtrStack[0] = abySrc.data();
300 483 : for (size_t iDim = 0; iDim < nDims; ++iDim)
301 : {
302 330 : CPLAssert((anInnerBlockIndices[iDim] + 1) * anInnerBlockSize[iDim] <=
303 : anSrcBlockSize[iDim]);
304 660 : srcPtrStack[0] += anInnerBlockIndices[iDim] * anInnerBlockSize[iDim] *
305 330 : srcStride[iDim];
306 : }
307 153 : GByte *pabyDst = abyChunk.data();
308 :
309 153 : const size_t nLastDimSize = anInnerBlockSize.back() * nDTSize;
310 153 : size_t dimIdx = 0;
311 1025 : lbl_next_depth:
312 1025 : if (dimIdx + 1 == nDims)
313 : {
314 848 : memcpy(pabyDst, srcPtrStack[dimIdx], nLastDimSize);
315 848 : pabyDst += nLastDimSize;
316 : }
317 : else
318 : {
319 177 : count[dimIdx] = anInnerBlockSize[dimIdx];
320 : while (true)
321 : {
322 872 : dimIdx++;
323 872 : srcPtrStack[dimIdx] = srcPtrStack[dimIdx - 1];
324 872 : goto lbl_next_depth;
325 872 : lbl_return_to_caller:
326 872 : dimIdx--;
327 872 : if (--count[dimIdx] == 0)
328 177 : break;
329 695 : srcPtrStack[dimIdx] += srcStride[dimIdx];
330 : }
331 : }
332 1025 : if (dimIdx > 0)
333 872 : goto lbl_return_to_caller;
334 153 : }
335 :
336 : /************************************************************************/
337 : /* IsAllNoData() */
338 : /************************************************************************/
339 :
340 : // Check if a buffer consists entirely of the fill value.
341 153 : static bool IsAllNoData(const ZarrByteVectorQuickResize &abyChunk,
342 : const ZarrArrayMetadata &metadata)
343 : {
344 153 : const size_t nDTSize = metadata.oElt.nativeSize;
345 153 : const size_t nBytes = abyChunk.size();
346 :
347 157 : if (metadata.abyNoData.empty() ||
348 157 : metadata.abyNoData == std::vector<GByte>(nDTSize, 0))
349 : {
350 : // Zero fill value: byte-by-byte check (compiler auto-vectorizes)
351 149 : const GByte *p = abyChunk.data();
352 4700 : for (size_t i = 0; i < nBytes; ++i)
353 : {
354 4664 : if (p[i] != 0)
355 113 : return false;
356 : }
357 36 : return true;
358 : }
359 : else
360 : {
361 : // Non-zero fill value: element-wise compare
362 4 : CPLAssert(metadata.abyNoData.size() == nDTSize);
363 4 : const GByte *p = abyChunk.data();
364 4 : const size_t nCount = nBytes / nDTSize;
365 4 : for (size_t i = 0; i < nCount; ++i)
366 : {
367 4 : if (memcmp(p + i * nDTSize, metadata.abyNoData.data(), nDTSize) !=
368 : 0)
369 4 : return false;
370 : }
371 0 : return true;
372 : }
373 : }
374 :
375 : /************************************************************************/
376 : /* ZarrV3CodecShardingIndexed::Encode() */
377 : /************************************************************************/
378 :
379 37 : bool ZarrV3CodecShardingIndexed::Encode(const ZarrByteVectorQuickResize &abySrc,
380 : ZarrByteVectorQuickResize &abyDst) const
381 : {
382 : // Compute total number of inner chunks
383 37 : size_t nInnerChunks = 1;
384 115 : for (size_t i = 0; i < m_anInnerBlockSize.size(); ++i)
385 : {
386 : const size_t nCountInnerChunksThisDim =
387 78 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
388 78 : nInnerChunks *= nCountInnerChunksThisDim;
389 : }
390 :
391 37 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
392 : const size_t nExpectedSrcSize =
393 37 : nDTSize * MultiplyElements(m_oInputArrayMetadata.anBlockSizes);
394 37 : if (abySrc.size() != nExpectedSrcSize)
395 : {
396 0 : CPLError(CE_Failure, CPLE_AppDefined,
397 : "ZarrV3CodecShardingIndexed::Encode(): input buffer size "
398 : "(%" PRIu64 ") != expected (%" PRIu64 ")",
399 0 : static_cast<uint64_t>(abySrc.size()),
400 : static_cast<uint64_t>(nExpectedSrcSize));
401 0 : return false;
402 : }
403 :
404 : // Index: one Location per inner chunk, initially all empty
405 : std::vector<Location> anLocations(nInnerChunks,
406 : {std::numeric_limits<uint64_t>::max(),
407 74 : std::numeric_limits<uint64_t>::max()});
408 :
409 : // Accumulate encoded inner chunk data
410 74 : ZarrByteVectorQuickResize abyData;
411 : // Reserve approximate capacity: decoded chunk size is close to the upper
412 : // bound per chunk (compression may slightly increase size in pathological
413 : // cases), but avoids most reallocations.
414 : const size_t nDecodedChunkSize =
415 37 : nDTSize * MultiplyElements(m_anInnerBlockSize);
416 : // resize+clear = reserve (ZarrByteVectorQuickResize has no reserve())
417 : try
418 : {
419 37 : abyData.resize(nInnerChunks * nDecodedChunkSize);
420 : }
421 0 : catch (const std::exception &)
422 : {
423 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
424 : "Cannot allocate memory for accumulated shard data");
425 0 : return false;
426 : }
427 37 : abyData.clear();
428 74 : ZarrByteVectorQuickResize abyChunk;
429 :
430 37 : uint64_t nCurrentOffset = 0;
431 74 : std::vector<size_t> anChunkIndices(m_anInnerBlockSize.size(), 0);
432 :
433 190 : for (size_t iChunk = 0; iChunk < nInnerChunks; ++iChunk)
434 : {
435 : // Update chunk coordinates (same iteration order as Decode)
436 153 : if (iChunk > 0)
437 : {
438 116 : size_t iDim = m_anInnerBlockSize.size() - 1;
439 164 : while (++anChunkIndices[iDim] ==
440 328 : m_oInputArrayMetadata.anBlockSizes[iDim] /
441 164 : m_anInnerBlockSize[iDim])
442 : {
443 48 : anChunkIndices[iDim] = 0;
444 48 : --iDim;
445 : }
446 : }
447 :
448 : // Extract this inner chunk from the shard buffer
449 : try
450 : {
451 153 : abyChunk.resize(nDecodedChunkSize);
452 : }
453 0 : catch (const std::exception &)
454 : {
455 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
456 : "Cannot allocate memory for inner chunk");
457 0 : return false;
458 : }
459 153 : ExtractSubArrayFromLargerOne(abySrc, m_oInputArrayMetadata.anBlockSizes,
460 153 : m_anInnerBlockSize, anChunkIndices,
461 : abyChunk, nDTSize);
462 :
463 : // Skip empty (all-nodata) chunks
464 153 : if (IsAllNoData(abyChunk, m_oInputArrayMetadata))
465 36 : continue;
466 :
467 : // Encode inner chunk through codec chain (bytes + compressor)
468 117 : if (!m_poCodecSequence->Encode(abyChunk))
469 : {
470 0 : CPLError(CE_Failure, CPLE_AppDefined,
471 : "ZarrV3CodecShardingIndexed::Encode(): cannot encode "
472 : "inner chunk %" PRIu64,
473 : static_cast<uint64_t>(iChunk));
474 0 : return false;
475 : }
476 :
477 117 : anLocations[iChunk] = {nCurrentOffset, abyChunk.size()};
478 117 : nCurrentOffset += abyChunk.size();
479 117 : abyData.insert(abyData.end(), abyChunk.begin(), abyChunk.end());
480 : }
481 :
482 : // All inner chunks are nodata: signal empty shard via zero-length output
483 37 : if (abyData.empty())
484 : {
485 0 : abyDst.clear();
486 0 : return true;
487 : }
488 :
489 : // Build index buffer
490 74 : ZarrByteVectorQuickResize abyIndex;
491 37 : const size_t nIndexRawSize = nInnerChunks * sizeof(Location);
492 :
493 : // If index is at start, data offsets must account for the index prefix.
494 : // Encode the index once to determine its encoded size (includes CRC32C
495 : // overhead), then re-encode below with the adjusted offsets.
496 : // Note: currently unreachable in write mode (creation hardcodes "end"),
497 : // but kept for completeness with the Decode() start-index path.
498 37 : if (!m_bIndexLocationAtEnd)
499 : {
500 : // Encode a temporary copy of the index to determine its encoded size
501 0 : abyIndex.resize(nIndexRawSize);
502 0 : memcpy(abyIndex.data(), anLocations.data(), nIndexRawSize);
503 0 : if (!m_poIndexCodecSequence->Encode(abyIndex))
504 : {
505 0 : CPLError(
506 : CE_Failure, CPLE_AppDefined,
507 : "ZarrV3CodecShardingIndexed::Encode(): cannot encode index");
508 0 : return false;
509 : }
510 0 : const uint64_t nIndexEncodedSize = abyIndex.size();
511 : // Adjust offsets
512 0 : for (auto &loc : anLocations)
513 : {
514 0 : if (loc.nOffset != std::numeric_limits<uint64_t>::max())
515 0 : loc.nOffset += nIndexEncodedSize;
516 : }
517 : }
518 :
519 : // (Re-)encode index with final offsets
520 37 : abyIndex.resize(nIndexRawSize);
521 37 : memcpy(abyIndex.data(), anLocations.data(), nIndexRawSize);
522 37 : if (!m_poIndexCodecSequence->Encode(abyIndex))
523 : {
524 0 : CPLError(CE_Failure, CPLE_AppDefined,
525 : "ZarrV3CodecShardingIndexed::Encode(): cannot encode index");
526 0 : return false;
527 : }
528 :
529 : // Assemble output: data + index (or index + data)
530 37 : if (m_bIndexLocationAtEnd)
531 : {
532 37 : abyDst = std::move(abyData);
533 37 : abyDst.insert(abyDst.end(), abyIndex.begin(), abyIndex.end());
534 : }
535 : else
536 : {
537 0 : abyDst = std::move(abyIndex);
538 0 : abyDst.insert(abyDst.end(), abyData.begin(), abyData.end());
539 : }
540 :
541 37 : return true;
542 : }
543 :
544 : /************************************************************************/
545 : /* CopySubArrayIntoLargerOne() */
546 : /************************************************************************/
547 :
548 : static void
549 148 : CopySubArrayIntoLargerOne(const ZarrByteVectorQuickResize &abyChunk,
550 : const std::vector<size_t> &anInnerBlockSize,
551 : const std::vector<size_t> &anInnerBlockIndices,
552 : ZarrByteVectorQuickResize &abyDst,
553 : const std::vector<size_t> &anDstBlockSize,
554 : const size_t nDTSize)
555 : {
556 148 : const auto nDims = anInnerBlockSize.size();
557 148 : CPLAssert(nDims > 0);
558 148 : CPLAssert(nDims == anInnerBlockIndices.size());
559 148 : CPLAssert(nDims == anDstBlockSize.size());
560 : // +1 just to make some gcc versions not emit -Wnull-dereference false positives
561 296 : std::vector<GByte *> dstPtrStack(nDims + 1);
562 296 : std::vector<size_t> count(nDims + 1);
563 296 : std::vector<size_t> dstStride(nDims + 1);
564 :
565 148 : size_t nDstStride = nDTSize;
566 444 : for (size_t iDim = nDims; iDim > 0;)
567 : {
568 296 : --iDim;
569 296 : dstStride[iDim] = nDstStride;
570 296 : nDstStride *= anDstBlockSize[iDim];
571 : }
572 :
573 148 : dstPtrStack[0] = abyDst.data();
574 444 : for (size_t iDim = 0; iDim < nDims; ++iDim)
575 : {
576 296 : CPLAssert((anInnerBlockIndices[iDim] + 1) * anInnerBlockSize[iDim] <=
577 : anDstBlockSize[iDim]);
578 592 : dstPtrStack[0] += anInnerBlockIndices[iDim] * anInnerBlockSize[iDim] *
579 296 : dstStride[iDim];
580 : }
581 148 : const GByte *pabySrc = abyChunk.data();
582 :
583 148 : const size_t nLastDimSize = anInnerBlockSize.back() * nDTSize;
584 148 : size_t dimIdx = 0;
585 312 : lbl_next_depth:
586 312 : if (dimIdx + 1 == nDims)
587 : {
588 164 : memcpy(dstPtrStack[dimIdx], pabySrc, nLastDimSize);
589 164 : pabySrc += nLastDimSize;
590 : }
591 : else
592 : {
593 148 : count[dimIdx] = anInnerBlockSize[dimIdx];
594 : while (true)
595 : {
596 164 : dimIdx++;
597 164 : dstPtrStack[dimIdx] = dstPtrStack[dimIdx - 1];
598 164 : goto lbl_next_depth;
599 164 : lbl_return_to_caller:
600 164 : dimIdx--;
601 164 : if (--count[dimIdx] == 0)
602 148 : break;
603 16 : dstPtrStack[dimIdx] += dstStride[dimIdx];
604 : }
605 : }
606 312 : if (dimIdx > 0)
607 164 : goto lbl_return_to_caller;
608 148 : }
609 :
610 : /************************************************************************/
611 : /* FillWithNoData() */
612 : /************************************************************************/
613 :
614 152 : static void FillWithNoData(ZarrByteVectorQuickResize &abyDst,
615 : const size_t nCount,
616 : const ZarrArrayMetadata &metadata)
617 : {
618 152 : const size_t nDTSize = metadata.oElt.nativeSize;
619 304 : if (metadata.abyNoData.empty() ||
620 304 : metadata.abyNoData == std::vector<GByte>(nDTSize, 0))
621 : {
622 150 : memset(abyDst.data(), 0, nDTSize * nCount);
623 : }
624 : else
625 : {
626 2 : CPLAssert(metadata.abyNoData.size() == nDTSize);
627 152 : for (size_t i = 0; i < nCount; ++i)
628 : {
629 150 : memcpy(abyDst.data() + i * nDTSize, metadata.abyNoData.data(),
630 : nDTSize);
631 : }
632 : }
633 152 : }
634 :
635 : /************************************************************************/
636 : /* ZarrV3CodecShardingIndexed::Decode() */
637 : /************************************************************************/
638 :
639 147 : bool ZarrV3CodecShardingIndexed::Decode(const ZarrByteVectorQuickResize &abySrc,
640 : ZarrByteVectorQuickResize &abyDst) const
641 : {
642 147 : size_t nInnerChunks = 1;
643 441 : for (size_t i = 0; i < m_anInnerBlockSize.size(); ++i)
644 : {
645 : const size_t nCountInnerChunksThisdim =
646 294 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
647 294 : nInnerChunks *= nCountInnerChunksThisdim;
648 : }
649 :
650 294 : const size_t nIndexEncodedSize = nInnerChunks * sizeof(Location) +
651 147 : (m_bIndexHasCRC32 ? sizeof(uint32_t) : 0);
652 294 : ZarrByteVectorQuickResize abyIndex;
653 147 : if (m_bIndexLocationAtEnd)
654 : {
655 147 : abyIndex.insert(abyIndex.end(),
656 147 : abySrc.begin() + (abySrc.size() - nIndexEncodedSize),
657 441 : abySrc.end());
658 : }
659 : else
660 : {
661 0 : abyIndex.insert(abyIndex.end(), abySrc.begin(),
662 0 : abySrc.end() + nIndexEncodedSize);
663 : }
664 :
665 147 : if (!m_poIndexCodecSequence->Decode(abyIndex))
666 : {
667 0 : CPLError(
668 : CE_Failure, CPLE_NotSupported,
669 : "ZarrV3CodecShardingIndexed::Decode(): cannot decode shard index");
670 0 : return false;
671 : }
672 :
673 147 : if (abyIndex.size() != nInnerChunks * sizeof(Location))
674 : {
675 0 : CPLError(CE_Failure, CPLE_NotSupported,
676 : "ZarrV3CodecShardingIndexed::Decode(): shard index has not "
677 : "expected size");
678 0 : return false;
679 : }
680 :
681 : const Location *panLocations =
682 147 : reinterpret_cast<const Location *>(abyIndex.data());
683 :
684 294 : ZarrByteVectorQuickResize abyChunk;
685 147 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
686 : const size_t nExpectedDecodedChunkSize =
687 147 : nDTSize * MultiplyElements(m_anInnerBlockSize);
688 : const size_t nDstCount =
689 147 : MultiplyElements(m_oInputArrayMetadata.anBlockSizes);
690 :
691 : try
692 : {
693 147 : abyDst.resize(nDstCount * nDTSize);
694 : }
695 0 : catch (const std::exception &)
696 : {
697 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
698 : "Cannot allocate memory for decoded shard");
699 0 : return false;
700 : }
701 :
702 147 : FillWithNoData(abyDst, nDstCount, m_oInputArrayMetadata);
703 :
704 294 : std::vector<size_t> anChunkIndices(m_anInnerBlockSize.size(), 0);
705 385 : for (size_t iChunk = 0; iChunk < nInnerChunks; ++iChunk)
706 : {
707 368 : if (iChunk > 0)
708 : {
709 : // Update chunk coordinates
710 221 : size_t iDim = m_anInnerBlockSize.size() - 1;
711 303 : while (++anChunkIndices[iDim] ==
712 606 : m_oInputArrayMetadata.anBlockSizes[iDim] /
713 303 : m_anInnerBlockSize[iDim])
714 : {
715 82 : anChunkIndices[iDim] = 0;
716 82 : --iDim;
717 : }
718 : }
719 :
720 : #ifdef DEBUG_VERBOSE
721 : CPLDebug("ZARR", "Chunk %" PRIu64 ": offset %" PRIu64 ", size %" PRIu64,
722 : static_cast<uint64_t>(iChunk), panLocations[iChunk].nOffset,
723 : panLocations[iChunk].nSize);
724 : #endif
725 :
726 826 : if (panLocations[iChunk].nOffset ==
727 478 : std::numeric_limits<uint64_t>::max() &&
728 110 : panLocations[iChunk].nSize == std::numeric_limits<uint64_t>::max())
729 : {
730 : // Empty chunk
731 90 : continue;
732 : }
733 :
734 496 : if (panLocations[iChunk].nOffset >= abySrc.size() ||
735 218 : panLocations[iChunk].nSize >
736 218 : abySrc.size() - panLocations[iChunk].nOffset)
737 : {
738 80 : CPLError(CE_Failure, CPLE_NotSupported,
739 : "ZarrV3CodecShardingIndexed::Decode(): invalid chunk "
740 : "location for chunk %" PRIu64 ": offset=%" PRIu64
741 : ", size=%" PRIu64,
742 : static_cast<uint64_t>(iChunk),
743 80 : panLocations[iChunk].nOffset, panLocations[iChunk].nSize);
744 80 : return false;
745 : }
746 :
747 198 : abyChunk.clear();
748 : abyChunk.insert(
749 198 : abyChunk.end(),
750 0 : abySrc.begin() + static_cast<size_t>(panLocations[iChunk].nOffset),
751 0 : abySrc.begin() + static_cast<size_t>(panLocations[iChunk].nOffset +
752 396 : panLocations[iChunk].nSize));
753 198 : if (!m_poCodecSequence->Decode(abyChunk))
754 : {
755 50 : CPLError(CE_Failure, CPLE_NotSupported,
756 : "ZarrV3CodecShardingIndexed::Decode(): cannot decode "
757 : "chunk %" PRIu64,
758 : static_cast<uint64_t>(iChunk));
759 50 : return false;
760 : }
761 :
762 148 : if (abyChunk.size() != nExpectedDecodedChunkSize)
763 : {
764 0 : CPLError(CE_Failure, CPLE_NotSupported,
765 : "ZarrV3CodecShardingIndexed::Decode(): decoded size for "
766 : "chunk %" PRIu64 " is %" PRIu64 " whereas %" PRIu64
767 : " is expected",
768 : static_cast<uint64_t>(iChunk),
769 0 : static_cast<uint64_t>(abyChunk.size()),
770 : static_cast<uint64_t>(nExpectedDecodedChunkSize));
771 0 : return false;
772 : }
773 :
774 148 : CopySubArrayIntoLargerOne(abyChunk, m_anInnerBlockSize, anChunkIndices,
775 148 : abyDst, m_oInputArrayMetadata.anBlockSizes,
776 : nDTSize);
777 : }
778 :
779 17 : return true;
780 : }
781 :
782 : /************************************************************************/
783 : /* ZarrV3CodecShardingIndexed::DecodePartial() */
784 : /************************************************************************/
785 :
786 1027 : bool ZarrV3CodecShardingIndexed::DecodePartial(
787 : VSIVirtualHandle *poFile, const ZarrByteVectorQuickResize & /* abySrc */,
788 : ZarrByteVectorQuickResize &abyDst, std::vector<size_t> &anStartIdx,
789 : std::vector<size_t> &anCount)
790 : {
791 1027 : CPLAssert(anStartIdx.size() == m_oInputArrayMetadata.anBlockSizes.size());
792 1027 : CPLAssert(anStartIdx.size() == anCount.size());
793 :
794 1027 : size_t nInnerChunkCount = 1;
795 1027 : size_t nInnerChunkIdx = 0;
796 3081 : for (size_t i = 0; i < anStartIdx.size(); ++i)
797 : {
798 2054 : CPLAssert(anStartIdx[i] + anCount[i] <=
799 : m_oInputArrayMetadata.anBlockSizes[i]);
800 4108 : if ((anStartIdx[i] % m_anInnerBlockSize[i]) != 0 ||
801 2054 : anCount[i] != m_anInnerBlockSize[i])
802 : {
803 : // Should not happen with the current call sites.
804 0 : CPLError(CE_Failure, CPLE_AppDefined,
805 : "ZarrV3CodecShardingIndexed::DecodePartial() only "
806 : "supported on an exact inner chunk");
807 0 : return false;
808 : }
809 :
810 : const size_t nCountInnerChunksThisDim =
811 2054 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
812 4108 : nInnerChunkIdx = nInnerChunkIdx * nCountInnerChunksThisDim +
813 2054 : anStartIdx[i] / m_anInnerBlockSize[i];
814 2054 : nInnerChunkCount *= nCountInnerChunksThisDim;
815 : }
816 :
817 1027 : abyDst.clear();
818 :
819 1027 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
820 1027 : const auto nExpectedDecodedChunkSize = nDTSize * MultiplyElements(anCount);
821 :
822 1027 : vsi_l_offset nLocationOffset =
823 : static_cast<vsi_l_offset>(nInnerChunkIdx) * sizeof(Location);
824 1027 : if (m_bIndexLocationAtEnd)
825 : {
826 1027 : poFile->Seek(0, SEEK_END);
827 1027 : const auto nFileSize = poFile->Tell();
828 1027 : vsi_l_offset nIndexSize =
829 : static_cast<vsi_l_offset>(nInnerChunkCount) * sizeof(Location);
830 1027 : if (m_bIndexHasCRC32)
831 1024 : nIndexSize += sizeof(uint32_t);
832 1027 : if (nFileSize < nIndexSize)
833 : {
834 0 : CPLError(CE_Failure, CPLE_AppDefined,
835 : "ZarrV3CodecShardingIndexed::DecodePartial(): shard file "
836 : "too small");
837 0 : return false;
838 : }
839 1027 : nLocationOffset += nFileSize - nIndexSize;
840 : }
841 :
842 : Location loc;
843 2054 : if (poFile->Seek(nLocationOffset, SEEK_SET) != 0 ||
844 1027 : poFile->Read(&loc, 1, sizeof(loc)) != sizeof(loc))
845 : {
846 :
847 0 : CPLError(CE_Failure, CPLE_AppDefined,
848 : "ZarrV3CodecShardingIndexed::DecodePartial(): "
849 : "cannot read index for chunk %" PRIu64,
850 : static_cast<uint64_t>(nInnerChunkIdx));
851 0 : return false;
852 : }
853 :
854 2051 : if (!m_poIndexCodecSequence->GetCodecs().empty() &&
855 1024 : m_poIndexCodecSequence->GetCodecs().front()->GetName() ==
856 2051 : ZarrV3CodecBytes::NAME &&
857 0 : !m_poIndexCodecSequence->GetCodecs().front()->IsNoOp())
858 : {
859 0 : CPL_SWAP64PTR(&(loc.nOffset));
860 0 : CPL_SWAP64PTR(&(loc.nSize));
861 : }
862 :
863 1032 : if (loc.nOffset == std::numeric_limits<uint64_t>::max() &&
864 5 : loc.nSize == std::numeric_limits<uint64_t>::max())
865 : {
866 : // Empty chunk
867 : try
868 : {
869 5 : abyDst.resize(nExpectedDecodedChunkSize);
870 : }
871 0 : catch (const std::exception &)
872 : {
873 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
874 : "Cannot allocate memory for decoded shard");
875 0 : return false;
876 : }
877 5 : FillWithNoData(abyDst, MultiplyElements(anCount),
878 5 : m_oInputArrayMetadata);
879 5 : return true;
880 : }
881 :
882 1022 : constexpr size_t THRESHOLD = 10 * 1024 * 1024;
883 1022 : if (loc.nSize > THRESHOLD)
884 : {
885 : // When the chunk size is above a certain threshold, check it against
886 : // the actual file size to avoid excessive memory allocation attempts.
887 :
888 0 : poFile->Seek(0, SEEK_END);
889 0 : const auto nFileSize = poFile->Tell();
890 :
891 0 : if (loc.nOffset >= nFileSize || loc.nSize > nFileSize - loc.nOffset)
892 : {
893 0 : CPLError(
894 : CE_Failure, CPLE_NotSupported,
895 : "ZarrV3CodecShardingIndexed::DecodePartial(): invalid chunk "
896 : "location for chunk %" PRIu64 ": offset=%" PRIu64
897 : ", size=%" PRIu64,
898 : static_cast<uint64_t>(nInnerChunkIdx), loc.nOffset, loc.nSize);
899 0 : return false;
900 : }
901 : }
902 :
903 : if constexpr (sizeof(size_t) < sizeof(uint64_t))
904 : {
905 : #ifndef __COVERITY__
906 : // coverity[result_independent_of_operands]
907 : if (loc.nSize > std::numeric_limits<size_t>::max())
908 : {
909 : CPLError(
910 : CE_Failure, CPLE_NotSupported,
911 : "ZarrV3CodecShardingIndexed::DecodePartial(): too large chunk "
912 : "size for chunk %" PRIu64 " for this platform: size=%" PRIu64,
913 : static_cast<uint64_t>(nInnerChunkIdx), loc.nSize);
914 : return false;
915 : }
916 : #endif
917 : }
918 :
919 : try
920 : {
921 1022 : abyDst.resize(static_cast<size_t>(loc.nSize));
922 : }
923 0 : catch (const std::exception &)
924 : {
925 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
926 : "Cannot allocate memory for decoded shard");
927 0 : return false;
928 : }
929 :
930 2044 : if (poFile->Seek(loc.nOffset, SEEK_SET) != 0 ||
931 1022 : poFile->Read(abyDst.data(), 1, abyDst.size()) != abyDst.size())
932 : {
933 0 : CPLError(CE_Failure, CPLE_NotSupported,
934 : "ZarrV3CodecShardingIndexed::DecodePartial(): cannot read "
935 : "data for chunk %" PRIu64 ": offset=%" PRIu64
936 : ", size=%" PRIu64,
937 : static_cast<uint64_t>(nInnerChunkIdx), loc.nOffset, loc.nSize);
938 0 : return false;
939 : }
940 :
941 1022 : if (!m_poCodecSequence->Decode(abyDst))
942 : {
943 356 : CPLError(CE_Failure, CPLE_NotSupported,
944 : "ZarrV3CodecShardingIndexed::DecodePartial(): cannot decode "
945 : "chunk %" PRIu64,
946 : static_cast<uint64_t>(nInnerChunkIdx));
947 356 : return false;
948 : }
949 :
950 666 : if (abyDst.size() != nExpectedDecodedChunkSize)
951 : {
952 0 : CPLError(
953 : CE_Failure, CPLE_NotSupported,
954 : "ZarrV3CodecShardingIndexed::DecodePartial(): decoded size for "
955 : "chunk %" PRIu64 " is %" PRIu64 " whereas %" PRIu64 " is expected",
956 : static_cast<uint64_t>(nInnerChunkIdx),
957 0 : static_cast<uint64_t>(abyDst.size()),
958 : static_cast<uint64_t>(nExpectedDecodedChunkSize));
959 0 : return false;
960 : }
961 :
962 666 : return true;
963 : }
964 :
965 : /************************************************************************/
966 : /* ZarrV3CodecShardingIndexed::BatchDecodePartial() */
967 : /************************************************************************/
968 :
969 4023 : bool ZarrV3CodecShardingIndexed::BatchDecodePartial(
970 : VSIVirtualHandle *poFile, const char *pszFilename,
971 : const std::vector<std::pair<std::vector<size_t>, std::vector<size_t>>>
972 : &anRequests,
973 : std::vector<ZarrByteVectorQuickResize> &aResults)
974 : {
975 4023 : if (anRequests.empty())
976 0 : return true;
977 :
978 4023 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
979 :
980 : // --- Compute inner chunk count and per-request inner chunk indices ---
981 4023 : size_t nInnerChunkCount = 1;
982 12076 : for (size_t i = 0; i < m_oInputArrayMetadata.anBlockSizes.size(); ++i)
983 : {
984 8053 : nInnerChunkCount *=
985 8053 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
986 : }
987 :
988 : // Determine whether index codec requires byte-swapping
989 : const bool bSwapIndex =
990 8043 : !m_poIndexCodecSequence->GetCodecs().empty() &&
991 4020 : m_poIndexCodecSequence->GetCodecs().front()->GetName() ==
992 8043 : ZarrV3CodecBytes::NAME &&
993 1 : !m_poIndexCodecSequence->GetCodecs().front()->IsNoOp();
994 :
995 : // Compute index base offset. For index-at-end, we need the file size.
996 4023 : vsi_l_offset nIndexBaseOffset = 0;
997 4023 : if (m_bIndexLocationAtEnd)
998 : {
999 4023 : poFile->Seek(0, SEEK_END);
1000 4023 : const auto nFileSize = poFile->Tell();
1001 4023 : vsi_l_offset nIndexSize =
1002 : static_cast<vsi_l_offset>(nInnerChunkCount) * sizeof(Location);
1003 4023 : if (m_bIndexHasCRC32)
1004 4020 : nIndexSize += sizeof(uint32_t);
1005 4023 : if (nFileSize < nIndexSize)
1006 : {
1007 0 : CPLError(CE_Failure, CPLE_AppDefined,
1008 : "BatchDecodePartial: shard file too small");
1009 0 : return false;
1010 : }
1011 4023 : nIndexBaseOffset = nFileSize - nIndexSize;
1012 : }
1013 :
1014 : // Build per-request inner chunk indices
1015 8046 : std::vector<size_t> anInnerChunkIndices(anRequests.size());
1016 16115 : for (size_t iReq = 0; iReq < anRequests.size(); ++iReq)
1017 : {
1018 12092 : const auto &anStartIdx = anRequests[iReq].first;
1019 12092 : CPLAssert(anStartIdx.size() ==
1020 : m_oInputArrayMetadata.anBlockSizes.size());
1021 :
1022 12092 : size_t nInnerChunkIdx = 0;
1023 36322 : for (size_t i = 0; i < anStartIdx.size(); ++i)
1024 : {
1025 : const size_t nCountInnerChunksThisDim =
1026 24230 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
1027 48460 : nInnerChunkIdx = nInnerChunkIdx * nCountInnerChunksThisDim +
1028 24230 : anStartIdx[i] / m_anInnerBlockSize[i];
1029 : }
1030 12092 : anInnerChunkIndices[iReq] = nInnerChunkIdx;
1031 : }
1032 :
1033 : // --- Pass 1: read index entries, using per-shard cache when possible ---
1034 8046 : std::vector<Location> aLocations(anRequests.size());
1035 :
1036 4023 : const size_t nIndexBytes = nInnerChunkCount * sizeof(Location);
1037 : const size_t nMaxIndexBytes =
1038 4023 : static_cast<size_t>(CPLAtoGIntBig(CPLGetConfigOption(
1039 4023 : "GDAL_ZARR_SHARD_INDEX_CACHE_MAX_BYTES", "1048576")));
1040 4023 : if (pszFilename != nullptr && nIndexBytes <= nMaxIndexBytes)
1041 : {
1042 : // Try to serve from the in-process index cache.
1043 : // g_oShardIndexCache serialises its own access via std::mutex.
1044 4023 : std::vector<GByte> abyIndexBuf;
1045 4023 : bool bCacheHit = g_oShardIndexCache.tryGet(pszFilename, abyIndexBuf);
1046 4023 : if (bCacheHit && abyIndexBuf.size() != nIndexBytes)
1047 0 : bCacheHit = false; // stale entry from different layout
1048 :
1049 4023 : if (!bCacheHit)
1050 : {
1051 : // Cache miss: read the full index in one contiguous I/O call.
1052 78 : abyIndexBuf.resize(nIndexBytes);
1053 78 : poFile->Seek(nIndexBaseOffset, SEEK_SET);
1054 78 : if (poFile->Read(abyIndexBuf.data(), nIndexBytes, 1) != 1)
1055 : {
1056 0 : CPLError(CE_Failure, CPLE_AppDefined,
1057 : "BatchDecodePartial: failed to read shard index");
1058 0 : return false;
1059 : }
1060 78 : if (bSwapIndex)
1061 : {
1062 5 : for (size_t j = 0; j < nInnerChunkCount; ++j)
1063 : {
1064 4 : GByte *p = abyIndexBuf.data() + j * sizeof(Location);
1065 4 : CPL_SWAP64PTR(p);
1066 4 : CPL_SWAP64PTR(p + sizeof(uint64_t));
1067 : }
1068 : }
1069 78 : g_oShardIndexCache.insert(pszFilename, abyIndexBuf);
1070 : }
1071 :
1072 16115 : for (size_t i = 0; i < anRequests.size(); ++i)
1073 12092 : memcpy(&aLocations[i],
1074 12092 : abyIndexBuf.data() +
1075 12092 : anInnerChunkIndices[i] * sizeof(Location),
1076 4023 : sizeof(Location));
1077 : }
1078 : else
1079 : {
1080 : // Shard index too large for cache or no filename: fall back to
1081 : // per-entry ReadMultiRange.
1082 0 : std::vector<vsi_l_offset> anIdxOffsets(anRequests.size());
1083 0 : std::vector<size_t> anIdxSizes(anRequests.size(), sizeof(Location));
1084 0 : std::vector<void *> ppIdxData(anRequests.size());
1085 :
1086 0 : for (size_t i = 0; i < anRequests.size(); ++i)
1087 : {
1088 0 : anIdxOffsets[i] = nIndexBaseOffset + static_cast<vsi_l_offset>(
1089 0 : anInnerChunkIndices[i]) *
1090 : sizeof(Location);
1091 : #ifndef __COVERITY__
1092 0 : ppIdxData[i] = &aLocations[i];
1093 : #endif
1094 : }
1095 :
1096 0 : if (poFile->ReadMultiRange(static_cast<int>(anRequests.size()),
1097 0 : ppIdxData.data(), anIdxOffsets.data(),
1098 0 : anIdxSizes.data()) != 0)
1099 : {
1100 0 : CPLError(CE_Failure, CPLE_AppDefined,
1101 : "BatchDecodePartial: ReadMultiRange() failed for index");
1102 0 : return false;
1103 : }
1104 :
1105 0 : if (bSwapIndex)
1106 : {
1107 0 : for (auto &loc : aLocations)
1108 : {
1109 0 : CPL_SWAP64PTR(&(loc.nOffset));
1110 0 : CPL_SWAP64PTR(&(loc.nSize));
1111 : }
1112 : }
1113 : }
1114 :
1115 : // --- Classify requests: empty chunks vs data chunks ---
1116 4023 : aResults.resize(anRequests.size());
1117 :
1118 : struct DataRange
1119 : {
1120 : size_t nReqIdx;
1121 : };
1122 :
1123 8046 : std::vector<DataRange> aDataRanges;
1124 8046 : std::vector<vsi_l_offset> anDataOffsets;
1125 8046 : std::vector<size_t> anDataSizes;
1126 :
1127 16115 : for (size_t iReq = 0; iReq < anRequests.size(); ++iReq)
1128 : {
1129 12092 : const auto &anCount = anRequests[iReq].second;
1130 : const auto nExpectedDecodedChunkSize =
1131 12092 : nDTSize * MultiplyElements(anCount);
1132 12092 : const Location &loc = aLocations[iReq];
1133 :
1134 12092 : if (loc.nOffset == std::numeric_limits<uint64_t>::max() &&
1135 0 : loc.nSize == std::numeric_limits<uint64_t>::max())
1136 : {
1137 : // Empty chunk — fill with nodata
1138 : try
1139 : {
1140 0 : aResults[iReq].resize(nExpectedDecodedChunkSize);
1141 : }
1142 0 : catch (const std::exception &)
1143 : {
1144 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
1145 : "Cannot allocate memory for decoded chunk");
1146 0 : return false;
1147 : }
1148 0 : FillWithNoData(aResults[iReq], MultiplyElements(anCount),
1149 0 : m_oInputArrayMetadata);
1150 0 : continue;
1151 : }
1152 :
1153 : if constexpr (sizeof(size_t) < sizeof(uint64_t))
1154 : {
1155 : #ifndef __COVERITY__
1156 : // coverity[result_independent_of_operands]
1157 : if (loc.nSize > std::numeric_limits<size_t>::max())
1158 : {
1159 : CPLError(CE_Failure, CPLE_NotSupported,
1160 : "BatchDecodePartial: too large chunk size");
1161 : return false;
1162 : }
1163 : #endif
1164 : }
1165 :
1166 12092 : aDataRanges.push_back({iReq});
1167 12092 : anDataOffsets.push_back(loc.nOffset);
1168 12092 : anDataSizes.push_back(static_cast<size_t>(loc.nSize));
1169 : }
1170 :
1171 4023 : if (aDataRanges.empty())
1172 0 : return true;
1173 :
1174 : // Validate against file size (same threshold as DecodePartial)
1175 4023 : constexpr size_t THRESHOLD = 10 * 1024 * 1024;
1176 : {
1177 4023 : size_t nMaxSize = 0;
1178 16115 : for (const auto &s : anDataSizes)
1179 12092 : nMaxSize = std::max(nMaxSize, s);
1180 4023 : if (nMaxSize > THRESHOLD)
1181 : {
1182 0 : poFile->Seek(0, SEEK_END);
1183 0 : const auto nFileSize = poFile->Tell();
1184 0 : for (size_t i = 0; i < aDataRanges.size(); ++i)
1185 : {
1186 0 : if (anDataOffsets[i] >= nFileSize ||
1187 0 : anDataSizes[i] > nFileSize - anDataOffsets[i])
1188 : {
1189 0 : CPLError(CE_Failure, CPLE_NotSupported,
1190 : "BatchDecodePartial: invalid chunk location: "
1191 : "offset=%" PRIu64 ", size=%" PRIu64,
1192 0 : static_cast<uint64_t>(anDataOffsets[i]),
1193 0 : static_cast<uint64_t>(anDataSizes[i]));
1194 0 : return false;
1195 : }
1196 : }
1197 : }
1198 : }
1199 :
1200 : // --- Pass 2: ReadMultiRange for data chunks ---
1201 8046 : std::vector<ZarrByteVectorQuickResize> aCompressed(aDataRanges.size());
1202 8046 : std::vector<void *> ppData(aDataRanges.size());
1203 :
1204 16115 : for (size_t i = 0; i < aDataRanges.size(); ++i)
1205 : {
1206 : try
1207 : {
1208 12092 : aCompressed[i].resize(anDataSizes[i]);
1209 : }
1210 0 : catch (const std::exception &)
1211 : {
1212 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
1213 : "Cannot allocate memory for compressed chunk");
1214 0 : return false;
1215 : }
1216 12092 : ppData[i] = aCompressed[i].data();
1217 : }
1218 :
1219 4023 : CPLDebugOnly("ZARR",
1220 : "BatchDecodePartial: ReadMultiRange() with %d data ranges",
1221 : static_cast<int>(aDataRanges.size()));
1222 :
1223 4023 : if (poFile->ReadMultiRange(static_cast<int>(aDataRanges.size()),
1224 4023 : ppData.data(), anDataOffsets.data(),
1225 8046 : anDataSizes.data()) != 0)
1226 : {
1227 0 : CPLError(CE_Failure, CPLE_AppDefined,
1228 : "BatchDecodePartial: ReadMultiRange() failed for data");
1229 0 : return false;
1230 : }
1231 :
1232 : // --- Decode compressed chunks (parallel when GDAL_NUM_THREADS > 1) ---
1233 4023 : const int nMaxThreads = GDALGetNumThreads();
1234 4023 : const int nChunks = static_cast<int>(aDataRanges.size());
1235 4023 : const int nThreads = std::min(std::max(1, nMaxThreads), nChunks);
1236 :
1237 : // Try parallel decode when multiple threads are available
1238 0 : CPLWorkerThreadPool *wtp = (nThreads > 1 && nChunks > 1)
1239 4023 : ? GDALGetGlobalThreadPool(nMaxThreads)
1240 4023 : : nullptr;
1241 :
1242 4023 : if (!wtp)
1243 : {
1244 : // Sequential fallback
1245 15182 : for (size_t i = 0; i < aDataRanges.size(); ++i)
1246 : {
1247 11515 : const size_t iReq = aDataRanges[i].nReqIdx;
1248 11515 : const auto &anCount = anRequests[iReq].second;
1249 : const auto nExpectedDecodedChunkSize =
1250 11515 : nDTSize * MultiplyElements(anCount);
1251 :
1252 11515 : if (!m_poCodecSequence->Decode(aCompressed[i]))
1253 : {
1254 356 : CPLError(CE_Failure, CPLE_NotSupported,
1255 : "BatchDecodePartial: cannot decode chunk %" PRIu64,
1256 356 : static_cast<uint64_t>(anInnerChunkIndices[iReq]));
1257 356 : return false;
1258 : }
1259 :
1260 11159 : if (aCompressed[i].size() != nExpectedDecodedChunkSize)
1261 : {
1262 0 : CPLError(CE_Failure, CPLE_NotSupported,
1263 : "BatchDecodePartial: decoded size %" PRIu64
1264 : " != expected %" PRIu64,
1265 0 : static_cast<uint64_t>(aCompressed[i].size()),
1266 : static_cast<uint64_t>(nExpectedDecodedChunkSize));
1267 0 : return false;
1268 : }
1269 :
1270 11159 : aResults[iReq] = std::move(aCompressed[i]);
1271 : }
1272 3667 : return true;
1273 : }
1274 :
1275 0 : CPLDebugOnly("ZARR",
1276 : "BatchDecodePartial: parallel decode with %d threads "
1277 : "for %d chunks",
1278 : nThreads, nChunks);
1279 :
1280 : {
1281 0 : bool bGlobalOK = true;
1282 0 : std::mutex oMutex;
1283 :
1284 : // Clone codecs per thread on the main thread (Clone() is not
1285 : // thread-safe due to JSON object cloning)
1286 0 : std::vector<std::unique_ptr<ZarrV3CodecSequence>> apoCodecs(nThreads);
1287 0 : for (int t = 0; t < nThreads; ++t)
1288 0 : apoCodecs[t] = m_poCodecSequence->Clone();
1289 :
1290 0 : auto poJobQueue = wtp->CreateJobQueue();
1291 0 : for (int t = 0; t < nThreads; ++t)
1292 : {
1293 0 : const int iFirst =
1294 0 : static_cast<int>(static_cast<int64_t>(t) * nChunks / nThreads);
1295 0 : const int iEnd = static_cast<int>(static_cast<int64_t>(t + 1) *
1296 0 : nChunks / nThreads);
1297 :
1298 0 : poJobQueue->SubmitJob(
1299 0 : [iFirst, iEnd, t, &aDataRanges, &anRequests, &aCompressed,
1300 0 : &aResults, &apoCodecs, &bGlobalOK, &oMutex, nDTSize]()
1301 : {
1302 0 : for (int i = iFirst; i < iEnd; ++i)
1303 : {
1304 : {
1305 0 : std::lock_guard<std::mutex> oLock(oMutex);
1306 0 : if (!bGlobalOK)
1307 0 : return;
1308 : }
1309 :
1310 0 : const size_t iReq = aDataRanges[i].nReqIdx;
1311 0 : const auto &anCount = anRequests[iReq].second;
1312 : const auto nExpected =
1313 0 : nDTSize * MultiplyElements(anCount);
1314 :
1315 0 : if (!apoCodecs[t]->Decode(aCompressed[i]) ||
1316 0 : aCompressed[i].size() != nExpected)
1317 : {
1318 0 : std::lock_guard<std::mutex> oLock(oMutex);
1319 0 : bGlobalOK = false;
1320 0 : return;
1321 : }
1322 :
1323 : // Each job writes to a unique iReq slot - no lock
1324 0 : aResults[iReq] = std::move(aCompressed[i]);
1325 : }
1326 : });
1327 : }
1328 0 : poJobQueue->WaitCompletion();
1329 :
1330 0 : if (!bGlobalOK)
1331 : {
1332 0 : CPLError(CE_Failure, CPLE_NotSupported,
1333 : "BatchDecodePartial: parallel decode failed");
1334 0 : return false;
1335 : }
1336 : }
1337 :
1338 0 : return true;
1339 : }
1340 :
1341 : /************************************************************************/
1342 : /* ZarrV3CodecShardingIndexed::GetInnerMostBlockSize() */
1343 : /************************************************************************/
1344 :
1345 531 : std::vector<size_t> ZarrV3CodecShardingIndexed::GetInnerMostBlockSize(
1346 : const std::vector<size_t> &) const
1347 : {
1348 531 : return m_anInnerBlockSize;
1349 : // TODO if we one day properly support nested sharding
1350 : // return m_poCodecSequence->GetInnerMostBlockSize(m_anInnerBlockSize);
1351 : }
|