Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
409 changes: 402 additions & 7 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion icechunk-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ crate-type = ["cdylib"]
bytes = "1.11.0"
chrono = { version = "0.4.42" }
futures = "0.3.31"
icechunk = { path = "../icechunk", version = "0.3.14", features = ["logs"] }
icechunk = { path = "../icechunk", version = "0.3.14", features = ["logs", "arrow"] }
itertools = "0.14.0"
pyo3-arrow = "0.15"
arrow-array = "57"
pyo3 = { version = "0.27.2", features = [
"chrono",
"experimental-async",
Expand Down
24 changes: 24 additions & 0 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ from collections.abc import (
from enum import Enum
from typing import Any, TypeAlias

import pyarrow as pa

class S3Options:
"""Options for accessing an S3-compatible storage backend"""
def __init__(
Expand Down Expand Up @@ -1992,6 +1994,28 @@ class PyStore:
chunks: list[VirtualChunkSpec],
validate_containers: bool,
) -> list[tuple[int, ...]] | None: ...
def set_virtual_refs_arr(
self,
array_path: str,
chunk_grid_shape: list[int],
locations: pa.StringArray,
offsets: pa.UInt64Array,
lengths: pa.UInt64Array,
arr_offset: tuple[int, ...] | None = None,
checksum: datetime.datetime | str | None = None,
validate_containers: bool = True,
) -> list[tuple[int, ...]] | None: ...
async def set_virtual_refs_arr_async(
self,
array_path: str,
chunk_grid_shape: list[int],
locations: pa.StringArray,
offsets: pa.UInt64Array,
lengths: pa.UInt64Array,
arr_offset: tuple[int, ...] | None = None,
checksum: datetime.datetime | str | None = None,
validate_containers: bool = True,
) -> list[tuple[int, ...]] | None: ...
async def delete(self, key: str) -> None: ...
async def delete_dir(self, prefix: str) -> None: ...
@property
Expand Down
162 changes: 162 additions & 0 deletions icechunk-python/python/icechunk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from zarr.core.sync import SyncMixin

if TYPE_CHECKING:
import pyarrow as pa

from icechunk import Session


Expand Down Expand Up @@ -352,6 +354,166 @@ async def set_virtual_refs_async(
array_path, chunks, validate_containers
)

def set_virtual_refs_arr(
self,
array_path: str,
chunk_grid_shape: tuple[int, ...],
locations: "pa.StringArray",
offsets: "pa.UInt64Array",
lengths: "pa.UInt64Array",
*,
arr_offset: tuple[int, ...] | None = None,
checksum: datetime | str | None = None,
validate_containers: bool = True,
) -> list[tuple[int, ...]] | None:
"""Store multiple virtual references using Arrow arrays.

This method is significantly faster than set_virtual_refs for large
numbers of references as it uses Arrow's zero-copy FFI to avoid
creating Python objects per chunk.

Parameters
----------
array_path : str
The path to the array inside the Zarr store.
Example: "/groupA/groupB/outputs/my-array"
chunk_grid_shape : tuple[int, ...]
Shape of the chunk grid. The product must equal the length of the arrays.
Arrays are assumed to be flattened in C (row-major) order.
locations : pa.StringArray
PyArrow StringArray of URLs to external files containing chunk data
offsets : pa.UInt64Array
PyArrow UInt64Array of byte offsets within each file
lengths : pa.UInt64Array
PyArrow UInt64Array of byte lengths of each chunk
arr_offset : tuple[int, ...] | None
Optional offset to add to computed chunk indices. Useful for append
operations where new chunks should be written at an offset from (0,0,...).
Must have the same length as chunk_grid_shape. Default is None.
checksum : datetime | str | None
Optional checksum for all chunks. Can be a datetime (last modified time)
or a string (ETag). Default is None.
validate_containers : bool
If True, validate that locations match registered virtual chunk containers.
Default is True.

Returns
-------
list[tuple[int, ...]] | None
If all virtual references were successfully updated, returns None.
If there were validation errors, returns the chunk indices of all failed references.

Notes
-----
This method requires PyArrow to be installed. The arrays are passed to
Rust via Arrow's zero-copy FFI, making this much more efficient than
creating millions of VirtualChunkSpec Python objects.

Example
-------
>>> import pyarrow as pa
>>> locations = pa.array(["s3://bucket/file1.nc", "s3://bucket/file2.nc"])
>>> offsets = pa.array([0, 1000], type=pa.uint64())
>>> lengths = pa.array([1000, 1000], type=pa.uint64())
>>> store.set_virtual_refs_arr(
... "/data",
... chunk_grid_shape=(2,),
... locations=locations,
... offsets=offsets,
... lengths=lengths,
... )
"""
return self._store.set_virtual_refs_arr(
array_path,
list(chunk_grid_shape),
locations,
offsets,
lengths,
list(arr_offset) if arr_offset is not None else None,
checksum,
validate_containers,
)

async def set_virtual_refs_arr_async(
self,
array_path: str,
chunk_grid_shape: tuple[int, ...],
locations: "pa.StringArray",
offsets: "pa.UInt64Array",
lengths: "pa.UInt64Array",
*,
arr_offset: tuple[int, ...] | None = None,
checksum: datetime | str | None = None,
validate_containers: bool = True,
) -> list[tuple[int, ...]] | None:
"""Store multiple virtual references using Arrow arrays (async version).

This method is significantly faster than set_virtual_refs for large
numbers of references as it uses Arrow's zero-copy FFI to avoid
creating Python objects per chunk.

Parameters
----------
array_path : str
The path to the array inside the Zarr store.
Example: "/groupA/groupB/outputs/my-array"
chunk_grid_shape : tuple[int, ...]
Shape of the chunk grid. The product must equal the length of the arrays.
Arrays are assumed to be flattened in C (row-major) order.
locations : pa.StringArray
PyArrow StringArray of URLs to external files containing chunk data
offsets : pa.UInt64Array
PyArrow UInt64Array of byte offsets within each file
lengths : pa.UInt64Array
PyArrow UInt64Array of byte lengths of each chunk
arr_offset : tuple[int, ...] | None
Optional offset to add to computed chunk indices. Useful for append
operations where new chunks should be written at an offset from (0,0,...).
Must have the same length as chunk_grid_shape. Default is None.
checksum : datetime | str | None
Optional checksum for all chunks. Can be a datetime (last modified time)
or a string (ETag). Default is None.
validate_containers : bool
If True, validate that locations match registered virtual chunk containers.
Default is True.

Returns
-------
list[tuple[int, ...]] | None
If all virtual references were successfully updated, returns None.
If there were validation errors, returns the chunk indices of all failed references.

Notes
-----
This method requires PyArrow to be installed. The arrays are passed to
Rust via Arrow's zero-copy FFI, making this much more efficient than
creating millions of VirtualChunkSpec Python objects.

Example
-------
>>> import pyarrow as pa
>>> locations = pa.array(["s3://bucket/file1.nc", "s3://bucket/file2.nc"])
>>> offsets = pa.array([0, 1000], type=pa.uint64())
>>> lengths = pa.array([1000, 1000], type=pa.uint64())
>>> await store.set_virtual_refs_arr_async(
... "/data",
... chunk_grid_shape=(2,),
... locations=locations,
... offsets=offsets,
... lengths=lengths,
... )
"""
return await self._store.set_virtual_refs_arr_async(
array_path,
list(chunk_grid_shape),
locations,
offsets,
lengths,
list(arr_offset) if arr_offset is not None else None,
checksum,
validate_containers,
)

async def delete(self, key: str) -> None:
"""Remove a key from the store

Expand Down
Loading