Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions .github/workflows/build-wheels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
name: Build Wheels

# Builds the Python wheel that bundles the Rust data-daemon binary and the
# producer cdylib. See docs/rust_data_daemon_development.md#packaging-the-wheel
# for the build pipeline rationale (cargo → build_wheel_artefacts.sh → python -m build).

on:
pull_request:
branches:
- main
paths:
- 'rust/**'
- 'neuracore/data_daemon/**'
- 'setup.py'
- 'MANIFEST.in'
- '.github/workflows/build-wheels.yaml'
push:
branches:
- main
paths:
- 'rust/**'
- 'neuracore/data_daemon/**'
- 'setup.py'
- 'MANIFEST.in'
- '.github/workflows/build-wheels.yaml'
workflow_call:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build:
name: wheel (${{ matrix.os }}, py${{ matrix.python-version }})
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
# Linux-first per docs/data-daemon-rewrite.md §Open items.
# aarch64 ships when there's demand — the helper script is platform-
# agnostic but cross-compilation isn't wired up here yet.
os: [ubuntu-22.04]
python-version: ["3.10", "3.11"]

steps:
- name: Checkout code
uses: actions/checkout@v6

- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable

- name: Cache cargo build
uses: Swatinem/rust-cache@v2
with:
workspaces: rust

- name: Set up Python ${{ matrix.python-version }}
id: setup-python
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}

- name: Install build tooling
run: python -m pip install --upgrade pip build

- name: Build Rust artefacts
# PYO3_PYTHON pins pyo3-build-config to the matrix interpreter so the
# produced cdylib's ABI matches the wheel's python tag. Ubuntu images
# ship multiple python3 binaries; relying on PATH order would silently
# pick the wrong one and produce a wheel that imports under the wrong
# interpreter.
env:
PYO3_PYTHON: ${{ steps.setup-python.outputs.python-path }}
run: ./rust/scripts/build_wheel_artefacts.sh

- name: Build wheel
run: python -m build --wheel

- name: Smoke-test the built wheel
run: |
python -m pip install dist/neuracore-*.whl
python -c "import neuracore.data_daemon._native_producer as p; print('producer ok:', p)"
python -c "from importlib.resources import files; b = files('neuracore.data_daemon') / 'bin' / 'data-daemon'; assert b.is_file(), b; print('binary ok:', b)"

