diff --git a/virtualizarr/manifests/array.py b/virtualizarr/manifests/array.py index c314a1b9..cc7eb539 100644 --- a/virtualizarr/manifests/array.py +++ b/virtualizarr/manifests/array.py @@ -194,13 +194,7 @@ def __eq__( # type: ignore[override] ) # do chunk-wise comparison - equal_chunk_paths = self.manifest._paths == other.manifest._paths - equal_chunk_offsets = self.manifest._offsets == other.manifest._offsets - equal_chunk_lengths = self.manifest._lengths == other.manifest._lengths - - equal_chunks = ( - equal_chunk_paths & equal_chunk_offsets & equal_chunk_lengths - ) + equal_chunks = self.manifest.elementwise_eq(other.manifest) if not equal_chunks.all(): # TODO expand chunk-wise comparison into an element-wise result instead of just returning all False diff --git a/virtualizarr/manifests/array_api.py b/virtualizarr/manifests/array_api.py index 16bff6bb..65e0524a 100644 --- a/virtualizarr/manifests/array_api.py +++ b/virtualizarr/manifests/array_api.py @@ -97,28 +97,8 @@ def concatenate( new_shape[axis] = new_length_along_concat_axis # do concatenation of entries in manifest - concatenated_paths = ( - cast( # `np.concatenate` is type hinted as if the output could have Any dtype - np.ndarray[Any, np.dtypes.StringDType], - np.concatenate( - [arr.manifest._paths for arr in arrays], - axis=axis, - ), - ) - ) - concatenated_offsets = np.concatenate( - [arr.manifest._offsets for arr in arrays], - axis=axis, - ) - concatenated_lengths = np.concatenate( - [arr.manifest._lengths for arr in arrays], - axis=axis, - ) - concatenated_manifest = ChunkManifest.from_arrays( - paths=concatenated_paths, - offsets=concatenated_offsets, - lengths=concatenated_lengths, - validate_paths=False, + concatenated_manifest = _concat_manifests( + [arr.manifest for arr in arrays], axis=axis ) new_metadata = copy_and_replace_metadata( @@ -165,27 +145,7 @@ def stack( new_shape.insert(axis, length_along_new_stacked_axis) # do stacking of entries in manifest - stacked_paths = cast( # `np.stack` apparently is type hinted as if the output could have Any dtype - np.ndarray[Any, np.dtypes.StringDType], - np.stack( - [arr.manifest._paths for arr in arrays], - axis=axis, - ), - ) - stacked_offsets = np.stack( - [arr.manifest._offsets for arr in arrays], - axis=axis, - ) - stacked_lengths = np.stack( - [arr.manifest._lengths for arr in arrays], - axis=axis, - ) - stacked_manifest = ChunkManifest.from_arrays( - paths=stacked_paths, - offsets=stacked_offsets, - lengths=stacked_lengths, - validate_paths=False, - ) + stacked_manifest = _stack_manifests([arr.manifest for arr in arrays], axis=axis) # chunk shape has changed because a length-1 axis has been inserted old_chunks = first_arr.chunks @@ -235,37 +195,66 @@ def broadcast_to(x: "ManifestArray", /, shape: tuple[int, ...]) -> "ManifestArra new_chunk_grid_shape = determine_chunk_grid_shape(new_shape, new_chunk_shape) # do broadcasting of entries in manifest - broadcasted_paths = cast( # `np.broadcast_to` apparently is type hinted as if the output could have Any dtype + broadcasted_manifest = _broadcast_manifest(x.manifest, shape=new_chunk_grid_shape) + + new_metadata = copy_and_replace_metadata( + old_metadata=x.metadata, + new_shape=list(new_shape), + new_chunks=list(new_chunk_shape), + ) + + return ManifestArray(chunkmanifest=broadcasted_manifest, metadata=new_metadata) + + +def _concat_manifests(manifests: list[ChunkManifest], axis: int) -> ChunkManifest: + """Concatenate manifests along an existing axis.""" + concatenated_paths = cast( np.ndarray[Any, np.dtypes.StringDType], - np.broadcast_to( - x.manifest._paths, - shape=new_chunk_grid_shape, - ), + np.concatenate([m._paths for m in manifests], axis=axis), + ) + concatenated_offsets = np.concatenate([m._offsets for m in manifests], axis=axis) + concatenated_lengths = np.concatenate([m._lengths for m in manifests], axis=axis) + return ChunkManifest.from_arrays( + paths=concatenated_paths, + offsets=concatenated_offsets, + lengths=concatenated_lengths, + validate_paths=False, ) - broadcasted_offsets = np.broadcast_to( - x.manifest._offsets, - shape=new_chunk_grid_shape, + +def _stack_manifests(manifests: list[ChunkManifest], axis: int) -> ChunkManifest: + """Stack manifests along a new axis.""" + stacked_paths = cast( + np.ndarray[Any, np.dtypes.StringDType], + np.stack([m._paths for m in manifests], axis=axis), ) - broadcasted_lengths = np.broadcast_to( - x.manifest._lengths, - shape=new_chunk_grid_shape, + stacked_offsets = np.stack([m._offsets for m in manifests], axis=axis) + stacked_lengths = np.stack([m._lengths for m in manifests], axis=axis) + return ChunkManifest.from_arrays( + paths=stacked_paths, + offsets=stacked_offsets, + lengths=stacked_lengths, + validate_paths=False, ) - broadcasted_manifest = ChunkManifest.from_arrays( + + +def _broadcast_manifest( + manifest: ChunkManifest, shape: tuple[int, ...] +) -> ChunkManifest: + """Broadcast manifest to a new chunk grid shape.""" + broadcasted_paths = cast( + np.ndarray[Any, np.dtypes.StringDType], + np.broadcast_to(manifest._paths, shape=shape), + ) + broadcasted_offsets = np.broadcast_to(manifest._offsets, shape=shape) + broadcasted_lengths = np.broadcast_to(manifest._lengths, shape=shape) + return ChunkManifest.from_arrays( paths=broadcasted_paths, offsets=broadcasted_offsets, lengths=broadcasted_lengths, validate_paths=False, ) - new_metadata = copy_and_replace_metadata( - old_metadata=x.metadata, - new_shape=list(new_shape), - new_chunks=list(new_chunk_shape), - ) - - return ManifestArray(chunkmanifest=broadcasted_manifest, metadata=new_metadata) - def _prepend_singleton_dimensions(shape: tuple[int, ...], ndim: int) -> tuple[int, ...]: """Prepend as many new length-1 axes to shape as necessary such that the result has ndim number of axes.""" diff --git a/virtualizarr/manifests/manifest.py b/virtualizarr/manifests/manifest.py index c6339218..ff200b11 100644 --- a/virtualizarr/manifests/manifest.py +++ b/virtualizarr/manifests/manifest.py @@ -435,6 +435,47 @@ def __eq__(self, other: Any) -> bool: lengths_equal = (self._lengths == other._lengths).all() return paths_equal and offsets_equal and lengths_equal + def get_entry(self, indices: tuple[int, ...]) -> ChunkEntry | None: + """Look up a chunk entry by grid indices. Returns None for missing chunks (empty path).""" + path = self._paths[indices] + if path == "": + return None + offset = self._offsets[indices] + length = self._lengths[indices] + return ChunkEntry(path=path, offset=offset, length=length) + + def elementwise_eq(self, other: "ChunkManifest") -> np.ndarray: + """Return boolean array where True means that chunk entry matches.""" + return ( + (self._paths == other._paths) + & (self._offsets == other._offsets) + & (self._lengths == other._lengths) + ) + + def iter_nonempty_paths(self) -> Iterator[str]: + """Yield all non-empty paths in the manifest.""" + for path in self._paths.flat: + if path: + yield path + + def iter_refs(self) -> Iterator[tuple[tuple[int, ...], ChunkEntry]]: + """Yield (grid_indices, chunk_entry) for every non-missing chunk.""" + coord_vectors = np.mgrid[ + tuple(slice(None, length) for length in self.shape_chunk_grid) + ] + for *inds, path, offset, length in np.nditer( + [*coord_vectors, self._paths, self._offsets, self._lengths], + flags=("refs_ok",), + ): + if path.item() != "": + idx = tuple(int(i) for i in inds) + yield ( + idx, + ChunkEntry( + path=path.item(), offset=offset.item(), length=length.item() + ), + ) + def rename_paths( self, new: str | Callable[[str], str], diff --git a/virtualizarr/manifests/store.py b/virtualizarr/manifests/store.py index 21141564..3671037f 100644 --- a/virtualizarr/manifests/store.py +++ b/virtualizarr/manifests/store.py @@ -164,11 +164,12 @@ async def get( ) chunk_indexes = parse_manifest_index(key, separator, expand_pattern=True) - path = manifest._paths[chunk_indexes] - if path == "": + entry = manifest.get_entry(chunk_indexes) + if entry is None: return None - offset = manifest._offsets[chunk_indexes] - length = manifest._lengths[chunk_indexes] + path = entry["path"] + offset = entry["offset"] + length = entry["length"] # Get the configured object store instance that matches the path store, path_after_prefix = self._registry.resolve(path) if not store: diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index f94eb1db..b789d1de 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -2,7 +2,6 @@ from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Iterable, List, Literal, Optional, Union, cast -import numpy as np import xarray as xr from xarray.backends.zarr import ZarrStore as XarrayZarrStore from xarray.backends.zarr import encode_zarr_attr_value @@ -272,9 +271,8 @@ def validate_virtual_chunk_containers( for marr in manifestarrays: # TODO this loop over every virtual reference is likely inefficient in python, # is there a way to push this down to Icechunk? (see https://github.com/earth-mover/icechunk/issues/1167) - for ref in marr.manifest._paths.flat: - if ref: - validate_single_ref(ref, supported_prefixes) + for ref in marr.manifest.iter_nonempty_paths(): + validate_single_ref(ref, supported_prefixes) def validate_single_ref(ref: str, supported_prefixes: set[str]) -> None: @@ -562,16 +560,6 @@ def write_manifest_virtual_refs( # but Icechunk need to expose a suitable API first # See https://github.com/earth-mover/icechunk/issues/401 for performance benchmark - it = np.nditer( - [manifest._paths, manifest._offsets, manifest._lengths], # type: ignore[arg-type] - flags=[ - "refs_ok", - "multi_index", - "c_index", - ], - op_flags=[["readonly"]] * 3, # type: ignore - ) - if last_updated_at is None: # Icechunk rounds timestamps to the nearest second, but filesystems have higher precision, # so we need to add a buffer, so that if you immediately read data back from this icechunk store, @@ -583,16 +571,14 @@ def write_manifest_virtual_refs( virtual_chunk_spec_list = [ VirtualChunkSpec( index=[ - index + offset - for index, offset in zip(it.multi_index, chunk_index_offsets) + index + offset for index, offset in zip(grid_index, chunk_index_offsets) ], - location=path.item(), - offset=offset.item(), - length=length.item(), + location=entry["path"], + offset=entry["offset"], + length=entry["length"], last_updated_at_checksum=last_updated_at, ) - for path, offset, length in it - if path + for grid_index, entry in manifest.iter_refs() ] store.set_virtual_refs(