From b70c963ffa97c99b3e733a12593a79a50a24d4a5 Mon Sep 17 00:00:00 2001 From: Cougar Tasker Date: Wed, 17 Jun 2026 11:41:38 +0000 Subject: [PATCH] feat: integrate rust daemon behind a flag --- .github/workflows/build-wheels.yaml | 91 +++++ MANIFEST.in | 16 + docs/data_daemon.md | 6 +- neuracore-dictionary.txt | 1 + neuracore/api/core.py | 88 ++++- neuracore/api/datasets.py | 5 + neuracore/api/globals.py | 9 + neuracore/api/logging.py | 365 +++++++++++++++--- neuracore/core/data/synced_recording.py | 2 + neuracore/core/robot.py | 144 ++++++- neuracore/core/streaming/data_stream.py | 140 +++++-- .../core/streaming/recording_state_manager.py | 48 ++- neuracore/data_daemon/__main__.py | 50 ++- .../shared_transport/recording_context.py | 281 +++++++++++++- .../config_manager/args_handler.py | 24 ++ .../data_daemon/config_manager/cli_options.py | 21 + .../data_daemon/config_manager/config.py | 3 +- .../config_manager/daemon_config.py | 3 + .../data_daemon/config_manager/helpers.py | 4 + .../data_daemon/config_manager/profiles.py | 2 +- neuracore/data_daemon/const.py | 1 + .../lifecycle/daemon_os_control.py | 200 ++++++++-- neuracore/data_daemon/main.py | 2 + neuracore/data_daemon/rust_selection.py | 31 ++ rust/scripts/build_wheel_artefacts.sh | 76 ++++ setup.py | 47 +++ tests/unit/api/test_core.py | 1 + tests/unit/api/test_logging.py | 84 ++++ tests/unit/core/test_data_stream.py | 33 ++ tests/unit/core/test_robot_stop_streams.py | 1 + .../data_daemon/cli/test_stop_status_cli.py | 72 ++++ .../config_manager/test_cli_handlers.py | 4 +- .../config_manager/test_profiles_api.py | 4 + .../lifecycle/test_daemon_os_control.py | 30 +- 34 files changed, 1711 insertions(+), 178 deletions(-) create mode 100644 .github/workflows/build-wheels.yaml create mode 100644 MANIFEST.in create mode 100644 neuracore/data_daemon/rust_selection.py create mode 100755 rust/scripts/build_wheel_artefacts.sh diff --git a/.github/workflows/build-wheels.yaml b/.github/workflows/build-wheels.yaml new file mode 100644 index 000000000..4f0e118ba --- /dev/null +++ b/.github/workflows/build-wheels.yaml @@ -0,0 +1,91 @@ +name: Build Wheels + +# Builds the Python wheel that bundles the Rust data-daemon binary and the +# producer cdylib. See docs/rust_data_daemon_development.md#packaging-the-wheel +# for the build pipeline rationale (cargo → build_wheel_artefacts.sh → python -m build). + +on: + pull_request: + branches: + - main + paths: + - 'rust/**' + - 'neuracore/data_daemon/**' + - 'setup.py' + - 'MANIFEST.in' + - '.github/workflows/build-wheels.yaml' + push: + branches: + - main + paths: + - 'rust/**' + - 'neuracore/data_daemon/**' + - 'setup.py' + - 'MANIFEST.in' + - '.github/workflows/build-wheels.yaml' + workflow_call: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + build: + name: wheel (${{ matrix.os }}, py${{ matrix.python-version }}) + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + # Linux-first per docs/data-daemon-rewrite.md §Open items. + # aarch64 ships when there's demand — the helper script is platform- + # agnostic but cross-compilation isn't wired up here yet. + os: [ubuntu-22.04] + python-version: ["3.10", "3.11"] + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo build + uses: Swatinem/rust-cache@v2 + with: + workspaces: rust + + - name: Set up Python ${{ matrix.python-version }} + id: setup-python + uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python-version }} + + - name: Install build tooling + run: python -m pip install --upgrade pip build + + - name: Build Rust artefacts + # PYO3_PYTHON pins pyo3-build-config to the matrix interpreter so the + # produced cdylib's ABI matches the wheel's python tag. Ubuntu images + # ship multiple python3 binaries; relying on PATH order would silently + # pick the wrong one and produce a wheel that imports under the wrong + # interpreter. + env: + PYO3_PYTHON: ${{ steps.setup-python.outputs.python-path }} + run: ./rust/scripts/build_wheel_artefacts.sh + + - name: Build wheel + run: python -m build --wheel + + - name: Smoke-test the built wheel + run: | + python -m pip install dist/neuracore-*.whl + python -c "import neuracore.data_daemon._native_producer as p; print('producer ok:', p)" + python -c "from importlib.resources import files; b = files('neuracore.data_daemon') / 'bin' / 'data-daemon'; assert b.is_file(), b; print('binary ok:', b)" + + - name: Upload wheel artefact + uses: actions/upload-artifact@v4 + with: + name: neuracore-wheel-${{ matrix.os }}-py${{ matrix.python-version }} + path: dist/*.whl + if-no-files-found: error + retention-days: 7 diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..430ced192 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,16 @@ +# Sources required to rebuild the bundled Rust artefacts from an sdist. +# The artefacts themselves (neuracore/data_daemon/bin/data-daemon and +# neuracore/data_daemon/_native_producer.so) are gitignored and only land +# in the *wheel* — the sdist ships the Rust sources so a downstream +# packager can run rust/scripts/build_wheel_artefacts.sh themselves. +# +# See docs/rust_data_daemon_development.md#packaging-the-wheel. + +include README.md +include LICENSE + +recursive-include rust *.rs *.toml *.lock *.sql *.md *.sh + +prune rust/target +global-exclude __pycache__ +global-exclude *.py[cod] diff --git a/docs/data_daemon.md b/docs/data_daemon.md index f5599fdfe..b87a43867 100644 --- a/docs/data_daemon.md +++ b/docs/data_daemon.md @@ -203,6 +203,7 @@ These are the supported settings: |---|---| | `storage_limit` | Maximum local disk space the daemon should use for recordings (bytes). | | `bandwidth_limit` | Maximum upload speed the daemon should use (bytes per second). | +| `spool_limit` | Cap on the producer's on-disk video spool backlog (bytes). When the un-encoded backlog reaches this size the producer applies backpressure to video logging instead of letting the spool fill the disk. `0` disables the bound. Defaults to 2 GiB. | | `path_to_store_record` | Folder where recordings are stored. | | `num_threads` | Number of worker threads used by the daemon. | | `keep_wakelock_while_upload` | Whether to keep the machine awake during uploads (where supported). | @@ -253,6 +254,7 @@ Supported environment variables: |---|---| | `storage_limit` | `NCD_STORAGE_LIMIT` | | `bandwidth_limit` | `NCD_BANDWIDTH_LIMIT` | +| `spool_limit` | `NCD_SPOOL_LIMIT` | | `path_to_store_record` | `NCD_PATH_TO_STORE_RECORD` | | `num_threads` | `NCD_NUM_THREADS` | | `keep_wakelock_while_upload` | `NCD_KEEP_WAKELOCK_WHILE_UPLOAD` | @@ -320,13 +322,13 @@ neuracore data-daemon profile create laptop Update a named profile: ```bash -neuracore data-daemon profile update [--storage-limit ] [--bandwidth-limit ] [--storage-path ] [--num-threads ] [--max-concurrent-uploads ] [--wakelock|--no-wakelock] [--offline|--online] [--api-key ] [--current-org-id ] +neuracore data-daemon profile update [--storage-limit ] [--bandwidth-limit ] [--spool-limit ] [--storage-path ] [--num-threads ] [--wakelock|--no-wakelock] [--offline|--online] [--api-key ] [--current-org-id ] ``` Update the default profile: ```bash -neuracore data-daemon profile update [--storage-limit ] [--bandwidth-limit ] [--storage-path ] [--num-threads ] [--max-concurrent-uploads ] [--wakelock|--no-wakelock] [--offline|--online] [--api-key ] [--current-org-id ] +neuracore data-daemon profile update [--storage-limit ] [--bandwidth-limit ] [--spool-limit ] [--storage-path ] [--num-threads ] [--wakelock|--no-wakelock] [--offline|--online] [--api-key ] [--current-org-id ] ``` Example: diff --git a/neuracore-dictionary.txt b/neuracore-dictionary.txt index 9178b380d..bfb437d5a 100644 --- a/neuracore-dictionary.txt +++ b/neuracore-dictionary.txt @@ -126,6 +126,7 @@ elems embs Emika ENOENT +enoexec EPERM EPIPE erfinv diff --git a/neuracore/api/core.py b/neuracore/api/core.py index e6ec85c1b..f66eb5d5e 100644 --- a/neuracore/api/core.py +++ b/neuracore/api/core.py @@ -20,6 +20,7 @@ ) from neuracore.core.streaming.recording_state_manager import get_recording_state_manager from neuracore.core.utils import backend_utils +from neuracore.data_daemon.rust_selection import rust_daemon_enabled from ..core.auth import get_auth from ..core.data.dataset import Dataset @@ -103,6 +104,7 @@ def logout() -> None: get_auth().logout() GlobalSingleton()._active_robot = None GlobalSingleton()._active_dataset_id = None + GlobalSingleton()._active_dataset = None GlobalSingleton()._has_validated_version = False @@ -245,7 +247,11 @@ def is_recording(robot_name: str | None = None, instance: int = 0) -> bool: return robot.is_recording() -def start_recording(robot_name: str | None = None, instance: int = 0) -> None: +def start_recording( + robot_name: str | None = None, + instance: int = 0, + timestamp: float | None = None, +) -> None: """Start recording data for a specific robot. Begins a new recording session for the specified robot, capturing all @@ -256,6 +262,9 @@ def start_recording(robot_name: str | None = None, instance: int = 0) -> None: robot_name: Robot identifier. If not provided, uses the currently active robot from the global state. instance: Instance number of the robot for multi-instance scenarios. + timestamp: Optional capture time (Unix seconds) for the recording's + start, matching the ``log_*`` methods. When omitted, the current + time is captured. Raises: RobotError: If no robot is active and no robot_name is provided, @@ -266,10 +275,12 @@ def start_recording(robot_name: str | None = None, instance: int = 0) -> None: active_dataset_id = GlobalSingleton()._active_dataset_id if active_dataset_id is None: raise RobotError("No active dataset. Call create_dataset() first.") - try: - active_dataset = Dataset.get_by_id(active_dataset_id) - except DatasetError: - active_dataset = None + active_dataset = GlobalSingleton()._active_dataset + if active_dataset is None or active_dataset.id != active_dataset_id: + try: + active_dataset = Dataset.get_by_id(active_dataset_id) + except DatasetError: + active_dataset = None if active_dataset is not None: if robot.shared and not active_dataset.is_shared: raise RobotError( @@ -286,11 +297,14 @@ def start_recording(robot_name: str | None = None, instance: int = 0) -> None: "shared robot, ensure connect_robot(shared=True) succeeded. " f"Active dataset: '{active_dataset.name}' ({active_dataset.id})." ) - robot.start_recording(active_dataset_id) + robot.start_recording(active_dataset_id, timestamp=timestamp) def stop_recording( - robot_name: str | None = None, instance: int = 0, wait: bool = False + robot_name: str | None = None, + instance: int = 0, + wait: bool = False, + timestamp: float | None = None, ) -> None: """Stop recording data for a specific robot. @@ -303,6 +317,9 @@ def stop_recording( instance: Instance number of the robot for multi-instance scenarios. wait: Whether to block until all data streams have finished uploading to the backend storage. + timestamp: Optional capture time (Unix seconds) for the recording's + stop, matching the ``log_*`` methods. When omitted, the current + time is captured. Raises: RobotError: If no robot is active and no robot_name is provided. @@ -317,10 +334,20 @@ def stop_recording( recording_id = robot.get_current_recording_id() if not recording_id: raise ValueError("Recording_id is None, no current recording") - robot.stop_recording(recording_id, wait_for_producer_drain=wait) - - if not wait: - return + if rust_daemon_enabled(): + cloud_recording_id = robot.get_cloud_recording_id() if wait else None + robot.stop_recording( + recording_id, wait_for_producer_drain=wait, timestamp=timestamp + ) + if not wait or not cloud_recording_id: + return + recording_id = cloud_recording_id + else: + robot.stop_recording( + recording_id, wait_for_producer_drain=wait, timestamp=timestamp + ) + if not wait: + return # TODO: We need to instead check that the specific recording is complete is_traces_registered = False @@ -333,6 +360,34 @@ def stop_recording( time.sleep(0.2) +def get_cloud_recording_id( + robot_name: str | None = None, + instance: int = 0, + timestamp_ns: int | None = None, + timeout_s: float = 30.0, +) -> str | None: + """Resolve the daemon-owned cloud recording id for a robot's recording. + + Under the Rust daemon the cloud recording id is assigned asynchronously by + the daemon. This asks the daemon (it may block up to ``timeout_s``) for the + id of the recording whose window brackets ``timestamp_ns`` for this source + (defaulting to the most recently started recording). For + non-performance-critical use only (tests, ``stop_recording(wait=True)``). + + Args: + robot_name: Robot identifier. Defaults to the active robot. + instance: Robot instance number. + timestamp_ns: A wall-clock instant inside the target recording window; + defaults to the most recent recording for the source. + timeout_s: Maximum time to wait for the daemon to mint the id. + + Returns: + The cloud recording id, or ``None`` on timeout / legacy daemon. + """ + robot = _get_robot(robot_name, instance) + return robot.get_cloud_recording_id(timestamp_ns=timestamp_ns, timeout_s=timeout_s) + + def stop_live_data(robot_name: str | None = None, instance: int = 0) -> None: """Stop sharing live data for active monitoring from the Neuracore platform. @@ -355,12 +410,19 @@ def stop_live_data(robot_name: str | None = None, instance: int = 0) -> None: StreamManagerOrchestrator().remove_manager(robot.id, robot.instance) -def cancel_recording(robot_name: str | None = None, instance: int = 0) -> None: +def cancel_recording( + robot_name: str | None = None, + instance: int = 0, + timestamp: float | None = None, +) -> None: """Cancel the current recording for a specific robot without saving any data. Args: robot_name: Robot identifier. instance: Instance number of the robot for multi-instance scenarios. + timestamp: Optional capture time (Unix seconds) for the cancel, + mirroring ``stop_recording``. When omitted, the current time is + captured. """ robot = _get_robot(robot_name, instance) @@ -369,4 +431,4 @@ def cancel_recording(robot_name: str | None = None, instance: int = 0) -> None: recording_id = robot.get_current_recording_id() if not recording_id: return - robot.cancel_recording(recording_id) + robot.cancel_recording(recording_id, timestamp=timestamp) diff --git a/neuracore/api/datasets.py b/neuracore/api/datasets.py index 30b607373..755800776 100644 --- a/neuracore/api/datasets.py +++ b/neuracore/api/datasets.py @@ -48,6 +48,7 @@ def get_dataset(name: str | None = None, id: str | None = None) -> Dataset: if _active_dataset is None: raise ValueError(f"No Dataset found with the given name: {name} or ID: {id}") GlobalSingleton()._active_dataset_id = _active_dataset.id + GlobalSingleton()._active_dataset = _active_dataset return _active_dataset @@ -95,6 +96,7 @@ def merge_datasets(name: str, dataset_names: list[str]) -> Dataset: data_types=list(dataset_model.all_data_types.keys()), ) GlobalSingleton()._active_dataset_id = merged.id + GlobalSingleton()._active_dataset = merged return merged @@ -169,6 +171,7 @@ def clone_dataset( "available immediately." ) GlobalSingleton()._active_dataset_id = cloned.id + GlobalSingleton()._active_dataset = cloned return cloned # resolve for source_dataset if not provided, to get the total number of recordings @@ -197,6 +200,7 @@ def clone_dataset( pbar.close() GlobalSingleton()._active_dataset_id = cloned.id + GlobalSingleton()._active_dataset = cloned return cloned @@ -224,4 +228,5 @@ def create_dataset( """ _active_dataset = Dataset.create(name, description, tags, shared) GlobalSingleton()._active_dataset_id = _active_dataset.id + GlobalSingleton()._active_dataset = _active_dataset return _active_dataset diff --git a/neuracore/api/globals.py b/neuracore/api/globals.py index 64d1d11ee..bf15ad13c 100644 --- a/neuracore/api/globals.py +++ b/neuracore/api/globals.py @@ -5,9 +5,14 @@ and validation status. """ +from typing import TYPE_CHECKING + from neuracore.core.robot import Robot from neuracore.core.utils.singleton_metaclass import SingletonMetaclass +if TYPE_CHECKING: + from neuracore.core.data.dataset import Dataset + class GlobalSingleton(metaclass=SingletonMetaclass): """Singleton class for managing global Neuracore session state. @@ -24,8 +29,12 @@ class GlobalSingleton(metaclass=SingletonMetaclass): for operations when no specific robot is specified. _active_dataset_id: ID of the currently active dataset that new recordings will be associated with. + _active_dataset: The active dataset object cached alongside its id, so + hot-path callers (e.g. ``start_recording``) can read its metadata + without a synchronous backend fetch. """ _has_validated_version = False _active_robot: Robot | None = None _active_dataset_id: str | None = None + _active_dataset: "Dataset | None" = None diff --git a/neuracore/api/logging.py b/neuracore/api/logging.py index 31c3f0142..55d58fc00 100644 --- a/neuracore/api/logging.py +++ b/neuracore/api/logging.py @@ -5,9 +5,11 @@ All logging functions support optional robot identification and timestamping. """ +import json import logging import time from dataclasses import dataclass +from itertools import islice from warnings import filterwarnings, warn import numpy as np @@ -35,10 +37,14 @@ DataRecordingContext, DataStream, DepthDataStream, + JointDataStream, JsonDataStream, RGBDataStream, VideoDataStream, ) +from neuracore.core.streaming.p2p.provider.global_live_data_enabled import ( + get_provide_live_data_enabled_manager, +) from neuracore.core.streaming.p2p.stream_manager_orchestrator import ( StreamManagerOrchestrator, ) @@ -48,6 +54,7 @@ BatchedJointDataItemPayload, BatchedJointDataPayload, ) +from neuracore.data_daemon.rust_selection import rust_daemon_enabled logger = logging.getLogger(__name__) @@ -67,7 +74,7 @@ class JointStreamBinding: stream_id: str storage_name: str - stream: JsonDataStream + stream: JointDataStream @dataclass(frozen=True) @@ -79,6 +86,100 @@ class LoggedJointData: trace_id: str | None +_JOINT_SAMPLE_SIZE = 8 + + +def _smoke_validate_joint_values(joint_data: dict[str, float]) -> None: + """Smoke-test that a sample of joint values are floats. + + Only test a fixed number of joints to avoid the cost scaling with the number + of joints. + + Args: + joint_data: The joint frame being logged (non-empty). + + Raises: + ValueError: If a sampled value is not a float. + """ + head = list(islice(joint_data.items(), _JOINT_SAMPLE_SIZE)) + tail = list(islice(reversed(joint_data.items()), _JOINT_SAMPLE_SIZE)) + for joint_name, joint_value in head: + if not isinstance(joint_value, float): + raise ValueError(f"Joint data must be floats. {joint_name} is not a float.") + for joint_name, joint_value in tail: + if not isinstance(joint_value, float): + raise ValueError(f"Joint data must be floats. {joint_name} is not a float.") + + +@dataclass(frozen=True) +class ResolvedJointGroup: + r"""Per-(robot, data_type) resolution of a joint frame, reused across frames. + + Attributes: + joint_names: The frame's joint names in order (``tuple(joint_data)``). + bindings: The frame's bindings in order (``list(JointStreamBinding)``). + joined_names: ``\0``-joined storage names for the batched rust + ``log_joints`` call. + started_recording_id: The recording the bindings' streams were started + for (``None`` when not recording). + """ + + joint_names: tuple[str, ...] + bindings: list[JointStreamBinding] + joined_names: str + started_recording_id: str | None + + +def _resolve_joint_group( + robot: Robot, + data_type: DataType, + joint_names: tuple[str, ...], + bindings_for_type: dict[str, JointStreamBinding], + current_recording_id: str | None, +) -> ResolvedJointGroup: + """Return the cached resolution for ``joint_names``, rebuilding if it changed. + + Args: + robot: Robot instance. + data_type: Joint data type being logged. + joint_names: The frame's joint names in order (``tuple(joint_data)``). + bindings_for_type: The robot's per-joint binding cache for this type. + current_recording_id: The active recording id, or ``None``. + + Returns: + The resolved, cached group for this frame. + """ + cached = robot._joint_group_cache.get(data_type) + if ( + cached is not None + and cached.started_recording_id == current_recording_id + and cached.joint_names == joint_names + ): + return cached + + record_context: DataRecordingContext | None = None + bindings: list[JointStreamBinding] = [] + for joint_name in joint_names: + binding = bindings_for_type.get(joint_name) + if binding is None: + binding = _get_or_create_joint_stream(data_type, joint_name, robot) + bindings_for_type[joint_name] = binding + if current_recording_id is not None and not binding.stream.is_recording(): + if record_context is None: + record_context = _build_recording_context(robot, current_recording_id) + binding.stream.start_recording(record_context) + bindings.append(binding) + + group = ResolvedJointGroup( + joint_names=joint_names, + bindings=bindings, + joined_names="\0".join(binding.storage_name for binding in bindings), + started_recording_id=current_recording_id, + ) + robot._joint_group_cache[data_type] = group + return group + + def _publish_json_to_p2p( robot: Robot, str_id: str, @@ -103,6 +204,8 @@ def _publish_json_to_p2p( """ if robot.id is None: raise RobotError("Robot not initialized. Call init() first.") + if get_provide_live_data_enabled_manager().is_disabled(): + return StreamManagerOrchestrator().get_provider_manager( robot.id, robot.instance ).get_json_source(str_id, data_type, sensor_key=str_id).publish( @@ -110,6 +213,47 @@ def _publish_json_to_p2p( ) +def _record_json_to_daemon( + robot: Robot, + data_type: DataType, + storage_name: str, + data: ( + JointData + | Custom1DData + | PoseData + | EndEffectorPoseData + | ParallelGripperOpenAmountData + | LanguageData + | PointCloudData + ), + timestamp: float, +) -> None: + """Forward one JSON sample to the Rust daemon's recording pipeline. + + The generic counterpart to :func:`_publish_json_to_p2p`: where that feeds + live consumers, this persists the sample into the active recording. Every + non-joint, non-video data type shares this single path — the daemon's + ``log_json`` entry point is datatype-agnostic, so adding a new JSON type + needs no daemon-side change. + + A no-op unless the Rust daemon is active and a recording is in progress + (the legacy daemon is fed by :meth:`JsonDataStream.log` instead). + + Args: + robot: Robot instance owning the daemon recording context. + data_type: Wire label for the sample's trace. + storage_name: Sensor name the trace is stored under. + data: Data object to serialize and persist. + timestamp: Capture timestamp in seconds. + """ + if not (rust_daemon_enabled() and robot.get_current_recording_id() is not None): + return + payload = json.dumps(data.model_dump(mode="json")).encode("utf-8") + robot._get_daemon_recording_context().log_json( + data_type.value, storage_name, payload, timestamp + ) + + def _publish_video_to_p2p( robot: Robot, name: str, @@ -128,6 +272,8 @@ def _publish_video_to_p2p( """ if robot.id is None: raise RobotError("Robot not initialized. Call init() first.") + if get_provide_live_data_enabled_manager().is_disabled(): + return StreamManagerOrchestrator().get_provider_manager( robot.id, robot.instance ).get_video_source(name, camera_type, f"{name}_{camera_type}").add_frame( @@ -135,6 +281,44 @@ def _publish_video_to_p2p( ) +def _build_recording_context(robot: Robot, recording_id: str) -> DataRecordingContext: + """Build the recording context a robot's streams start with for one recording. + + Every field is invariant across the streams logged in a single call (they + share one recording, robot, and dataset), so this can be resolved once and + reused — see :func:`_log_group_of_joint_data`, which hoists it out of the + per-joint loop rather than rebuilding it (and re-querying the dataset id) for + each of a robot's joints on a recording's first frame. + + Args: + robot: Robot instance. + recording_id: The active recording id the streams are being started for. + + Returns: + The shared :class:`DataRecordingContext` for the streams to start with. + """ + instance_key = RobotInstanceIdentifier( + robot_id=robot.id, + robot_instance=robot.instance, + ) + dataset_id = get_recording_state_manager().active_dataset_ids.get(instance_key) + if dataset_id is None: + dataset_id = GlobalSingleton()._active_dataset_id + logger.debug( + "start_stream: falling back to global dataset_id=%s recording_id=%s", + dataset_id, + recording_id, + ) + return DataRecordingContext( + recording_id=recording_id, + robot_id=robot.id, + robot_name=robot.name, + robot_instance=robot.instance, + dataset_id=dataset_id, + dataset_name=None, + ) + + def start_stream(robot: Robot, data_stream: DataStream) -> None: """Start recording on a data stream if robot is currently recording. @@ -144,27 +328,7 @@ def start_stream(robot: Robot, data_stream: DataStream) -> None: """ current_recording = robot.get_current_recording_id() if current_recording is not None and not data_stream.is_recording(): - instance_key = RobotInstanceIdentifier( - robot_id=robot.id, - robot_instance=robot.instance, - ) - dataset_id = get_recording_state_manager().active_dataset_ids.get(instance_key) - if dataset_id is None: - dataset_id = GlobalSingleton()._active_dataset_id - logger.debug( - "start_stream: falling back to global dataset_id=%s recording_id=%s", - dataset_id, - current_recording, - ) - context = DataRecordingContext( - recording_id=current_recording, - robot_id=robot.id, - robot_name=robot.name, - robot_instance=robot.instance, - dataset_id=dataset_id, - dataset_name=None, - ) - data_stream.start_recording(context) + data_stream.start_recording(_build_recording_context(robot, current_recording)) def _log_single_joint_data( @@ -203,16 +367,16 @@ def _get_or_create_joint_stream( name: str, robot: Robot, ) -> JointStreamBinding: - """Return the stream id, storage name, and JsonDataStream for one joint.""" + """Return the stream id, storage name, and JointDataStream for one joint.""" storage_name = validate_safe_name(name) str_id = f"{data_type.value}:{name}" joint_stream = robot.get_data_stream(str_id) if joint_stream is None: - joint_stream = JsonDataStream(data_type=data_type, data_type_name=storage_name) + joint_stream = JointDataStream(data_type=data_type, data_type_name=storage_name) robot.add_data_stream(str_id, joint_stream) assert isinstance( - joint_stream, JsonDataStream - ), "Expected stream to be instance of JSONDataStream" + joint_stream, JointDataStream + ), "Expected stream to be instance of JointDataStream" return JointStreamBinding( stream_id=str_id, storage_name=storage_name, @@ -286,39 +450,101 @@ def _log_group_of_joint_data( timestamp = time.time() if not isinstance(joint_data, dict): raise ValueError("Joint data must be a dictionary of floats") - for key, value in joint_data.items(): - if not isinstance(value, float): - raise ValueError(f"Joint data must be floats. {key} is not a float.") - if dry_run: + for key, value in joint_data.items(): + if not isinstance(value, float): + raise ValueError(f"Joint data must be floats. {key} is not a float.") + return + if not joint_data: return + _smoke_validate_joint_values(joint_data) + robot = _get_robot(robot_name, instance) + rust_mode = rust_daemon_enabled() + + binding_cache = robot._joint_stream_bindings + bindings_for_type = binding_cache.get(data_type) + if bindings_for_type is None: + bindings_for_type = {} + binding_cache[data_type] = bindings_for_type + current_recording_id = robot.get_current_recording_id() + live_data_disabled = get_provide_live_data_enabled_manager().is_disabled() + robot_id = robot.id + robot_instance = robot.instance + live_data_orchestrator = ( + None if live_data_disabled or robot_id is None else StreamManagerOrchestrator() + ) + + group = _resolve_joint_group( + robot, + data_type, + tuple(joint_data), + bindings_for_type, + current_recording_id, + ) + + if rust_mode and live_data_orchestrator is None: + native_values = list(joint_data.values()) + for binding, joint_value in zip(group.bindings, native_values): + binding.stream.record_scalar(timestamp, joint_value) + if current_recording_id is not None: + robot._get_daemon_recording_context().log_joints( + data_type.value, timestamp, group.joined_names, native_values + ) + return + + native_values = [] batched_items: list[BatchedJointDataItemPayload] = [] batch_transport_stream: JsonDataStream | None = None - for key, value in joint_data.items(): - logged_joint_data = _log_joint_data_point( - data_type=data_type, - name=key, - value=value, - robot=robot, - timestamp=timestamp, - send_to_daemon=False, + for (joint_name, joint_value), binding in zip(joint_data.items(), group.bindings): + joint_stream = binding.stream + if live_data_orchestrator is not None and robot_id is not None: + # A live consumer needs the materialised sample now, so build it and + # publish it; the stream keeps it as its latest data. + data = JointData(timestamp=timestamp, value=joint_value) + joint_stream.log(data=data, send_to_daemon=False) + live_data_orchestrator.get_provider_manager( + robot_id, robot_instance + ).get_json_source( + binding.stream_id, data_type, sensor_key=binding.stream_id + ).publish( + data.model_dump(mode="json") + ) + else: + # No live consumer: stash the raw scalar and defer building the + # JointData to get_latest_data(), keeping the per-joint hot path + # allocation-free (see JointDataStream). + joint_stream.record_scalar(timestamp, joint_value) + + if rust_mode: + if current_recording_id is not None: + native_values.append(joint_value) + continue + + producer_channel = joint_stream.get_producer_channel() + if producer_channel is None or joint_stream.get_recording_context() is None: + continue + trace_id = producer_channel.trace_id + if trace_id is None: + continue + if batch_transport_stream is None: + batch_transport_stream = joint_stream + batched_items.append( + BatchedJointDataItemPayload( + trace_id=trace_id, + data_type_name=binding.storage_name, + value=joint_value, + ) ) - joint_stream_binding = logged_joint_data.binding - joint_stream = joint_stream_binding.stream - - if logged_joint_data.trace_id is not None: - if batch_transport_stream is None: - batch_transport_stream = joint_stream - batched_items.append( - BatchedJointDataItemPayload( - trace_id=logged_joint_data.trace_id, - data_type_name=joint_stream_binding.storage_name, - value=value, - ) + + if rust_mode: + if native_values: + robot._get_daemon_recording_context().log_joints( + data_type.value, timestamp, group.joined_names, native_values ) + return if batch_transport_stream is None or not batched_items: return @@ -432,6 +658,18 @@ def _log_camera_data( # camera_data_without_frame object to avoid serializing the frame to JSON # or having to make two copies for streaming and bucket storage. stream.log(camera_data_without_frame, frame=image) + + if rust_daemon_enabled() and robot.get_current_recording_id() is not None: + contiguous = image if image.flags.c_contiguous else np.ascontiguousarray(image) + robot._get_daemon_recording_context().log_frame( + camera_type.value, + storage_name, + int(image.shape[1]), + int(image.shape[0]), + memoryview(contiguous).cast("B"), + camera_data_without_frame.timestamp, + ) + _publish_video_to_p2p(robot, name, camera_type, camera_data_without_frame, image) @@ -485,6 +723,9 @@ def log_custom_1d( custom_data = Custom1DData(timestamp=timestamp, data=data) stream.log(custom_data) + _record_json_to_daemon( + robot, DataType.CUSTOM_1D, storage_name, custom_data, timestamp + ) _publish_json_to_p2p(robot, str_id, DataType.CUSTOM_1D, custom_data) @@ -869,6 +1110,7 @@ def log_pose( pose_data = PoseData(timestamp=timestamp, pose=pose.tolist()) stream.log(pose_data) + _record_json_to_daemon(robot, DataType.POSES, storage_name, pose_data, timestamp) _publish_json_to_p2p(robot, str_id, DataType.POSES, pose_data) @@ -932,6 +1174,9 @@ def log_end_effector_pose( ee_pose_data = EndEffectorPoseData(timestamp=timestamp, pose=pose.tolist()) stream.log(ee_pose_data) + _record_json_to_daemon( + robot, DataType.END_EFFECTOR_POSES, storage_name, ee_pose_data, timestamp + ) _publish_json_to_p2p(robot, str_id, DataType.END_EFFECTOR_POSES, ee_pose_data) @@ -987,6 +1232,13 @@ def log_parallel_gripper_open_amount( timestamp=timestamp, open_amount=value ) stream.log(parallel_gripper_open_amount_data) + _record_json_to_daemon( + robot, + DataType.PARALLEL_GRIPPER_OPEN_AMOUNTS, + storage_name, + parallel_gripper_open_amount_data, + timestamp, + ) _publish_json_to_p2p( robot, str_id, @@ -1083,6 +1335,13 @@ def log_parallel_gripper_target_open_amount( timestamp=timestamp, open_amount=value ) stream.log(parallel_gripper_target_open_amount_data) + _record_json_to_daemon( + robot, + DataType.PARALLEL_GRIPPER_TARGET_OPEN_AMOUNTS, + storage_name, + parallel_gripper_target_open_amount_data, + timestamp, + ) _publish_json_to_p2p( robot, str_id, @@ -1170,6 +1429,9 @@ def log_language( language_data = LanguageData(timestamp=timestamp, text=language) stream.log(language_data) + _record_json_to_daemon( + robot, DataType.LANGUAGE, storage_name, language_data, timestamp + ) _publish_json_to_p2p(robot, str_id, DataType.LANGUAGE, language_data) @@ -1361,4 +1623,7 @@ def log_point_cloud( intrinsics=intrinsics, ) stream.log(point_data) + _record_json_to_daemon( + robot, DataType.POINT_CLOUDS, storage_name, point_data, timestamp + ) _publish_json_to_p2p(robot, str_id, DataType.POINT_CLOUDS, point_data) diff --git a/neuracore/core/data/synced_recording.py b/neuracore/core/data/synced_recording.py index 417feeb94..dc7aa6eee 100644 --- a/neuracore/core/data/synced_recording.py +++ b/neuracore/core/data/synced_recording.py @@ -182,6 +182,8 @@ def _decode_video(self, video_location: Path, video_frame_cache_path: Path) -> N str(video_location), "-vsync", "0", + "-pix_fmt", + "rgb24", "-q:v", "1", "-start_number", diff --git a/neuracore/core/robot.py b/neuracore/core/robot.py index c64abc445..c59f4b983 100644 --- a/neuracore/core/robot.py +++ b/neuracore/core/robot.py @@ -12,11 +12,13 @@ import os import tempfile import time +import uuid import xml.etree.ElementTree as ET import zipfile from collections import defaultdict from dataclasses import dataclass, field from pathlib import Path +from typing import TYPE_CHECKING from warnings import warn import requests @@ -29,12 +31,16 @@ from neuracore.data_daemon.communications_management.shared_transport import ( recording_context, ) +from neuracore.data_daemon.rust_selection import rust_daemon_enabled from .auth import Auth, get_auth from .const import API_URL, MAX_DATA_STREAMS from .exceptions import RobotError, ValidationError from .utils.http_errors import extract_error_detail +if TYPE_CHECKING: + from neuracore.api.logging import JointStreamBinding, ResolvedJointGroup + logger = logging.getLogger(__name__) DaemonRecordingContext = recording_context.RecordingContext @@ -113,6 +119,10 @@ def __init__( self._temp_dir = None self._data_streams: dict[str, DataStream] = dict() self._data_stream_counts: dict[DataType, int] = defaultdict(int) + self._joint_stream_bindings: "dict[DataType, dict[str, JointStreamBinding]]" = ( + dict() + ) + self._joint_group_cache: "dict[DataType, ResolvedJointGroup]" = dict() self._daemon_recording_context: DaemonRecordingContext | None = None self.org_id = org_id or get_current_org() @@ -260,7 +270,9 @@ def list_all_streams(self) -> dict[str, DataStream]: """ return self._data_streams - def start_recording(self, dataset_id: str) -> str: + def start_recording( + self, dataset_id: str, timestamp: float | None = None + ) -> str | None: """Start recording data from all active streams to a dataset. Initiates a recording session that will capture data from all registered @@ -268,9 +280,17 @@ def start_recording(self, dataset_id: str) -> str: Args: dataset_id: Unique identifier of the dataset to record into. + timestamp: Optional capture time (Unix seconds) for the recording's + start, matching the ``log_*`` methods. Under the Rust daemon it + pins the window's lower bound (the producer stamps wall-clock + now when omitted); under the legacy daemon it is the + ``start_time`` sent to the backend (defaults to ``time.time()``). Returns: - The unique recording ID for this recording session. + Under the legacy Python daemon, the backend recording ID for the + session. Under the Rust daemon the daemon owns recording identity + and assigns it asynchronously, so there is nothing to return at + call time — a local correlation handle is returned for convenience. Raises: RobotError: If the robot is not initialized or if @@ -280,8 +300,23 @@ def start_recording(self, dataset_id: str) -> str: if not self.id: raise RobotError("Robot not initialized. Call init() first.") + if rust_daemon_enabled(): + local_handle = str(uuid.uuid4()) + self._get_daemon_recording_context().start_recording( + robot_id=self.id, + robot_instance=self.instance, + robot_name=self.name, + dataset_id=dataset_id, + dataset_name=None, + timestamp=timestamp, + ) + get_recording_state_manager().recording_started( + robot_id=self.id, instance=self.instance, recording_id=local_handle + ) + return local_handle + try: - start_time = time.time() + start_time = timestamp if timestamp is not None else time.time() session = thread_local_session() response = session.post( f"{API_URL}/org/{self.org_id}/recording/start", @@ -326,17 +361,24 @@ def start_recording(self, dataset_id: str) -> str: def stop_recording( self, - recording_id: str, + recording_id: str | None = None, wait_for_producer_drain: bool = True, + timestamp: float | None = None, ) -> None: """Stop an active recording session. - Ends the specified recording session and stops data collection from - all streams. The recorded data will be processed and stored in the - associated dataset. + Ends the active recording session for this robot instance and stops + data collection from all streams. Args: - recording_id: Unique identifier of the recording session to stop. + recording_id: Unused under the Rust daemon (the daemon stops the + active recording for this source); the legacy daemon uses it to + address the backend stop POST. + timestamp: Optional capture time (Unix seconds) for the recording's + stop, matching the ``log_*`` methods. Under the Rust daemon it + pins the window's upper bound (the producer stamps wall-clock + now when omitted); under the legacy daemon it is the + ``end_time`` sent to the backend (defaults to ``time.time()``). Raises: RobotError: If the robot is not initialized, if the recording cannot @@ -347,11 +389,27 @@ def stop_recording( raise RobotError("Robot not initialized. Call init() first.") end_time = time.time() + if rust_daemon_enabled(): + active_handle = get_recording_state_manager().get_current_recording_id( + self.id, self.instance + ) + get_recording_state_manager().recording_stopped( + robot_id=self.id, instance=self.instance, recording_id=active_handle + ) + self._drain_streams_and_notify_daemon( + recording_id, + wait_for_producer_drain=wait_for_producer_drain, + timestamp=timestamp, + ) + return + get_recording_state_manager().recording_stopped( robot_id=self.id, instance=self.instance, recording_id=recording_id ) self._drain_streams_and_notify_daemon( - recording_id, wait_for_producer_drain=wait_for_producer_drain + recording_id, + wait_for_producer_drain=wait_for_producer_drain, + timestamp=timestamp, ) try: @@ -361,7 +419,7 @@ def stop_recording( headers=self._auth.get_headers(), json={ "recording_id": recording_id, - "end_time": end_time, + "end_time": timestamp if timestamp is not None else end_time, }, ) @@ -382,7 +440,11 @@ def stop_recording( raise RobotError(f"Failed to stop recording: {str(e)}") def _drain_streams_and_notify_daemon( - self, recording_id: str, *, wait_for_producer_drain: bool + self, + recording_id: str | None, + *, + wait_for_producer_drain: bool, + timestamp: float | None = None, ) -> None: """Stop all streams and send the recording-stopped IPC message to the daemon.""" try: @@ -392,6 +454,7 @@ def _drain_streams_and_notify_daemon( self._get_daemon_recording_context().stop_recording( recording_id=recording_id, producer_stop_sequence_numbers=producer_stop_sequence_numbers, + timestamp=timestamp, ) except Exception: logger.exception( @@ -424,9 +487,10 @@ def _stop_all_streams( stream.prepare_recording_stopped() ) - producer_stop_sequence_numbers[producer_channel.channel_id] = ( - stop_cutoff_sequence_number - ) + if producer_channel is not None: + producer_stop_sequence_numbers[producer_channel.channel_id] = ( + stop_cutoff_sequence_number + ) stream.stop_recording( stop_cutoff_sequence_number=stop_cutoff_sequence_number, @@ -458,6 +522,8 @@ def get_current_recording_id(self) -> str | None: Returns: The current recording ID if the robot is recording, None otherwise. + Under the Rust daemon this is a local correlation handle, not the + cloud recording id — use :meth:`get_cloud_recording_id` for that. """ if not self.id: raise RobotError("Robot not initialized. Call init() first.") @@ -465,6 +531,27 @@ def get_current_recording_id(self) -> str | None: robot_id=self.id, instance=self.instance ) + def get_cloud_recording_id( + self, timestamp_ns: int | None = None, timeout_s: float = 30.0 + ) -> str | None: + """Resolve the daemon-owned cloud recording id for a recording window. + + Under the Rust daemon the daemon allocates the cloud recording id + asynchronously, so this asks the daemon (it may block) for the id of the + recording whose window brackets ``timestamp_ns`` for this source + (defaulting to the most recently started recording). For + non-performance-critical use only (tests, ``stop_recording(wait=True)``). + Returns ``None`` under the legacy daemon (use + :meth:`get_current_recording_id`). + """ + if not self.id: + raise RobotError("Robot not initialized. Call init() first.") + if not rust_daemon_enabled(): + return self.get_current_recording_id() + return self._get_daemon_recording_context().get_recording_id( + timestamp_ns=timestamp_ns, timeout_s=timeout_s + ) + def _package_urdf(self) -> dict: """Package URDF file and associated meshes into a ZIP archive. @@ -719,18 +806,39 @@ def _get_joint_info(self) -> dict[str, JointInfo]: return joint_info - def cancel_recording(self, recording_id: str) -> None: + def cancel_recording( + self, recording_id: str | None = None, timestamp: float | None = None + ) -> None: """Cancel an active recording without saving any data. Args: - recording_id: the ID of the recording to cancel. + recording_id: Unused under the Rust daemon (the daemon cancels the + active recording for this source); the legacy daemon uses it to + address the backend cancel POST. + timestamp: Optional capture time (Unix seconds) for the cancel, + mirroring ``stop_recording``. Under the Rust daemon it is the + recording's captured stop time (producer stamps now when + omitted); under the legacy daemon it is the ``end_time`` sent to + the backend (defaults to ``time.time()``). """ if not self.id: raise RobotError("Robot not initialized. Call init() first.") end_time = time.time() self._stop_all_streams() - self._get_daemon_recording_context().stop_recording(recording_id=recording_id) + daemon_context = self._get_daemon_recording_context() + + if rust_daemon_enabled(): + daemon_context.cancel_recording(timestamp=timestamp) + active_handle = get_recording_state_manager().get_current_recording_id( + self.id, self.instance + ) + get_recording_state_manager().recording_stopped( + robot_id=self.id, instance=self.instance, recording_id=active_handle + ) + return + + daemon_context.stop_recording(recording_id=recording_id) try: session = thread_local_session() @@ -739,7 +847,7 @@ def cancel_recording(self, recording_id: str) -> None: headers=self._auth.get_headers(), json={ "recording_id": recording_id, - "end_time": end_time, + "end_time": timestamp if timestamp is not None else end_time, }, ) response.raise_for_status() diff --git a/neuracore/core/streaming/data_stream.py b/neuracore/core/streaming/data_stream.py index e895d98eb..99cb155c2 100644 --- a/neuracore/core/streaming/data_stream.py +++ b/neuracore/core/streaming/data_stream.py @@ -14,9 +14,10 @@ from dataclasses import dataclass import numpy as np -from neuracore_types import CameraData, DataType, NCData +from neuracore_types import CameraData, DataType, JointData, NCData from neuracore.data_daemon.communications_management.producer import ProducerChannel +from neuracore.data_daemon.rust_selection import rust_daemon_enabled logger = logging.getLogger(__name__) @@ -45,8 +46,13 @@ class DataStream(ABC): """Base class for data streams. Provides common functionality for managing recording state and data - storage across different types of sensor data streams. Each stream - has its own ProducerChannel for sending data to the daemon. + storage across different types of sensor data streams. + + Under the legacy daemon each stream owns a :class:`ProducerChannel` that + forwards its data over ZMQ. Under the Rust daemon the stream owns *no* + channel — the daemon interface lives at the logging-function layer + (``RecordingContext``) — so the stream only tracks recording state and the + latest sample for live-data consumers. """ def __init__(self, data_type: DataType, stream_name: str) -> None: @@ -65,6 +71,7 @@ def __init__(self, data_type: DataType, stream_name: str) -> None: self._data_type = data_type self._stream_name = stream_name self._producer_channel: ProducerChannel | None = None + self._use_native_producer = rust_daemon_enabled() @property def data_type(self) -> DataType: @@ -74,16 +81,12 @@ def data_type(self) -> DataType: def start_recording(self, context: DataRecordingContext) -> None: """Start recording data for this stream. - If the stream is already recording, stop it first. Then, set the - recording state to True and store the recording context. Finally, - ensure a producer is available for this stream and start a new trace. + If the stream is already recording, stop it first. Then set the + recording state to True and store the recording context. Args: context: Recording context containing identifiers for the recording session, robot, and dataset. - - Returns: - None """ if self.is_recording(): _, stop_cutoff_sequence_number = self.prepare_recording_stopped() @@ -96,16 +99,18 @@ def start_recording(self, context: DataRecordingContext) -> None: self._handle_ensure_producer_channel(context) def _handle_ensure_producer_channel(self, context: DataRecordingContext) -> None: - """Ensures a producer is available for this data stream. + """Ensure a legacy producer channel exists for this data stream. - If the producer does not exist, it is created with the given context. - Afterwards, the producer channel starts a fresh recording session for - the supplied recording context. + Under the Rust daemon the stream owns no channel — lifecycle and data + envelopes are published by ``RecordingContext`` from the logging layer + — so this is a no-op and ``_producer_channel`` stays ``None``. Args: context: Recording context containing identifiers for the recording session, robot, and dataset. """ + if self._use_native_producer: + return if self._producer_channel is None: channel_id = f"{self._data_type.value}:\ {self._stream_name}:{uuid.uuid4().hex[:8]}" @@ -119,14 +124,20 @@ def _handle_ensure_producer_channel(self, context: DataRecordingContext) -> None recording_id=context.recording_id ) - def prepare_recording_stopped(self) -> tuple[ProducerChannel, int]: - """Mark the producer channel as stopping and return it.""" - producer_channel = self.get_producer_channel() + def prepare_recording_stopped(self) -> tuple[ProducerChannel | None, int]: + """Mark the producer channel as stopping and return it. - if not isinstance(producer_channel, ProducerChannel): - raise MissingProducerChannelError( - f"Stream {self._stream_name} has no ProducerChannel" - ) + Under the Rust daemon there is no channel, so this returns ``(None, 0)``. + Under the legacy daemon a missing channel means the stream is stale, so + it raises :class:`MissingProducerChannelError` to have it pruned. + """ + producer_channel = self.get_producer_channel() + if producer_channel is None: + if not rust_daemon_enabled(): + raise MissingProducerChannelError( + "stream has no active producer channel" + ) + return None, 0 stop_cutoff_sequence_number = producer_channel.mark_recording_stop_requested() @@ -143,8 +154,15 @@ def stop_recording( producer_channel = self._producer_channel self._producer_channel = None - if not isinstance(producer_channel, ProducerChannel): - raise MissingProducerChannelError("Stream has no ProducerChannel") + if producer_channel is None: + if not rust_daemon_enabled(): + # Legacy daemon: a stream with no producer channel is stale — + # raise so the caller prunes it. + raise MissingProducerChannelError( + "stream has no active producer channel" + ) + # Rust daemon: the stream never owned a channel — nothing to drain. + return try: if producer_channel.trace_id: @@ -182,7 +200,11 @@ def get_recording_context(self) -> DataRecordingContext | None: return self._context def _send_to_daemon(self, data: bytes) -> None: - """Send data to the daemon via the producer. + """Send data to the daemon via the legacy producer channel. + + A no-op when there is no channel — which is always the case under the + Rust daemon, where the logging layer delivers data via + ``RecordingContext`` instead. Args: data: Serialized data bytes to send. @@ -248,12 +270,68 @@ def log(self, data: NCData, *, send_to_daemon: bool = True) -> None: self._latest_data = data if not self.is_recording() or not send_to_daemon: return + if self._use_native_producer: + # Rust daemon: the logging layer delivers the sample to the daemon + # via RecordingContext; the stream only keeps `_latest_data`. + return # Serialize to JSON bytes and send to daemon json_bytes = json.dumps(data.model_dump(mode="json")).encode("utf-8") self._send_to_daemon(json_bytes) +class JointDataStream(JsonDataStream): + """JSON stream for scalar joint samples with deferred latest-data builds. + + Joint logging is the hottest path in the SDK: during a recording one + :class:`JointData` was materialised per joint per frame purely to keep + ``_latest_data`` current for live-data / endpoint consumers, which read it + at serving rate — far below the logging rate. At high joint counts that + per-sample Pydantic construction, and the GC churn it drove, dominated the + ``log_joint_*`` calls. + + This stream lets the logging layer hand over the raw ``(timestamp, value)`` + cheaply via :meth:`record_scalar` and defers building the ``JointData`` + until :meth:`get_latest_data` is actually called, so the hot path performs + two attribute writes instead of a model construction. + """ + + def __init__(self, data_type: DataType, data_type_name: str) -> None: + """Initialize the joint data stream.""" + super().__init__(data_type=data_type, data_type_name=data_type_name) + self._pending_timestamp: float = 0.0 + self._pending_value: float = 0.0 + self._has_pending_latest = False + + def record_scalar(self, timestamp: float, value: float) -> None: + """Stash the latest scalar sample without building a ``JointData``. + + The model is materialised lazily in :meth:`get_latest_data`. The + latest-data read is best-effort: under concurrency a reader may observe + a timestamp/value drawn from adjacent samples (the pair is not written + atomically), but it never raises and never returns a partially + constructed ``JointData``. + """ + self._pending_timestamp = timestamp + self._pending_value = value + self._has_pending_latest = True + + def log(self, data: NCData, *, send_to_daemon: bool = True) -> None: + """Log a materialised sample, superseding any deferred scalar.""" + self._has_pending_latest = False + super().log(data=data, send_to_daemon=send_to_daemon) + + def get_latest_data(self) -> NCData | None: + """Return the latest sample, materialising a deferred scalar on demand.""" + if self._has_pending_latest: + self._latest_data = JointData( + timestamp=self._pending_timestamp, + value=self._pending_value, + ) + self._has_pending_latest = False + return self._latest_data + + class VideoDataStream(DataStream): """Stream that sends video frame data to the daemon. @@ -289,21 +367,21 @@ def log(self, metadata: CameraData, frame: np.ndarray) -> None: self._latest_data = metadata if not self.is_recording(): return + if self._use_native_producer: + # Rust daemon: the frame is delivered to the daemon by the logging + # layer (RecordingContext.log_frame); the stream only keeps + # `_latest_data` for live-data consumers. + return - # Serialize metadata and frame to bytes - # Frame is sent as raw numpy bytes with metadata as JSON header + frame_view = memoryview(frame).cast("B") + + # Legacy daemon: pack [metadata_len (4 bytes)] [metadata_json] [frame_bytes] metadata_dict = metadata.model_dump(mode="json", exclude={"frame"}) metadata_dict["width"] = self.width metadata_dict["height"] = self.height metadata_dict["frame_nbytes"] = int(frame.size * frame.itemsize) metadata_json = json.dumps(metadata_dict).encode("utf-8") - - # Pack: [metadata_len (4 bytes)] [metadata_json] [frame_bytes] header = struct.pack(" None: + """Tell the Rust producer a source's recording has been locally auto-expired. + + Calls the native ``stop_recording`` for the source so the producer flushes + any in-progress NUT chunk and publishes ``StopRecording``. The daemon is + idempotent, so a later explicit ``nc.stop_recording`` that races this is a + no-op. The stop boundary is wall-clock here (the expiry path has no access + to the recording context's tracked data-clock timestamp) — acceptable for a + forced 5-minute timeout. Failures are logged and swallowed: the local + expiry must always succeed. + """ + try: + _recording_context._load_native().stop_recording( + robot_id, instance, time.time_ns() + ) + except Exception: + logger.exception( + "Failed to notify native producer of recording expiry for %s:%s", + robot_id, + instance, + ) + + class RecordingStateManager(BaseSSEConsumer): """Manages recording state across robot instances with real-time notifications. @@ -134,8 +162,11 @@ def recording_started( ) -> None: """Handle recording start for a robot instance. - Updates internal state. If the robot was already recording with a different - ID, stops the previous recording first. + Updates internal state. If the robot was already recording under a + different id (e.g. the local handle being replaced by the backend cloud + id), the handle is replaced in place and the previous recording's timers + are retired — the instance is never transiently cleared, so a concurrent + ``log_*`` cannot observe a ``None`` recording id and drop a frame. Args: robot_id: Robot ID @@ -149,8 +180,6 @@ def recording_started( if previous_recording_id == recording_id: return - if previous_recording_id is not None: - self.recording_stopped(robot_id, instance, previous_recording_id) try: ensure_daemon_running() @@ -159,6 +188,8 @@ def recording_started( return self.recording_robot_instances[instance_key] = recording_id + if previous_recording_id is not None: + self._cancel_recording_timers(previous_recording_id) self._schedule_recording_timers( robot_id=robot_id, instance=instance, @@ -192,6 +223,8 @@ def expire_if_still_active() -> None: ) self._expired_recording_ids.add(recording_id) self.recording_stopped(robot_id, instance, recording_id) + if rust_daemon_enabled(): + _notify_native_producer_of_expiry(robot_id, instance) loop = get_running_loop() @@ -220,7 +253,7 @@ def _cancel() -> None: loop.call_soon_threadsafe(_cancel) def recording_stopped( - self, robot_id: str, instance: int, recording_id: str + self, robot_id: str, instance: int, recording_id: str | None ) -> None: """Handle recording stop for a robot instance. @@ -239,7 +272,8 @@ def recording_stopped( if current_recording != recording_id: return self.recording_robot_instances.pop(instance_key, None) - self._cancel_recording_timers(recording_id) + if recording_id is not None: + self._cancel_recording_timers(recording_id) def updated_recording_state( self, is_recording: bool, details: BaseRecodingUpdatePayload @@ -298,6 +332,8 @@ def updated_recording_state( instance_key = RobotInstanceIdentifier( robot_id=robot_id, robot_instance=instance ) + if previous_recording_id != recording_id: + return callback = self._drain_callbacks.get(instance_key) if callback and was_recording: threading.Thread( diff --git a/neuracore/data_daemon/__main__.py b/neuracore/data_daemon/__main__.py index 74cbd0578..8bd4f8568 100644 --- a/neuracore/data_daemon/__main__.py +++ b/neuracore/data_daemon/__main__.py @@ -1,6 +1,52 @@ -"""Neuracore Data Damon to manage data recording and uploading locally.""" +"""Entry point for ``python -m neuracore.data_daemon``. + +By default this runs the Python data daemon CLI. When the +``NCD_RUST_DAEMON`` environment variable is truthy and the bundled Rust +data-daemon binary is present, execution is handed off to it instead. The +flag keeps the Python daemon available throughout the rollout window. +""" + +from __future__ import annotations + +import os +import sys + +from neuracore.data_daemon.rust_selection import ( + rust_daemon_binary_path, + rust_daemon_enabled, +) + + +def main() -> None: + """Dispatch to the Rust data daemon when enabled, else the Python CLI.""" + if rust_daemon_enabled(): + binary = rust_daemon_binary_path() + if binary is None: + print( + "NCD_RUST_DAEMON is set but the bundled Rust data-daemon binary " + "was not found; falling back to the Python daemon.", + file=sys.stderr, + ) + else: + try: + os.execv(str(binary), [str(binary), *sys.argv[1:]]) + except OSError as error: + # The binary is present but couldn't be executed (e.g. not + # executable, ENOEXEC); fall back to the Python daemon rather + # than crashing the rollout. + print( + f"NCD_RUST_DAEMON is set but the bundled Rust data-daemon " + f"binary could not be executed ({error}); falling back to " + "the Python daemon.", + file=sys.stderr, + ) + + # Imported lazily so that handing off to the Rust binary above does not + # pay the cost of importing the full Python daemon stack. + from neuracore.data_daemon.main import main as run_python_cli + + run_python_cli() -from neuracore.data_daemon.main import main if __name__ == "__main__": main() diff --git a/neuracore/data_daemon/communications_management/shared_transport/recording_context.py b/neuracore/data_daemon/communications_management/shared_transport/recording_context.py index 85b1a5644..d5a394c2e 100644 --- a/neuracore/data_daemon/communications_management/shared_transport/recording_context.py +++ b/neuracore/data_daemon/communications_management/shared_transport/recording_context.py @@ -1,39 +1,269 @@ -"""Recording-scoped context for sending recording control messages to the daemon.""" +"""Recording-scoped context for driving the data daemon.""" from __future__ import annotations +import logging +from importlib import import_module +from types import ModuleType + from neuracore.data_daemon.models import CommandType +from neuracore.data_daemon.rust_selection import rust_daemon_enabled from .communications_manager import CommunicationsManager, MessageEnvelope +logger = logging.getLogger(__name__) + +_NATIVE_MODULE: ModuleType | None = None + +_NATIVE_IMPORT_HINT = ( + "neuracore.data_daemon._native_producer is not available. Build the Rust " + "data_daemon_producer crate with maturin and ensure the resulting " + "extension is on sys.path, or unset NCD_RUST_DAEMON to fall back to the " + "legacy Python producer." +) + + +def _load_native() -> ModuleType: + """Lazily import and cache the PyO3 producer module for the process.""" + global _NATIVE_MODULE + if _NATIVE_MODULE is None: + try: + _NATIVE_MODULE = import_module("neuracore.data_daemon._native_producer") + except ImportError as error: + raise RuntimeError(_NATIVE_IMPORT_HINT) from error + return _NATIVE_MODULE + class RecordingContext: - """Recording-scoped context for sending recording control messages.""" + """Recording-scoped interface to the data daemon. + + Under the Rust daemon this is a *thin shipper* bridge: ``start_recording`` + / ``log_joints`` / ``log_frame`` / ``log_json`` / ``stop_recording`` / + ``cancel_recording`` forward straight through to ``_native_producer`` over + iceoryx2, tagged only with the **source** ``(robot_id, robot_instance)``. + The daemon owns all recording identity — there is no recording id on the + wire. Routing is by a producer-stamped *publish* timestamp (wall clock), + decoupled from the data's own capture timestamp, so the daemon partitions + recordings by when data was published rather than what clock it carries. + + Under the legacy daemon it keeps its original role: sending recording + control messages over the ZMQ producer socket. That path is unchanged. + """ def __init__( self, recording_id: str | None = None, comm_manager: CommunicationsManager | None = None, ) -> None: - """Initialize the recording context.""" + """Initialize the recording context. + + Under the Rust daemon the ZMQ producer socket is unused — every + envelope flows through ``_native_producer`` over iceoryx2 — so we skip + creating it. + """ self.recording_id = recording_id - self._comm = comm_manager or CommunicationsManager() - self._comm.create_producer_socket() + self._rust_mode = rust_daemon_enabled() + self._robot_id: str | None = None + self._robot_instance: int = 0 + self._recording_marker_ns: int = 0 + if self._rust_mode: + self._comm = None + else: + self._comm = comm_manager or CommunicationsManager() + self._comm.create_producer_socket() def set_recording_id(self, recording_id: str | None) -> None: """Set or clear the recording identifier for this context.""" self.recording_id = recording_id + # -- Native (Rust daemon) interface ------------------------------------- + + def start_recording( + self, + robot_id: str, + robot_instance: int = 0, + robot_name: str | None = None, + dataset_id: str | None = None, + dataset_name: str | None = None, + timestamp: float | None = None, + ) -> None: + """Announce a recording to the Rust daemon for a source. + + Publishes exactly one ``StartRecording`` envelope tagged with the + source ``(robot_id, robot_instance)``. No recording id is on the wire — + the daemon allocates and owns recording identity. + + ``timestamp`` optionally pins the recording window's lower bound (Unix + seconds), matching the ``log_*`` methods; when ``None`` the producer + stamps the publish clock now. + """ + if not self._rust_mode: + return + if not robot_id: + raise ValueError("robot_id is required to start a recording.") + self._robot_id = robot_id + self._robot_instance = robot_instance + timestamp_ns = int(timestamp * 1_000_000_000) if timestamp is not None else None + + self._recording_marker_ns = _load_native().start_recording( + robot_id, + robot_instance, + robot_name, + dataset_id, + dataset_name, + timestamp_ns, + ) + + def log_joints( + self, + data_type: str, + timestamp: float, + joined_names: str, + values: list[float], + ) -> None: + r"""Forward a batch of joint scalar samples to the daemon. + + Args: + data_type: Type of joint data e.g. DataType.JOINT_POSITIONS. + timestamp: the Unix timestamp of the sample. + joined_names: a single ``\0``-joined string of joint names. + values: a flat list of joint values. + """ + if not values: + return + robot_id = self._require_source("log_joints") + timestamp_ns = int(timestamp * 1_000_000_000) + _load_native().log_joints( + robot_id, + self._robot_instance, + data_type, + joined_names, + values, + timestamp_ns, + timestamp, + ) + + def log_frame( + self, + data_type: str, + name: str, + width: int, + height: int, + payload: bytes | memoryview, + timestamp: float, + ) -> None: + """Forward one video frame to the daemon. + + Args: + data_type: Type of video data e.g. DataType.RGB_IMAGES. + name: the camera/sensor name e.g. left_wrist_camera. + width: Video frame width. + height: Video frame height. + payload: Raw video frame bytes. + timestamp: the Unix timestamp of the sample. + """ + robot_id = self._require_source("log_frame") + timestamp_ns = int(timestamp * 1_000_000_000) + _load_native().log_frame( + robot_id, + self._robot_instance, + data_type, + name, + int(width), + int(height), + payload, + timestamp_ns, + timestamp, + ) + + def log_json( + self, + data_type: str, + name: str, + payload: bytes, + timestamp: float, + ) -> None: + """Forward one JSON sample to the daemon. + + The generic single-sample path for any non-joint, non-video data type + (scalars, poses, gripper amounts, language, point clouds, ...). The + ``data_type`` is an opaque wire label and ``payload`` is already + serialized, so the daemon stores it verbatim as a per-trace JSON sample. + """ + robot_id = self._require_source("log_json") + timestamp_ns = int(timestamp * 1_000_000_000) + _load_native().log_json( + robot_id, + self._robot_instance, + data_type, + name, + payload, + timestamp_ns, + timestamp, + ) + + def cancel_recording( + self, + recording_id: str | None = None, + timestamp: float | None = None, + ) -> None: + """Cancel the source's active recording — the daemon discards it. + + A cancel is a recording stop that discards data, so ``timestamp`` + behaves exactly like ``stop_recording``'s: it optionally pins the + recording's capture stop time (Unix seconds); when ``None`` the producer + stamps wall-clock now. + """ + if not self._rust_mode: + return + if not self._robot_id: + return + timestamp_ns = int(timestamp * 1_000_000_000) if timestamp is not None else None + _load_native().cancel_recording( + self._robot_id, self._robot_instance, timestamp_ns + ) + + def _require_source(self, operation: str) -> str: + """Return the active source's robot id or raise if logging before start.""" + if not self._rust_mode: + raise RuntimeError(f"{operation} is only available under the rust daemon.") + if not self._robot_id: + raise RuntimeError( + f"{operation} called before start_recording set a source." + ) + return self._robot_id + + # -- Lifecycle ---------------------------------------------------------- + def stop_recording( self, recording_id: str | None = None, producer_stop_sequence_numbers: dict[str, int] | None = None, + timestamp: float | None = None, ) -> None: - """Send a recording-stopped control message.""" + """Send a recording-stopped control message. + + Under the Rust daemon this publishes one ``StopRecording`` tagged with + the source and the publish-clock stop boundary, which the daemon uses to + close the recording window. ``timestamp`` optionally pins that boundary + (Unix seconds), matching the ``log_*`` methods; when ``None`` the + producer stamps wall-clock now. ``recording_id`` / + ``producer_stop_sequence_numbers`` are only used by the legacy path. + """ + if self._rust_mode: + if not self._robot_id: + return + timestamp_ns = ( + int(timestamp * 1_000_000_000) if timestamp is not None else None + ) + _load_native().stop_recording( + self._robot_id, self._robot_instance, timestamp_ns + ) + return + effective_recording_id = recording_id or self.recording_id if not effective_recording_id: raise ValueError("recording_id is required to stop a recording.") - recording_stopped_payload: dict[str, object] = { "recording_id": effective_recording_id } @@ -47,9 +277,39 @@ def stop_recording( ) self.recording_id = effective_recording_id + def get_recording_id( + self, + timestamp_ns: int | None = None, + timeout_s: float = 30.0, + ) -> str | None: + """Resolve the daemon-owned cloud recording id for this source. + + The producer never sees the cloud recording id — the + daemon allocates it and POSTs ``/recording/start`` asynchronously. This + asks the daemon over the native ``queries`` request-response service for + the id of the recording identified by this source and the capture + ``timestamp_ns`` marker (defaulting to the marker captured at + ``start_recording``). The daemon answers authoritatively from its own + state; the native call blocks (with the GIL released) until the id is + minted or ``timeout_s`` elapses. + + It MAY block and is for non-performance-critical paths only (tests, + ``nc.stop_recording(wait=True)``). Returns ``None`` on timeout or in the + legacy daemon mode. + """ + if not self._rust_mode or not self._robot_id: + return None + marker_ns = ( + timestamp_ns if timestamp_ns is not None else self._recording_marker_ns + ) + return _load_native().get_recording_id( + self._robot_id, self._robot_instance, marker_ns, timeout_s + ) + def close(self) -> None: """Close sockets and cleanup context resources owned by this instance.""" - self._comm.cleanup_producer() + if self._comm is not None: + self._comm.cleanup_producer() def _send(self, command: CommandType, payload: dict | None = None) -> None: """Send a management message to the daemon. @@ -62,6 +322,11 @@ def _send(self, command: CommandType, payload: dict | None = None) -> None: Returns: None """ + if self._comm is None: + raise RuntimeError( + "Cannot send a control message: no CommunicationsManager is " + "available (rust daemon mode bypasses the ZMQ producer socket)." + ) envelope = MessageEnvelope( producer_id=None, command=command, diff --git a/neuracore/data_daemon/config_manager/args_handler.py b/neuracore/data_daemon/config_manager/args_handler.py index 11d23838c..ccaac9d50 100644 --- a/neuracore/data_daemon/config_manager/args_handler.py +++ b/neuracore/data_daemon/config_manager/args_handler.py @@ -10,6 +10,7 @@ from neuracore.core.exceptions import AuthenticationError from neuracore.data_daemon.config_manager.cli_options import ( ApiKeyOption, + AssumeYesOption, BackgroundOption, BandwidthLimitOption, CurrentOrgIdOption, @@ -22,6 +23,7 @@ ProfileNameDeleteArgument, ProfileNameGetArgument, ProfileNameUpdateArgument, + SpoolLimitOption, StorageLimitOption, StoragePathOption, ) @@ -41,6 +43,7 @@ launch_new_daemon_subprocess, pid_is_running, read_pid_from_file, + reset_daemon_state, terminate_pid, wait_for_exit, ) @@ -56,6 +59,7 @@ def _update_profile( create_if_missing: bool, storage_limit: int | None, bandwidth_limit: int | None, + spool_limit: int | None, path_to_store_record: str | None, num_threads: int | None, keep_wakelock_while_upload: bool | None, @@ -66,6 +70,7 @@ def _update_profile( updates = collect_config_updates( storage_limit=storage_limit, bandwidth_limit=bandwidth_limit, + spool_limit=spool_limit, path_to_store_record=path_to_store_record, num_threads=num_threads, keep_wakelock_while_upload=keep_wakelock_while_upload, @@ -113,6 +118,7 @@ def run_profile_update( name: ProfileNameUpdateArgument = None, storage_limit: StorageLimitOption = None, bandwidth_limit: BandwidthLimitOption = None, + spool_limit: SpoolLimitOption = None, path_to_store_record: StoragePathOption = None, num_threads: NumThreadsOption = None, keep_wakelock_while_upload: KeepWakelockOption = None, @@ -129,6 +135,7 @@ def run_profile_update( create_if_missing=False, storage_limit=storage_limit, bandwidth_limit=bandwidth_limit, + spool_limit=spool_limit, path_to_store_record=path_to_store_record, num_threads=num_threads, keep_wakelock_while_upload=keep_wakelock_while_upload, @@ -294,6 +301,23 @@ def run_stop() -> None: raise typer.Exit(code=1) +def run_reset(yes: AssumeYesOption = False) -> None: + """Remove all daemon state: recordings, database, and IPC artefacts.""" + pid_path = get_daemon_pid_path() + db_path = get_daemon_db_path() + + try: + exit_code = reset_daemon_state( + pid_path=pid_path, db_path=db_path, assume_yes=yes + ) + except DaemonLifecycleError as exc: + typer.echo(str(exc), err=True) + raise typer.Exit(code=1) + + if exit_code != 0: + raise typer.Exit(code=exit_code) + + def run_status() -> None: """Show daemon status.""" pid_path = get_daemon_pid_path() diff --git a/neuracore/data_daemon/config_manager/cli_options.py b/neuracore/data_daemon/config_manager/cli_options.py index 73075eb78..3ee2dc615 100644 --- a/neuracore/data_daemon/config_manager/cli_options.py +++ b/neuracore/data_daemon/config_manager/cli_options.py @@ -49,6 +49,16 @@ ), ] +SpoolLimitOption = Annotated[ + int | None, + typer.Option( + "--spool-limit", + "--spool_limit", + parser=parse_bytes, + help="Producer video spool-backlog cap in bytes (0 disables the bound).", + ), +] + StoragePathOption = Annotated[ str | None, typer.Option( @@ -127,3 +137,14 @@ is_flag=True, ), ] + +AssumeYesOption = Annotated[ + bool, + typer.Option( + "--yes", + "-y", + "--force", + help="Skip the confirmation prompt — required for non-interactive use.", + is_flag=True, + ), +] diff --git a/neuracore/data_daemon/config_manager/config.py b/neuracore/data_daemon/config_manager/config.py index 24f9062b5..c395b6ce9 100644 --- a/neuracore/data_daemon/config_manager/config.py +++ b/neuracore/data_daemon/config_manager/config.py @@ -12,6 +12,7 @@ _ENV_MAP: dict[str, str] = { "storage_limit": "NCD_STORAGE_LIMIT", "bandwidth_limit": "NCD_BANDWIDTH_LIMIT", + "spool_limit": "NCD_SPOOL_LIMIT", "path_to_store_record": "NCD_PATH_TO_STORE_RECORD", "num_threads": "NCD_NUM_THREADS", "keep_wakelock_while_upload": "NCD_KEEP_WAKELOCK_WHILE_UPLOAD", @@ -51,7 +52,7 @@ def _read_env_overrides(self) -> dict[str, Any]: if env_value is None: continue - if field_name in {"storage_limit", "bandwidth_limit"}: + if field_name in {"storage_limit", "bandwidth_limit", "spool_limit"}: try: overrides[field_name] = parse_bytes(env_value) except ValueError: diff --git a/neuracore/data_daemon/config_manager/daemon_config.py b/neuracore/data_daemon/config_manager/daemon_config.py index 01a8b366c..7c58d3747 100644 --- a/neuracore/data_daemon/config_manager/daemon_config.py +++ b/neuracore/data_daemon/config_manager/daemon_config.py @@ -9,6 +9,8 @@ class DaemonConfig(BaseModel): Attributes: storage_limit: maximum storage the daemon may use locally, in bytes. bandwidth_limit: maximum upload bandwidth, in bytes per second. + spool_limit: cap on the producer's on-disk video spool backlog, in + bytes. 0 disables the bound. path_to_store_record: directory where the daemon writes recording files. num_threads: number of worker threads used by the daemon. keep_wakelock_while_upload: whether to keep a wakelock while uploading data. @@ -19,6 +21,7 @@ class DaemonConfig(BaseModel): storage_limit: int | None = None bandwidth_limit: int | None = None + spool_limit: int | None = None path_to_store_record: str | None = None num_threads: int | None = None keep_wakelock_while_upload: bool | None = None diff --git a/neuracore/data_daemon/config_manager/helpers.py b/neuracore/data_daemon/config_manager/helpers.py index f619834a9..e81378cb5 100644 --- a/neuracore/data_daemon/config_manager/helpers.py +++ b/neuracore/data_daemon/config_manager/helpers.py @@ -9,6 +9,7 @@ BYTES_PER_MIB, DEFAULT_MAX_BANDWIDTH_MIB_S, DEFAULT_MIN_BANDWIDTH_MIB_S, + DEFAULT_SPOOL_LIMIT_BYTES, DEFAULT_STORAGE_FREE_FRACTION, DEFAULT_TARGET_DRAIN_HOURS, SECONDS_PER_HOUR, @@ -81,6 +82,7 @@ def collect_config_updates( *, storage_limit: int | None, bandwidth_limit: int | None, + spool_limit: int | None, path_to_store_record: str | None, num_threads: int | None, keep_wakelock_while_upload: bool | None, @@ -92,6 +94,7 @@ def collect_config_updates( raw_config_values = { "storage_limit": storage_limit, "bandwidth_limit": bandwidth_limit, + "spool_limit": spool_limit, "path_to_store_record": path_to_store_record, "num_threads": num_threads, "keep_wakelock_while_upload": keep_wakelock_while_upload, @@ -147,6 +150,7 @@ def build_default_daemon_config( return DaemonConfig( storage_limit=storage_limit, bandwidth_limit=bandwidth_limit, + spool_limit=DEFAULT_SPOOL_LIMIT_BYTES, path_to_store_record=str(record_dir), num_threads=num_threads, keep_wakelock_while_upload=False, diff --git a/neuracore/data_daemon/config_manager/profiles.py b/neuracore/data_daemon/config_manager/profiles.py index da3f8996a..eadf53a6d 100644 --- a/neuracore/data_daemon/config_manager/profiles.py +++ b/neuracore/data_daemon/config_manager/profiles.py @@ -103,7 +103,7 @@ def get_profile(self, profile: str | None = None) -> DaemonConfig: except FileNotFoundError as exc: raise ProfileNotFound(f"Profile {profile!r} not found.") from exc - for field_name in ("storage_limit", "bandwidth_limit"): + for field_name in ("storage_limit", "bandwidth_limit", "spool_limit"): raw_value = profile_data.get(field_name) if raw_value is not None: try: diff --git a/neuracore/data_daemon/const.py b/neuracore/data_daemon/const.py index 9b5bf7dfb..23d48048a 100644 --- a/neuracore/data_daemon/const.py +++ b/neuracore/data_daemon/const.py @@ -71,6 +71,7 @@ DEFAULT_TARGET_DRAIN_HOURS = 12.0 # Aim to drain stored data within ~12 hours DEFAULT_MIN_BANDWIDTH_MIB_S = 1.0 # Avoid too-slow uploads even on large disks DEFAULT_MAX_BANDWIDTH_MIB_S = 20.0 # Cap upload bandwidth to avoid saturating links +DEFAULT_SPOOL_LIMIT_BYTES = 2 * 1024 * 1024 * 1024 # 2 GiB # Backend API retry configuration BACKEND_API_MAX_RETRIES = 3 diff --git a/neuracore/data_daemon/lifecycle/daemon_os_control.py b/neuracore/data_daemon/lifecycle/daemon_os_control.py index 25f27d9c3..2dee2015f 100644 --- a/neuracore/data_daemon/lifecycle/daemon_os_control.py +++ b/neuracore/data_daemon/lifecycle/daemon_os_control.py @@ -10,7 +10,7 @@ from collections.abc import Callable, Sequence from pathlib import Path from types import FrameType -from typing import cast +from typing import IO, cast import filelock @@ -20,6 +20,10 @@ ) from neuracore.data_daemon.helpers import get_daemon_db_path, get_daemon_pid_path from neuracore.data_daemon.lifecycle.auth_preflight import ensure_daemon_auth_ready +from neuracore.data_daemon.rust_selection import ( + rust_daemon_binary_path, + rust_daemon_enabled, +) # cspell:ignore WNOHANG waitpid @@ -106,7 +110,18 @@ def cleanup_stale_client_state( def _build_daemon_runner_command() -> list[str]: - """Build the command used to launch the daemon runner entrypoint.""" + """Build the command used to launch the daemon runner entrypoint. + + Hands off to the bundled Rust daemon binary when ``NCD_RUST_DAEMON`` is + truthy and the binary ships in the package; otherwise launches the Python + runner as before. The Rust binary's ``launch`` subcommand stays in the + foreground so it inherits the same process semantics the Python runner + relies on (signal handling, parent-side ``Popen.wait``). + """ + if rust_daemon_enabled(): + binary = rust_daemon_binary_path() + if binary is not None: + return [str(binary), "launch"] return [ sys.executable, "-m", @@ -120,16 +135,62 @@ def _build_daemon_launch_env( db_path: Path, env_overrides: dict[str, str] | None = None, ) -> dict[str, str]: - """Build the environment for launching the daemon subprocess.""" + """Build the environment for launching the daemon subprocess. + + The Rust daemon manages its own PID file (see + [rust/data_daemon/src/cli/launch.rs](../../rust/data_daemon/src/cli/launch.rs)), + so ``NEURACORE_DAEMON_MANAGE_PID=0`` is suppressed in that mode. The Python + runner keeps the override so its parent can write the PID itself after the + socket appears. + """ environment = os.environ.copy() environment["NEURACORE_DAEMON_PID_PATH"] = str(pid_path) environment["NEURACORE_DAEMON_DB_PATH"] = str(db_path) - environment["NEURACORE_DAEMON_MANAGE_PID"] = "0" + if not rust_daemon_enabled(): + environment["NEURACORE_DAEMON_MANAGE_PID"] = "0" if env_overrides: environment.update(env_overrides) return cast(dict[str, str], environment) +def reset_daemon_state(*, pid_path: Path, db_path: Path, assume_yes: bool) -> int: + """Remove all daemon state via the bundled Rust binary's ``reset`` command. + + The Rust ``reset`` subcommand owns the full wipe — it stops a running + daemon, then removes the recordings tree, the SQLite state database, the + PID file, and the iceoryx2 discovery files together with their + ``/dev/shm`` segments. Delegating keeps path resolution and IPC cleanup in + one place instead of duplicating it in Python. Returns the subprocess exit + code. + + ``assume_yes`` forwards ``--yes`` so the binary skips its confirmation + prompt; otherwise the prompt is shown on the inherited terminal. + + Raises: + DaemonLifecycleError: when the bundled Rust binary is unavailable. + """ + binary = rust_daemon_binary_path() + if binary is None: + raise DaemonLifecycleError( + "Reset requires the bundled Rust data-daemon binary, which is not " + "available in this installation." + ) + + command = [str(binary), "reset"] + if assume_yes: + command.append("--yes") + + environment = os.environ.copy() + environment["NEURACORE_DAEMON_PID_PATH"] = str(pid_path) + environment["NEURACORE_DAEMON_DB_PATH"] = str(db_path) + completed = subprocess.run( # noqa: S603 - bundled binary, fixed argv + command, + env=environment, + check=False, + ) + return completed.returncode + + def _start_daemon_subprocess( pid_path: Path, db_path: Path, @@ -137,8 +198,17 @@ def _start_daemon_subprocess( env_overrides: dict[str, str] | None = None, stdout: int | None = None, stderr: int | None = None, -) -> subprocess.Popen: - """Start the daemon runner subprocess with the requested terminal mode.""" +) -> tuple[subprocess.Popen, Path | None]: + """Start the daemon runner subprocess with the requested terminal mode. + + Returns the process together with the log path its stderr was routed to + in background mode (``None`` in the foreground). A long-lived background + daemon must not inherit an undrained ``subprocess.PIPE`` — once the pipe + buffer fills, the daemon blocks on its next stderr write and hangs. Sending + stderr to ``DEVNULL`` avoids that, but throws away the reason for a startup + failure. Routing to a file gets both: writes never block, and the caller + can read the daemon's own error output back if it exits prematurely. + """ environment = _build_daemon_launch_env( pid_path=pid_path, db_path=db_path, @@ -146,9 +216,31 @@ def _start_daemon_subprocess( ) current_working_directory = str(Path.cwd()) + daemon_log_path: Path | None = None + daemon_log_handle: IO[bytes] | None = None + if background: + candidate_log_path = db_path.parent / "daemon.log" + try: + candidate_log_path.parent.mkdir(parents=True, exist_ok=True) + # Truncate so the log reflects this run only; the daemon's own + # stderr (tracing output / early eprintln failures) lands here. + daemon_log_handle = open( + candidate_log_path, "wb", buffering=0 + ) # noqa: SIM115 + except OSError: + # Fall back to discarding stderr rather than failing the launch. + daemon_log_handle = None + else: + daemon_log_path = candidate_log_path + try: if background: - return subprocess.Popen( + stderr_target: int | IO[bytes] = ( + daemon_log_handle + if daemon_log_handle is not None + else subprocess.DEVNULL + ) + process = subprocess.Popen( _build_daemon_runner_command(), close_fds=True, cwd=current_working_directory, @@ -156,21 +248,55 @@ def _start_daemon_subprocess( start_new_session=True, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, - stderr=subprocess.PIPE, + stderr=stderr_target, + ) + else: + process = subprocess.Popen( + _build_daemon_runner_command(), + close_fds=True, + cwd=current_working_directory, + env=environment, + start_new_session=False, + stdout=stdout, + stderr=stderr, ) - - return subprocess.Popen( - _build_daemon_runner_command(), - close_fds=True, - cwd=current_working_directory, - env=environment, - start_new_session=False, - stdout=stdout, - stderr=stderr, - ) except OSError as error: + if daemon_log_handle is not None: + daemon_log_handle.close() raise RuntimeError(f"Failed to start daemon: {error}") from error + if daemon_log_handle is not None: + daemon_log_handle.close() + return process, daemon_log_path + + +# Cap on how much of the daemon log we fold into a premature-exit error, so a +# verbose-but-then-crashing daemon can't produce a multi-megabyte exception. +_DAEMON_FAILURE_DETAIL_TAIL_BYTES = 8192 + + +def _read_daemon_failure_detail( + process: subprocess.Popen, daemon_log_path: Path | None +) -> str: + """Return the trailing daemon output to append to a premature-exit error. + + Background launches route the daemon's stderr to ``daemon_log_path``; + foreground launches may instead expose a readable ``process.stderr`` pipe. + Returns a newline-prefixed snippet, or an empty string when no output is + available. + """ + output = "" + if daemon_log_path is not None: + try: + log_bytes = daemon_log_path.read_bytes() + except OSError: + log_bytes = b"" + tail = log_bytes[-_DAEMON_FAILURE_DETAIL_TAIL_BYTES:] + output = tail.decode(errors="replace").strip() + elif process.stderr is not None: + output = process.stderr.read().decode(errors="replace").strip() + return f"\n{output}" if output else "" + def launch_daemon_subprocess( pid_path: Path, @@ -181,10 +307,17 @@ def launch_daemon_subprocess( stdout: int | None = None, stderr: int | None = None, ) -> subprocess.Popen: - """Launch the daemon runner subprocess and poll until it is ready.""" + """Launch the daemon runner subprocess and poll until it is ready. + + Readiness signal differs by backend: the Python daemon publishes a Unix + socket the SDK connects to, while the Rust daemon never opens one (its IPC + is iceoryx2 shared memory) — so under ``NCD_RUST_DAEMON`` we instead wait + for the daemon to write its PID file. The Rust binary also acquires the + PID file itself, so the parent must not overwrite it. + """ pid_path.parent.mkdir(parents=True, exist_ok=True) - process = _start_daemon_subprocess( + process, daemon_log_path = _start_daemon_subprocess( pid_path=pid_path, db_path=db_path, background=background, @@ -192,30 +325,39 @@ def launch_daemon_subprocess( stdout=stdout, stderr=stderr, ) - socket_poll_interval_s = 0.05 + poll_interval_s = 0.05 daemon_startup_timeout_s = time.monotonic() + timeout_s + rust_mode = rust_daemon_enabled() and rust_daemon_binary_path() is not None + + def _ready() -> bool: + if rust_mode: + existing = read_pid_from_file(pid_path) + return existing is not None and pid_is_running(existing) + return SOCKET_PATH.exists() + + readiness_target = ( + "pid file " + str(pid_path) if rust_mode else "socket " + str(SOCKET_PATH) + ) while time.monotonic() < daemon_startup_timeout_s: if process.poll() is not None: - stderr_output = "" - if process.stderr is not None: - stderr_output = process.stderr.read().decode(errors="replace").strip() - detail = f"\n{stderr_output}" if stderr_output else "" + detail = _read_daemon_failure_detail(process, daemon_log_path) raise RuntimeError( f"Daemon process exited unexpectedly during startup " f"(exit code {process.returncode}).{detail}" ) - if SOCKET_PATH.exists(): + if _ready(): break - time.sleep(socket_poll_interval_s) + time.sleep(poll_interval_s) else: process.terminate() raise RuntimeError( f"Daemon did not become ready within {timeout_s}s: " - f"socket {SOCKET_PATH} never appeared." + f"{readiness_target} never appeared." ) - pid_path.write_text(str(process.pid), encoding="utf-8") + if not rust_mode: + pid_path.write_text(str(process.pid), encoding="utf-8") return process diff --git a/neuracore/data_daemon/main.py b/neuracore/data_daemon/main.py index a171314f3..e61038c1c 100644 --- a/neuracore/data_daemon/main.py +++ b/neuracore/data_daemon/main.py @@ -8,6 +8,7 @@ profile_app, run_install, run_launch, + run_reset, run_status, run_stop, run_uninstall, @@ -20,6 +21,7 @@ app.command("launch")(run_launch) app.command("stop")(run_stop) +app.command("reset")(run_reset) app.command("status")(run_status) app.command("install")(run_install) app.command("uninstall")(run_uninstall) diff --git a/neuracore/data_daemon/rust_selection.py b/neuracore/data_daemon/rust_selection.py new file mode 100644 index 000000000..349015be0 --- /dev/null +++ b/neuracore/data_daemon/rust_selection.py @@ -0,0 +1,31 @@ +"""Runtime selection between the legacy Python daemon and the Rust rewrite. + +Centralises the ``NCD_RUST_DAEMON`` environment-variable check used by both +the CLI entry point ([__main__.py](neuracore/data_daemon/__main__.py)) and +SDK-side producer routing in +[neuracore/core/streaming/data_stream.py](neuracore/core/streaming/data_stream.py) +so both surfaces agree on which daemon is in play for a given process. + +Kept dependency-free so the SDK can import it without pulling in the daemon's +heavyweight runtime modules. +""" + +from __future__ import annotations + +import os +from importlib.resources import files +from pathlib import Path + +_TRUTHY_VALUES = frozenset({"1", "true", "yes", "y"}) + + +def rust_daemon_enabled() -> bool: + """Return True when ``NCD_RUST_DAEMON`` selects the Rust data daemon.""" + return os.environ.get("NCD_RUST_DAEMON", "").strip().lower() in _TRUTHY_VALUES + + +def rust_daemon_binary_path() -> Path | None: + """Return the path to the bundled Rust data-daemon binary, if present.""" + candidate = files("neuracore.data_daemon") / "bin" / "data-daemon" + path = Path(str(candidate)) + return path if path.is_file() else None diff --git a/rust/scripts/build_wheel_artefacts.sh b/rust/scripts/build_wheel_artefacts.sh new file mode 100755 index 000000000..3a4e8c126 --- /dev/null +++ b/rust/scripts/build_wheel_artefacts.sh @@ -0,0 +1,76 @@ +#!/usr/bin/env bash +# Build the Rust artefacts shipped in the neuracore wheel and place them +# inside the Python package tree where setup.py's package_data expects them. +# +# Two artefacts are produced: +# 1. The data-daemon binary -> neuracore/data_daemon/bin/data-daemon +# Re-exec'd by neuracore/data_daemon/__main__.py when NCD_RUST_DAEMON +# is truthy. +# 2. The data_daemon_producer cdylib -> neuracore/data_daemon/_native_producer.so +# Renamed from libdata_daemon_producer.so so PyO3's PyInit__native_producer +# is discoverable by the Python import machinery. +# +# See docs/rust_data_daemon_development.md#packaging-the-wheel for the rationale. + +set -euo pipefail + +script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +workspace_root="$(cd "$script_dir/.." && pwd)" +repo_root="$(cd "$workspace_root/.." && pwd)" + +package_dir="$repo_root/neuracore/data_daemon" +bin_dst="$package_dir/bin/data-daemon" +cdylib_dst="$package_dir/_native_producer.so" + +# PyO3's build-config probes (in order) PYO3_PYTHON, VIRTUAL_ENV/bin/python, +# CONDA_PREFIX/bin/python, then /usr/bin/python. On minimal Debian/Ubuntu +# images only python3 is on PATH and some dev environments set VIRTUAL_ENV to +# a host (e.g. /usr) where neither python nor python3 lives — both cases +# leave pyo3 with no interpreter and the build fails. +# +# When PYO3_PYTHON isn't set explicitly, walk the same probe chain ourselves +# and fall back to whatever `python3` resolves to on PATH if none of the +# usual candidates exist. Caller-supplied PYO3_PYTHON always wins. +if [[ -z "${PYO3_PYTHON:-}" ]]; then + pyo3_candidates=() + [[ -n "${VIRTUAL_ENV:-}" ]] && pyo3_candidates+=("$VIRTUAL_ENV/bin/python") + [[ -n "${CONDA_PREFIX:-}" ]] && pyo3_candidates+=("$CONDA_PREFIX/bin/python") + pyo3_candidates+=("/usr/bin/python") + for candidate in "${pyo3_candidates[@]}"; do + if [[ -x "$candidate" ]]; then + export PYO3_PYTHON="$candidate" + break + fi + done + if [[ -z "${PYO3_PYTHON:-}" ]]; then + if command -v python3 >/dev/null 2>&1; then + export PYO3_PYTHON + PYO3_PYTHON="$(command -v python3)" + echo "==> using PYO3_PYTHON=$PYO3_PYTHON (no python found in VIRTUAL_ENV/CONDA_PREFIX/system)" + else + echo "error: no python interpreter found; set PYO3_PYTHON or install python3" >&2 + exit 1 + fi + fi +fi + +echo "==> cargo build --release -p data-daemon" +cargo build --release --manifest-path "$workspace_root/Cargo.toml" -p data-daemon + +echo "==> cargo build --release -p data_daemon_producer" +cargo build --release --manifest-path "$workspace_root/Cargo.toml" -p data_daemon_producer + +mkdir -p "$(dirname "$bin_dst")" +install -m 0755 "$workspace_root/target/release/data-daemon" "$bin_dst" +echo " wrote $bin_dst" + +# cdylib filename varies by platform: libfoo.so on Linux, libfoo.dylib on macOS, +# foo.dll on Windows. Linux-only support per data-daemon-rewrite.md §Open items. +cdylib_src="$workspace_root/target/release/libdata_daemon_producer.so" +if [[ ! -f "$cdylib_src" ]]; then + echo "error: cdylib not found at $cdylib_src" >&2 + echo " (data-daemon-rewrite.md is Linux-first; macOS/Windows are not supported)" >&2 + exit 1 +fi +install -m 0755 "$cdylib_src" "$cdylib_dst" +echo " wrote $cdylib_dst" diff --git a/setup.py b/setup.py index ee1abc962..f1c681bff 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,7 @@ +import os + from setuptools import find_packages, setup +from setuptools.dist import Distribution version = None with open("neuracore/__init__.py", encoding="utf-8") as f: @@ -11,8 +14,52 @@ with open("README.md", encoding="utf-8") as fh: long_description = fh.read() + +# The prebuilt Rust artefacts are placed under ``neuracore/data_daemon/`` by +# rust/scripts/build_wheel_artefacts.sh before ``python -m build``. They are +# gitignored and absent from a plain checkout and from the standard release +# build (which does not run that script), so their presence is what decides +# whether this is a binary wheel. +_DATA_DAEMON_DIR = os.path.join(os.path.dirname(__file__), "neuracore", "data_daemon") +_RUST_ARTEFACTS = ( + os.path.join(_DATA_DAEMON_DIR, "bin", "data-daemon"), + os.path.join(_DATA_DAEMON_DIR, "_native_producer.so"), +) +_HAS_RUST_ARTEFACTS = all(os.path.exists(path) for path in _RUST_ARTEFACTS) + + +class BinaryDistribution(Distribution): + """Force a platform-specific wheel tag when the Rust artefacts are bundled. + + The Rust data-daemon binary and producer cdylib shipped under + ``neuracore/data_daemon/`` are platform-specific, so a wheel that contains + them must not be tagged ``py3-none-any`` (pip would install a Linux ``.so`` + onto macOS). When the artefacts are absent — e.g. the standard + ``python -m build`` release, which does not run + ``build_wheel_artefacts.sh`` — this returns ``False`` so the usual pure + ``py3-none-any`` wheel is produced and the release pipeline is unchanged. + See ``docs/rust_data_daemon_development.md#packaging-the-wheel``. + """ + + def has_ext_modules(self) -> bool: + return _HAS_RUST_ARTEFACTS + + setup( name="neuracore", + distclass=BinaryDistribution, + include_package_data=True, + package_data={ + "neuracore.data_daemon": [ + # Pre-built Rust artefacts. Generated by + # rust/scripts/build_wheel_artefacts.sh before `python -m build`. + # Both paths are gitignored; the wheel build is responsible for + # placing them. Empty trees just produce an any-platform wheel + # without these payloads (the Python daemon still works). + "bin/data-daemon", + "_native_producer.so", + ], + }, version=version, author="Stephen James", author_email="support@neuracore.com", diff --git a/tests/unit/api/test_core.py b/tests/unit/api/test_core.py index 6baa82a33..9431d6311 100644 --- a/tests/unit/api/test_core.py +++ b/tests/unit/api/test_core.py @@ -180,6 +180,7 @@ def stop_recording( recording_id: str, *, wait_for_producer_drain: bool = True, + timestamp: float | None = None, ) -> None: calls.append((recording_id, wait_for_producer_drain)) diff --git a/tests/unit/api/test_logging.py b/tests/unit/api/test_logging.py index 03d4fd843..6fce55440 100644 --- a/tests/unit/api/test_logging.py +++ b/tests/unit/api/test_logging.py @@ -1,5 +1,6 @@ import numpy as np import pytest +from neuracore_types import DataType import neuracore as nc from neuracore.core.const import API_URL @@ -104,6 +105,89 @@ def test_log_joint_velocities_and_torques( nc.log_joint_torque(name="joint2", torque=2.3) +def test_log_joint_group_cache_reuse_and_rebuild( + temp_config_dir, mock_auth_requests, reset_neuracore, mock_urdf, mocked_org_id +): + """A stable joint set reuses the resolved group; a changed set rebuilds it.""" + nc.login("test_api_key") + mock_auth_requests.post( + f"{API_URL}/org/{mocked_org_id}/robots", + json={"robot_id": "mock_robot_id", "has_urdf": True}, + status_code=200, + ) + robot = nc.connect_robot("test_robot", urdf_path=mock_urdf) + + # Two frames with the same joints reuse the same cached group object — the + # steady-state hot path does no per-joint re-resolution. + nc.log_joint_positions({"joint1": 0.5, "joint2": -0.3}) + first_group = robot._joint_group_cache[DataType.JOINT_POSITIONS] + nc.log_joint_positions({"joint1": 0.6, "joint2": -0.4}) + assert robot._joint_group_cache[DataType.JOINT_POSITIONS] is first_group + + # A changed joint set is rebuilt — the caller may change which joints they + # log and we must honour the live set, never a stale cached one. + nc.log_joint_positions({"joint1": 0.6, "joint2": -0.4, "joint3": 0.1}) + rebuilt_group = robot._joint_group_cache[DataType.JOINT_POSITIONS] + assert rebuilt_group is not first_group + assert len(rebuilt_group.bindings) == 3 + + +def test_log_joint_group_rebuilds_on_middle_joint_change( + temp_config_dir, mock_auth_requests, reset_neuracore, mock_urdf, mocked_org_id +): + """A changed *middle* joint rebuilds the group even with stable boundaries. + + The group cache key is the exact joint-name tuple, not a sampled one, so a + rename far from either end — which a boundary-only check would miss — is + still detected and the data is logged against the correct joints. + """ + nc.login("test_api_key") + mock_auth_requests.post( + f"{API_URL}/org/{mocked_org_id}/robots", + json={"robot_id": "mock_robot_id", "has_urdf": True}, + status_code=200, + ) + robot = nc.connect_robot("test_robot", urdf_path=mock_urdf) + + # 20 joints — wider than the value-validation sample on each end, so the + # changed joint below sits in the unsampled middle. + frame = {f"joint{index}": float(index) for index in range(20)} + nc.log_joint_positions(frame) + first_group = robot._joint_group_cache[DataType.JOINT_POSITIONS] + + # Rebuild preserving order so only the middle entry differs; the first and + # last entries (the only ones a boundary check would inspect) are unchanged. + changed = { + ("renamed_middle" if name == "joint10" else name): value + for name, value in frame.items() + } + nc.log_joint_positions(changed) + rebuilt_group = robot._joint_group_cache[DataType.JOINT_POSITIONS] + + assert rebuilt_group is not first_group + assert "renamed_middle" in rebuilt_group.joint_names + assert "joint10" not in rebuilt_group.joint_names + + +def test_log_joints_large_frame( + temp_config_dir, mock_auth_requests, reset_neuracore, mock_urdf, mocked_org_id +): + """A frame larger than the smoke sample logs every joint without error.""" + nc.login("test_api_key") + mock_auth_requests.post( + f"{API_URL}/org/{mocked_org_id}/robots", + json={"robot_id": "mock_robot_id", "has_urdf": True}, + status_code=200, + ) + nc.connect_robot("test_robot", urdf_path=mock_urdf) + + positions = {f"joint{index}": float(index) for index in range(100)} + nc.log_joint_positions(positions) + sync_point = nc.get_latest_sync_point(include_remote=False) + logged = sync_point.data[DataType.JOINT_POSITIONS] + assert len(logged) == 100 + + def test_log_visual_joint_positions( temp_config_dir, mock_auth_requests, reset_neuracore, mock_urdf, mocked_org_id ): diff --git a/tests/unit/core/test_data_stream.py b/tests/unit/core/test_data_stream.py index 87c332c1b..63a5e58d6 100644 --- a/tests/unit/core/test_data_stream.py +++ b/tests/unit/core/test_data_stream.py @@ -222,3 +222,36 @@ def test_stream_stop_recording_wait_false_skips_slot_drain(monkeypatch) -> None: assert stream.get_recording_context() is None assert stream.get_producer_channel() is None assert stream.is_recording() is False + + +def test_rgb_stream_owns_no_channel_under_rust_daemon(monkeypatch) -> None: + """Under the rust daemon a stream owns no producer channel — the daemon + interface lives at the logging layer (RecordingContext), not the stream.""" + _FakeProducerChannel.instances.clear() + monkeypatch.setattr( + "neuracore.core.streaming.data_stream.ProducerChannel", + _FakeProducerChannel, + ) + monkeypatch.setattr( + "neuracore.core.streaming.data_stream.rust_daemon_enabled", + lambda: True, + ) + + width, height = 4, 3 + stream = RGBDataStream("front_camera", width=width, height=height) + stream.start_recording(_context()) + + # No legacy channel is created, yet the stream tracks recording state. + assert _FakeProducerChannel.instances == [] + assert stream.get_producer_channel() is None + assert stream.is_recording() is True + + # Logging only updates the local latest-data cache; no channel I/O. + frame = np.arange(width * height * 3, dtype=np.uint8).reshape((height, width, 3)) + stream.log(_DummyCameraData(timestamp=1.0), frame) + assert stream.get_producer_channel() is None + + # Stop is clean even though there was never a channel to tear down. + stream.stop_recording(stop_cutoff_sequence_number=0) + assert stream.is_recording() is False + assert stream.get_recording_context() is None diff --git a/tests/unit/core/test_robot_stop_streams.py b/tests/unit/core/test_robot_stop_streams.py index 7b59de56e..134c98bbc 100644 --- a/tests/unit/core/test_robot_stop_streams.py +++ b/tests/unit/core/test_robot_stop_streams.py @@ -122,6 +122,7 @@ def deregister_remote_stop_handler(self, robot_id: str, instance: int) -> None: fake_daemon.stop_recording.assert_called_once_with( recording_id="rec-abc", producer_stop_sequence_numbers={"active-channel": 42}, + timestamp=None, ) diff --git a/tests/unit/data_daemon/cli/test_stop_status_cli.py b/tests/unit/data_daemon/cli/test_stop_status_cli.py index 9e7c1deb6..9367a682e 100644 --- a/tests/unit/data_daemon/cli/test_stop_status_cli.py +++ b/tests/unit/data_daemon/cli/test_stop_status_cli.py @@ -178,3 +178,75 @@ def test_status_prints_running_when_pid_is_running( ah.run_status() assert capsys.readouterr().out.strip() == "Daemon running (pid=456)." + + +def test_reset_delegates_to_binary_and_passes_resolved_paths( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + pid_path = tmp_path / "daemon.pid" + db_path = tmp_path / "state.db" + + monkeypatch.setattr(ah, "get_daemon_pid_path", lambda: pid_path) + monkeypatch.setattr(ah, "get_daemon_db_path", lambda: db_path) + + seen: dict[str, object] = {} + + def fake_reset_daemon_state( + *, pid_path: Path, db_path: Path, assume_yes: bool + ) -> int: + seen["pid_path"] = pid_path + seen["db_path"] = db_path + seen["assume_yes"] = assume_yes + return 0 + + monkeypatch.setattr(ah, "reset_daemon_state", fake_reset_daemon_state) + + ah.run_reset(yes=True) + assert seen == {"pid_path": pid_path, "db_path": db_path, "assume_yes": True} + + +def test_reset_defaults_to_prompting( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setattr(ah, "get_daemon_pid_path", lambda: tmp_path / "daemon.pid") + monkeypatch.setattr(ah, "get_daemon_db_path", lambda: tmp_path / "state.db") + + seen: dict[str, object] = {} + + def fake_reset_daemon_state(*, assume_yes: bool, **kwargs: object) -> int: + seen["assume_yes"] = assume_yes + return 0 + + monkeypatch.setattr(ah, "reset_daemon_state", fake_reset_daemon_state) + + ah.run_reset() + assert seen == {"assume_yes": False} + + +def test_reset_propagates_nonzero_exit_code( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setattr(ah, "get_daemon_pid_path", lambda: tmp_path / "daemon.pid") + monkeypatch.setattr(ah, "get_daemon_db_path", lambda: tmp_path / "state.db") + monkeypatch.setattr(ah, "reset_daemon_state", lambda **kwargs: 3) + + with pytest.raises(typer.Exit) as exit_info: + ah.run_reset() + assert exit_info.value.exit_code == 3 + + +def test_reset_reports_missing_binary( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + monkeypatch.setattr(ah, "get_daemon_pid_path", lambda: tmp_path / "daemon.pid") + monkeypatch.setattr(ah, "get_daemon_db_path", lambda: tmp_path / "state.db") + + def fake_reset_daemon_state(**kwargs): + raise ah.DaemonLifecycleError("no binary") + + monkeypatch.setattr(ah, "reset_daemon_state", fake_reset_daemon_state) + + with pytest.raises(typer.Exit) as exit_info: + ah.run_reset() + assert exit_info.value.exit_code == 1 + assert capsys.readouterr().err.strip() == "no binary" diff --git a/tests/unit/data_daemon/config_manager/test_cli_handlers.py b/tests/unit/data_daemon/config_manager/test_cli_handlers.py index 2a99407de..9a96d9dda 100644 --- a/tests/unit/data_daemon/config_manager/test_cli_handlers.py +++ b/tests/unit/data_daemon/config_manager/test_cli_handlers.py @@ -48,6 +48,7 @@ def fake_update_profile(name: str, updates: dict[str, Any]) -> DaemonConfig: name="recording", storage_limit=None, bandwidth_limit=None, + spool_limit=2_048, path_to_store_record=None, num_threads=2, keep_wakelock_while_upload=None, @@ -58,7 +59,8 @@ def fake_update_profile(name: str, updates: dict[str, Any]) -> DaemonConfig: out = capsys.readouterr().out.strip() assert captured["name"] == "recording" - assert captured["updates"] == {"num_threads": 2} + # Only the provided fields are forwarded; the `None` arguments are dropped. + assert captured["updates"] == {"num_threads": 2, "spool_limit": 2_048} assert out == "Updated profile 'recording'." diff --git a/tests/unit/data_daemon/config_manager/test_profiles_api.py b/tests/unit/data_daemon/config_manager/test_profiles_api.py index 991973844..b74723707 100644 --- a/tests/unit/data_daemon/config_manager/test_profiles_api.py +++ b/tests/unit/data_daemon/config_manager/test_profiles_api.py @@ -122,6 +122,7 @@ def test_get_profile_parses_unit_suffixed_byte_fields( stored_content: dict[str, Any] = { "storage_limit": "2gb", "bandwidth_limit": "300m", + "spool_limit": "1gb", } with profile_path.open("w") as profile_file: @@ -131,6 +132,7 @@ def test_get_profile_parses_unit_suffixed_byte_fields( assert loaded_config.storage_limit == 2 * 1024**3 assert loaded_config.bandwidth_limit == 300 * 1024**2 + assert loaded_config.spool_limit == 1024**3 def test_get_profile_raises_when_missing( @@ -244,6 +246,7 @@ def test_resolve_effective_config_env_supports_unit_suffixed_values( monkeypatch.setenv("NCD_STORAGE_LIMIT", "2gb") monkeypatch.setenv("NCD_BANDWIDTH_LIMIT", "300m") + monkeypatch.setenv("NCD_SPOOL_LIMIT", "1gb") config_manager = ConfigManager( profile_manager=profile_manager, profile=profile_name @@ -252,6 +255,7 @@ def test_resolve_effective_config_env_supports_unit_suffixed_values( assert effective_config.storage_limit == 2 * 1024**3 assert effective_config.bandwidth_limit == 300 * 1024**2 + assert effective_config.spool_limit == 1024**3 def test_resolve_effective_config_env_boolean_values_are_case_insensitive( diff --git a/tests/unit/data_daemon/lifecycle/test_daemon_os_control.py b/tests/unit/data_daemon/lifecycle/test_daemon_os_control.py index adbd5a9f9..18a8e6846 100644 --- a/tests/unit/data_daemon/lifecycle/test_daemon_os_control.py +++ b/tests/unit/data_daemon/lifecycle/test_daemon_os_control.py @@ -3,6 +3,7 @@ import os import signal from pathlib import Path +from typing import IO, cast import pytest @@ -16,25 +17,16 @@ ) -class _FakeStderr: - def __init__(self, content: bytes = b"") -> None: - self._content = content - - def read(self) -> bytes: - return self._content - - class _FakePopen: def __init__( self, pid: int = 12345, poll_value: int | None = None, - stderr: _FakeStderr | None = None, ) -> None: self.pid = pid self._poll_value = poll_value self.returncode = poll_value - self.stderr = stderr + self.stderr = None def poll(self) -> int | None: return self._poll_value @@ -93,7 +85,13 @@ def fake_popen(command: list[str], **kwargs: object) -> _FakePopen: assert captured["start_new_session"] is True assert captured["stdin"] is daemon_os_control.subprocess.DEVNULL assert captured["stdout"] is daemon_os_control.subprocess.DEVNULL - assert captured["stderr"] is daemon_os_control.subprocess.PIPE + # Background stderr is routed to a sibling log file (not an undrained PIPE + # that would deadlock the daemon, nor DEVNULL that would hide failures). + stderr_target = captured["stderr"] + assert stderr_target is not daemon_os_control.subprocess.PIPE + assert stderr_target is not daemon_os_control.subprocess.DEVNULL + assert Path(stderr_target.name) == db_path.parent / "daemon.log" + assert (db_path.parent / "daemon.log").exists() assert captured["close_fds"] is True assert captured["cwd"] == str(Path.cwd()) @@ -150,11 +148,11 @@ def test_launch_daemon_subprocess_premature_exit_includes_stderr( fake_socket_path = tmp_path / "management.sock" def fake_popen(command: list[str], **kwargs: object) -> _FakePopen: - return _FakePopen( - pid=99999, - poll_value=1, - stderr=_FakeStderr(b"ImportError: No module named 'foo'"), - ) + # The real daemon writes its failure to the stderr target before it + # exits; emulate that so the parent can read it back from the log file. + stderr_target = cast(IO[bytes], kwargs["stderr"]) + stderr_target.write(b"ImportError: No module named 'foo'") + return _FakePopen(pid=99999, poll_value=1) monkeypatch.setattr(daemon_os_control.subprocess, "Popen", fake_popen) monkeypatch.setattr(daemon_os_control.time, "sleep", lambda _: None)