diff --git a/docs/releases.md b/docs/releases.md index bb80b80ae..081b88d7d 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -41,6 +41,10 @@ This release moves the `ObjectStoreRegistry` to a separate package `obspec_utils ### New Features +- Improved `ZarrParser` performance. + ([#892](https://github.com/zarr-developers/VirtualiZarr/pull/892)). + By [Raphael Hagen](https://github.com/norlandrhagen). + - Added `reader_factory` parameter to `HDFParser` to allow customizing how files are read ([#844](https://github.com/zarr-developers/VirtualiZarr/pull/844)). By [Max Jones](https://github.com/maxrjones). diff --git a/pyproject.toml b/pyproject.toml index f6027fccc..e7d23fdc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,8 @@ hdf = [ "imagecodecs-numcodecs==2024.6.1", ] +zarr = ["arro3-core", "pyarrow"] + # kerchunk-based parsers netcdf3 = [ "virtualizarr[remote]", @@ -76,13 +78,14 @@ all_parsers = [ "virtualizarr[fits]", "virtualizarr[kerchunk_parquet]", "virtualizarr[tiff]", + "virtualizarr[zarr]" ] # writers icechunk = [ "icechunk>=1.1.2", ] -zarr = ["arro3-core", "pyarrow"] + kerchunk = ["fastparquet", "pandas"] @@ -203,14 +206,17 @@ run-tests-html-cov = { cmd = "pytest -n auto --run-network-tests --verbose --cov min-deps = ["dev", "test", "hdf", "hdf5-lib"] # VirtualiZarr/conftest.py using h5py, so the minimum set of dependencies for testing still includes hdf libs # Inherit from min-deps to get all the test commands, along with optional dependencies test = ["dev", "test", "remote", "hdf", "netcdf3", "fits", "icechunk", "kerchunk", "kerchunk_parquet", "hdf5-lib", "tiff", "py313"] -test-py311 = ["dev", "test", "remote", "hdf", "netcdf3", "fits", "icechunk", "kerchunk", "kerchunk_parquet", "hdf5-lib", "tiff", "py311"] # test against python 3.11 -test-py312 = ["dev", "test", "remote", "hdf", "netcdf3", "fits", "icechunk", "kerchunk", "kerchunk_parquet", "hdf5-lib", "tiff", "py312"] # test against python 3.12 +test-py311 = ["dev", "test", "remote", "hdf", "netcdf3", "fits", "icechunk", "kerchunk", "kerchunk_parquet", "hdf5-lib", "tiff", "zarr", "py311"] # test against python 3.11 +test-py312 = ["dev", "test", "remote", "hdf", "netcdf3", "fits", "icechunk", "kerchunk", "kerchunk_parquet", "hdf5-lib", "tiff", "zarr", "py312"] # test against python 3.12 minio = ["dev", "remote", "hdf", "netcdf3", "fits", "icechunk", "kerchunk", "hdf5-lib", "tiff", "py312", "minio"] minimum-versions = ["dev", "test", "remote", "hdf", "netcdf3", "fits", "icechunk", "kerchunk", "kerchunk_parquet", "tiff", "hdf5-lib", "minimum-versions"] upstream = ["dev", "test", "hdf", "hdf5-lib", "netcdf3", "upstream", "icechunk-dev", "py313"] all = ["dev", "test", "remote", "hdf", "netcdf3", "fits", "icechunk", "kerchunk", "kerchunk_parquet", "hdf5-lib", "tiff", "all_parsers", "all_writers", "py313"] docs = ["docs", "dev", "remote", "hdf", "netcdf3", "fits", "icechunk", "kerchunk", "kerchunk_parquet", "hdf5-lib", "tiff", "py313"] +[tool.pixi.dependencies] +pytest = "*" + # Define commands to run within the docs environment [tool.pixi.feature.docs.tasks] serve-docs = { cmd = "mkdocs serve" } diff --git a/virtualizarr/manifests/manifest.py b/virtualizarr/manifests/manifest.py index a4d93bbf8..d1029fb95 100644 --- a/virtualizarr/manifests/manifest.py +++ b/virtualizarr/manifests/manifest.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import re from collections.abc import ( Callable, @@ -8,13 +10,16 @@ ValuesView, ) from pathlib import PosixPath -from typing import Any, NewType, TypedDict, cast +from typing import TYPE_CHECKING, Any, NewType, TypedDict, cast import numpy as np from virtualizarr.manifests.utils import construct_chunk_pattern, parse_manifest_index from virtualizarr.types import ChunkKey +if TYPE_CHECKING: + import pyarrow as pa # type: ignore[import-untyped,import-not-found] + # doesn't guarantee that writers actually handle these VALID_URI_PREFIXES = { "s3://", @@ -322,6 +327,55 @@ def from_arrays( return obj + @classmethod + def _from_arrow( + cls, + *, + paths: "pa.StringArray", + offsets: "pa.UInt64Array", + lengths: "pa.UInt64Array", + shape: tuple[int, ...], + ) -> "ChunkManifest": + """ + Create a ChunkManifest from flat 1D PyArrow arrays. + + Avoids intermediate Python dicts by converting Arrow arrays directly + to the numpy arrays used internally by ChunkManifest. + + Parameters + ---------- + paths + Full paths to chunks, as a PyArrow StringArray. Nulls represent missing chunks. + offsets + Byte offsets of chunks, as a PyArrow UInt64Array. Nulls represent missing chunks. + lengths + Byte lengths of chunks, as a PyArrow UInt64Array. Nulls represent missing chunks. + shape + Shape to reshape the flat arrays into. + """ + import pyarrow as pa # type: ignore[import-untyped,import-not-found] + import pyarrow.compute as pc # type: ignore[import-untyped,import-not-found] + + arrow_paths = pc.if_else(pc.is_null(paths), "", paths) + arrow_offsets = pc.if_else( + pc.is_null(offsets), pa.scalar(0, pa.uint64()), offsets + ) + arrow_lengths = pc.if_else( + pc.is_null(lengths), pa.scalar(0, pa.uint64()), lengths + ) + + np_paths = arrow_paths.to_numpy(zero_copy_only=False).astype( + np.dtypes.StringDType() + ) + np_offsets = arrow_offsets.to_numpy(zero_copy_only=False) + np_lengths = arrow_lengths.to_numpy(zero_copy_only=False) + + return cls.from_arrays( + paths=np_paths.reshape(shape), + offsets=np_offsets.reshape(shape), + lengths=np_lengths.reshape(shape), + ) + @property def ndim_chunk_grid(self) -> int: """ diff --git a/virtualizarr/parsers/zarr.py b/virtualizarr/parsers/zarr.py index 765978271..417855597 100644 --- a/virtualizarr/parsers/zarr.py +++ b/virtualizarr/parsers/zarr.py @@ -2,15 +2,16 @@ import asyncio import concurrent.futures -import math from abc import ABC, abstractmethod from collections.abc import Coroutine, Iterable from pathlib import Path -from typing import Any, TypeVar, cast +from typing import TYPE_CHECKING, Any, TypeVar, cast +import numpy as np import zarr from obspec_utils.registry import ObjectStoreRegistry from zarr.api.asynchronous import open_group as open_group_async +from zarr.core.chunk_key_encodings import DefaultChunkKeyEncoding from zarr.core.group import GroupMetadata from zarr.core.metadata import ArrayV2Metadata, ArrayV3Metadata from zarr.storage import ObjectStore @@ -21,8 +22,13 @@ ManifestGroup, ManifestStore, ) -from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri -from virtualizarr.vendor.zarr.core.common import _concurrent_map +from virtualizarr.manifests.manifest import ( + validate_and_normalize_path_to_uri, +) + +if TYPE_CHECKING: + import pyarrow as pa # type: ignore[import-untyped,import-not-found] + import zarr T = TypeVar("T") ZarrArrayType = zarr.AsyncArray | zarr.Array @@ -90,41 +96,67 @@ async def _handle_scalar_array( async def _build_chunk_mapping( - chunk_keys: list[str], zarr_array: ZarrArrayType, path: str, prefix: str -) -> dict[str, dict[str, Any]]: + zarr_array: ZarrArrayType, path: str, prefix: str +) -> tuple["pa.Array", "pa.Array", "pa.Array"] | None: """ - Build chunk mapping from a list of chunk keys. + Build chunk mapping by listing the object store with obstore. + + Uses obstore's list_async with Arrow output to get chunk paths and sizes + in a single Rust-level call, avoiding per-chunk getsize calls. Parameters ---------- - chunk_keys - List of storage keys for chunks. zarr_array The Zarr array. path Base path for constructing chunk paths. prefix - Prefix to strip from chunk keys. + Prefix to list and strip from chunk keys. Returns ------- - dict - Mapping of normalized chunk coordinates to storage locations. + Tuple of (normalized_keys, full_paths, sizes) as PyArrow arrays, or None if no chunks found. """ - if not chunk_keys: - return {} + import pyarrow as pa # type: ignore[import-untyped,import-not-found] + import pyarrow.compute as pc # type: ignore[import-untyped,import-not-found] - lengths = await _concurrent_map( - [(k,) for k in chunk_keys], zarr_array.store.getsize + path_batches = [] + size_batches = [] + stream = cast(ObjectStore, zarr_array.store).store.list_async( + prefix=prefix, return_arrow=True ) - dict_keys = _normalize_chunk_keys(chunk_keys, prefix) - paths = [join_url(path, k) for k in chunk_keys] - offsets = [0] * len(lengths) + async for batch in stream: + pa_path_col = pa.array(batch.column("path")) + not_metadata = pc.invert( + pc.or_( + pc.match_substring(pa_path_col, pattern="/."), + pc.starts_with(pa_path_col, "."), + ) + ) - return { - key: {"path": p, "offset": offset, "length": length} - for key, p, offset, length in zip(dict_keys, paths, offsets, lengths) - } + filtered_paths = pa_path_col.filter(not_metadata) + filtered_sizes = pa.array(batch.column("size")).filter(not_metadata) + path_batches.append(filtered_paths) + size_batches.append(filtered_sizes) + + if not path_batches: + return None + + all_paths = pa.concat_arrays(path_batches) + all_sizes = pa.concat_arrays(size_batches) + + if len(all_paths) == 0: + return None + stripped_keys = pc.utf8_replace_slice( + all_paths, start=0, stop=len(prefix), replacement="" + ) + + # construct full paths + full_paths = pc.binary_join_element_wise( + pa.scalar(path.rstrip("/")), all_paths, "/" + ) + + return stripped_keys, full_paths, all_sizes class ZarrVersionStrategy(ABC): @@ -142,6 +174,18 @@ def get_metadata(self, zarr_array: ZarrArrayType) -> ArrayV3Metadata: """Get V3 metadata for the array (converting if necessary).""" ... + @abstractmethod + def get_prefix(self, zarr_array: ZarrArrayType) -> str: + """Get the storage prefix for chunk listing.""" + ... + + @abstractmethod + def _get_separator(self, zarr_array: ZarrArrayType) -> str: ... + + @abstractmethod + def validate(self, zarr_array: ZarrArrayType) -> None: + """Validate that the array can be virtualized.""" + class ZarrV2Strategy(ZarrVersionStrategy): """Strategy for handling Zarr V2 arrays.""" @@ -158,24 +202,7 @@ async def get_chunk_mapping( scalar_key = f"{prefix}0" return await _handle_scalar_array(zarr_array, path, scalar_key) - # List all keys under the array prefix, filtering out metadata files - prefix_keys = [(x,) async for x in zarr_array.store.list_prefix(prefix)] - if not prefix_keys: - return {} - - metadata_files = {".zarray", ".zattrs", ".zgroup", ".zmetadata"} - chunk_keys = [] - for key_tuple in prefix_keys: - key = key_tuple[0] - file_name = ( - key[len(prefix) :] - if prefix and key.startswith(prefix) - else key.split("/")[-1] - ) - if file_name not in metadata_files: - chunk_keys.append(key) - - return await _build_chunk_mapping(chunk_keys, zarr_array, path, prefix) + return await _build_chunk_mapping(zarr_array, path, prefix) # type: ignore[return-value] def get_metadata(self, zarr_array: ZarrArrayType) -> ArrayV3Metadata: """Convert V2 metadata to V3 format.""" @@ -236,6 +263,18 @@ def get_metadata(self, zarr_array: ZarrArrayType) -> ArrayV3Metadata: return v3_metadata + def get_prefix(self, zarr_array: ZarrArrayType) -> str: + name = _get_array_name(zarr_array) + return f"{name}/" if name else "" + + def _get_separator(self, zarr_array: ZarrArrayType) -> str: + from typing import cast + + return cast(ArrayV2Metadata, zarr_array.metadata).dimension_separator + + def validate(self, zarr_array: ZarrArrayType) -> None: + pass # no restrictions for V2 + class ZarrV3Strategy(ZarrVersionStrategy): """Strategy for handling Zarr V3 arrays.""" @@ -271,17 +310,37 @@ async def get_chunk_mapping( # List chunk keys under the c/ subdirectory prefix = f"{name}/c/" if name else "c/" - prefix_keys = [(x,) async for x in zarr_array.store.list_prefix(prefix)] - if not prefix_keys: - return {} - - chunk_keys = [x[0] for x in prefix_keys] - return await _build_chunk_mapping(chunk_keys, zarr_array, path, prefix) + return await _build_chunk_mapping(zarr_array, path, prefix) # type: ignore[return-value] def get_metadata(self, zarr_array: ZarrArrayType) -> ArrayV3Metadata: """Return V3 metadata as-is (no conversion needed).""" return zarr_array.metadata # type: ignore[return-value] + def get_prefix(self, zarr_array: ZarrArrayType) -> str: + name = _get_array_name(zarr_array) + return f"{name}/c/" if name else "c/" + + def _get_separator(self, zarr_array: ZarrArrayType) -> str: + from typing import cast + + metadata = cast(ArrayV3Metadata, zarr_array.metadata) + return cast(DefaultChunkKeyEncoding, metadata.chunk_key_encoding).separator + + def validate(self, zarr_array: ZarrArrayType) -> None: + from zarr.codecs import ShardingCodec + + if not isinstance(zarr_array.metadata, ArrayV3Metadata): + return + if any( + isinstance(codec, ShardingCodec) for codec in zarr_array.metadata.codecs + ): + raise NotImplementedError( + "Zarr V3 arrays with sharding are not yet supported. " + "Sharding stores multiple chunks in a single storage object with non-zero offsets, " + "which VirtualiZarr does not currently handle. " + "Reading sharded arrays without proper offset handling would result in corrupted data." + ) + def get_strategy(zarr_array: ZarrArrayType) -> ZarrVersionStrategy: """ @@ -319,17 +378,59 @@ async def build_chunk_manifest(zarr_array: ZarrArrayType, path: str) -> ChunkMan (sparse arrays), and VirtualiZarr manifests preserve this sparsity. When chunks are missing, Zarr will return the fill_value for those regions when the array is read. """ + import pyarrow as pa # type: ignore[import-untyped,import-not-found] + import pyarrow.compute as pc # type: ignore[import-untyped,import-not-found] + strategy = get_strategy(zarr_array) - chunk_map = await strategy.get_chunk_mapping(zarr_array, path) + strategy.validate(zarr_array) + chunk_grid_shape = zarr_array._chunk_grid_shape - if not chunk_map: - if zarr_array.shape and zarr_array.chunks: - chunk_grid_shape = tuple( - math.ceil(s / c) for s, c in zip(zarr_array.shape, zarr_array.chunks) - ) + if zarr_array.shape == (): + chunk_map = await strategy.get_chunk_mapping(zarr_array, path) + if not chunk_map: return ChunkManifest(chunk_map, shape=chunk_grid_shape) + entry = next(iter(chunk_map.values())) + return ChunkManifest._from_arrow( + paths=pa.array([entry["path"]], type=pa.string()), + offsets=pa.array([entry["offset"]], type=pa.uint64()), + lengths=pa.array([entry["length"]], type=pa.uint64()), + shape=chunk_grid_shape, + ) + + prefix = strategy.get_prefix(zarr_array) + + result = await _build_chunk_mapping(zarr_array, path, prefix) + + if result is None: + return ChunkManifest({}, shape=chunk_grid_shape) - return ChunkManifest(chunk_map) + stripped_keys, full_paths, all_lengths = result + + total_size = zarr_array.nchunks + separator = strategy._get_separator(zarr_array) + split_keys = pc.split_pattern(stripped_keys, pattern=separator) + coords = [ + pc.cast(pc.list_element(split_keys, dim), pa.int64()).to_numpy() + for dim in range(zarr_array.ndim) + ] + flat_positions = pa.array(np.ravel_multi_index(coords, chunk_grid_shape)) + + # scatter listed chunks into a dense flat array (nulls = missing chunks) + updates = pa.table( + {"idx": flat_positions, "path": full_paths, "length": all_lengths} + ) + dense = ( + pa.table({"idx": pa.array(np.arange(total_size, dtype=np.int64))}) + .join(updates, "idx", join_type="left outer") + .sort_by("idx") + ) + + return ChunkManifest._from_arrow( + paths=dense["path"].combine_chunks(), + offsets=pa.repeat(pa.scalar(0, type=pa.uint64()), total_size), + lengths=dense["length"].combine_chunks(), + shape=chunk_grid_shape, + ) def get_metadata(zarr_array: ZarrArrayType) -> ArrayV3Metadata: diff --git a/virtualizarr/tests/__init__.py b/virtualizarr/tests/__init__.py index c14f048c5..b1c030b49 100644 --- a/virtualizarr/tests/__init__.py +++ b/virtualizarr/tests/__init__.py @@ -42,3 +42,4 @@ def _importorskip( has_dask, requires_dask = _importorskip("dask") has_obstore, requires_obstore = _importorskip("obstore") has_tiff, requires_tiff = _importorskip("virtual_tiff") +has_pyarrow, requires_pyarrow = _importorskip("pyarrow") diff --git a/virtualizarr/tests/test_parsers/test_kerchunk.py b/virtualizarr/tests/test_parsers/test_kerchunk.py index 0728ee04b..2a2863e15 100644 --- a/virtualizarr/tests/test_parsers/test_kerchunk.py +++ b/virtualizarr/tests/test_parsers/test_kerchunk.py @@ -164,6 +164,7 @@ def test_kerchunk_parquet_sparse_array(tmp_path, local_registry): This tests reading a kerchunk parquet where not all chunks are present, which is a common case for sparse arrays. """ + import pandas as pd from kerchunk.df import refs_to_dataframe # Create refs with only one chunk defined (sparse array) @@ -179,7 +180,8 @@ def test_kerchunk_parquet_sparse_array(tmp_path, local_registry): } ref_filepath = tmp_path / "sparse.parq" - refs_to_dataframe(fo=refs, url=str(ref_filepath)) + with pd.option_context("future.infer_string", False): + refs_to_dataframe(fo=refs, url=str(ref_filepath)) parser = KerchunkParquetParser() with open_virtual_dataset( @@ -305,10 +307,14 @@ def test_open_virtual_dataset_existing_kerchunk_refs( ujson.dump(example_reference_dict, json_file) parser = KerchunkJSONParser(fs_root="file://") if reference_format == "parquet": + import pandas as pd from kerchunk.df import refs_to_dataframe ref_filepath = tmp_path / "ref.parquet" - refs_to_dataframe(fo=example_reference_dict, url=ref_filepath.as_posix()) + with pd.option_context("future.infer_string", False): + refs_to_dataframe( + fo=example_reference_dict, url=ref_filepath.as_posix() + ) parser = KerchunkParquetParser(fs_root="file://") expected_refs = netcdf4_virtual_dataset.vz.to_kerchunk(format="dict") with open_virtual_dataset( @@ -363,10 +369,12 @@ def test_notimplemented_read_inline_refs_parquet( ): # Test that parquet references with inlined data raise NotImplementedError # https://github.com/zarr-developers/VirtualiZarr/issues/489 + import pandas as pd from kerchunk.df import refs_to_dataframe ref_filepath = tmp_path / "ref.parquet" - refs_to_dataframe(fo=netcdf4_inlined_ref, url=ref_filepath.as_posix()) + with pd.option_context("future.infer_string", False): + refs_to_dataframe(fo=netcdf4_inlined_ref, url=ref_filepath.as_posix()) parser = KerchunkParquetParser() with pytest.raises( diff --git a/virtualizarr/tests/test_parsers/test_zarr.py b/virtualizarr/tests/test_parsers/test_zarr.py index 5d83820f6..1330f131b 100644 --- a/virtualizarr/tests/test_parsers/test_zarr.py +++ b/virtualizarr/tests/test_parsers/test_zarr.py @@ -20,6 +20,9 @@ get_strategy, join_url, ) +from virtualizarr.tests import requires_pyarrow + +pytestmark = requires_pyarrow ZarrArrayType = zarr.AsyncArray | zarr.Array @@ -182,7 +185,9 @@ def test_unsupported_zarr_format(): def test_empty_array_chunk_mapping(tmpdir, zarr_format): """Test chunk mapping for arrays with no chunks written yet.""" - # Create an array but don't write any data + from obstore.store import LocalStore as ObsLocalStore + from zarr.storage import ObjectStore + filepath = f"{tmpdir}/empty.zarr" zarr.create( shape=(10, 10), @@ -193,13 +198,14 @@ def test_empty_array_chunk_mapping(tmpdir, zarr_format): ) async def get_chunk_map(): - zarr_array = await open_array(store=filepath, mode="r") - strategy = get_strategy(zarr_array) - return await strategy.get_chunk_mapping(zarr_array, filepath) + obs_store = ObsLocalStore(prefix=filepath) + zarr_store = ObjectStore(store=obs_store) + zarr_array = await open_array(store=zarr_store, mode="r") + manifest = await build_chunk_manifest(zarr_array, filepath) + return manifest.dict() - chunk_map = asyncio.run(get_chunk_map()) - # Empty arrays should return empty chunk map - assert chunk_map == {} + result = asyncio.run(get_chunk_map()) + assert result == {} @SKIP_OLDER_ZARR_PYTHON @@ -306,31 +312,31 @@ async def get_meta(): def test_build_chunk_manifest_empty_with_shape(): """Test build_chunk_manifest when chunk_map is empty but array has shape and chunks.""" - # Create an array but don't write data - store = zarr.storage.MemoryStore() - zarr.create(shape=(10, 10), chunks=(5, 5), dtype="int8", store=store, zarr_format=3) + from obstore.store import MemoryStore as ObsMemoryStore + from zarr.storage import ObjectStore + + obs_store = ObsMemoryStore() + zarr_store = ObjectStore(store=obs_store) + zarr.create( + shape=(10, 10), chunks=(5, 5), dtype="int8", store=zarr_store, zarr_format=3 + ) async def get_manifest(): - zarr_array = await open_array(store=store, mode="r") + zarr_array = await open_array(store=zarr_store, mode="r") return await build_chunk_manifest(zarr_array, "test://path") manifest = asyncio.run(get_manifest()) - # Should create manifest with proper chunk grid shape even if empty - assert manifest.shape_chunk_grid == (2, 2) # 10/5 = 2 chunks per dimension + assert manifest.shape_chunk_grid == (2, 2) @zarr_versions() def test_sparse_array_with_missing_chunks(tmpdir, zarr_format): - """Test that arrays with some missing chunks (sparse arrays) are handled correctly. + """Test that arrays with some missing chunks (sparse arrays) are handled correctly.""" + import asyncio - This test verifies that VirtualiZarr correctly handles the case where some chunks - exist but others are missing. Zarr allows this for sparse data, and when chunks - are missing, Zarr returns the fill_value for those regions. VirtualiZarr should - preserve this sparsity in the manifest rather than generating entries for all - possible chunks based on the chunk grid. - """ + from obstore.store import LocalStore as ObsLocalStore + from zarr.storage import ObjectStore - # Create a zarr array with a 3x3 chunk grid (9 possible chunks) filepath = f"{tmpdir}/sparse.zarr" arr = zarr.create( shape=(30, 30), @@ -341,36 +347,28 @@ def test_sparse_array_with_missing_chunks(tmpdir, zarr_format): fill_value=np.nan, ) - # Only write data to some chunks, leaving others missing (sparse) - # Write to chunks (0,0), (1,1), and (2,2) - a diagonal pattern arr[0:10, 0:10] = 1.0 # chunk 0.0 arr[10:20, 10:20] = 2.0 # chunk 1.1 arr[20:30, 20:30] = 3.0 # chunk 2.2 - # Chunks (0,1), (0,2), (1,0), (1,2), (2,0), (2,1) are intentionally left unwritten async def get_manifest(): - zarr_array = await open_array(store=filepath, mode="r") + obs_store = ObsLocalStore(prefix=filepath) + zarr_store = ObjectStore(store=obs_store) + zarr_array = await open_array(store=zarr_store, mode="r") return await build_chunk_manifest(zarr_array, filepath) manifest = asyncio.run(get_manifest()) - # The manifest should only contain the 3 chunks we actually wrote - assert len(manifest.dict()) == 3, f"Expected 3 chunks, got {len(manifest.dict())}" - - # Verify the expected chunks are present - assert "0.0" in manifest.dict(), "Chunk 0.0 should be present" - assert "1.1" in manifest.dict(), "Chunk 1.1 should be present" - assert "2.2" in manifest.dict(), "Chunk 2.2 should be present" + assert len(manifest.dict()) == 3 + assert "0.0" in manifest.dict() + assert "1.1" in manifest.dict() + assert "2.2" in manifest.dict() - # Verify missing chunks are not in the manifest missing_chunks = ["0.1", "0.2", "1.0", "1.2", "2.0", "2.1"] for chunk_key in missing_chunks: - assert chunk_key not in manifest.dict(), ( - f"Chunk {chunk_key} should not be present (it's missing/sparse)" - ) + assert chunk_key not in manifest.dict() - # The chunk grid shape should still reflect the full array dimensions - assert manifest.shape_chunk_grid == (3, 3), "Chunk grid should be 3x3" + assert manifest.shape_chunk_grid == (3, 3) @zarr_versions() diff --git a/virtualizarr/vendor/__init__.py b/virtualizarr/vendor/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/virtualizarr/vendor/zarr/__init__.py b/virtualizarr/vendor/zarr/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/virtualizarr/vendor/zarr/core/__init__.py b/virtualizarr/vendor/zarr/core/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/virtualizarr/vendor/zarr/core/common.py b/virtualizarr/vendor/zarr/core/common.py deleted file mode 100644 index b6363db76..000000000 --- a/virtualizarr/vendor/zarr/core/common.py +++ /dev/null @@ -1,34 +0,0 @@ -import asyncio -from itertools import starmap -from typing import ( - Any, - Awaitable, - Callable, - Iterable, - TypeVar, -) - -# Vendored directly from Zarr-python V3's private API -# https://github.com/zarr-developers/zarr-python/blob/458299857141a5470ba3956d8a1607f52ac33857/src/zarr/core/common.py#L53 -T = TypeVar("T", bound=tuple[Any, ...]) -V = TypeVar("V") - - -async def _concurrent_map( - items: Iterable[T], - func: Callable[..., Awaitable[V]], - limit: int | None = None, -) -> list[V]: - if limit is None: - return await asyncio.gather(*list(starmap(func, items))) - - else: - sem = asyncio.Semaphore(limit) - - async def run(item: tuple[Any]) -> V: - async with sem: - return await func(*item) - - return await asyncio.gather( - *[asyncio.ensure_future(run(item)) for item in items] - )