Skip to content

Commit f518a07

Browse files
TomNicholasclaude
andauthored
feat: write inlined ChunkManifest entries to icechunk as native chunks (#981)
* Add failing test for writing inlined chunks to icechunk The icechunk writer currently sends the INLINED_CHUNK_PATH sentinel ('__inlined__') straight into icechunk's set_virtual_refs_arr, which rejects it as a malformed virtual URL. The new test writes a manifest containing one inlined chunk plus one virtual chunk, commits, then re-opens via xarray and asserts the values match end-to-end. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Support writing inlined ChunkManifest entries to icechunk Icechunk's set_virtual_refs_arr rejects the INLINED_CHUNK_PATH sentinel ('__inlined__') as a malformed URL. write_manifest_to_icechunk now writes inlined chunks first as native chunks via store.set, then rewrites those positions to empty strings in the paths array before calling set_virtual_refs_arr with the cleaned view. A cheap numpy-level check skips the virtual-refs call entirely for all-inlined or all-missing manifests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Update broadcast description to reflect bytes replication Broadcasting inlined chunks not only prepends singleton dims to their keys, but also replicates the bytes (by reference) across every position of an expanded axis, per the fix in #938. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 08f95c3 commit f518a07

4 files changed

Lines changed: 141 additions & 16 deletions

File tree

docs/data_structures.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ manifest = ChunkManifest.from_arrays(
144144
)
145145
```
146146

147-
Inlined chunks participate in all manifest operations: concatenation and stacking shift their indices, broadcasting prepends singleton dimensions to their keys, equality compares the inlined bytes, pickling carries the data along (for Dask/multiprocessing), `ManifestStore` reads return them directly from memory, and `nbytes` includes their size.
147+
Inlined chunks participate in all manifest operations: concatenation and stacking shift their indices, broadcasting both prepends singleton dimensions to their keys and replicates the bytes (by reference) across every position of an expanded axis, equality compares the inlined bytes, pickling carries the data along (for Dask/multiprocessing), `ManifestStore` reads return them directly from memory, and `nbytes` includes their size.
148148

149149
## `ManifestArray` class
150150

docs/releases.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
- The kerchunk writer now serializes inlined `ChunkManifest` entries as kerchunk's `base64:`-prefixed inline form, rather than emitting broken `["__inlined__", 0, length]` triples. Together with the read-side support added in #979, this means a virtual dataset with inlined chunks can be round-tripped through both `to_kerchunk(format="json"/"parquet")` and the corresponding `KerchunkJSONParser`/`KerchunkParquetParser`.
88
By [Tom Nicholas](https://github.com/TomNicholas).
9+
- The icechunk writer now handles `ChunkManifest` entries containing inlined chunk data. For arrays with no inlined chunks the existing fast bulk `set_virtual_refs_arr` path is unchanged; otherwise inlined positions are sent to icechunk as empty (missing) virtual refs and the inlined bytes are written separately as managed chunks. A virtual dataset with inlined chunks can now be `to_icechunk`'d and re-opened via `xr.open_zarr` without data loss.
10+
By [Tom Nicholas](https://github.com/TomNicholas).
911

1012
- `ChunkManifest` can now hold inlined chunks — raw chunk bytes carried directly in memory rather than as references to external files. Intended for parser authors (e.g., loading Kerchunk references with inlined data); not exposed via `loadable_variables`.
1113
([#938](https://github.com/zarr-developers/VirtualiZarr/pull/938)).

virtualizarr/tests/test_writers/test_icechunk.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55

66
import numpy as np
77
import numpy.testing as npt
8+
import obstore as obs
89
import pytest
910
import xarray as xr
1011
import xarray.testing as xrt
1112
import zarr
1213
from obspec_utils.registry import ObjectStoreRegistry
1314
from obstore.store import LocalStore
15+
from zarr.codecs import BytesCodec
1416
from zarr.core.metadata import ArrayV3Metadata
17+
from zarr.dtype import parse_data_type
1518

1619
from virtualizarr import open_virtual_dataset
1720
from virtualizarr.manifests import ChunkManifest, ManifestArray
@@ -170,6 +173,61 @@ def test_set_grid_virtual_refs(icechunk_filestore: "IcechunkStore", synthetic_vd
170173
npt.assert_equal(observed, arr)
171174

172175

176+
def test_set_inlined_and_virtual_refs(
177+
icechunk_filestore: "IcechunkStore",
178+
icechunk_repo: "Repository",
179+
tmp_path: Path,
180+
):
181+
# ManifestArray with shape (2, 2), chunks (1, 2): position (0, .) is inlined
182+
# with values [1, 2]; position (1, .) is virtual with values [3, 4] read from
183+
# a file in the tmp dir (covered by the icechunk_repo virtual chunk container).
184+
inlined_arr = np.array([[1, 2]], dtype="<i4")
185+
virtual_arr = np.array([[3, 4]], dtype="<i4")
186+
inlined_bytes = inlined_arr.tobytes()
187+
virtual_bytes = virtual_arr.tobytes()
188+
189+
filepath = str(tmp_path / "data_chunk")
190+
obs.put(obs.store.LocalStore(), filepath, virtual_bytes)
191+
192+
manifest = ChunkManifest(
193+
entries={
194+
"0.0": {
195+
"path": "",
196+
"offset": 0,
197+
"length": len(inlined_bytes),
198+
"data": inlined_bytes,
199+
},
200+
"1.0": {
201+
"path": filepath,
202+
"offset": 0,
203+
"length": len(virtual_bytes),
204+
},
205+
}
206+
)
207+
metadata = ArrayV3Metadata(
208+
shape=(2, 2),
209+
data_type=parse_data_type(np.dtype("<i4"), zarr_format=3),
210+
chunk_grid={"name": "regular", "configuration": {"chunk_shape": (1, 2)}},
211+
chunk_key_encoding={"name": "default"},
212+
fill_value=0,
213+
codecs=[BytesCodec()],
214+
attributes={},
215+
dimension_names=("y", "x"),
216+
storage_transformers=None,
217+
)
218+
ma = ManifestArray(chunkmanifest=manifest, metadata=metadata)
219+
vds = xr.Dataset({"foo": xr.Variable(data=ma, dims=["y", "x"])})
220+
221+
vds.vz.to_icechunk(icechunk_filestore)
222+
icechunk_filestore.session.commit("test")
223+
224+
icechunk_readonly_session = icechunk_repo.readonly_session("main")
225+
with xr.open_zarr(
226+
store=icechunk_readonly_session.store, zarr_format=3, consolidated=False
227+
) as ds:
228+
np.testing.assert_equal(ds["foo"].data, np.array([[1, 2], [3, 4]], dtype="<i4"))
229+
230+
173231
def test_write_big_endian_value(icechunk_repo: "Repository", big_endian_synthetic_vds):
174232
vds, arr = big_endian_synthetic_vds
175233
vds = vds.drop_encoding()

virtualizarr/writers/icechunk.py

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
1+
import asyncio
12
from collections.abc import Mapping
23
from datetime import datetime, timedelta, timezone
34
from typing import TYPE_CHECKING, Iterable, List, Literal, Optional, Union, cast
45

6+
import numpy as np
57
import xarray as xr
68
from xarray.backends.zarr import ZarrStore as XarrayZarrStore
79
from xarray.backends.zarr import encode_zarr_attr_value
810
from zarr import Array, Group
11+
from zarr.core.buffer import default_buffer_prototype
12+
from zarr.core.chunk_key_encodings import ChunkKeyEncoding
13+
from zarr.core.sync import sync
914

1015
from virtualizarr.codecs import extract_codecs, get_codecs
1116
from virtualizarr.manifests import ChunkManifest, ManifestArray
17+
from virtualizarr.manifests.manifest import INLINED_CHUNK_PATH
1218
from virtualizarr.manifests.utils import (
1319
check_compatible_encodings,
1420
check_same_chunk_shapes,
@@ -526,25 +532,32 @@ def write_virtual_variable_to_icechunk(
526532

527533
update_attributes(arr, var.attrs, encoding=var.encoding)
528534

529-
write_manifest_virtual_refs(
535+
write_manifest_to_icechunk(
530536
store=store,
531537
group=group,
532538
arr_name=name,
533539
manifest=ma.manifest,
540+
chunk_key_encoding=ma.metadata.chunk_key_encoding,
534541
chunk_index_offsets=tuple(chunk_offsets),
535542
last_updated_at=last_updated_at,
536543
)
537544

538545

539-
def write_manifest_virtual_refs(
546+
def write_manifest_to_icechunk(
540547
store: "IcechunkStore",
541548
group: "Group",
542549
arr_name: str,
543550
manifest: ChunkManifest,
551+
chunk_key_encoding: ChunkKeyEncoding,
544552
chunk_index_offsets: tuple[int, ...],
545553
last_updated_at: Optional[datetime] = None,
546554
) -> None:
547-
"""Write all the virtual references for one array manifest at once."""
555+
"""
556+
Write all the chunks (virtual and/or inlined) for one array manifest at once.
557+
558+
Virtual chunks are written as virtual chunks, and inlined chunks are written as native
559+
(which Icechunk may then choose to inline in its manifests).
560+
"""
548561

549562
if group.name == "/":
550563
key_prefix = arr_name
@@ -559,15 +572,67 @@ def write_manifest_virtual_refs(
559572
# In practice this should only really come up in synthetic examples, e.g. tests and docs.
560573
last_updated_at = datetime.now(timezone.utc) + timedelta(seconds=1)
561574

562-
# Pass manifest arrays directly to Rust, avoiding per-chunk Python object creation.
563-
# Empty paths represent missing chunks and are skipped on the Rust side.
564-
store.set_virtual_refs_arr(
565-
array_path=key_prefix,
566-
chunk_grid_shape=manifest.shape_chunk_grid,
567-
locations=manifest._paths.flatten().tolist(),
568-
offsets=manifest._offsets.flatten(),
569-
lengths=manifest._lengths.flatten(),
570-
validate_containers=False,
571-
arr_offset=chunk_index_offsets if any(chunk_index_offsets) else None,
572-
checksum=last_updated_at,
573-
)
575+
paths_flat = manifest._paths.flatten()
576+
577+
if manifest._inlined:
578+
# Write inlined chunks first, then erase them from the paths array so the
579+
# virtual-refs write below doesn't see the INLINED_CHUNK_PATH sentinel
580+
# (which Icechunk's `.set_virtual_refs_arr` would reject as a malformed URL).
581+
# Use of zarr's `sync` here is to avoid a serial high-latency loop over chunks.
582+
# Would prefer if zarr-python had a public API for setting many chunks at once concurrently.
583+
sync(
584+
write_inlined_chunks_as_native(
585+
store=store,
586+
key_prefix=key_prefix,
587+
chunk_key_encoding=chunk_key_encoding,
588+
inlined=manifest._inlined,
589+
chunk_index_offsets=chunk_index_offsets,
590+
)
591+
)
592+
virtual_paths = np.where(paths_flat == INLINED_CHUNK_PATH, "", paths_flat)
593+
else:
594+
virtual_paths = paths_flat
595+
596+
# Cheap numpy-level check so we can skip the .tolist() allocation and the
597+
# Python->Rust call entirely when no position holds a real virtual ref
598+
# (e.g. an all-inlined or all-missing manifest).
599+
if (virtual_paths != "").any():
600+
# Pass flat per-chunk arrays (or a list) to Rust in one call, avoiding Python-side
601+
# per-chunk dict construction. Empty paths are skipped on the Rust side.
602+
store.set_virtual_refs_arr(
603+
array_path=key_prefix,
604+
chunk_grid_shape=manifest.shape_chunk_grid,
605+
locations=virtual_paths.tolist(),
606+
offsets=manifest._offsets.flatten(),
607+
lengths=manifest._lengths.flatten(),
608+
validate_containers=False,
609+
arr_offset=chunk_index_offsets if any(chunk_index_offsets) else None,
610+
checksum=last_updated_at,
611+
)
612+
613+
614+
async def write_inlined_chunks_as_native(
615+
store: "IcechunkStore",
616+
key_prefix: str,
617+
chunk_key_encoding: ChunkKeyEncoding,
618+
inlined: Mapping[tuple[int, ...], bytes],
619+
chunk_index_offsets: tuple[int, ...],
620+
) -> None:
621+
"""Write each inlined chunk as a native chunk at its zarr chunk key."""
622+
prototype = default_buffer_prototype()
623+
has_offset = any(chunk_index_offsets)
624+
coros = []
625+
for chunk_idx, data in inlined.items():
626+
shifted_idx = (
627+
tuple(c + o for c, o in zip(chunk_idx, chunk_index_offsets))
628+
if has_offset
629+
else chunk_idx
630+
)
631+
encoded_chunk_key = chunk_key_encoding.encode_chunk_key(shifted_idx)
632+
coros.append(
633+
store.set(
634+
f"{key_prefix}/{encoded_chunk_key}",
635+
prototype.buffer.from_bytes(data),
636+
)
637+
)
638+
await asyncio.gather(*coros)

0 commit comments

Comments
 (0)