Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions neuracore-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ rotvec
roundoff
rowwise
rtype
savez
Safetensors
schematypens
SCTP
Expand Down
9 changes: 4 additions & 5 deletions neuracore/api/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
DataStream,
DepthDataStream,
JsonDataStream,
PointCloudDataStream,
RGBDataStream,
VideoDataStream,
)
Expand Down Expand Up @@ -1345,13 +1346,11 @@ def log_point_cloud(
str_id = f"{DataType.POINT_CLOUDS.value}:{name}"
stream = robot.get_data_stream(str_id)
if stream is None:
stream = JsonDataStream(
data_type=DataType.POINT_CLOUDS, data_type_name=storage_name
)
stream = PointCloudDataStream(data_type_name=storage_name)
robot.add_data_stream(str_id, stream)
assert isinstance(
stream, JsonDataStream
), "Expected stream to be instance of JSONDataStream"
stream, PointCloudDataStream
), "Expected stream to be instance of PointCloudDataStream"
start_stream(robot, stream)
point_data = PointCloudData(
timestamp=timestamp,
Expand Down
140 changes: 122 additions & 18 deletions neuracore/core/data/synced_recording.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Synchronized recording iterator."""

import json
import logging
import os
import shutil
Expand All @@ -9,18 +10,23 @@
import time
from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Any, cast

import numpy as np
import wget
from neuracore_types import (
CameraData,
CrossEmbodimentUnion,
DataType,
PointCloudData,
SynchronizationDetails,
)
from neuracore_types import SynchronizedEpisode as SynchronizedEpisodeModel
from neuracore_types import SynchronizedPoint, SynchronizeRecordingRequest
from neuracore_types.nc_data.point_cloud_data import (
POINT_CLOUD_TRACE_BIN_FILENAME,
decode_point_cloud_frame,
)
from PIL import Image

from neuracore.core.data.cache_manager import CacheManager
Expand Down Expand Up @@ -122,24 +128,21 @@ def _get_synced_data(self) -> SynchronizedEpisodeModel:
response.raise_for_status()
return SynchronizedEpisodeModel.model_validate(response.json())

def _get_video_url(self, camera_type: DataType, camera_id: str) -> str:
"""Get streaming URL for a specific camera's video data.
def _get_recording_file_url(self, filepath: str) -> str:
"""Get a signed download URL for a file in this recording.

Args:
camera_type: Type of camera (e.g., "rgbs", "depths").
camera_id: Unique identifier for the camera.
filepath: Recording-root-relative path
(e.g. ``rgbs/cam1/lossless.mp4``).

Returns:
URL string for downloading the video file.

Raises:
requests.HTTPError: If the API request fails.
URL string for downloading the file.
"""
auth = get_auth()
session = thread_local_session()
response = session.get(
f"{API_URL}/org/{self.dataset.org_id}/recording/{self.id}/download_url",
params={"filepath": f"{camera_type.value}/{camera_id}/lossless.mp4"},
params={"filepath": filepath},
headers=auth.get_headers(),
)
response.raise_for_status()
Expand Down Expand Up @@ -247,7 +250,9 @@ def _download_video_and_cache_frames_to_disk(
staging_dir.mkdir()
video_location = Path(temp_dir) / f"{camera_id}{camera_type.value}.mp4"
wget.download(
self._get_video_url(camera_type, camera_id),
self._get_recording_file_url(
f"{camera_type.value}/{camera_id}/lossless.mp4"
),
str(video_location),
bar=None if self._suppress_wget_progress else wget.bar_thermometer,
)
Expand Down Expand Up @@ -362,17 +367,115 @@ def _get_frame_from_disk_cache(

return result

def _insert_camera_data_intro_sync_point(
def _download_bytes(self, url: str) -> bytes:
"""Download a remote file and return its contents."""
with tempfile.TemporaryDirectory() as temp_dir:
destination = Path(temp_dir) / "download.bin"
wget.download(
url,
str(destination),
bar=None if self._suppress_wget_progress else wget.bar_thermometer,
)
return destination.read_bytes()

def _cache_point_cloud_frames_to_disk(
self, sensor_id: str, sensor_root: Path
) -> None:
"""Download trace files and cache decoded point cloud frames to disk."""
trace_prefix = f"{DataType.POINT_CLOUDS.value}/{sensor_id}"
trace_json = json.loads(
self._download_bytes(
self._get_recording_file_url(f"{trace_prefix}/trace.json")
).decode("utf-8")
)
if not isinstance(trace_json, list):
raise RuntimeError("Point cloud trace.json must be a JSON array")

trace_bin_path = sensor_root / POINT_CLOUD_TRACE_BIN_FILENAME
if trace_bin_path.exists():
trace_bin = trace_bin_path.read_bytes()
else:
trace_bin = self._download_bytes(
self._get_recording_file_url(
f"{trace_prefix}/{POINT_CLOUD_TRACE_BIN_FILENAME}"
)
)

for entry_idx, entry in enumerate(trace_json):
if not isinstance(entry, dict):
raise RuntimeError("Invalid point cloud trace frame metadata")

frame_idx = entry.get("frame_idx", entry_idx)
frame_file = sensor_root / f"{frame_idx}.npz"
if frame_file.exists():
continue

offset = entry.get("offset")
length = entry.get("length")
if not isinstance(offset, int) or not isinstance(length, int):
raise RuntimeError(
f"Invalid point cloud frame offset/length for frame_idx={frame_idx}"
)
decoded = decode_point_cloud_frame(trace_bin[offset : offset + length])

save_kwargs: dict[str, Any] = {"points": decoded.points}
if decoded.rgb_points is not None:
save_kwargs["rgb_points"] = decoded.rgb_points
np.savez_compressed(frame_file, **save_kwargs)

def _get_point_cloud_from_disk_cache(
self, point_cloud_data: dict[str, PointCloudData]
) -> dict[str, PointCloudData]:
"""Load point cloud arrays from disk cache."""
result: dict[str, PointCloudData] = {}
for sensor_id, pc_data in point_cloud_data.items():
sensor_root = (
self.cache_dir / f"{self.id}" / DataType.POINT_CLOUDS.value / sensor_id
)
lock_file = sensor_root / ".recording.lock"
self._wait_for_lock_release(lock_file, sensor_root)

frame_file = sensor_root / f"{pc_data.frame_idx}.npz"
if not sensor_root.exists() or not frame_file.exists():
sensor_root.mkdir(parents=True, exist_ok=True)
self._download_point_cloud_and_cache_frames_to_disk(
sensor_id, sensor_root
)

frame_file = sensor_root / f"{pc_data.frame_idx}.npz"
with np.load(frame_file) as cached:
points = cached["points"]
rgb_points = cached["rgb_points"] if "rgb_points" in cached else None

result[sensor_id] = pc_data.model_copy(
update={"points": points, "rgb_points": rgb_points}
)
return result

def _download_point_cloud_and_cache_frames_to_disk(
self, sensor_id: str, point_cloud_cache_path: Path
) -> None:
"""Download point cloud trace files and cache frames to disk."""
lock_file = point_cloud_cache_path / ".recording.lock"
lock_acquired = self._create_decoding_lock(lock_file, sensor_id)

try:
self.cache_manager.ensure_space_available()
self._cache_point_cloud_frames_to_disk(sensor_id, point_cloud_cache_path)
finally:
if lock_acquired:
self._delete_decoding_lock(lock_file)

def _load_sync_point_payloads(
self, sync_point: SynchronizedPoint
) -> SynchronizedPoint:
"""Populate video frames for a given sync point.
"""Load lazy sensor payloads from disk cache for a sync point.

Args:
sync_point: SynchronizedPoint object containing
camera data (without frames).
sync_point: Sync point with metadata-only camera and point cloud entries.

Returns:
A new SynchronizedPoint object with populated video frames.
Sync point with camera frames and point cloud arrays populated.
"""
# Build new data dict with loaded frames
new_data = {}
Expand All @@ -385,6 +488,8 @@ def _insert_camera_data_intro_sync_point(
new_data[data_type] = self._get_frame_from_disk_cache(
DataType.DEPTH_IMAGES, data_dict, rgb_to_depth_storage
)
elif data_type == DataType.POINT_CLOUDS:
new_data[data_type] = self._get_point_cloud_from_disk_cache(data_dict)
else:
# create NEW instances to avoid shared references
new_data[data_type] = {
Expand All @@ -408,8 +513,7 @@ def _get_sync_point(self, idx: int) -> SynchronizedPoint:
for the specified index.
"""
sync_point = self._episode_synced.observations[idx]
sync_point = self._insert_camera_data_intro_sync_point(sync_point)
return sync_point
return self._load_sync_point_payloads(sync_point)

def __iter__(self) -> "SynchronizedRecording":
"""Initialize iteration over the episode.
Expand Down
37 changes: 36 additions & 1 deletion neuracore/core/streaming/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from dataclasses import dataclass

import numpy as np
from neuracore_types import CameraData, DataType, NCData
from neuracore_types import CameraData, DataType, NCData, PointCloudData
from neuracore_types.nc_data.point_cloud_data import encode_point_cloud_frame_parts

from neuracore.data_daemon.communications_management.producer import ProducerChannel

Expand Down Expand Up @@ -254,6 +255,40 @@ def log(self, data: NCData, *, send_to_daemon: bool = True) -> None:
self._send_to_daemon(json_bytes)


class PointCloudDataStream(DataStream):
"""Stream that sends point cloud data with a JSON metadata header and raw arrays."""

def __init__(self, data_type_name: str):
"""Initialize the point cloud data stream.

Args:
data_type_name: Name of the point cloud stream
"""
super().__init__(data_type=DataType.POINT_CLOUDS, stream_name=data_type_name)

def log(self, data: PointCloudData) -> None:
"""Log point cloud data using the binary wire format.

Args:
data: Point cloud data to log
"""
self._latest_data = data
if not self.is_recording():
return

header, metadata_json, points_view, rgb_view = encode_point_cloud_frame_parts(
data
)
total_bytes = len(header) + len(metadata_json) + len(points_view)
parts: tuple[bytes | memoryview, ...]
if rgb_view is not None:
total_bytes += len(rgb_view)
parts = (header, metadata_json, points_view, rgb_view)
else:
parts = (header, metadata_json, points_view)
self._send_to_daemon_parts(parts, total_bytes=total_bytes)


class VideoDataStream(DataStream):
"""Stream that sends video frame data to the daemon.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Persist binary point cloud wire frames and a JSON metadata trace."""

from __future__ import annotations

import json
import pathlib
from typing import Any

from neuracore_types.nc_data.point_cloud_data import (
POINT_CLOUD_TRACE_BIN_FILENAME,
decode_point_cloud_wire_metadata,
)

TRACE_INDEX_FILE = "trace.json"


class PointCloudTrace:
"""Write point cloud wire frames to trace.bin and metadata to trace.json."""

def __init__(
self,
output_dir: pathlib.Path,
*,
bin_filename: str = POINT_CLOUD_TRACE_BIN_FILENAME,
index_filename: str = TRACE_INDEX_FILE,
) -> None:
"""Initialise the point cloud trace writer."""
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
self.bin_path = self.output_dir / bin_filename
self.index_path = self.output_dir / index_filename
self._frames: list[dict[str, Any]] = []
self._offset = 0
self._bin_file = open(self.bin_path, "wb")

def add_payload(self, payload: bytes) -> None:
"""Validate and append one wire frame to trace.bin."""
if not payload:
return

metadata, _ = decode_point_cloud_wire_metadata(payload)
self._bin_file.write(payload)
self._frames.append({
"type": "PointCloudData",
"timestamp": metadata["timestamp"],
"extrinsics": metadata.get("extrinsics"),
"intrinsics": metadata.get("intrinsics"),
"offset": self._offset,
"length": len(payload),
})
self._offset += len(payload)

def finish(self) -> None:
"""Write the metadata JSON trace file and close trace.bin."""
for frame_idx, frame_meta in enumerate(self._frames):
frame_meta["frame_idx"] = frame_idx

self._bin_file.flush()
self._bin_file.close()
self.index_path.write_text(
json.dumps(self._frames, separators=(",", ":"), ensure_ascii=False),
encoding="utf-8",
)
Loading
Loading