Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions neuracore-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ axvline
backpressure
bfloat
bgra
efgh
bibtex
bigendian
bigym
Startable
blit
blowaway
Brawner
Expand Down
14 changes: 6 additions & 8 deletions neuracore/api/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def start_stream(robot: Robot, data_stream: DataStream) -> None:
dataset_id=dataset_id,
dataset_name=None,
)
data_stream.start_recording(context)
data_stream.start_recording(context, robot.get_producer_coordinator())


def _log_single_joint_data(
Expand Down Expand Up @@ -245,11 +245,10 @@ def _log_joint_data_point(

_publish_json_to_p2p(robot, joint_stream_binding.stream_id, data_type, data)

producer_channel = joint_stream.get_producer_channel()
session = joint_stream.get_stream_session()
trace_id = (
producer_channel.trace_id
if producer_channel is not None
and joint_stream.get_recording_context() is not None
session.trace_id
if session is not None and joint_stream.get_recording_context() is not None
else None
)
return LoggedJointData(
Expand Down Expand Up @@ -324,11 +323,10 @@ def _log_group_of_joint_data(
return

batch_context = batch_transport_stream.get_recording_context()
batch_transport_channel = batch_transport_stream.get_producer_channel()
if batch_context is None or batch_transport_channel is None:
if batch_context is None:
return

batch_transport_channel.send_batched_joint_data(
robot.get_producer_coordinator().enqueue_batched_joint(
BatchedJointDataPayload(
recording_id=batch_context.recording_id,
timestamp=timestamp,
Expand Down
89 changes: 64 additions & 25 deletions neuracore/core/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import logging
import os
import tempfile
import threading
import time
import xml.etree.ElementTree as ET
import zipfile
from collections import defaultdict
Expand All @@ -22,9 +24,12 @@
from neuracore_types import DataType, RobotInstanceIdentifier

from neuracore.core.config.get_current_org import get_current_org
from neuracore.core.streaming.data_stream import DataStream, MissingProducerChannelError
from neuracore.core.streaming.data_stream import DataStream
from neuracore.core.streaming.recording_state_manager import get_recording_state_manager
from neuracore.core.utils.http_session import thread_local_session
from neuracore.data_daemon.communications_management.producer import (
RobotProducerCoordinator,
)
from neuracore.data_daemon.communications_management.shared_transport import (
recording_context,
)
Expand Down Expand Up @@ -113,6 +118,8 @@ def __init__(
self._data_streams: dict[str, DataStream] = dict()
self._data_stream_counts: dict[DataType, int] = defaultdict(int)
self._daemon_recording_context: DaemonRecordingContext | None = None
self._producer_coordinator: RobotProducerCoordinator | None = None
self._producer_coordinator_lock = threading.Lock()

self.org_id = org_id or get_current_org()

Expand Down Expand Up @@ -280,6 +287,7 @@ def start_recording(self, dataset_id: str) -> str:
raise RobotError("Robot not initialized. Call init() first.")

try:
start_time = time.time()
session = thread_local_session()
response = session.post(
f"{API_URL}/org/{self.org_id}/recording/start",
Expand All @@ -291,7 +299,8 @@ def start_recording(self, dataset_id: str) -> str:
},
)
response.raise_for_status()

now = time.time()
logger.info(f"Starting recording took {now - start_time:.2f}s")
recording_details = response.json()
recording_id = recording_details["id"]
assert isinstance(recording_id, str)
Expand Down Expand Up @@ -344,14 +353,16 @@ def stop_recording(
)

try:
stop_time = time.time()
session = thread_local_session()
response = session.post(
f"{API_URL}/org/{self.org_id}/recording/stop?recording_id={recording_id}",
headers=self._auth.get_headers(),
)

response.raise_for_status()

now = time.time()
logger.info(f"Stopping recording took {now - stop_time:.2f}s")
if response.json() == "WrongUser":
raise RobotError("Cannot stop recording initiated by another user")

Expand Down Expand Up @@ -399,32 +410,48 @@ def _stop_all_streams(
self,
wait_for_producer_drain: bool = True,
) -> dict[str, int]:
"""Stop recording on all data streams for this robot instance."""
producer_stop_sequence_numbers: dict[str, int] = {}

for stream_id, stream in list(self._data_streams.items()):
try:
"""Stop recording across all streams via the robot coordinator.

(producer_channel, stop_cutoff_sequence_number) = (
stream.prepare_recording_stopped()
)

producer_stop_sequence_numbers[producer_channel.channel_id] = (
stop_cutoff_sequence_number
)
Freezes coordinator intake, drains accepted work, and ends every active
stream trace, then marks the local streams stopped. Returns the single
``{coordinator_id: stop_cutoff_sequence_number}`` mapping the daemon uses
to finalize the recording.
"""
coordinator = self._producer_coordinator
if coordinator is None:
return {}

stream.stop_recording(
stop_cutoff_sequence_number=stop_cutoff_sequence_number,
wait_for_producer_drain=wait_for_producer_drain,
)
stop_cutoff_sequence_number: int | None = None
try:
stop_cutoff_sequence_number = coordinator.stop_recording(
wait_for_drain=wait_for_producer_drain
)
except Exception:
logger.exception("Failed to stop robot producer coordinator")

except MissingProducerChannelError as exc:
logger.info("Removing stale data stream %s: %s", stream_id, exc)
self._remove_data_stream(stream_id, stream)
for stream_id, stream in list(self._data_streams.items()):
try:
stream.mark_recording_stopped()
except Exception:
logger.exception("Failed to stop data stream %s", stream_id)

return producer_stop_sequence_numbers
logger.exception("Failed to mark data stream %s stopped", stream_id)

if stop_cutoff_sequence_number is None:
return {}
return {coordinator.producer_id: stop_cutoff_sequence_number}

def get_producer_coordinator(self) -> RobotProducerCoordinator:
"""Return the robot-scoped producer coordinator, creating it lazily."""
coordinator = self._producer_coordinator
if coordinator is not None:
return coordinator

with self._producer_coordinator_lock:
if self._producer_coordinator is None:
producer_id = f"robot:{self.id or self.name}:{self.instance}"
self._producer_coordinator = RobotProducerCoordinator(
producer_id=producer_id
)
return self._producer_coordinator

def is_recording(self) -> bool:
"""Check if the robot is currently recording data.
Expand Down Expand Up @@ -755,8 +782,20 @@ def _cleanup_daemon_recording_context(self) -> None:
finally:
self._daemon_recording_context = None

def _cleanup_producer_coordinator(self) -> None:
"""Tear down the robot-scoped producer coordinator, if created."""
if self._producer_coordinator is None:
return
try:
self._producer_coordinator.close()
except Exception:
logger.exception("Failed to cleanup producer coordinator")
finally:
self._producer_coordinator = None

def close(self) -> None:
"""Release local resources owned by this Robot instance."""
self._cleanup_producer_coordinator()
self._cleanup_daemon_recording_context()
if self.id is not None:
get_recording_state_manager().deregister_remote_stop_handler(
Expand Down
Loading
Loading