- name: Upload wheel artefact
uses: actions/upload-artifact@v4
with:
name: neuracore-wheel-${{ matrix.os }}-py${{ matrix.python-version }}
path: dist/*.whl
if-no-files-found: error
retention-days: 7
16 changes: 16 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Sources required to rebuild the bundled Rust artefacts from an sdist.
# The artefacts themselves (neuracore/data_daemon/bin/data-daemon and
# neuracore/data_daemon/_native_producer.so) are gitignored and only land
# in the *wheel* — the sdist ships the Rust sources so a downstream
# packager can run rust/scripts/build_wheel_artefacts.sh themselves.
#
# See docs/rust_data_daemon_development.md#packaging-the-wheel.

include README.md
include LICENSE

recursive-include rust *.rs *.toml *.lock *.sql *.md *.sh

prune rust/target
global-exclude __pycache__
global-exclude *.py[cod]
6 changes: 4 additions & 2 deletions docs/data_daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ These are the supported settings:
|---|---|
| `storage_limit` | Maximum local disk space the daemon should use for recordings (bytes). |
| `bandwidth_limit` | Maximum upload speed the daemon should use (bytes per second). |
| `spool_limit` | Cap on the producer's on-disk video spool backlog (bytes). When the un-encoded backlog reaches this size the producer applies backpressure to video logging instead of letting the spool fill the disk. `0` disables the bound. Defaults to 2 GiB. |
| `path_to_store_record` | Folder where recordings are stored. |
| `num_threads` | Number of worker threads used by the daemon. |
| `keep_wakelock_while_upload` | Whether to keep the machine awake during uploads (where supported). |
Expand Down Expand Up @@ -253,6 +254,7 @@ Supported environment variables:
|---|---|
| `storage_limit` | `NCD_STORAGE_LIMIT` |
| `bandwidth_limit` | `NCD_BANDWIDTH_LIMIT` |
| `spool_limit` | `NCD_SPOOL_LIMIT` |
| `path_to_store_record` | `NCD_PATH_TO_STORE_RECORD` |
| `num_threads` | `NCD_NUM_THREADS` |
| `keep_wakelock_while_upload` | `NCD_KEEP_WAKELOCK_WHILE_UPLOAD` |
Expand Down Expand Up @@ -320,13 +322,13 @@ neuracore data-daemon profile create laptop
Update a named profile:

```bash
neuracore data-daemon profile update <name> [--storage-limit <bytes|unit>] [--bandwidth-limit <bytes|unit>] [--storage-path <path>] [--num-threads <n>] [--max-concurrent-uploads <n>] [--wakelock|--no-wakelock] [--offline|--online] [--api-key <key>] [--current-org-id <org_id>]
neuracore data-daemon profile update <name> [--storage-limit <bytes|unit>] [--bandwidth-limit <bytes|unit>] [--spool-limit <bytes|unit>] [--storage-path <path>] [--num-threads <n>] [--wakelock|--no-wakelock] [--offline|--online] [--api-key <key>] [--current-org-id <org_id>]
```

Update the default profile:

```bash
neuracore data-daemon profile update [--storage-limit <bytes|unit>] [--bandwidth-limit <bytes|unit>] [--storage-path <path>] [--num-threads <n>] [--max-concurrent-uploads <n>] [--wakelock|--no-wakelock] [--offline|--online] [--api-key <key>] [--current-org-id <org_id>]
neuracore data-daemon profile update [--storage-limit <bytes|unit>] [--bandwidth-limit <bytes|unit>] [--spool-limit <bytes|unit>] [--storage-path <path>] [--num-threads <n>] [--wakelock|--no-wakelock] [--offline|--online] [--api-key <key>] [--current-org-id <org_id>]
```

Example:
Expand Down
1 change: 1 addition & 0 deletions neuracore-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ elems
embs
Emika
ENOENT
enoexec
EPERM
EPIPE
erfinv
Expand Down
88 changes: 75 additions & 13 deletions neuracore/api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from neuracore.core.streaming.recording_state_manager import get_recording_state_manager
from neuracore.core.utils import backend_utils
from neuracore.data_daemon.rust_selection import rust_daemon_enabled

from ..core.auth import get_auth
from ..core.data.dataset import Dataset
Expand Down Expand Up @@ -103,6 +104,7 @@ def logout() -> None:
get_auth().logout()
GlobalSingleton()._active_robot = None
GlobalSingleton()._active_dataset_id = None
GlobalSingleton()._active_dataset = None
GlobalSingleton()._has_validated_version = False


Expand Down Expand Up @@ -245,7 +247,11 @@ def is_recording(robot_name: str | None = None, instance: int = 0) -> bool:
return robot.is_recording()


def start_recording(robot_name: str | None = None, instance: int = 0) -> None:
def start_recording(
robot_name: str | None = None,
instance: int = 0,
timestamp: float | None = None,
) -> None:
"""Start recording data for a specific robot.

Begins a new recording session for the specified robot, capturing all
Expand All @@ -256,6 +262,9 @@ def start_recording(robot_name: str | None = None, instance: int = 0) -> None:
robot_name: Robot identifier. If not provided, uses the currently
active robot from the global state.
instance: Instance number of the robot for multi-instance scenarios.
timestamp: Optional capture time (Unix seconds) for the recording's
start, matching the ``log_*`` methods. When omitted, the current
time is captured.

Raises:
RobotError: If no robot is active and no robot_name is provided,
Expand All @@ -266,10 +275,12 @@ def start_recording(robot_name: str | None = None, instance: int = 0) -> None:
active_dataset_id = GlobalSingleton()._active_dataset_id
if active_dataset_id is None:
raise RobotError("No active dataset. Call create_dataset() first.")
try:
active_dataset = Dataset.get_by_id(active_dataset_id)
except DatasetError:
active_dataset = None
active_dataset = GlobalSingleton()._active_dataset
if active_dataset is None or active_dataset.id != active_dataset_id:
try:
active_dataset = Dataset.get_by_id(active_dataset_id)
except DatasetError:
active_dataset = None
if active_dataset is not None:
if robot.shared and not active_dataset.is_shared:
raise RobotError(
Expand All @@ -286,11 +297,14 @@ def start_recording(robot_name: str | None = None, instance: int = 0) -> None:
"shared robot, ensure connect_robot(shared=True) succeeded. "
f"Active dataset: '{active_dataset.name}' ({active_dataset.id})."
)
robot.start_recording(active_dataset_id)
robot.start_recording(active_dataset_id, timestamp=timestamp)


def stop_recording(
robot_name: str | None = None, instance: int = 0, wait: bool = False
robot_name: str | None = None,
instance: int = 0,
wait: bool = False,
timestamp: float | None = None,
) -> None:
"""Stop recording data for a specific robot.

Expand All @@ -303,6 +317,9 @@ def stop_recording(
instance: Instance number of the robot for multi-instance scenarios.
wait: Whether to block until all data streams have finished uploading
to the backend storage.
timestamp: Optional capture time (Unix seconds) for the recording's
stop, matching the ``log_*`` methods. When omitted, the current
time is captured.

Raises:
RobotError: If no robot is active and no robot_name is provided.
Expand All @@ -317,10 +334,20 @@ def stop_recording(
recording_id = robot.get_current_recording_id()
if not recording_id:
raise ValueError("Recording_id is None, no current recording")
robot.stop_recording(recording_id, wait_for_producer_drain=wait)

if not wait:
return
if rust_daemon_enabled():
cloud_recording_id = robot.get_cloud_recording_id() if wait else None
robot.stop_recording(
recording_id, wait_for_producer_drain=wait, timestamp=timestamp
)
if not wait or not cloud_recording_id:
return
recording_id = cloud_recording_id
else:
robot.stop_recording(
recording_id, wait_for_producer_drain=wait, timestamp=timestamp
)
if not wait:
return

# TODO: We need to instead check that the specific recording is complete
is_traces_registered = False
Expand All @@ -333,6 +360,34 @@ def stop_recording(
time.sleep(0.2)


def get_cloud_recording_id(
robot_name: str | None = None,
instance: int = 0,
timestamp_ns: int | None = None,
timeout_s: float = 30.0,
) -> str | None:
"""Resolve the daemon-owned cloud recording id for a robot's recording.

Under the Rust daemon the cloud recording id is assigned asynchronously by
the daemon. This asks the daemon (it may block up to ``timeout_s``) for the
id of the recording whose window brackets ``timestamp_ns`` for this source
(defaulting to the most recently started recording). For
non-performance-critical use only (tests, ``stop_recording(wait=True)``).

Args:
robot_name: Robot identifier. Defaults to the active robot.
instance: Robot instance number.
timestamp_ns: A wall-clock instant inside the target recording window;
defaults to the most recent recording for the source.
timeout_s: Maximum time to wait for the daemon to mint the id.

Returns:
The cloud recording id, or ``None`` on timeout / legacy daemon.
"""
robot = _get_robot(robot_name, instance)
return robot.get_cloud_recording_id(timestamp_ns=timestamp_ns, timeout_s=timeout_s)


def stop_live_data(robot_name: str | None = None, instance: int = 0) -> None:
"""Stop sharing live data for active monitoring from the Neuracore platform.

Expand All @@ -355,12 +410,19 @@ def stop_live_data(robot_name: str | None = None, instance: int = 0) -> None:
StreamManagerOrchestrator().remove_manager(robot.id, robot.instance)


def cancel_recording(robot_name: str | None = None, instance: int = 0) -> None:
def cancel_recording(
robot_name: str | None = None,
instance: int = 0,
timestamp: float | None = None,
) -> None:
"""Cancel the current recording for a specific robot without saving any data.

Args:
robot_name: Robot identifier.
instance: Instance number of the robot for multi-instance scenarios.
timestamp: Optional capture time (Unix seconds) for the cancel,
mirroring ``stop_recording``. When omitted, the current time is
captured.

"""
robot = _get_robot(robot_name, instance)
Expand All @@ -369,4 +431,4 @@ def cancel_recording(robot_name: str | None = None, instance: int = 0) -> None:
recording_id = robot.get_current_recording_id()
if not recording_id:
return
robot.cancel_recording(recording_id)
robot.cancel_recording(recording_id, timestamp=timestamp)
5 changes: 5 additions & 0 deletions neuracore/api/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def get_dataset(name: str | None = None, id: str | None = None) -> Dataset:
if _active_dataset is None:
raise ValueError(f"No Dataset found with the given name: {name} or ID: {id}")
GlobalSingleton()._active_dataset_id = _active_dataset.id
GlobalSingleton()._active_dataset = _active_dataset
return _active_dataset


Expand Down Expand Up @@ -95,6 +96,7 @@ def merge_datasets(name: str, dataset_names: list[str]) -> Dataset:
data_types=list(dataset_model.all_data_types.keys()),
)
GlobalSingleton()._active_dataset_id = merged.id
GlobalSingleton()._active_dataset = merged
return merged


Expand Down Expand Up @@ -169,6 +171,7 @@ def clone_dataset(
"available immediately."
)
GlobalSingleton()._active_dataset_id = cloned.id
GlobalSingleton()._active_dataset = cloned
return cloned

# resolve for source_dataset if not provided, to get the total number of recordings
Expand Down Expand Up @@ -197,6 +200,7 @@ def clone_dataset(
pbar.close()

GlobalSingleton()._active_dataset_id = cloned.id
GlobalSingleton()._active_dataset = cloned
return cloned


Expand Down Expand Up @@ -224,4 +228,5 @@ def create_dataset(
"""
_active_dataset = Dataset.create(name, description, tags, shared)
GlobalSingleton()._active_dataset_id = _active_dataset.id
GlobalSingleton()._active_dataset = _active_dataset
return _active_dataset
9 changes: 9 additions & 0 deletions neuracore/api/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
and validation status.
"""

from typing import TYPE_CHECKING

from neuracore.core.robot import Robot
from neuracore.core.utils.singleton_metaclass import SingletonMetaclass

if TYPE_CHECKING:
from neuracore.core.data.dataset import Dataset


class GlobalSingleton(metaclass=SingletonMetaclass):
"""Singleton class for managing global Neuracore session state.
Expand All @@ -24,8 +29,12 @@ class GlobalSingleton(metaclass=SingletonMetaclass):
for operations when no specific robot is specified.
_active_dataset_id: ID of the currently active dataset that new
recordings will be associated with.
_active_dataset: The active dataset object cached alongside its id, so
hot-path callers (e.g. ``start_recording``) can read its metadata
without a synchronous backend fetch.
"""

_has_validated_version = False
_active_robot: Robot | None = None
_active_dataset_id: str | None = None
_active_dataset: "Dataset | None" = None
Comment thread
CougarTasker marked this conversation as resolved.
Loading
Loading