Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/data_structures.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ manifest = ChunkManifest.from_arrays(
)
```

Inlined chunks participate in all manifest operations: concatenation and stacking shift their indices, broadcasting prepends singleton dimensions to their keys, equality compares the inlined bytes, pickling carries the data along (for Dask/multiprocessing), `ManifestStore` reads return them directly from memory, and `nbytes` includes their size.
Inlined chunks participate in all manifest operations: concatenation and stacking shift their indices, broadcasting both prepends singleton dimensions to their keys and replicates the bytes (by reference) across every position of an expanded axis, equality compares the inlined bytes, pickling carries the data along (for Dask/multiprocessing), `ManifestStore` reads return them directly from memory, and `nbytes` includes their size.

## `ManifestArray` class

Expand Down
2 changes: 2 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

- The kerchunk writer now serializes inlined `ChunkManifest` entries as kerchunk's `base64:`-prefixed inline form, rather than emitting broken `["__inlined__", 0, length]` triples. Together with the read-side support added in #979, this means a virtual dataset with inlined chunks can be round-tripped through both `to_kerchunk(format="json"/"parquet")` and the corresponding `KerchunkJSONParser`/`KerchunkParquetParser`.
By [Tom Nicholas](https://github.com/TomNicholas).
- The icechunk writer now handles `ChunkManifest` entries containing inlined chunk data. For arrays with no inlined chunks the existing fast bulk `set_virtual_refs_arr` path is unchanged; otherwise inlined positions are sent to icechunk as empty (missing) virtual refs and the inlined bytes are written separately as managed chunks. A virtual dataset with inlined chunks can now be `to_icechunk`'d and re-opened via `xr.open_zarr` without data loss.
By [Tom Nicholas](https://github.com/TomNicholas).

- `ChunkManifest` can now hold inlined chunks — raw chunk bytes carried directly in memory rather than as references to external files. Intended for parser authors (e.g., loading Kerchunk references with inlined data); not exposed via `loadable_variables`.
([#938](https://github.com/zarr-developers/VirtualiZarr/pull/938)).
Expand Down
58 changes: 58 additions & 0 deletions virtualizarr/tests/test_writers/test_icechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

import numpy as np
import numpy.testing as npt
import obstore as obs
import pytest
import xarray as xr
import xarray.testing as xrt
import zarr
from obspec_utils.registry import ObjectStoreRegistry
from obstore.store import LocalStore
from zarr.codecs import BytesCodec
from zarr.core.metadata import ArrayV3Metadata
from zarr.dtype import parse_data_type

from virtualizarr import open_virtual_dataset
from virtualizarr.manifests import ChunkManifest, ManifestArray
Expand Down Expand Up @@ -170,6 +173,61 @@ def test_set_grid_virtual_refs(icechunk_filestore: "IcechunkStore", synthetic_vd
npt.assert_equal(observed, arr)


def test_set_inlined_and_virtual_refs(
icechunk_filestore: "IcechunkStore",
icechunk_repo: "Repository",
tmp_path: Path,
):
# ManifestArray with shape (2, 2), chunks (1, 2): position (0, .) is inlined
# with values [1, 2]; position (1, .) is virtual with values [3, 4] read from
# a file in the tmp dir (covered by the icechunk_repo virtual chunk container).
inlined_arr = np.array([[1, 2]], dtype="<i4")
virtual_arr = np.array([[3, 4]], dtype="<i4")
inlined_bytes = inlined_arr.tobytes()
virtual_bytes = virtual_arr.tobytes()

filepath = str(tmp_path / "data_chunk")
obs.put(obs.store.LocalStore(), filepath, virtual_bytes)

manifest = ChunkManifest(
entries={
"0.0": {
"path": "",
"offset": 0,
"length": len(inlined_bytes),
"data": inlined_bytes,
},
"1.0": {
"path": filepath,
"offset": 0,
"length": len(virtual_bytes),
},
}
)
metadata = ArrayV3Metadata(
shape=(2, 2),
data_type=parse_data_type(np.dtype("<i4"), zarr_format=3),
chunk_grid={"name": "regular", "configuration": {"chunk_shape": (1, 2)}},
chunk_key_encoding={"name": "default"},
fill_value=0,
codecs=[BytesCodec()],
attributes={},
dimension_names=("y", "x"),
storage_transformers=None,
)
ma = ManifestArray(chunkmanifest=manifest, metadata=metadata)
vds = xr.Dataset({"foo": xr.Variable(data=ma, dims=["y", "x"])})

vds.vz.to_icechunk(icechunk_filestore)
icechunk_filestore.session.commit("test")

icechunk_readonly_session = icechunk_repo.readonly_session("main")
with xr.open_zarr(
store=icechunk_readonly_session.store, zarr_format=3, consolidated=False
) as ds:
np.testing.assert_equal(ds["foo"].data, np.array([[1, 2], [3, 4]], dtype="<i4"))


def test_write_big_endian_value(icechunk_repo: "Repository", big_endian_synthetic_vds):
vds, arr = big_endian_synthetic_vds
vds = vds.drop_encoding()
Expand Down
95 changes: 80 additions & 15 deletions virtualizarr/writers/icechunk.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import asyncio
from collections.abc import Mapping
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Iterable, List, Literal, Optional, Union, cast

import numpy as np
import xarray as xr
from xarray.backends.zarr import ZarrStore as XarrayZarrStore
from xarray.backends.zarr import encode_zarr_attr_value
from zarr import Array, Group
from zarr.core.buffer import default_buffer_prototype
from zarr.core.chunk_key_encodings import ChunkKeyEncoding
from zarr.core.sync import sync

from virtualizarr.codecs import extract_codecs, get_codecs
from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.manifests.manifest import INLINED_CHUNK_PATH
from virtualizarr.manifests.utils import (
check_compatible_encodings,
check_same_chunk_shapes,
Expand Down Expand Up @@ -526,25 +532,32 @@ def write_virtual_variable_to_icechunk(

update_attributes(arr, var.attrs, encoding=var.encoding)

write_manifest_virtual_refs(
write_manifest_to_icechunk(
store=store,
group=group,
arr_name=name,
manifest=ma.manifest,
chunk_key_encoding=ma.metadata.chunk_key_encoding,
chunk_index_offsets=tuple(chunk_offsets),
last_updated_at=last_updated_at,
)


def write_manifest_virtual_refs(
def write_manifest_to_icechunk(
store: "IcechunkStore",
group: "Group",
arr_name: str,
manifest: ChunkManifest,
chunk_key_encoding: ChunkKeyEncoding,
chunk_index_offsets: tuple[int, ...],
last_updated_at: Optional[datetime] = None,
) -> None:
"""Write all the virtual references for one array manifest at once."""
"""
Write all the chunks (virtual and/or inlined) for one array manifest at once.

Virtual chunks are written as virtual chunks, and inlined chunks are written as native
(which Icechunk may then choose to inline in its manifests).
"""

if group.name == "/":
key_prefix = arr_name
Expand All @@ -559,15 +572,67 @@ def write_manifest_virtual_refs(
# In practice this should only really come up in synthetic examples, e.g. tests and docs.
last_updated_at = datetime.now(timezone.utc) + timedelta(seconds=1)

# Pass manifest arrays directly to Rust, avoiding per-chunk Python object creation.
# Empty paths represent missing chunks and are skipped on the Rust side.
store.set_virtual_refs_arr(
array_path=key_prefix,
chunk_grid_shape=manifest.shape_chunk_grid,
locations=manifest._paths.flatten().tolist(),
offsets=manifest._offsets.flatten(),
lengths=manifest._lengths.flatten(),
validate_containers=False,
arr_offset=chunk_index_offsets if any(chunk_index_offsets) else None,
checksum=last_updated_at,
)
paths_flat = manifest._paths.flatten()

if manifest._inlined:
# Write inlined chunks first, then erase them from the paths array so the
# virtual-refs write below doesn't see the INLINED_CHUNK_PATH sentinel
# (which Icechunk's `.set_virtual_refs_arr` would reject as a malformed URL).
# Use of zarr's `sync` here is to avoid a serial high-latency loop over chunks.
# Would prefer if zarr-python had a public API for setting many chunks at once concurrently.
sync(
write_inlined_chunks_as_native(
store=store,
key_prefix=key_prefix,
chunk_key_encoding=chunk_key_encoding,
inlined=manifest._inlined,
chunk_index_offsets=chunk_index_offsets,
)
)
virtual_paths = np.where(paths_flat == INLINED_CHUNK_PATH, "", paths_flat)
else:
virtual_paths = paths_flat

# Cheap numpy-level check so we can skip the .tolist() allocation and the
# Python->Rust call entirely when no position holds a real virtual ref
# (e.g. an all-inlined or all-missing manifest).
if (virtual_paths != "").any():
# Pass flat per-chunk arrays (or a list) to Rust in one call, avoiding Python-side
# per-chunk dict construction. Empty paths are skipped on the Rust side.
store.set_virtual_refs_arr(
array_path=key_prefix,
chunk_grid_shape=manifest.shape_chunk_grid,
locations=virtual_paths.tolist(),
offsets=manifest._offsets.flatten(),
lengths=manifest._lengths.flatten(),
validate_containers=False,
arr_offset=chunk_index_offsets if any(chunk_index_offsets) else None,
checksum=last_updated_at,
)


async def write_inlined_chunks_as_native(
store: "IcechunkStore",
key_prefix: str,
chunk_key_encoding: ChunkKeyEncoding,
inlined: Mapping[tuple[int, ...], bytes],
chunk_index_offsets: tuple[int, ...],
) -> None:
"""Write each inlined chunk as a native chunk at its zarr chunk key."""
prototype = default_buffer_prototype()
has_offset = any(chunk_index_offsets)
coros = []
for chunk_idx, data in inlined.items():
shifted_idx = (
tuple(c + o for c, o in zip(chunk_idx, chunk_index_offsets))
if has_offset
else chunk_idx
)
encoded_chunk_key = chunk_key_encoding.encode_chunk_key(shifted_idx)
coros.append(
store.set(
f"{key_prefix}/{encoded_chunk_key}",
prototype.buffer.from_bytes(data),
)
)
await asyncio.gather(*coros)
Loading