Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions virtualizarr/manifests/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,7 @@ def __eq__( # type: ignore[override]
)

# do chunk-wise comparison
equal_chunk_paths = self.manifest._paths == other.manifest._paths
equal_chunk_offsets = self.manifest._offsets == other.manifest._offsets
equal_chunk_lengths = self.manifest._lengths == other.manifest._lengths

equal_chunks = (
equal_chunk_paths & equal_chunk_offsets & equal_chunk_lengths
)
equal_chunks = self.manifest.elementwise_eq(other.manifest)

if not equal_chunks.all():
# TODO expand chunk-wise comparison into an element-wise result instead of just returning all False
Expand Down
71 changes: 5 additions & 66 deletions virtualizarr/manifests/array_api.py
Comment thread
maxrjones marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Callable, Union, cast
from typing import TYPE_CHECKING, Callable, Union

import numpy as np

Expand Down Expand Up @@ -97,28 +97,8 @@ def concatenate(
new_shape[axis] = new_length_along_concat_axis

# do concatenation of entries in manifest
concatenated_paths = (
cast( # `np.concatenate` is type hinted as if the output could have Any dtype
np.ndarray[Any, np.dtypes.StringDType],
np.concatenate(
[arr.manifest._paths for arr in arrays],
axis=axis,
),
)
)
concatenated_offsets = np.concatenate(
[arr.manifest._offsets for arr in arrays],
axis=axis,
)
concatenated_lengths = np.concatenate(
[arr.manifest._lengths for arr in arrays],
axis=axis,
)
concatenated_manifest = ChunkManifest.from_arrays(
paths=concatenated_paths,
offsets=concatenated_offsets,
lengths=concatenated_lengths,
validate_paths=False,
concatenated_manifest = ChunkManifest.concat(
[arr.manifest for arr in arrays], axis=axis
)

new_metadata = copy_and_replace_metadata(
Expand Down Expand Up @@ -165,27 +145,7 @@ def stack(
new_shape.insert(axis, length_along_new_stacked_axis)

# do stacking of entries in manifest
stacked_paths = cast( # `np.stack` apparently is type hinted as if the output could have Any dtype
np.ndarray[Any, np.dtypes.StringDType],
np.stack(
[arr.manifest._paths for arr in arrays],
axis=axis,
),
)
stacked_offsets = np.stack(
[arr.manifest._offsets for arr in arrays],
axis=axis,
)
stacked_lengths = np.stack(
[arr.manifest._lengths for arr in arrays],
axis=axis,
)
stacked_manifest = ChunkManifest.from_arrays(
paths=stacked_paths,
offsets=stacked_offsets,
lengths=stacked_lengths,
validate_paths=False,
)
stacked_manifest = ChunkManifest.stack([arr.manifest for arr in arrays], axis=axis)

# chunk shape has changed because a length-1 axis has been inserted
old_chunks = first_arr.chunks
Expand Down Expand Up @@ -235,28 +195,7 @@ def broadcast_to(x: "ManifestArray", /, shape: tuple[int, ...]) -> "ManifestArra
new_chunk_grid_shape = determine_chunk_grid_shape(new_shape, new_chunk_shape)

# do broadcasting of entries in manifest
broadcasted_paths = cast( # `np.broadcast_to` apparently is type hinted as if the output could have Any dtype
np.ndarray[Any, np.dtypes.StringDType],
np.broadcast_to(
x.manifest._paths,
shape=new_chunk_grid_shape,
),
)

broadcasted_offsets = np.broadcast_to(
x.manifest._offsets,
shape=new_chunk_grid_shape,
)
broadcasted_lengths = np.broadcast_to(
x.manifest._lengths,
shape=new_chunk_grid_shape,
)
broadcasted_manifest = ChunkManifest.from_arrays(
paths=broadcasted_paths,
offsets=broadcasted_offsets,
lengths=broadcasted_lengths,
validate_paths=False,
)
broadcasted_manifest = x.manifest.broadcast(shape=new_chunk_grid_shape)

new_metadata = copy_and_replace_metadata(
old_metadata=x.metadata,
Expand Down
92 changes: 92 additions & 0 deletions virtualizarr/manifests/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,98 @@ def __eq__(self, other: Any) -> bool:
lengths_equal = (self._lengths == other._lengths).all()
return paths_equal and offsets_equal and lengths_equal

@classmethod
def concat(cls, manifests: list["ChunkManifest"], axis: int) -> "ChunkManifest":
"""Concatenate manifests along an existing axis."""
concatenated_paths = cast(
np.ndarray[Any, np.dtypes.StringDType],
np.concatenate([m._paths for m in manifests], axis=axis),
)
concatenated_offsets = np.concatenate(
[m._offsets for m in manifests], axis=axis
)
concatenated_lengths = np.concatenate(
[m._lengths for m in manifests], axis=axis
)
return cls.from_arrays(
paths=concatenated_paths,
offsets=concatenated_offsets,
lengths=concatenated_lengths,
validate_paths=False,
)

@classmethod
def stack(cls, manifests: list["ChunkManifest"], axis: int) -> "ChunkManifest":
"""Stack manifests along a new axis."""
stacked_paths = cast(
np.ndarray[Any, np.dtypes.StringDType],
np.stack([m._paths for m in manifests], axis=axis),
)
stacked_offsets = np.stack([m._offsets for m in manifests], axis=axis)
stacked_lengths = np.stack([m._lengths for m in manifests], axis=axis)
return cls.from_arrays(
paths=stacked_paths,
offsets=stacked_offsets,
lengths=stacked_lengths,
validate_paths=False,
)

def broadcast(self, shape: tuple[int, ...]) -> "ChunkManifest":
"""Broadcast manifest to a new chunk grid shape."""
broadcasted_paths = cast(
np.ndarray[Any, np.dtypes.StringDType],
np.broadcast_to(self._paths, shape=shape),
)
broadcasted_offsets = np.broadcast_to(self._offsets, shape=shape)
broadcasted_lengths = np.broadcast_to(self._lengths, shape=shape)
return self.from_arrays(
paths=broadcasted_paths,
offsets=broadcasted_offsets,
lengths=broadcasted_lengths,
validate_paths=False,
)

def get_entry(self, indices: tuple[int, ...]) -> ChunkEntry | None:
"""Look up a chunk entry by grid indices. Returns None for missing chunks (empty path)."""
path = self._paths[indices]
if path == "":
return None
offset = self._offsets[indices]
length = self._lengths[indices]
return ChunkEntry(path=path, offset=offset, length=length)

def elementwise_eq(self, other: "ChunkManifest") -> np.ndarray:
"""Return boolean array where True means that chunk entry matches."""
return (
(self._paths == other._paths)
& (self._offsets == other._offsets)
& (self._lengths == other._lengths)
)
Comment on lines +447 to +453
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have to be updated in #938, to compare the values of inlined chunks.


def iter_nonempty_paths(self) -> Iterator[str]:
"""Yield all non-empty paths in the manifest."""
for path in self._paths.flat:
if path:
yield path

def iter_refs(self) -> Iterator[tuple[tuple[int, ...], ChunkEntry]]:
"""Yield (grid_indices, chunk_entry) for every non-missing chunk."""
coord_vectors = np.mgrid[
tuple(slice(None, length) for length in self.shape_chunk_grid)
]
for *inds, path, offset, length in np.nditer(
[*coord_vectors, self._paths, self._offsets, self._lengths],
flags=("refs_ok",),
):
if path.item() != "":
idx = tuple(int(i) for i in inds)
yield (
idx,
ChunkEntry(
path=path.item(), offset=offset.item(), length=length.item()
),
)

def rename_paths(
self,
new: str | Callable[[str], str],
Expand Down
9 changes: 5 additions & 4 deletions virtualizarr/manifests/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,12 @@ async def get(
)
chunk_indexes = parse_manifest_index(key, separator, expand_pattern=True)

path = manifest._paths[chunk_indexes]
if path == "":
entry = manifest.get_entry(chunk_indexes)
if entry is None:
return None
offset = manifest._offsets[chunk_indexes]
length = manifest._lengths[chunk_indexes]
path = entry["path"]
offset = entry["offset"]
length = entry["length"]
# Get the configured object store instance that matches the path
store, path_after_prefix = self._registry.resolve(path)
if not store:
Expand Down
28 changes: 7 additions & 21 deletions virtualizarr/writers/icechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Iterable, List, Literal, Optional, Union, cast

import numpy as np
import xarray as xr
from xarray.backends.zarr import ZarrStore as XarrayZarrStore
from xarray.backends.zarr import encode_zarr_attr_value
Expand Down Expand Up @@ -272,9 +271,8 @@ def validate_virtual_chunk_containers(
for marr in manifestarrays:
# TODO this loop over every virtual reference is likely inefficient in python,
# is there a way to push this down to Icechunk? (see https://github.com/earth-mover/icechunk/issues/1167)
for ref in marr.manifest._paths.flat:
if ref:
validate_single_ref(ref, supported_prefixes)
for ref in marr.manifest.iter_nonempty_paths():
validate_single_ref(ref, supported_prefixes)


def validate_single_ref(ref: str, supported_prefixes: set[str]) -> None:
Expand Down Expand Up @@ -562,16 +560,6 @@ def write_manifest_virtual_refs(
# but Icechunk need to expose a suitable API first
# See https://github.com/earth-mover/icechunk/issues/401 for performance benchmark

it = np.nditer(
[manifest._paths, manifest._offsets, manifest._lengths], # type: ignore[arg-type]
flags=[
"refs_ok",
"multi_index",
"c_index",
],
op_flags=[["readonly"]] * 3, # type: ignore
)

if last_updated_at is None:
# Icechunk rounds timestamps to the nearest second, but filesystems have higher precision,
# so we need to add a buffer, so that if you immediately read data back from this icechunk store,
Expand All @@ -583,16 +571,14 @@ def write_manifest_virtual_refs(
virtual_chunk_spec_list = [
VirtualChunkSpec(
index=[
index + offset
for index, offset in zip(it.multi_index, chunk_index_offsets)
index + offset for index, offset in zip(grid_index, chunk_index_offsets)
],
location=path.item(),
offset=offset.item(),
length=length.item(),
location=entry["path"],
offset=entry["offset"],
length=entry["length"],
last_updated_at_checksum=last_updated_at,
)
for path, offset, length in it
if path
for grid_index, entry in manifest.iter_refs()
]

store.set_virtual_refs(
Expand Down
Loading