Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: GDAL
4 : * Purpose: Zarr driver, "sharding_indexed" codec
5 : * Author: Even Rouault <even dot rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2026, Development Seed
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "zarr_v3_codec.h"
14 :
15 : #include "cpl_mem_cache.h"
16 : #include "cpl_vsi_virtual.h"
17 : #include "cpl_worker_thread_pool.h"
18 : #include "gdal_thread_pool.h"
19 :
20 : #include <algorithm>
21 : #include <cinttypes>
22 : #include <limits>
23 : #include <mutex>
24 :
25 : // Process-wide LRU cache for shard indices.
26 : // Key: shard filepath/URL. Value: flat byte buffer of Location entries
27 : // (each 16 bytes: uint64_t nOffset + uint64_t nSize, host byte order).
28 : // Populated lazily by BatchDecodePartial.
29 : // Shards whose index exceeds GDAL_ZARR_SHARD_INDEX_CACHE_MAX_BYTES
30 : // (default 1 MiB) fall back to per-entry ReadMultiRange.
31 : // Thread safety: lru11::Cache<..., std::mutex> serialises all access.
32 : static constexpr size_t SHARD_INDEX_CACHE_ENTRIES = 64;
33 : static lru11::Cache<std::string, std::vector<GByte>, std::mutex>
34 : g_oShardIndexCache{SHARD_INDEX_CACHE_ENTRIES, 0};
35 :
36 : // Implements https://zarr-specs.readthedocs.io/en/latest/v3/codecs/sharding-indexed/index.html
37 :
38 4 : void ZarrClearShardIndexCache()
39 : {
40 4 : g_oShardIndexCache.clear();
41 4 : }
42 :
43 37 : void ZarrEraseShardIndexFromCache(const std::string &osFilename)
44 : {
45 37 : g_oShardIndexCache.remove(osFilename);
46 37 : }
47 :
48 : /************************************************************************/
49 : /* ZarrV3CodecShardingIndexed() */
50 : /************************************************************************/
51 :
52 697 : ZarrV3CodecShardingIndexed::ZarrV3CodecShardingIndexed() : ZarrV3Codec(NAME)
53 : {
54 697 : }
55 :
56 : /************************************************************************/
57 : /* ZarrV3CodecShardingIndexed::Clone() */
58 : /************************************************************************/
59 :
60 8 : std::unique_ptr<ZarrV3Codec> ZarrV3CodecShardingIndexed::Clone() const
61 : {
62 16 : auto psClone = std::make_unique<ZarrV3CodecShardingIndexed>();
63 16 : ZarrArrayMetadata oOutputArrayMetadata;
64 8 : psClone->InitFromConfiguration(m_oConfiguration, m_oInputArrayMetadata,
65 : oOutputArrayMetadata,
66 : /* bEmitWarnings = */ false);
67 16 : return psClone;
68 : }
69 :
70 : /************************************************************************/
71 : /* ZarrV3CodecShardingIndexed::InitFromConfiguration() */
72 : /************************************************************************/
73 :
74 697 : bool ZarrV3CodecShardingIndexed::InitFromConfiguration(
75 : const CPLJSONObject &configuration,
76 : const ZarrArrayMetadata &oInputArrayMetadata,
77 : ZarrArrayMetadata &oOutputArrayMetadata, bool bEmitWarnings)
78 : {
79 697 : if (oInputArrayMetadata.anBlockSizes.empty())
80 : {
81 0 : CPLError(
82 : CE_Failure, CPLE_AppDefined,
83 : "Codec sharding_indexed: sharding not supported for scalar array");
84 0 : return false;
85 : }
86 :
87 697 : m_oConfiguration = configuration.Clone();
88 697 : m_oInputArrayMetadata = oInputArrayMetadata;
89 :
90 1393 : if (!configuration.IsValid() ||
91 696 : configuration.GetType() != CPLJSONObject::Type::Object)
92 : {
93 2 : CPLError(
94 : CE_Failure, CPLE_AppDefined,
95 : "Codec sharding_indexed: configuration missing or not an object");
96 2 : return false;
97 : }
98 :
99 2085 : const auto oChunkShape = configuration["chunk_shape"].ToArray();
100 1388 : if (!oChunkShape.IsValid() ||
101 693 : oChunkShape.GetType() != CPLJSONObject::Type::Array)
102 : {
103 2 : CPLError(CE_Failure, CPLE_AppDefined,
104 : "Codec sharding_indexed: configuration.chunk_shape missing or "
105 : "not an array");
106 2 : return false;
107 : }
108 1386 : if (static_cast<size_t>(oChunkShape.Size()) !=
109 693 : m_oInputArrayMetadata.anBlockSizes.size())
110 : {
111 1 : CPLError(CE_Failure, CPLE_AppDefined,
112 : "Codec sharding_indexed: configuration.chunk_shape should "
113 : "have the same shape as the array");
114 1 : return false;
115 : }
116 1384 : std::vector<size_t> anCountInnerChunks;
117 2071 : for (int i = 0; i < oChunkShape.Size(); ++i)
118 : {
119 2764 : if (oChunkShape[i].GetType() != CPLJSONObject::Type::Integer &&
120 1382 : oChunkShape[i].GetType() != CPLJSONObject::Type::Long)
121 : {
122 0 : CPLError(CE_Failure, CPLE_AppDefined,
123 : "Codec sharding_indexed: configuration.chunk_shape[%d] "
124 : "should be an integer",
125 : i);
126 0 : return false;
127 : }
128 1382 : const int64_t nVal = oChunkShape[i].ToLong();
129 2762 : if (nVal <= 0 ||
130 1380 : static_cast<uint64_t>(nVal) >
131 2762 : m_oInputArrayMetadata.anBlockSizes[i] ||
132 1380 : (m_oInputArrayMetadata.anBlockSizes[i] % nVal) != 0)
133 : {
134 3 : CPLError(
135 : CE_Failure, CPLE_AppDefined,
136 : "Codec sharding_indexed: configuration.chunk_shape[%d]=%" PRId64
137 : " should be a strictly positive value that is a divisor of "
138 : "%" PRIu64,
139 : i, nVal,
140 3 : static_cast<uint64_t>(m_oInputArrayMetadata.anBlockSizes[i]));
141 3 : return false;
142 : }
143 : #ifndef __COVERITY__
144 : // The following cast is safe since ZarrArray::ParseChunkSize() has
145 : // previously validated that m_oInputArrayMetadata.anBlockSizes[i] fits
146 : // on size_t
147 : if constexpr (sizeof(size_t) < sizeof(uint64_t))
148 : {
149 : // coverity[result_independent_of_operands]
150 : CPLAssert(nVal <= std::numeric_limits<size_t>::max());
151 : }
152 : #endif
153 1379 : m_anInnerBlockSize.push_back(static_cast<size_t>(nVal));
154 1379 : anCountInnerChunks.push_back(
155 1379 : static_cast<size_t>(m_oInputArrayMetadata.anBlockSizes[i] / nVal));
156 : }
157 :
158 2067 : const auto oCodecs = configuration["codecs"];
159 689 : if (!oCodecs.IsValid() || oCodecs.GetType() != CPLJSONObject::Type::Array)
160 : {
161 2 : CPLError(CE_Failure, CPLE_AppDefined,
162 : "Codec sharding_indexed: configuration.codecs missing or "
163 : "not an array");
164 2 : return false;
165 : }
166 687 : if (oCodecs.ToArray().Size() == 0)
167 : {
168 1 : CPLError(CE_Failure, CPLE_AppDefined,
169 : "Codec sharding_indexed: configuration.codecs[] is empty");
170 1 : return false;
171 : }
172 1372 : ZarrArrayMetadata inputArrayMetadataCodecs = m_oInputArrayMetadata;
173 686 : inputArrayMetadataCodecs.anBlockSizes = m_anInnerBlockSize;
174 : m_poCodecSequence =
175 686 : std::make_unique<ZarrV3CodecSequence>(inputArrayMetadataCodecs);
176 686 : if (!m_poCodecSequence->InitFromJson(oCodecs, oOutputArrayMetadata))
177 : {
178 1 : CPLError(CE_Failure, CPLE_AppDefined,
179 : "Codec sharding_indexed: initialization of codecs failed");
180 1 : return false;
181 : }
182 :
183 685 : if (bEmitWarnings && m_poCodecSequence->SupportsPartialDecoding())
184 : {
185 : // Implementation limitation
186 1 : CPLError(CE_Warning, CPLE_AppDefined,
187 : "Nested sharding detected. For now, partial decoding is only "
188 : "implemented on the outer-most shard level");
189 : }
190 :
191 2055 : const auto oIndexCodecs = configuration["index_codecs"];
192 1369 : if (!oIndexCodecs.IsValid() ||
193 684 : oIndexCodecs.GetType() != CPLJSONObject::Type::Array)
194 : {
195 2 : CPLError(
196 : CE_Failure, CPLE_AppDefined,
197 : "Codec sharding_indexed: configuration.index_codecs missing or "
198 : "not an array");
199 2 : return false;
200 : }
201 683 : if (oIndexCodecs.ToArray().Size() == 0)
202 : {
203 1 : CPLError(
204 : CE_Failure, CPLE_AppDefined,
205 : "Codec sharding_indexed: configuration.index_codecs[] is empty");
206 1 : return false;
207 : }
208 1364 : ZarrArrayMetadata inputArrayMetadataIndex;
209 682 : inputArrayMetadataIndex.oElt.nativeType =
210 : DtypeElt::NativeType::UNSIGNED_INT;
211 682 : inputArrayMetadataIndex.oElt.nativeSize = sizeof(uint64_t);
212 : inputArrayMetadataIndex.oElt.gdalType =
213 682 : GDALExtendedDataType::Create(GDT_UInt64);
214 682 : inputArrayMetadataIndex.oElt.gdalSize = sizeof(uint64_t);
215 682 : inputArrayMetadataIndex.anBlockSizes = std::move(anCountInnerChunks);
216 : // 2 for offset and size
217 682 : inputArrayMetadataIndex.anBlockSizes.push_back(2);
218 : m_poIndexCodecSequence =
219 682 : std::make_unique<ZarrV3CodecSequence>(inputArrayMetadataIndex);
220 1364 : ZarrArrayMetadata oOutputArrayMetadataIndex;
221 682 : if (!m_poIndexCodecSequence->InitFromJson(oIndexCodecs,
222 : oOutputArrayMetadataIndex))
223 : {
224 1 : CPLError(
225 : CE_Failure, CPLE_AppDefined,
226 : "Codec sharding_indexed: initialization of index_codecs failed");
227 1 : return false;
228 : }
229 681 : const auto &indexCodecs = m_poIndexCodecSequence->GetCodecs();
230 681 : if (indexCodecs.empty())
231 : {
232 : // ok, there is only a "bytes" codec, optimized away if the order
233 : // is the one of the native architecture
234 : }
235 1074 : else if (indexCodecs[0]->GetName() == ZarrV3CodecBytes::NAME ||
236 536 : indexCodecs[0]->GetName() == ZarrV3CodecCRC32C::NAME)
237 : {
238 : // ok
239 : }
240 1 : else if (indexCodecs.size() == 2 &&
241 0 : indexCodecs[1]->GetName() == ZarrV3CodecCRC32C::NAME)
242 : {
243 : // ok
244 : }
245 : else
246 : {
247 1 : CPLError(CE_Failure, CPLE_NotSupported,
248 : "Codec sharding_indexed: this implementation only supports "
249 : "Bytes, possibly followed by CRC32C, as index_codecs");
250 1 : return false;
251 : }
252 680 : m_bIndexHasCRC32 = (!indexCodecs.empty() && indexCodecs.back()->GetName() ==
253 : ZarrV3CodecCRC32C::NAME);
254 :
255 : const std::string osIndexLocation =
256 2040 : configuration.GetString("index_location", "end");
257 680 : if (osIndexLocation != "start" && osIndexLocation != "end")
258 : {
259 1 : CPLError(CE_Failure, CPLE_AppDefined,
260 : "Codec sharding_indexed: invalid value for index_location");
261 1 : return false;
262 : }
263 679 : m_bIndexLocationAtEnd = (osIndexLocation == "end");
264 :
265 679 : return true;
266 : }
267 :
268 : /************************************************************************/
269 : /* ExtractSubArrayFromLargerOne() */
270 : /************************************************************************/
271 :
272 : // Inverse of CopySubArrayIntoLargerOne(): extract a contiguous inner chunk
273 : // from its position in the larger shard buffer.
274 : static void
275 153 : ExtractSubArrayFromLargerOne(const ZarrByteVectorQuickResize &abySrc,
276 : const std::vector<size_t> &anSrcBlockSize,
277 : const std::vector<size_t> &anInnerBlockSize,
278 : const std::vector<size_t> &anInnerBlockIndices,
279 : ZarrByteVectorQuickResize &abyChunk,
280 : const size_t nDTSize)
281 : {
282 153 : const auto nDims = anInnerBlockSize.size();
283 153 : CPLAssert(nDims > 0);
284 153 : CPLAssert(nDims == anInnerBlockIndices.size());
285 153 : CPLAssert(nDims == anSrcBlockSize.size());
286 : // +1 to avoid gcc -Wnull-dereference false positives
287 306 : std::vector<const GByte *> srcPtrStack(nDims + 1);
288 306 : std::vector<size_t> count(nDims + 1);
289 306 : std::vector<size_t> srcStride(nDims + 1);
290 :
291 153 : size_t nSrcStride = nDTSize;
292 483 : for (size_t iDim = nDims; iDim > 0;)
293 : {
294 330 : --iDim;
295 330 : srcStride[iDim] = nSrcStride;
296 330 : nSrcStride *= anSrcBlockSize[iDim];
297 : }
298 :
299 153 : srcPtrStack[0] = abySrc.data();
300 483 : for (size_t iDim = 0; iDim < nDims; ++iDim)
301 : {
302 330 : CPLAssert((anInnerBlockIndices[iDim] + 1) * anInnerBlockSize[iDim] <=
303 : anSrcBlockSize[iDim]);
304 660 : srcPtrStack[0] += anInnerBlockIndices[iDim] * anInnerBlockSize[iDim] *
305 330 : srcStride[iDim];
306 : }
307 153 : GByte *pabyDst = abyChunk.data();
308 :
309 153 : const size_t nLastDimSize = anInnerBlockSize.back() * nDTSize;
310 153 : size_t dimIdx = 0;
311 1025 : lbl_next_depth:
312 1025 : if (dimIdx + 1 == nDims)
313 : {
314 848 : memcpy(pabyDst, srcPtrStack[dimIdx], nLastDimSize);
315 848 : pabyDst += nLastDimSize;
316 : }
317 : else
318 : {
319 177 : count[dimIdx] = anInnerBlockSize[dimIdx];
320 : while (true)
321 : {
322 872 : dimIdx++;
323 872 : srcPtrStack[dimIdx] = srcPtrStack[dimIdx - 1];
324 872 : goto lbl_next_depth;
325 872 : lbl_return_to_caller:
326 872 : dimIdx--;
327 872 : if (--count[dimIdx] == 0)
328 177 : break;
329 695 : srcPtrStack[dimIdx] += srcStride[dimIdx];
330 : }
331 : }
332 1025 : if (dimIdx > 0)
333 872 : goto lbl_return_to_caller;
334 153 : }
335 :
336 : /************************************************************************/
337 : /* IsAllNoData() */
338 : /************************************************************************/
339 :
340 : // Check if a buffer consists entirely of the fill value.
341 153 : static bool IsAllNoData(const ZarrByteVectorQuickResize &abyChunk,
342 : const ZarrArrayMetadata &metadata)
343 : {
344 153 : const size_t nDTSize = metadata.oElt.nativeSize;
345 153 : const size_t nBytes = abyChunk.size();
346 :
347 157 : if (metadata.abyNoData.empty() ||
348 157 : metadata.abyNoData == std::vector<GByte>(nDTSize, 0))
349 : {
350 : // Zero fill value: byte-by-byte check (compiler auto-vectorizes)
351 149 : const GByte *p = abyChunk.data();
352 4700 : for (size_t i = 0; i < nBytes; ++i)
353 : {
354 4664 : if (p[i] != 0)
355 113 : return false;
356 : }
357 36 : return true;
358 : }
359 : else
360 : {
361 : // Non-zero fill value: element-wise compare
362 4 : CPLAssert(metadata.abyNoData.size() == nDTSize);
363 4 : const GByte *p = abyChunk.data();
364 4 : const size_t nCount = nBytes / nDTSize;
365 4 : for (size_t i = 0; i < nCount; ++i)
366 : {
367 4 : if (memcmp(p + i * nDTSize, metadata.abyNoData.data(), nDTSize) !=
368 : 0)
369 4 : return false;
370 : }
371 0 : return true;
372 : }
373 : }
374 :
375 : /************************************************************************/
376 : /* ZarrV3CodecShardingIndexed::Encode() */
377 : /************************************************************************/
378 :
379 37 : bool ZarrV3CodecShardingIndexed::Encode(const ZarrByteVectorQuickResize &abySrc,
380 : ZarrByteVectorQuickResize &abyDst) const
381 : {
382 : // Compute total number of inner chunks
383 37 : size_t nInnerChunks = 1;
384 115 : for (size_t i = 0; i < m_anInnerBlockSize.size(); ++i)
385 : {
386 : const size_t nCountInnerChunksThisDim =
387 78 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
388 78 : nInnerChunks *= nCountInnerChunksThisDim;
389 : }
390 :
391 37 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
392 : const size_t nExpectedSrcSize =
393 37 : nDTSize * MultiplyElements(m_oInputArrayMetadata.anBlockSizes);
394 37 : if (abySrc.size() != nExpectedSrcSize)
395 : {
396 0 : CPLError(CE_Failure, CPLE_AppDefined,
397 : "ZarrV3CodecShardingIndexed::Encode(): input buffer size "
398 : "(%" PRIu64 ") != expected (%" PRIu64 ")",
399 0 : static_cast<uint64_t>(abySrc.size()),
400 : static_cast<uint64_t>(nExpectedSrcSize));
401 0 : return false;
402 : }
403 :
404 : // Index: one Location per inner chunk, initially all empty
405 : std::vector<Location> anLocations(nInnerChunks,
406 : {std::numeric_limits<uint64_t>::max(),
407 74 : std::numeric_limits<uint64_t>::max()});
408 :
409 : // Accumulate encoded inner chunk data
410 74 : ZarrByteVectorQuickResize abyData;
411 : // Reserve approximate capacity: decoded chunk size is close to the upper
412 : // bound per chunk (compression may slightly increase size in pathological
413 : // cases), but avoids most reallocations.
414 : const size_t nDecodedChunkSize =
415 37 : nDTSize * MultiplyElements(m_anInnerBlockSize);
416 : // resize+clear = reserve (ZarrByteVectorQuickResize has no reserve())
417 : try
418 : {
419 37 : abyData.resize(nInnerChunks * nDecodedChunkSize);
420 : }
421 0 : catch (const std::exception &)
422 : {
423 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
424 : "Cannot allocate memory for accumulated shard data");
425 0 : return false;
426 : }
427 37 : abyData.clear();
428 74 : ZarrByteVectorQuickResize abyChunk;
429 :
430 37 : uint64_t nCurrentOffset = 0;
431 74 : std::vector<size_t> anChunkIndices(m_anInnerBlockSize.size(), 0);
432 :
433 190 : for (size_t iChunk = 0; iChunk < nInnerChunks; ++iChunk)
434 : {
435 : // Update chunk coordinates (same iteration order as Decode)
436 153 : if (iChunk > 0)
437 : {
438 116 : size_t iDim = m_anInnerBlockSize.size() - 1;
439 164 : while (++anChunkIndices[iDim] ==
440 328 : m_oInputArrayMetadata.anBlockSizes[iDim] /
441 164 : m_anInnerBlockSize[iDim])
442 : {
443 48 : anChunkIndices[iDim] = 0;
444 48 : --iDim;
445 : }
446 : }
447 :
448 : // Extract this inner chunk from the shard buffer
449 : try
450 : {
451 153 : abyChunk.resize(nDecodedChunkSize);
452 : }
453 0 : catch (const std::exception &)
454 : {
455 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
456 : "Cannot allocate memory for inner chunk");
457 0 : return false;
458 : }
459 153 : ExtractSubArrayFromLargerOne(abySrc, m_oInputArrayMetadata.anBlockSizes,
460 153 : m_anInnerBlockSize, anChunkIndices,
461 : abyChunk, nDTSize);
462 :
463 : // Skip empty (all-nodata) chunks
464 153 : if (IsAllNoData(abyChunk, m_oInputArrayMetadata))
465 36 : continue;
466 :
467 : // Encode inner chunk through codec chain (bytes + compressor)
468 117 : if (!m_poCodecSequence->Encode(abyChunk))
469 : {
470 0 : CPLError(CE_Failure, CPLE_AppDefined,
471 : "ZarrV3CodecShardingIndexed::Encode(): cannot encode "
472 : "inner chunk %" PRIu64,
473 : static_cast<uint64_t>(iChunk));
474 0 : return false;
475 : }
476 :
477 117 : anLocations[iChunk] = {nCurrentOffset, abyChunk.size()};
478 117 : nCurrentOffset += abyChunk.size();
479 117 : abyData.insert(abyData.end(), abyChunk.begin(), abyChunk.end());
480 : }
481 :
482 : // All inner chunks are nodata: signal empty shard via zero-length output
483 37 : if (abyData.empty())
484 : {
485 0 : abyDst.clear();
486 0 : return true;
487 : }
488 :
489 : // Build index buffer
490 74 : ZarrByteVectorQuickResize abyIndex;
491 37 : const size_t nIndexRawSize = nInnerChunks * sizeof(Location);
492 :
493 : // If index is at start, data offsets must account for the index prefix.
494 : // Encode the index once to determine its encoded size (includes CRC32C
495 : // overhead), then re-encode below with the adjusted offsets.
496 : // Note: currently unreachable in write mode (creation hardcodes "end"),
497 : // but kept for completeness with the Decode() start-index path.
498 37 : if (!m_bIndexLocationAtEnd)
499 : {
500 : // Encode a temporary copy of the index to determine its encoded size
501 0 : abyIndex.resize(nIndexRawSize);
502 0 : memcpy(abyIndex.data(), anLocations.data(), nIndexRawSize);
503 0 : if (!m_poIndexCodecSequence->Encode(abyIndex))
504 : {
505 0 : CPLError(
506 : CE_Failure, CPLE_AppDefined,
507 : "ZarrV3CodecShardingIndexed::Encode(): cannot encode index");
508 0 : return false;
509 : }
510 0 : const uint64_t nIndexEncodedSize = abyIndex.size();
511 : // Adjust offsets
512 0 : for (auto &loc : anLocations)
513 : {
514 0 : if (loc.nOffset != std::numeric_limits<uint64_t>::max())
515 0 : loc.nOffset += nIndexEncodedSize;
516 : }
517 : }
518 :
519 : // (Re-)encode index with final offsets
520 37 : abyIndex.resize(nIndexRawSize);
521 37 : memcpy(abyIndex.data(), anLocations.data(), nIndexRawSize);
522 37 : if (!m_poIndexCodecSequence->Encode(abyIndex))
523 : {
524 0 : CPLError(CE_Failure, CPLE_AppDefined,
525 : "ZarrV3CodecShardingIndexed::Encode(): cannot encode index");
526 0 : return false;
527 : }
528 :
529 : // Assemble output: data + index (or index + data)
530 37 : if (m_bIndexLocationAtEnd)
531 : {
532 37 : abyDst = std::move(abyData);
533 37 : abyDst.insert(abyDst.end(), abyIndex.begin(), abyIndex.end());
534 : }
535 : else
536 : {
537 0 : abyDst = std::move(abyIndex);
538 0 : abyDst.insert(abyDst.end(), abyData.begin(), abyData.end());
539 : }
540 :
541 37 : return true;
542 : }
543 :
544 : /************************************************************************/
545 : /* CopySubArrayIntoLargerOne() */
546 : /************************************************************************/
547 :
548 : static void
549 148 : CopySubArrayIntoLargerOne(const ZarrByteVectorQuickResize &abyChunk,
550 : const std::vector<size_t> &anInnerBlockSize,
551 : const std::vector<size_t> &anInnerBlockIndices,
552 : ZarrByteVectorQuickResize &abyDst,
553 : const std::vector<size_t> &anDstBlockSize,
554 : const size_t nDTSize)
555 : {
556 148 : const auto nDims = anInnerBlockSize.size();
557 148 : CPLAssert(nDims > 0);
558 148 : CPLAssert(nDims == anInnerBlockIndices.size());
559 148 : CPLAssert(nDims == anDstBlockSize.size());
560 : // +1 just to make some gcc versions not emit -Wnull-dereference false positives
561 296 : std::vector<GByte *> dstPtrStack(nDims + 1);
562 296 : std::vector<size_t> count(nDims + 1);
563 296 : std::vector<size_t> dstStride(nDims + 1);
564 :
565 148 : size_t nDstStride = nDTSize;
566 444 : for (size_t iDim = nDims; iDim > 0;)
567 : {
568 296 : --iDim;
569 296 : dstStride[iDim] = nDstStride;
570 296 : nDstStride *= anDstBlockSize[iDim];
571 : }
572 :
573 148 : dstPtrStack[0] = abyDst.data();
574 444 : for (size_t iDim = 0; iDim < nDims; ++iDim)
575 : {
576 296 : CPLAssert((anInnerBlockIndices[iDim] + 1) * anInnerBlockSize[iDim] <=
577 : anDstBlockSize[iDim]);
578 592 : dstPtrStack[0] += anInnerBlockIndices[iDim] * anInnerBlockSize[iDim] *
579 296 : dstStride[iDim];
580 : }
581 148 : const GByte *pabySrc = abyChunk.data();
582 :
583 148 : const size_t nLastDimSize = anInnerBlockSize.back() * nDTSize;
584 148 : size_t dimIdx = 0;
585 312 : lbl_next_depth:
586 312 : if (dimIdx + 1 == nDims)
587 : {
588 164 : memcpy(dstPtrStack[dimIdx], pabySrc, nLastDimSize);
589 164 : pabySrc += nLastDimSize;
590 : }
591 : else
592 : {
593 148 : count[dimIdx] = anInnerBlockSize[dimIdx];
594 : while (true)
595 : {
596 164 : dimIdx++;
597 164 : dstPtrStack[dimIdx] = dstPtrStack[dimIdx - 1];
598 164 : goto lbl_next_depth;
599 164 : lbl_return_to_caller:
600 164 : dimIdx--;
601 164 : if (--count[dimIdx] == 0)
602 148 : break;
603 16 : dstPtrStack[dimIdx] += dstStride[dimIdx];
604 : }
605 : }
606 312 : if (dimIdx > 0)
607 164 : goto lbl_return_to_caller;
608 148 : }
609 :
610 : /************************************************************************/
611 : /* FillWithNoData() */
612 : /************************************************************************/
613 :
614 152 : static void FillWithNoData(ZarrByteVectorQuickResize &abyDst,
615 : const size_t nCount,
616 : const ZarrArrayMetadata &metadata)
617 : {
618 152 : const size_t nDTSize = metadata.oElt.nativeSize;
619 304 : if (metadata.abyNoData.empty() ||
620 304 : metadata.abyNoData == std::vector<GByte>(nDTSize, 0))
621 : {
622 150 : memset(abyDst.data(), 0, nDTSize * nCount);
623 : }
624 : else
625 : {
626 2 : CPLAssert(metadata.abyNoData.size() == nDTSize);
627 152 : for (size_t i = 0; i < nCount; ++i)
628 : {
629 150 : memcpy(abyDst.data() + i * nDTSize, metadata.abyNoData.data(),
630 : nDTSize);
631 : }
632 : }
633 152 : }
634 :
635 : /************************************************************************/
636 : /* ZarrV3CodecShardingIndexed::Decode() */
637 : /************************************************************************/
638 :
639 148 : bool ZarrV3CodecShardingIndexed::Decode(const ZarrByteVectorQuickResize &abySrc,
640 : ZarrByteVectorQuickResize &abyDst) const
641 : {
642 148 : size_t nInnerChunks = 1;
643 443 : for (size_t i = 0; i < m_anInnerBlockSize.size(); ++i)
644 : {
645 : const size_t nCountInnerChunksThisdim =
646 295 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
647 295 : nInnerChunks *= nCountInnerChunksThisdim;
648 : }
649 :
650 296 : const size_t nIndexEncodedSize = nInnerChunks * sizeof(Location) +
651 148 : (m_bIndexHasCRC32 ? sizeof(uint32_t) : 0);
652 148 : if (abySrc.size() < nIndexEncodedSize)
653 : {
654 1 : CPLError(CE_Failure, CPLE_NotSupported,
655 : "ZarrV3CodecShardingIndexed::Decode(): input buffer is too "
656 : "small to hold the shard index");
657 1 : return false;
658 : }
659 294 : ZarrByteVectorQuickResize abyIndex;
660 147 : if (m_bIndexLocationAtEnd)
661 : {
662 147 : abyIndex.insert(abyIndex.end(),
663 147 : abySrc.begin() + (abySrc.size() - nIndexEncodedSize),
664 441 : abySrc.end());
665 : }
666 : else
667 : {
668 0 : abyIndex.insert(abyIndex.end(), abySrc.begin(),
669 0 : abySrc.end() + nIndexEncodedSize);
670 : }
671 :
672 147 : if (!m_poIndexCodecSequence->Decode(abyIndex))
673 : {
674 0 : CPLError(
675 : CE_Failure, CPLE_NotSupported,
676 : "ZarrV3CodecShardingIndexed::Decode(): cannot decode shard index");
677 0 : return false;
678 : }
679 :
680 147 : if (abyIndex.size() != nInnerChunks * sizeof(Location))
681 : {
682 0 : CPLError(CE_Failure, CPLE_NotSupported,
683 : "ZarrV3CodecShardingIndexed::Decode(): shard index has not "
684 : "expected size");
685 0 : return false;
686 : }
687 :
688 : const Location *panLocations =
689 147 : reinterpret_cast<const Location *>(abyIndex.data());
690 :
691 294 : ZarrByteVectorQuickResize abyChunk;
692 147 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
693 : const size_t nExpectedDecodedChunkSize =
694 147 : nDTSize * MultiplyElements(m_anInnerBlockSize);
695 : const size_t nDstCount =
696 147 : MultiplyElements(m_oInputArrayMetadata.anBlockSizes);
697 :
698 : try
699 : {
700 147 : abyDst.resize(nDstCount * nDTSize);
701 : }
702 0 : catch (const std::exception &)
703 : {
704 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
705 : "Cannot allocate memory for decoded shard");
706 0 : return false;
707 : }
708 :
709 147 : FillWithNoData(abyDst, nDstCount, m_oInputArrayMetadata);
710 :
711 294 : std::vector<size_t> anChunkIndices(m_anInnerBlockSize.size(), 0);
712 385 : for (size_t iChunk = 0; iChunk < nInnerChunks; ++iChunk)
713 : {
714 368 : if (iChunk > 0)
715 : {
716 : // Update chunk coordinates
717 221 : size_t iDim = m_anInnerBlockSize.size() - 1;
718 303 : while (++anChunkIndices[iDim] ==
719 606 : m_oInputArrayMetadata.anBlockSizes[iDim] /
720 303 : m_anInnerBlockSize[iDim])
721 : {
722 82 : anChunkIndices[iDim] = 0;
723 82 : --iDim;
724 : }
725 : }
726 :
727 : #ifdef DEBUG_VERBOSE
728 : CPLDebug("ZARR", "Chunk %" PRIu64 ": offset %" PRIu64 ", size %" PRIu64,
729 : static_cast<uint64_t>(iChunk), panLocations[iChunk].nOffset,
730 : panLocations[iChunk].nSize);
731 : #endif
732 :
733 826 : if (panLocations[iChunk].nOffset ==
734 478 : std::numeric_limits<uint64_t>::max() &&
735 110 : panLocations[iChunk].nSize == std::numeric_limits<uint64_t>::max())
736 : {
737 : // Empty chunk
738 90 : continue;
739 : }
740 :
741 496 : if (panLocations[iChunk].nOffset >= abySrc.size() ||
742 218 : panLocations[iChunk].nSize >
743 218 : abySrc.size() - panLocations[iChunk].nOffset)
744 : {
745 80 : CPLError(CE_Failure, CPLE_NotSupported,
746 : "ZarrV3CodecShardingIndexed::Decode(): invalid chunk "
747 : "location for chunk %" PRIu64 ": offset=%" PRIu64
748 : ", size=%" PRIu64,
749 : static_cast<uint64_t>(iChunk),
750 80 : panLocations[iChunk].nOffset, panLocations[iChunk].nSize);
751 80 : return false;
752 : }
753 :
754 198 : abyChunk.clear();
755 : abyChunk.insert(
756 198 : abyChunk.end(),
757 0 : abySrc.begin() + static_cast<size_t>(panLocations[iChunk].nOffset),
758 0 : abySrc.begin() + static_cast<size_t>(panLocations[iChunk].nOffset +
759 396 : panLocations[iChunk].nSize));
760 198 : if (!m_poCodecSequence->Decode(abyChunk))
761 : {
762 50 : CPLError(CE_Failure, CPLE_NotSupported,
763 : "ZarrV3CodecShardingIndexed::Decode(): cannot decode "
764 : "chunk %" PRIu64,
765 : static_cast<uint64_t>(iChunk));
766 50 : return false;
767 : }
768 :
769 148 : if (abyChunk.size() != nExpectedDecodedChunkSize)
770 : {
771 0 : CPLError(CE_Failure, CPLE_NotSupported,
772 : "ZarrV3CodecShardingIndexed::Decode(): decoded size for "
773 : "chunk %" PRIu64 " is %" PRIu64 " whereas %" PRIu64
774 : " is expected",
775 : static_cast<uint64_t>(iChunk),
776 0 : static_cast<uint64_t>(abyChunk.size()),
777 : static_cast<uint64_t>(nExpectedDecodedChunkSize));
778 0 : return false;
779 : }
780 :
781 148 : CopySubArrayIntoLargerOne(abyChunk, m_anInnerBlockSize, anChunkIndices,
782 148 : abyDst, m_oInputArrayMetadata.anBlockSizes,
783 : nDTSize);
784 : }
785 :
786 17 : return true;
787 : }
788 :
789 : /************************************************************************/
790 : /* ZarrV3CodecShardingIndexed::DecodePartial() */
791 : /************************************************************************/
792 :
793 1027 : bool ZarrV3CodecShardingIndexed::DecodePartial(
794 : VSIVirtualHandle *poFile, const ZarrByteVectorQuickResize & /* abySrc */,
795 : ZarrByteVectorQuickResize &abyDst, std::vector<size_t> &anStartIdx,
796 : std::vector<size_t> &anCount)
797 : {
798 1027 : CPLAssert(anStartIdx.size() == m_oInputArrayMetadata.anBlockSizes.size());
799 1027 : CPLAssert(anStartIdx.size() == anCount.size());
800 :
801 1027 : size_t nInnerChunkCount = 1;
802 1027 : size_t nInnerChunkIdx = 0;
803 3081 : for (size_t i = 0; i < anStartIdx.size(); ++i)
804 : {
805 2054 : CPLAssert(anStartIdx[i] + anCount[i] <=
806 : m_oInputArrayMetadata.anBlockSizes[i]);
807 4108 : if ((anStartIdx[i] % m_anInnerBlockSize[i]) != 0 ||
808 2054 : anCount[i] != m_anInnerBlockSize[i])
809 : {
810 : // Should not happen with the current call sites.
811 0 : CPLError(CE_Failure, CPLE_AppDefined,
812 : "ZarrV3CodecShardingIndexed::DecodePartial() only "
813 : "supported on an exact inner chunk");
814 0 : return false;
815 : }
816 :
817 : const size_t nCountInnerChunksThisDim =
818 2054 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
819 4108 : nInnerChunkIdx = nInnerChunkIdx * nCountInnerChunksThisDim +
820 2054 : anStartIdx[i] / m_anInnerBlockSize[i];
821 2054 : nInnerChunkCount *= nCountInnerChunksThisDim;
822 : }
823 :
824 1027 : abyDst.clear();
825 :
826 1027 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
827 1027 : const auto nExpectedDecodedChunkSize = nDTSize * MultiplyElements(anCount);
828 :
829 1027 : vsi_l_offset nLocationOffset =
830 : static_cast<vsi_l_offset>(nInnerChunkIdx) * sizeof(Location);
831 1027 : if (m_bIndexLocationAtEnd)
832 : {
833 1027 : poFile->Seek(0, SEEK_END);
834 1027 : const auto nFileSize = poFile->Tell();
835 1027 : vsi_l_offset nIndexSize =
836 : static_cast<vsi_l_offset>(nInnerChunkCount) * sizeof(Location);
837 1027 : if (m_bIndexHasCRC32)
838 1024 : nIndexSize += sizeof(uint32_t);
839 1027 : if (nFileSize < nIndexSize)
840 : {
841 0 : CPLError(CE_Failure, CPLE_AppDefined,
842 : "ZarrV3CodecShardingIndexed::DecodePartial(): shard file "
843 : "too small");
844 0 : return false;
845 : }
846 1027 : nLocationOffset += nFileSize - nIndexSize;
847 : }
848 :
849 : Location loc;
850 2054 : if (poFile->Seek(nLocationOffset, SEEK_SET) != 0 ||
851 1027 : poFile->Read(&loc, 1, sizeof(loc)) != sizeof(loc))
852 : {
853 :
854 0 : CPLError(CE_Failure, CPLE_AppDefined,
855 : "ZarrV3CodecShardingIndexed::DecodePartial(): "
856 : "cannot read index for chunk %" PRIu64,
857 : static_cast<uint64_t>(nInnerChunkIdx));
858 0 : return false;
859 : }
860 :
861 2051 : if (!m_poIndexCodecSequence->GetCodecs().empty() &&
862 1024 : m_poIndexCodecSequence->GetCodecs().front()->GetName() ==
863 2051 : ZarrV3CodecBytes::NAME &&
864 0 : !m_poIndexCodecSequence->GetCodecs().front()->IsNoOp())
865 : {
866 0 : CPL_SWAP64PTR(&(loc.nOffset));
867 0 : CPL_SWAP64PTR(&(loc.nSize));
868 : }
869 :
870 1032 : if (loc.nOffset == std::numeric_limits<uint64_t>::max() &&
871 5 : loc.nSize == std::numeric_limits<uint64_t>::max())
872 : {
873 : // Empty chunk
874 : try
875 : {
876 5 : abyDst.resize(nExpectedDecodedChunkSize);
877 : }
878 0 : catch (const std::exception &)
879 : {
880 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
881 : "Cannot allocate memory for decoded shard");
882 0 : return false;
883 : }
884 5 : FillWithNoData(abyDst, MultiplyElements(anCount),
885 5 : m_oInputArrayMetadata);
886 5 : return true;
887 : }
888 :
889 1022 : constexpr size_t THRESHOLD = 10 * 1024 * 1024;
890 1022 : if (loc.nSize > THRESHOLD)
891 : {
892 : // When the chunk size is above a certain threshold, check it against
893 : // the actual file size to avoid excessive memory allocation attempts.
894 :
895 0 : poFile->Seek(0, SEEK_END);
896 0 : const auto nFileSize = poFile->Tell();
897 :
898 0 : if (loc.nOffset >= nFileSize || loc.nSize > nFileSize - loc.nOffset)
899 : {
900 0 : CPLError(
901 : CE_Failure, CPLE_NotSupported,
902 : "ZarrV3CodecShardingIndexed::DecodePartial(): invalid chunk "
903 : "location for chunk %" PRIu64 ": offset=%" PRIu64
904 : ", size=%" PRIu64,
905 : static_cast<uint64_t>(nInnerChunkIdx), loc.nOffset, loc.nSize);
906 0 : return false;
907 : }
908 : }
909 :
910 : if constexpr (sizeof(size_t) < sizeof(uint64_t))
911 : {
912 : #ifndef __COVERITY__
913 : // coverity[result_independent_of_operands]
914 : if (loc.nSize > std::numeric_limits<size_t>::max())
915 : {
916 : CPLError(
917 : CE_Failure, CPLE_NotSupported,
918 : "ZarrV3CodecShardingIndexed::DecodePartial(): too large chunk "
919 : "size for chunk %" PRIu64 " for this platform: size=%" PRIu64,
920 : static_cast<uint64_t>(nInnerChunkIdx), loc.nSize);
921 : return false;
922 : }
923 : #endif
924 : }
925 :
926 : try
927 : {
928 1022 : abyDst.resize(static_cast<size_t>(loc.nSize));
929 : }
930 0 : catch (const std::exception &)
931 : {
932 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
933 : "Cannot allocate memory for decoded shard");
934 0 : return false;
935 : }
936 :
937 2044 : if (poFile->Seek(loc.nOffset, SEEK_SET) != 0 ||
938 1022 : poFile->Read(abyDst.data(), 1, abyDst.size()) != abyDst.size())
939 : {
940 0 : CPLError(CE_Failure, CPLE_NotSupported,
941 : "ZarrV3CodecShardingIndexed::DecodePartial(): cannot read "
942 : "data for chunk %" PRIu64 ": offset=%" PRIu64
943 : ", size=%" PRIu64,
944 : static_cast<uint64_t>(nInnerChunkIdx), loc.nOffset, loc.nSize);
945 0 : return false;
946 : }
947 :
948 1022 : if (!m_poCodecSequence->Decode(abyDst))
949 : {
950 356 : CPLError(CE_Failure, CPLE_NotSupported,
951 : "ZarrV3CodecShardingIndexed::DecodePartial(): cannot decode "
952 : "chunk %" PRIu64,
953 : static_cast<uint64_t>(nInnerChunkIdx));
954 356 : return false;
955 : }
956 :
957 666 : if (abyDst.size() != nExpectedDecodedChunkSize)
958 : {
959 0 : CPLError(
960 : CE_Failure, CPLE_NotSupported,
961 : "ZarrV3CodecShardingIndexed::DecodePartial(): decoded size for "
962 : "chunk %" PRIu64 " is %" PRIu64 " whereas %" PRIu64 " is expected",
963 : static_cast<uint64_t>(nInnerChunkIdx),
964 0 : static_cast<uint64_t>(abyDst.size()),
965 : static_cast<uint64_t>(nExpectedDecodedChunkSize));
966 0 : return false;
967 : }
968 :
969 666 : return true;
970 : }
971 :
972 : /************************************************************************/
973 : /* ZarrV3CodecShardingIndexed::BatchDecodePartial() */
974 : /************************************************************************/
975 :
976 4023 : bool ZarrV3CodecShardingIndexed::BatchDecodePartial(
977 : VSIVirtualHandle *poFile, const char *pszFilename,
978 : const std::vector<std::pair<std::vector<size_t>, std::vector<size_t>>>
979 : &anRequests,
980 : std::vector<ZarrByteVectorQuickResize> &aResults)
981 : {
982 4023 : if (anRequests.empty())
983 0 : return true;
984 :
985 4023 : const auto nDTSize = m_oInputArrayMetadata.oElt.nativeSize;
986 :
987 : // --- Compute inner chunk count and per-request inner chunk indices ---
988 4023 : size_t nInnerChunkCount = 1;
989 12076 : for (size_t i = 0; i < m_oInputArrayMetadata.anBlockSizes.size(); ++i)
990 : {
991 8053 : nInnerChunkCount *=
992 8053 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
993 : }
994 :
995 : // Determine whether index codec requires byte-swapping
996 : const bool bSwapIndex =
997 8043 : !m_poIndexCodecSequence->GetCodecs().empty() &&
998 4020 : m_poIndexCodecSequence->GetCodecs().front()->GetName() ==
999 8043 : ZarrV3CodecBytes::NAME &&
1000 1 : !m_poIndexCodecSequence->GetCodecs().front()->IsNoOp();
1001 :
1002 : // Compute index base offset. For index-at-end, we need the file size.
1003 4023 : vsi_l_offset nIndexBaseOffset = 0;
1004 4023 : if (m_bIndexLocationAtEnd)
1005 : {
1006 4023 : poFile->Seek(0, SEEK_END);
1007 4023 : const auto nFileSize = poFile->Tell();
1008 4023 : vsi_l_offset nIndexSize =
1009 : static_cast<vsi_l_offset>(nInnerChunkCount) * sizeof(Location);
1010 4023 : if (m_bIndexHasCRC32)
1011 4020 : nIndexSize += sizeof(uint32_t);
1012 4023 : if (nFileSize < nIndexSize)
1013 : {
1014 0 : CPLError(CE_Failure, CPLE_AppDefined,
1015 : "BatchDecodePartial: shard file too small");
1016 0 : return false;
1017 : }
1018 4023 : nIndexBaseOffset = nFileSize - nIndexSize;
1019 : }
1020 :
1021 : // Build per-request inner chunk indices
1022 8046 : std::vector<size_t> anInnerChunkIndices(anRequests.size());
1023 16115 : for (size_t iReq = 0; iReq < anRequests.size(); ++iReq)
1024 : {
1025 12092 : const auto &anStartIdx = anRequests[iReq].first;
1026 12092 : CPLAssert(anStartIdx.size() ==
1027 : m_oInputArrayMetadata.anBlockSizes.size());
1028 :
1029 12092 : size_t nInnerChunkIdx = 0;
1030 36322 : for (size_t i = 0; i < anStartIdx.size(); ++i)
1031 : {
1032 : const size_t nCountInnerChunksThisDim =
1033 24230 : m_oInputArrayMetadata.anBlockSizes[i] / m_anInnerBlockSize[i];
1034 48460 : nInnerChunkIdx = nInnerChunkIdx * nCountInnerChunksThisDim +
1035 24230 : anStartIdx[i] / m_anInnerBlockSize[i];
1036 : }
1037 12092 : anInnerChunkIndices[iReq] = nInnerChunkIdx;
1038 : }
1039 :
1040 : // --- Pass 1: read index entries, using per-shard cache when possible ---
1041 8046 : std::vector<Location> aLocations(anRequests.size());
1042 :
1043 4023 : const size_t nIndexBytes = nInnerChunkCount * sizeof(Location);
1044 : const size_t nMaxIndexBytes =
1045 4023 : static_cast<size_t>(CPLAtoGIntBig(CPLGetConfigOption(
1046 4023 : "GDAL_ZARR_SHARD_INDEX_CACHE_MAX_BYTES", "1048576")));
1047 4023 : if (pszFilename != nullptr && nIndexBytes <= nMaxIndexBytes)
1048 : {
1049 : // Try to serve from the in-process index cache.
1050 : // g_oShardIndexCache serialises its own access via std::mutex.
1051 4023 : std::vector<GByte> abyIndexBuf;
1052 4023 : bool bCacheHit = g_oShardIndexCache.tryGet(pszFilename, abyIndexBuf);
1053 4023 : if (bCacheHit && abyIndexBuf.size() != nIndexBytes)
1054 0 : bCacheHit = false; // stale entry from different layout
1055 :
1056 4023 : if (!bCacheHit)
1057 : {
1058 : // Cache miss: read the full index in one contiguous I/O call.
1059 78 : abyIndexBuf.resize(nIndexBytes);
1060 78 : poFile->Seek(nIndexBaseOffset, SEEK_SET);
1061 78 : if (poFile->Read(abyIndexBuf.data(), nIndexBytes, 1) != 1)
1062 : {
1063 0 : CPLError(CE_Failure, CPLE_AppDefined,
1064 : "BatchDecodePartial: failed to read shard index");
1065 0 : return false;
1066 : }
1067 78 : if (bSwapIndex)
1068 : {
1069 5 : for (size_t j = 0; j < nInnerChunkCount; ++j)
1070 : {
1071 4 : GByte *p = abyIndexBuf.data() + j * sizeof(Location);
1072 4 : CPL_SWAP64PTR(p);
1073 4 : CPL_SWAP64PTR(p + sizeof(uint64_t));
1074 : }
1075 : }
1076 78 : g_oShardIndexCache.insert(pszFilename, abyIndexBuf);
1077 : }
1078 :
1079 16115 : for (size_t i = 0; i < anRequests.size(); ++i)
1080 12092 : memcpy(&aLocations[i],
1081 12092 : abyIndexBuf.data() +
1082 12092 : anInnerChunkIndices[i] * sizeof(Location),
1083 4023 : sizeof(Location));
1084 : }
1085 : else
1086 : {
1087 : // Shard index too large for cache or no filename: fall back to
1088 : // per-entry ReadMultiRange.
1089 0 : std::vector<vsi_l_offset> anIdxOffsets(anRequests.size());
1090 0 : std::vector<size_t> anIdxSizes(anRequests.size(), sizeof(Location));
1091 0 : std::vector<void *> ppIdxData(anRequests.size());
1092 :
1093 0 : for (size_t i = 0; i < anRequests.size(); ++i)
1094 : {
1095 0 : anIdxOffsets[i] = nIndexBaseOffset + static_cast<vsi_l_offset>(
1096 0 : anInnerChunkIndices[i]) *
1097 : sizeof(Location);
1098 : #ifndef __COVERITY__
1099 0 : ppIdxData[i] = &aLocations[i];
1100 : #endif
1101 : }
1102 :
1103 0 : if (poFile->ReadMultiRange(static_cast<int>(anRequests.size()),
1104 0 : ppIdxData.data(), anIdxOffsets.data(),
1105 0 : anIdxSizes.data()) != 0)
1106 : {
1107 0 : CPLError(CE_Failure, CPLE_AppDefined,
1108 : "BatchDecodePartial: ReadMultiRange() failed for index");
1109 0 : return false;
1110 : }
1111 :
1112 0 : if (bSwapIndex)
1113 : {
1114 0 : for (auto &loc : aLocations)
1115 : {
1116 0 : CPL_SWAP64PTR(&(loc.nOffset));
1117 0 : CPL_SWAP64PTR(&(loc.nSize));
1118 : }
1119 : }
1120 : }
1121 :
1122 : // --- Classify requests: empty chunks vs data chunks ---
1123 4023 : aResults.resize(anRequests.size());
1124 :
1125 : struct DataRange
1126 : {
1127 : size_t nReqIdx;
1128 : };
1129 :
1130 8046 : std::vector<DataRange> aDataRanges;
1131 8046 : std::vector<vsi_l_offset> anDataOffsets;
1132 8046 : std::vector<size_t> anDataSizes;
1133 :
1134 16115 : for (size_t iReq = 0; iReq < anRequests.size(); ++iReq)
1135 : {
1136 12092 : const auto &anCount = anRequests[iReq].second;
1137 : const auto nExpectedDecodedChunkSize =
1138 12092 : nDTSize * MultiplyElements(anCount);
1139 12092 : const Location &loc = aLocations[iReq];
1140 :
1141 12092 : if (loc.nOffset == std::numeric_limits<uint64_t>::max() &&
1142 0 : loc.nSize == std::numeric_limits<uint64_t>::max())
1143 : {
1144 : // Empty chunk - fill with nodata
1145 : try
1146 : {
1147 0 : aResults[iReq].resize(nExpectedDecodedChunkSize);
1148 : }
1149 0 : catch (const std::exception &)
1150 : {
1151 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
1152 : "Cannot allocate memory for decoded chunk");
1153 0 : return false;
1154 : }
1155 0 : FillWithNoData(aResults[iReq], MultiplyElements(anCount),
1156 0 : m_oInputArrayMetadata);
1157 0 : continue;
1158 : }
1159 :
1160 : if constexpr (sizeof(size_t) < sizeof(uint64_t))
1161 : {
1162 : #ifndef __COVERITY__
1163 : // coverity[result_independent_of_operands]
1164 : if (loc.nSize > std::numeric_limits<size_t>::max())
1165 : {
1166 : CPLError(CE_Failure, CPLE_NotSupported,
1167 : "BatchDecodePartial: too large chunk size");
1168 : return false;
1169 : }
1170 : #endif
1171 : }
1172 :
1173 12092 : aDataRanges.push_back({iReq});
1174 12092 : anDataOffsets.push_back(loc.nOffset);
1175 12092 : anDataSizes.push_back(static_cast<size_t>(loc.nSize));
1176 : }
1177 :
1178 4023 : if (aDataRanges.empty())
1179 0 : return true;
1180 :
1181 : // Validate against file size (same threshold as DecodePartial)
1182 4023 : constexpr size_t THRESHOLD = 10 * 1024 * 1024;
1183 : {
1184 4023 : uint64_t nAccSize = 0;
1185 16115 : for (const auto &nSize : anDataSizes)
1186 : {
1187 12092 : if (nAccSize > std::numeric_limits<uint64_t>::max() - nSize)
1188 : {
1189 0 : nAccSize = std::numeric_limits<uint64_t>::max();
1190 0 : break;
1191 : }
1192 12092 : nAccSize += nSize;
1193 : }
1194 4023 : if (nAccSize > THRESHOLD)
1195 : {
1196 0 : poFile->Seek(0, SEEK_END);
1197 0 : const auto nFileSize = poFile->Tell();
1198 0 : for (size_t i = 0; i < aDataRanges.size(); ++i)
1199 : {
1200 0 : if (anDataOffsets[i] >= nFileSize ||
1201 0 : anDataSizes[i] > nFileSize - anDataOffsets[i])
1202 : {
1203 0 : CPLError(CE_Failure, CPLE_NotSupported,
1204 : "BatchDecodePartial: invalid chunk location: "
1205 : "offset=%" PRIu64 ", size=%" PRIu64,
1206 0 : static_cast<uint64_t>(anDataOffsets[i]),
1207 0 : static_cast<uint64_t>(anDataSizes[i]));
1208 0 : return false;
1209 : }
1210 : }
1211 : }
1212 : }
1213 :
1214 : // --- Pass 2: ReadMultiRange for data chunks ---
1215 8046 : std::vector<ZarrByteVectorQuickResize> aCompressed(aDataRanges.size());
1216 8046 : std::vector<void *> ppData(aDataRanges.size());
1217 :
1218 16115 : for (size_t i = 0; i < aDataRanges.size(); ++i)
1219 : {
1220 : try
1221 : {
1222 12092 : aCompressed[i].resize(anDataSizes[i]);
1223 : }
1224 0 : catch (const std::exception &)
1225 : {
1226 0 : CPLError(CE_Failure, CPLE_OutOfMemory,
1227 : "Cannot allocate memory for compressed chunk");
1228 0 : return false;
1229 : }
1230 12092 : ppData[i] = aCompressed[i].data();
1231 : }
1232 :
1233 4023 : CPLDebugOnly("ZARR",
1234 : "BatchDecodePartial: ReadMultiRange() with %d data ranges",
1235 : static_cast<int>(aDataRanges.size()));
1236 :
1237 4023 : if (poFile->ReadMultiRange(static_cast<int>(aDataRanges.size()),
1238 4023 : ppData.data(), anDataOffsets.data(),
1239 8046 : anDataSizes.data()) != 0)
1240 : {
1241 0 : CPLError(CE_Failure, CPLE_AppDefined,
1242 : "BatchDecodePartial: ReadMultiRange() failed for data");
1243 0 : return false;
1244 : }
1245 :
1246 : // --- Decode compressed chunks (parallel when GDAL_NUM_THREADS > 1) ---
1247 4023 : const int nMaxThreads = GDALGetNumThreads();
1248 4023 : const int nChunks = static_cast<int>(aDataRanges.size());
1249 4023 : const int nThreads = std::min(std::max(1, nMaxThreads), nChunks);
1250 :
1251 : // Try parallel decode when multiple threads are available
1252 0 : CPLWorkerThreadPool *wtp = (nThreads > 1 && nChunks > 1)
1253 4023 : ? GDALGetGlobalThreadPool(nMaxThreads)
1254 4023 : : nullptr;
1255 :
1256 4023 : if (!wtp)
1257 : {
1258 : // Sequential fallback
1259 15182 : for (size_t i = 0; i < aDataRanges.size(); ++i)
1260 : {
1261 11515 : const size_t iReq = aDataRanges[i].nReqIdx;
1262 11515 : const auto &anCount = anRequests[iReq].second;
1263 : const auto nExpectedDecodedChunkSize =
1264 11515 : nDTSize * MultiplyElements(anCount);
1265 :
1266 11515 : if (!m_poCodecSequence->Decode(aCompressed[i]))
1267 : {
1268 356 : CPLError(CE_Failure, CPLE_NotSupported,
1269 : "BatchDecodePartial: cannot decode chunk %" PRIu64,
1270 356 : static_cast<uint64_t>(anInnerChunkIndices[iReq]));
1271 356 : return false;
1272 : }
1273 :
1274 11159 : if (aCompressed[i].size() != nExpectedDecodedChunkSize)
1275 : {
1276 0 : CPLError(CE_Failure, CPLE_NotSupported,
1277 : "BatchDecodePartial: decoded size %" PRIu64
1278 : " != expected %" PRIu64,
1279 0 : static_cast<uint64_t>(aCompressed[i].size()),
1280 : static_cast<uint64_t>(nExpectedDecodedChunkSize));
1281 0 : return false;
1282 : }
1283 :
1284 11159 : aResults[iReq] = std::move(aCompressed[i]);
1285 : }
1286 3667 : return true;
1287 : }
1288 :
1289 0 : CPLDebugOnly("ZARR",
1290 : "BatchDecodePartial: parallel decode with %d threads "
1291 : "for %d chunks",
1292 : nThreads, nChunks);
1293 :
1294 : {
1295 0 : bool bGlobalOK = true;
1296 0 : std::mutex oMutex;
1297 :
1298 : // Clone codecs per thread on the main thread (Clone() is not
1299 : // thread-safe due to JSON object cloning)
1300 0 : std::vector<std::unique_ptr<ZarrV3CodecSequence>> apoCodecs(nThreads);
1301 0 : for (int t = 0; t < nThreads; ++t)
1302 0 : apoCodecs[t] = m_poCodecSequence->Clone();
1303 :
1304 0 : auto poJobQueue = wtp->CreateJobQueue();
1305 0 : for (int t = 0; t < nThreads; ++t)
1306 : {
1307 0 : const int iFirst =
1308 0 : static_cast<int>(static_cast<int64_t>(t) * nChunks / nThreads);
1309 0 : const int iEnd = static_cast<int>(static_cast<int64_t>(t + 1) *
1310 0 : nChunks / nThreads);
1311 :
1312 0 : poJobQueue->SubmitJob(
1313 0 : [iFirst, iEnd, t, &aDataRanges, &anRequests, &aCompressed,
1314 0 : &aResults, &apoCodecs, &bGlobalOK, &oMutex, nDTSize]()
1315 : {
1316 0 : for (int i = iFirst; i < iEnd; ++i)
1317 : {
1318 : {
1319 0 : std::lock_guard<std::mutex> oLock(oMutex);
1320 0 : if (!bGlobalOK)
1321 0 : return;
1322 : }
1323 :
1324 0 : const size_t iReq = aDataRanges[i].nReqIdx;
1325 0 : const auto &anCount = anRequests[iReq].second;
1326 : const auto nExpected =
1327 0 : nDTSize * MultiplyElements(anCount);
1328 :
1329 0 : if (!apoCodecs[t]->Decode(aCompressed[i]) ||
1330 0 : aCompressed[i].size() != nExpected)
1331 : {
1332 0 : std::lock_guard<std::mutex> oLock(oMutex);
1333 0 : bGlobalOK = false;
1334 0 : return;
1335 : }
1336 :
1337 : // Each job writes to a unique iReq slot - no lock
1338 0 : aResults[iReq] = std::move(aCompressed[i]);
1339 : }
1340 : });
1341 : }
1342 0 : poJobQueue->WaitCompletion();
1343 :
1344 0 : if (!bGlobalOK)
1345 : {
1346 0 : CPLError(CE_Failure, CPLE_NotSupported,
1347 : "BatchDecodePartial: parallel decode failed");
1348 0 : return false;
1349 : }
1350 : }
1351 :
1352 0 : return true;
1353 : }
1354 :
1355 : /************************************************************************/
1356 : /* ZarrV3CodecShardingIndexed::GetInnerMostBlockSize() */
1357 : /************************************************************************/
1358 :
1359 531 : std::vector<size_t> ZarrV3CodecShardingIndexed::GetInnerMostBlockSize(
1360 : const std::vector<size_t> &) const
1361 : {
1362 531 : return m_anInnerBlockSize;
1363 : // TODO if we one day properly support nested sharding
1364 : // return m_poCodecSequence->GetInnerMostBlockSize(m_anInnerBlockSize);
1365 : }
|