From e89f8c6ce7a637a181373abd77c3c3e5e61c1416 Mon Sep 17 00:00:00 2001 From: ypang-neuraco Date: Wed, 24 Jun 2026 15:58:58 +0100 Subject: [PATCH] feat: improve pointcloud logging --- neuracore-dictionary.txt | 1 + neuracore/api/logging.py | 9 +- neuracore/core/data/synced_recording.py | 140 +++++++++++++++--- neuracore/core/streaming/data_stream.py | 37 ++++- .../encoding/point_cloud_trace.py | 63 ++++++++ .../lifecycle/encoder_manager.py | 31 ++-- .../workers/batch_encoder_worker.py | 17 +-- .../registration_manager.py | 6 + .../upload_management/upload_manager.py | 1 + .../test_point_cloud_trace.py | 71 +++++++++ .../test_registration_manager.py | 11 ++ 11 files changed, 342 insertions(+), 45 deletions(-) create mode 100644 neuracore/data_daemon/recording_encoding_disk_manager/encoding/point_cloud_trace.py create mode 100644 tests/unit/data_daemon/recording_disk_manager/test_point_cloud_trace.py diff --git a/neuracore-dictionary.txt b/neuracore-dictionary.txt index bd433f9e3..2dc3d646c 100644 --- a/neuracore-dictionary.txt +++ b/neuracore-dictionary.txt @@ -307,6 +307,7 @@ rotvec roundoff rowwise rtype +savez Safetensors schematypens SCTP diff --git a/neuracore/api/logging.py b/neuracore/api/logging.py index 31c3f0142..532414c8c 100644 --- a/neuracore/api/logging.py +++ b/neuracore/api/logging.py @@ -36,6 +36,7 @@ DataStream, DepthDataStream, JsonDataStream, + PointCloudDataStream, RGBDataStream, VideoDataStream, ) @@ -1345,13 +1346,11 @@ def log_point_cloud( str_id = f"{DataType.POINT_CLOUDS.value}:{name}" stream = robot.get_data_stream(str_id) if stream is None: - stream = JsonDataStream( - data_type=DataType.POINT_CLOUDS, data_type_name=storage_name - ) + stream = PointCloudDataStream(data_type_name=storage_name) robot.add_data_stream(str_id, stream) assert isinstance( - stream, JsonDataStream - ), "Expected stream to be instance of JSONDataStream" + stream, PointCloudDataStream + ), "Expected stream to be instance of PointCloudDataStream" start_stream(robot, stream) point_data = PointCloudData( timestamp=timestamp, diff --git a/neuracore/core/data/synced_recording.py b/neuracore/core/data/synced_recording.py index 417feeb94..a48b8d04a 100644 --- a/neuracore/core/data/synced_recording.py +++ b/neuracore/core/data/synced_recording.py @@ -1,5 +1,6 @@ """Synchronized recording iterator.""" +import json import logging import os import shutil @@ -9,7 +10,7 @@ import time from collections.abc import Callable from pathlib import Path -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast import numpy as np import wget @@ -17,10 +18,15 @@ CameraData, CrossEmbodimentUnion, DataType, + PointCloudData, SynchronizationDetails, ) from neuracore_types import SynchronizedEpisode as SynchronizedEpisodeModel from neuracore_types import SynchronizedPoint, SynchronizeRecordingRequest +from neuracore_types.nc_data.point_cloud_data import ( + POINT_CLOUD_TRACE_BIN_FILENAME, + decode_point_cloud_frame, +) from PIL import Image from neuracore.core.data.cache_manager import CacheManager @@ -122,24 +128,21 @@ def _get_synced_data(self) -> SynchronizedEpisodeModel: response.raise_for_status() return SynchronizedEpisodeModel.model_validate(response.json()) - def _get_video_url(self, camera_type: DataType, camera_id: str) -> str: - """Get streaming URL for a specific camera's video data. + def _get_recording_file_url(self, filepath: str) -> str: + """Get a signed download URL for a file in this recording. Args: - camera_type: Type of camera (e.g., "rgbs", "depths"). - camera_id: Unique identifier for the camera. + filepath: Recording-root-relative path + (e.g. ``rgbs/cam1/lossless.mp4``). Returns: - URL string for downloading the video file. - - Raises: - requests.HTTPError: If the API request fails. + URL string for downloading the file. """ auth = get_auth() session = thread_local_session() response = session.get( f"{API_URL}/org/{self.dataset.org_id}/recording/{self.id}/download_url", - params={"filepath": f"{camera_type.value}/{camera_id}/lossless.mp4"}, + params={"filepath": filepath}, headers=auth.get_headers(), ) response.raise_for_status() @@ -247,7 +250,9 @@ def _download_video_and_cache_frames_to_disk( staging_dir.mkdir() video_location = Path(temp_dir) / f"{camera_id}{camera_type.value}.mp4" wget.download( - self._get_video_url(camera_type, camera_id), + self._get_recording_file_url( + f"{camera_type.value}/{camera_id}/lossless.mp4" + ), str(video_location), bar=None if self._suppress_wget_progress else wget.bar_thermometer, ) @@ -362,17 +367,115 @@ def _get_frame_from_disk_cache( return result - def _insert_camera_data_intro_sync_point( + def _download_bytes(self, url: str) -> bytes: + """Download a remote file and return its contents.""" + with tempfile.TemporaryDirectory() as temp_dir: + destination = Path(temp_dir) / "download.bin" + wget.download( + url, + str(destination), + bar=None if self._suppress_wget_progress else wget.bar_thermometer, + ) + return destination.read_bytes() + + def _cache_point_cloud_frames_to_disk( + self, sensor_id: str, sensor_root: Path + ) -> None: + """Download trace files and cache decoded point cloud frames to disk.""" + trace_prefix = f"{DataType.POINT_CLOUDS.value}/{sensor_id}" + trace_json = json.loads( + self._download_bytes( + self._get_recording_file_url(f"{trace_prefix}/trace.json") + ).decode("utf-8") + ) + if not isinstance(trace_json, list): + raise RuntimeError("Point cloud trace.json must be a JSON array") + + trace_bin_path = sensor_root / POINT_CLOUD_TRACE_BIN_FILENAME + if trace_bin_path.exists(): + trace_bin = trace_bin_path.read_bytes() + else: + trace_bin = self._download_bytes( + self._get_recording_file_url( + f"{trace_prefix}/{POINT_CLOUD_TRACE_BIN_FILENAME}" + ) + ) + + for entry_idx, entry in enumerate(trace_json): + if not isinstance(entry, dict): + raise RuntimeError("Invalid point cloud trace frame metadata") + + frame_idx = entry.get("frame_idx", entry_idx) + frame_file = sensor_root / f"{frame_idx}.npz" + if frame_file.exists(): + continue + + offset = entry.get("offset") + length = entry.get("length") + if not isinstance(offset, int) or not isinstance(length, int): + raise RuntimeError( + f"Invalid point cloud frame offset/length for frame_idx={frame_idx}" + ) + decoded = decode_point_cloud_frame(trace_bin[offset : offset + length]) + + save_kwargs: dict[str, Any] = {"points": decoded.points} + if decoded.rgb_points is not None: + save_kwargs["rgb_points"] = decoded.rgb_points + np.savez_compressed(frame_file, **save_kwargs) + + def _get_point_cloud_from_disk_cache( + self, point_cloud_data: dict[str, PointCloudData] + ) -> dict[str, PointCloudData]: + """Load point cloud arrays from disk cache.""" + result: dict[str, PointCloudData] = {} + for sensor_id, pc_data in point_cloud_data.items(): + sensor_root = ( + self.cache_dir / f"{self.id}" / DataType.POINT_CLOUDS.value / sensor_id + ) + lock_file = sensor_root / ".recording.lock" + self._wait_for_lock_release(lock_file, sensor_root) + + frame_file = sensor_root / f"{pc_data.frame_idx}.npz" + if not sensor_root.exists() or not frame_file.exists(): + sensor_root.mkdir(parents=True, exist_ok=True) + self._download_point_cloud_and_cache_frames_to_disk( + sensor_id, sensor_root + ) + + frame_file = sensor_root / f"{pc_data.frame_idx}.npz" + with np.load(frame_file) as cached: + points = cached["points"] + rgb_points = cached["rgb_points"] if "rgb_points" in cached else None + + result[sensor_id] = pc_data.model_copy( + update={"points": points, "rgb_points": rgb_points} + ) + return result + + def _download_point_cloud_and_cache_frames_to_disk( + self, sensor_id: str, point_cloud_cache_path: Path + ) -> None: + """Download point cloud trace files and cache frames to disk.""" + lock_file = point_cloud_cache_path / ".recording.lock" + lock_acquired = self._create_decoding_lock(lock_file, sensor_id) + + try: + self.cache_manager.ensure_space_available() + self._cache_point_cloud_frames_to_disk(sensor_id, point_cloud_cache_path) + finally: + if lock_acquired: + self._delete_decoding_lock(lock_file) + + def _load_sync_point_payloads( self, sync_point: SynchronizedPoint ) -> SynchronizedPoint: - """Populate video frames for a given sync point. + """Load lazy sensor payloads from disk cache for a sync point. Args: - sync_point: SynchronizedPoint object containing - camera data (without frames). + sync_point: Sync point with metadata-only camera and point cloud entries. Returns: - A new SynchronizedPoint object with populated video frames. + Sync point with camera frames and point cloud arrays populated. """ # Build new data dict with loaded frames new_data = {} @@ -385,6 +488,8 @@ def _insert_camera_data_intro_sync_point( new_data[data_type] = self._get_frame_from_disk_cache( DataType.DEPTH_IMAGES, data_dict, rgb_to_depth_storage ) + elif data_type == DataType.POINT_CLOUDS: + new_data[data_type] = self._get_point_cloud_from_disk_cache(data_dict) else: # create NEW instances to avoid shared references new_data[data_type] = { @@ -408,8 +513,7 @@ def _get_sync_point(self, idx: int) -> SynchronizedPoint: for the specified index. """ sync_point = self._episode_synced.observations[idx] - sync_point = self._insert_camera_data_intro_sync_point(sync_point) - return sync_point + return self._load_sync_point_payloads(sync_point) def __iter__(self) -> "SynchronizedRecording": """Initialize iteration over the episode. diff --git a/neuracore/core/streaming/data_stream.py b/neuracore/core/streaming/data_stream.py index e895d98eb..8ee34cdb7 100644 --- a/neuracore/core/streaming/data_stream.py +++ b/neuracore/core/streaming/data_stream.py @@ -14,7 +14,8 @@ from dataclasses import dataclass import numpy as np -from neuracore_types import CameraData, DataType, NCData +from neuracore_types import CameraData, DataType, NCData, PointCloudData +from neuracore_types.nc_data.point_cloud_data import encode_point_cloud_frame_parts from neuracore.data_daemon.communications_management.producer import ProducerChannel @@ -254,6 +255,40 @@ def log(self, data: NCData, *, send_to_daemon: bool = True) -> None: self._send_to_daemon(json_bytes) +class PointCloudDataStream(DataStream): + """Stream that sends point cloud data with a JSON metadata header and raw arrays.""" + + def __init__(self, data_type_name: str): + """Initialize the point cloud data stream. + + Args: + data_type_name: Name of the point cloud stream + """ + super().__init__(data_type=DataType.POINT_CLOUDS, stream_name=data_type_name) + + def log(self, data: PointCloudData) -> None: + """Log point cloud data using the binary wire format. + + Args: + data: Point cloud data to log + """ + self._latest_data = data + if not self.is_recording(): + return + + header, metadata_json, points_view, rgb_view = encode_point_cloud_frame_parts( + data + ) + total_bytes = len(header) + len(metadata_json) + len(points_view) + parts: tuple[bytes | memoryview, ...] + if rgb_view is not None: + total_bytes += len(rgb_view) + parts = (header, metadata_json, points_view, rgb_view) + else: + parts = (header, metadata_json, points_view) + self._send_to_daemon_parts(parts, total_bytes=total_bytes) + + class VideoDataStream(DataStream): """Stream that sends video frame data to the daemon. diff --git a/neuracore/data_daemon/recording_encoding_disk_manager/encoding/point_cloud_trace.py b/neuracore/data_daemon/recording_encoding_disk_manager/encoding/point_cloud_trace.py new file mode 100644 index 000000000..48206ee4e --- /dev/null +++ b/neuracore/data_daemon/recording_encoding_disk_manager/encoding/point_cloud_trace.py @@ -0,0 +1,63 @@ +"""Persist binary point cloud wire frames and a JSON metadata trace.""" + +from __future__ import annotations + +import json +import pathlib +from typing import Any + +from neuracore_types.nc_data.point_cloud_data import ( + POINT_CLOUD_TRACE_BIN_FILENAME, + decode_point_cloud_wire_metadata, +) + +TRACE_INDEX_FILE = "trace.json" + + +class PointCloudTrace: + """Write point cloud wire frames to trace.bin and metadata to trace.json.""" + + def __init__( + self, + output_dir: pathlib.Path, + *, + bin_filename: str = POINT_CLOUD_TRACE_BIN_FILENAME, + index_filename: str = TRACE_INDEX_FILE, + ) -> None: + """Initialise the point cloud trace writer.""" + self.output_dir = output_dir + self.output_dir.mkdir(parents=True, exist_ok=True) + self.bin_path = self.output_dir / bin_filename + self.index_path = self.output_dir / index_filename + self._frames: list[dict[str, Any]] = [] + self._offset = 0 + self._bin_file = open(self.bin_path, "wb") + + def add_payload(self, payload: bytes) -> None: + """Validate and append one wire frame to trace.bin.""" + if not payload: + return + + metadata, _ = decode_point_cloud_wire_metadata(payload) + self._bin_file.write(payload) + self._frames.append({ + "type": "PointCloudData", + "timestamp": metadata["timestamp"], + "extrinsics": metadata.get("extrinsics"), + "intrinsics": metadata.get("intrinsics"), + "offset": self._offset, + "length": len(payload), + }) + self._offset += len(payload) + + def finish(self) -> None: + """Write the metadata JSON trace file and close trace.bin.""" + for frame_idx, frame_meta in enumerate(self._frames): + frame_meta["frame_idx"] = frame_idx + + self._bin_file.flush() + self._bin_file.close() + self.index_path.write_text( + json.dumps(self._frames, separators=(",", ":"), ensure_ascii=False), + encoding="utf-8", + ) diff --git a/neuracore/data_daemon/recording_encoding_disk_manager/lifecycle/encoder_manager.py b/neuracore/data_daemon/recording_encoding_disk_manager/lifecycle/encoder_manager.py index 27388a795..e320a8586 100644 --- a/neuracore/data_daemon/recording_encoding_disk_manager/lifecycle/encoder_manager.py +++ b/neuracore/data_daemon/recording_encoding_disk_manager/lifecycle/encoder_manager.py @@ -7,15 +7,12 @@ from neuracore.data_daemon.event_emitter import Emitter from neuracore.data_daemon.models import get_content_type -from neuracore.data_daemon.recording_encoding_disk_manager.encoding.json_trace import ( - JsonTrace, -) -from neuracore.data_daemon.recording_encoding_disk_manager.encoding.video_trace import ( - VideoTrace, -) from ..core.trace_filesystem import _TraceFilesystem from ..core.types import TraceKey +from ..encoding.json_trace import JsonTrace +from ..encoding.point_cloud_trace import PointCloudTrace +from ..encoding.video_trace import VideoTrace logger = logging.getLogger(__name__) @@ -48,7 +45,7 @@ def __init__( self._filesystem = filesystem self._abort_trace = abort_trace - self._encoders: dict[TraceKey, JsonTrace | VideoTrace] = {} + self._encoders: dict[TraceKey, JsonTrace | VideoTrace | PointCloudTrace] = {} self._emitter = emitter self._emitter.on(Emitter.TRACE_ABORTED, self._on_trace_aborted) @@ -68,7 +65,9 @@ def _on_trace_aborted(self, trace_key: TraceKey) -> None: "Encoder finish failed during abort for trace %s", trace_key ) - def _get_encoder(self, trace_key: TraceKey) -> JsonTrace | VideoTrace: + def _get_encoder( + self, trace_key: TraceKey + ) -> JsonTrace | VideoTrace | PointCloudTrace: """Get or create the encoder instance for a trace. Args: @@ -83,11 +82,13 @@ def _get_encoder(self, trace_key: TraceKey) -> JsonTrace | VideoTrace: trace_dir = self._filesystem.trace_dir_for(trace_key) content_kind = get_content_type(trace_key.data_type) - created_encoder: JsonTrace | VideoTrace + created_encoder: JsonTrace | VideoTrace | PointCloudTrace try: if content_kind == "RGB": created_encoder = VideoTrace(output_dir=trace_dir) + elif content_kind == "POINT_CLOUD": + created_encoder = PointCloudTrace(output_dir=trace_dir) else: created_encoder = JsonTrace(output_dir=trace_dir) except Exception: @@ -97,7 +98,9 @@ def _get_encoder(self, trace_key: TraceKey) -> JsonTrace | VideoTrace: self._encoders[trace_key] = created_encoder return created_encoder - def safe_get_encoder(self, trace_key: TraceKey) -> JsonTrace | VideoTrace | None: + def safe_get_encoder( + self, trace_key: TraceKey + ) -> JsonTrace | VideoTrace | PointCloudTrace | None: """Get or create an encoder for a trace, converting failures into a trace abort. Args: @@ -112,7 +115,9 @@ def safe_get_encoder(self, trace_key: TraceKey) -> JsonTrace | VideoTrace | None self._abort_trace(trace_key) return None - def pop_encoder(self, trace_key: TraceKey) -> JsonTrace | VideoTrace | None: + def pop_encoder( + self, trace_key: TraceKey + ) -> JsonTrace | VideoTrace | PointCloudTrace | None: """Remove and return an encoder for a trace if present. Args: @@ -123,7 +128,9 @@ def pop_encoder(self, trace_key: TraceKey) -> JsonTrace | VideoTrace | None: """ return self._encoders.pop(trace_key, None) - def clear_all_encoders(self) -> list[tuple[TraceKey, JsonTrace | VideoTrace]]: + def clear_all_encoders( + self, + ) -> list[tuple[TraceKey, JsonTrace | VideoTrace | PointCloudTrace]]: """Remove and return all active encoders. Returns: diff --git a/neuracore/data_daemon/recording_encoding_disk_manager/workers/batch_encoder_worker.py b/neuracore/data_daemon/recording_encoding_disk_manager/workers/batch_encoder_worker.py index 45eae3908..a97526bc9 100644 --- a/neuracore/data_daemon/recording_encoding_disk_manager/workers/batch_encoder_worker.py +++ b/neuracore/data_daemon/recording_encoding_disk_manager/workers/batch_encoder_worker.py @@ -16,15 +16,12 @@ from neuracore.data_daemon.recording_encoding_disk_manager.core.storage_budget import ( StorageBudget, ) -from neuracore.data_daemon.recording_encoding_disk_manager.encoding.json_trace import ( - JsonTrace, -) -from neuracore.data_daemon.recording_encoding_disk_manager.encoding.video_trace import ( - VideoTrace, -) from ..core.trace_filesystem import _TraceFilesystem from ..core.types import BatchJob, RGBSpoolJob, TraceKey +from ..encoding.json_trace import JsonTrace +from ..encoding.point_cloud_trace import PointCloudTrace +from ..encoding.video_trace import VideoTrace from ..lifecycle.encoder_manager import _EncoderManager logger = logging.getLogger(__name__) @@ -380,7 +377,7 @@ def _finalise_remaining_encoders(self) -> None: async def _process_batch_into_encoder( self, batch_job: BatchJob, - encoder: JsonTrace | VideoTrace, + encoder: JsonTrace | VideoTrace | PointCloudTrace, ) -> bool: """Decode one raw batch file and feed its payloads into the provided encoder. @@ -416,6 +413,8 @@ def encoding_work(raw_bytes: bytes) -> None: if isinstance(encoder, VideoTrace): encoder.add_payload(payload) + elif isinstance(encoder, PointCloudTrace): + encoder.add_payload(payload) else: decoded = json.loads(payload.decode("utf-8")) if isinstance(decoded, list): @@ -447,7 +446,7 @@ def encoding_work(raw_bytes: bytes) -> None: async def _process_rgb_spool_into_encoder( self, batch_job: RGBSpoolJob, - encoder: JsonTrace | VideoTrace, + encoder: JsonTrace | VideoTrace | PointCloudTrace, ) -> bool: """Read ordered RGB frame refs from disk and feed them into `VideoTrace`.""" if not isinstance(encoder, VideoTrace): @@ -508,7 +507,7 @@ def encoding_work() -> None: def _finalise_trace_encoder( self, trace_key: TraceKey, - encoder: JsonTrace | VideoTrace, + encoder: JsonTrace | VideoTrace | PointCloudTrace, ) -> None: """Finish a trace encoder, enforce storage limits, and emit TRACE_WRITTEN. diff --git a/neuracore/data_daemon/registration_management/registration_manager.py b/neuracore/data_daemon/registration_management/registration_manager.py index c5862cd14..760adc3c1 100644 --- a/neuracore/data_daemon/registration_management/registration_manager.py +++ b/neuracore/data_daemon/registration_management/registration_manager.py @@ -15,6 +15,7 @@ import aiohttp from neuracore_types import DataType +from neuracore_types.nc_data.point_cloud_data import POINT_CLOUD_TRACE_BIN_FILENAME from neuracore.core.auth import get_auth from neuracore.core.config.get_current_org import get_current_org @@ -57,6 +58,11 @@ def get_cloud_file_list( files.append( {"filepath": f"{prefix}/{LOSSLESS_VIDEO_NAME}", "content_type": "video/mp4"} ) + elif get_content_type(data_type) == "POINT_CLOUD": + files.append({ + "filepath": f"{prefix}/{POINT_CLOUD_TRACE_BIN_FILENAME}", + "content_type": "application/octet-stream", + }) files.append( {"filepath": f"{prefix}/{TRACE_FILE}", "content_type": "application/json"} ) diff --git a/neuracore/data_daemon/upload_management/upload_manager.py b/neuracore/data_daemon/upload_management/upload_manager.py index e8133e16c..328215c94 100644 --- a/neuracore/data_daemon/upload_management/upload_manager.py +++ b/neuracore/data_daemon/upload_management/upload_manager.py @@ -175,6 +175,7 @@ def _get_content_type_for_file(self, file: Path) -> str: content_type_map = { ".mp4": "video/mp4", ".json": "application/json", + ".bin": "application/octet-stream", } return content_type_map.get(file.suffix.lower(), "application/octet-stream") diff --git a/tests/unit/data_daemon/recording_disk_manager/test_point_cloud_trace.py b/tests/unit/data_daemon/recording_disk_manager/test_point_cloud_trace.py new file mode 100644 index 000000000..6d465219f --- /dev/null +++ b/tests/unit/data_daemon/recording_disk_manager/test_point_cloud_trace.py @@ -0,0 +1,71 @@ +"""Tests for PointCloudTrace encoder.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import numpy as np +import pytest +from neuracore_types import PointCloudData +from neuracore_types.nc_data.point_cloud_data import ( + POINT_CLOUD_TRACE_BIN_FILENAME, + decode_point_cloud_frame, + encode_point_cloud_frame_parts, +) + +from neuracore.data_daemon.recording_encoding_disk_manager.encoding import ( + point_cloud_trace, +) + + +def encode_wire_frame(data: PointCloudData) -> bytes: + header, metadata_json, points_view, rgb_view = encode_point_cloud_frame_parts(data) + parts: list[bytes | memoryview] = [header, metadata_json, points_view] + if rgb_view is not None: + parts.append(rgb_view) + return b"".join(parts) + + +def test_point_cloud_trace_writes_bin_and_metadata_array(tmp_path: Path) -> None: + trace = point_cloud_trace.PointCloudTrace(output_dir=tmp_path / "trace") + frame_a = encode_wire_frame( + PointCloudData( + timestamp=1.0, + points=np.array([[1.0, 2.0, 3.0]], dtype=np.float16), + ) + ) + frame_b = encode_wire_frame( + PointCloudData( + timestamp=2.0, + points=np.array([[4.0, 5.0, 6.0]], dtype=np.float16), + ) + ) + trace.add_payload(frame_a) + trace.add_payload(frame_b) + trace.finish() + + bin_bytes = (tmp_path / "trace" / POINT_CLOUD_TRACE_BIN_FILENAME).read_bytes() + index = json.loads((tmp_path / "trace" / "trace.json").read_text(encoding="utf-8")) + + assert isinstance(index, list) + assert len(index) == 2 + assert index[0]["frame_idx"] == 0 + assert index[0]["offset"] == 0 + assert index[0]["length"] == len(frame_a) + assert "points" not in index[0] + assert index[1]["frame_idx"] == 1 + assert index[1]["offset"] == len(frame_a) + assert index[1]["length"] == len(frame_b) + assert bin_bytes == frame_a + frame_b + + decoded_a = decode_point_cloud_frame( + bin_bytes[index[0]["offset"] : index[0]["offset"] + index[0]["length"]] + ) + assert decoded_a.timestamp == 1.0 + + +def test_point_cloud_trace_rejects_invalid_payload(tmp_path: Path) -> None: + trace = point_cloud_trace.PointCloudTrace(output_dir=tmp_path / "trace") + with pytest.raises(ValueError): + trace.add_payload(b'{"type":"PointCloudData"}') diff --git a/tests/unit/data_daemon/registration_management/test_registration_manager.py b/tests/unit/data_daemon/registration_management/test_registration_manager.py index 81d328fa6..35376ef5c 100644 --- a/tests/unit/data_daemon/registration_management/test_registration_manager.py +++ b/tests/unit/data_daemon/registration_management/test_registration_manager.py @@ -94,6 +94,17 @@ def test_non_rgb_type_returns_only_trace(self) -> None: }, ] + def test_point_cloud_type_returns_bin_and_index(self) -> None: + files = get_cloud_file_list(DataType.POINT_CLOUDS, "lidar") + paths = {f["filepath"] for f in files} + assert paths == { + "POINT_CLOUDS/lidar/trace.bin", + "POINT_CLOUDS/lidar/trace.json", + } + by_path = {f["filepath"]: f["content_type"] for f in files} + assert by_path["POINT_CLOUDS/lidar/trace.bin"] == "application/octet-stream" + assert by_path["POINT_CLOUDS/lidar/trace.json"] == "application/json" + class TestBatchRegistration: """Tests for the batch registration HTTP call and outcome handling."""