diff --git a/dimos/memory2/module.py b/dimos/memory2/module.py index 9a3d90e164..507d6dbe9c 100644 --- a/dimos/memory2/module.py +++ b/dimos/memory2/module.py @@ -35,7 +35,6 @@ from dimos.memory2.transform import QualityWindow from dimos.memory2.type.observation import EmbeddedObservation, Observation from dimos.models.embedding.base import EmbeddingModel -from dimos.models.embedding.clip import CLIPModel from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.sensor_msgs.Image import Image from dimos.utils.data import backup_file @@ -192,7 +191,7 @@ def store(self) -> SqliteStore: class SemanticSearchConfig(MemoryModuleConfig): - embedding_model: type[EmbeddingModel] = CLIPModel + embedding_model: type[EmbeddingModel] | None = None class SemanticSearch(MemoryModule): @@ -204,7 +203,13 @@ class SemanticSearch(MemoryModule): def start(self) -> None: super().start() - self.model = self.register_disposable(self.config.embedding_model()) + embedding_cls = self.config.embedding_model + if embedding_cls is None: + from dimos.models.embedding.clip import CLIPModel + + embedding_cls = CLIPModel + + self.model = self.register_disposable(embedding_cls()) self.model.start() self.embeddings = self.store.stream("color_image_embedded", Image) @@ -320,21 +325,25 @@ def _port_to_stream(self, name: str, input_topic: In[Any], stream: Stream[Any]) default_frame_id = self.config.default_frame_id tf_tolerance = self.config.tf_tolerance + # Frames already warned about — a stream with no tf (e.g. a cmd_vel + # tagged 'keyboard') would otherwise log on every message. + warned_frames: set[str] = set() def on_msg(msg: Any) -> None: - ts = getattr(msg, "ts", None) or time.time() + recv_ts = time.time() + ts = getattr(msg, "ts", None) or recv_ts frame_id = getattr(msg, "frame_id", None) or default_frame_id transform = self.tf.get("world", frame_id, time_point=ts, time_tolerance=tf_tolerance) pose = transform.to_pose() if transform is not None else None - if not pose: + if not pose and frame_id not in warned_frames: + warned_frames.add(frame_id) logger.warning( - "[%s] No tf available for frame '%s' at time %s (msg ts: %s), storing without pose", + "[%s] No tf for frame '%s' — recording without pose " + "(further such messages on this stream are not logged)", name, frame_id, - ts, - getattr(msg, "ts", None), ) - stream.append(msg, ts=ts, pose=pose) + stream.append(msg, ts=ts, pose=pose, tags={"recv_ts": recv_ts}) self.register_disposable(Disposable(input_topic.subscribe(on_msg))) diff --git a/dimos/msgs/geometry_msgs/TwistStamped.py b/dimos/msgs/geometry_msgs/TwistStamped.py index ab3dc507b9..9bac50dcce 100644 --- a/dimos/msgs/geometry_msgs/TwistStamped.py +++ b/dimos/msgs/geometry_msgs/TwistStamped.py @@ -39,11 +39,13 @@ class TwistStamped(Twist, Timestamped): msg_name = "geometry_msgs.TwistStamped" ts: float frame_id: str + seq: int @dispatch - def __init__(self, ts: float = 0.0, frame_id: str = "", **kwargs) -> None: # type: ignore[no-untyped-def] + def __init__(self, ts: float = 0.0, frame_id: str = "", seq: int = 0, **kwargs) -> None: # type: ignore[no-untyped-def] self.frame_id = frame_id self.ts = ts if ts != 0 else time.time() + self.seq = seq super().__init__(**kwargs) def lcm_encode(self) -> bytes: @@ -51,6 +53,7 @@ def lcm_encode(self) -> bytes: lcm_msg.twist = self [lcm_msg.header.stamp.sec, lcm_msg.header.stamp.nsec] = sec_nsec(self.ts) # type: ignore[no-untyped-call] lcm_msg.header.frame_id = self.frame_id + lcm_msg.header.seq = self.seq return lcm_msg.lcm_encode() # type: ignore[no-any-return] @classmethod @@ -59,6 +62,7 @@ def lcm_decode(cls, data: bytes | BinaryIO) -> TwistStamped: return cls( ts=lcm_msg.header.stamp.sec + (lcm_msg.header.stamp.nsec / 1_000_000_000), frame_id=lcm_msg.header.frame_id, + seq=lcm_msg.header.seq, linear=[lcm_msg.twist.linear.x, lcm_msg.twist.linear.y, lcm_msg.twist.linear.z], angular=[lcm_msg.twist.angular.x, lcm_msg.twist.angular.y, lcm_msg.twist.angular.z], ) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index efd0f563ba..45049d5e1c 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -72,6 +72,8 @@ "openarm-mock-planner-coordinator": "dimos.robot.manipulators.openarm.blueprints:openarm_mock_planner_coordinator", "openarm-planner-coordinator": "dimos.robot.manipulators.openarm.blueprints:openarm_planner_coordinator", "path-planner-eval": "dimos.navigation.nav_3d.evaluator.blueprints:path_planner_eval", + "teleop-hosted-go2": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2", + "teleop-hosted-xarm7": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_xarm7", "teleop-phone": "dimos.teleop.phone.blueprints:teleop_phone", "teleop-phone-go2": "dimos.teleop.phone.blueprints:teleop_phone_go2", "teleop-phone-go2-fleet": "dimos.teleop.phone.blueprints:teleop_phone_go2_fleet", @@ -165,6 +167,10 @@ "gps-nav-skill-container": "dimos.agents.skills.gps_nav_skill.GpsNavSkillContainer", "grasping-module": "dimos.manipulation.grasping.grasping.GraspingModule", "gstreamer-camera-module": "dimos.hardware.sensors.camera.gstreamer.gstreamer_camera.GstreamerCameraModule", + "hosted-arm-teleop-module": "dimos.teleop.quest_hosted.hosted_extensions.HostedArmTeleopModule", + "hosted-teleop-module": "dimos.teleop.quest_hosted.hosted_teleop_module.HostedTeleopModule", + "hosted-teleop-recorder": "dimos.teleop.quest_hosted.blueprints.HostedTeleopRecorder", + "hosted-twist-teleop-module": "dimos.teleop.quest_hosted.hosted_extensions.HostedTwistTeleopModule", "joint-trajectory-controller": "dimos.manipulation.control.trajectory_controller.joint_trajectory_controller.JointTrajectoryController", "joystick-module": "dimos.robot.unitree.b1.joystick_module.JoystickModule", "keyboard-teleop": "dimos.robot.unitree.keyboard_teleop.KeyboardTeleop", @@ -216,6 +222,7 @@ "spatial-memory": "dimos.perception.spatial_perception.SpatialMemory", "speak-skill": "dimos.agents.skills.speak_skill.SpeakSkill", "tare-planner": "dimos.navigation.nav_stack.modules.tare_planner.tare_planner.TarePlanner", + "teleop-recorder": "dimos.teleop.utils.recorder.TeleopRecorder", "temporal-memory": "dimos.perception.experimental.temporal_memory.temporal_memory.TemporalMemory", "terrain-analysis": "dimos.navigation.nav_stack.modules.terrain_analysis.terrain_analysis.TerrainAnalysis", "terrain-map-ext": "dimos.navigation.nav_stack.modules.terrain_map_ext.terrain_map_ext.TerrainMapExt", diff --git a/dimos/robot/test_all_blueprints.py b/dimos/robot/test_all_blueprints.py index 0bb677ebc5..cdf72e9b6b 100644 --- a/dimos/robot/test_all_blueprints.py +++ b/dimos/robot/test_all_blueprints.py @@ -50,6 +50,8 @@ "coordinator-xarm6", "coordinator-xarm7", "dual-xarm6-planner", + "teleop-hosted-go2", + "teleop-hosted-xarm7", "teleop-quest-dual", "teleop-quest-go2", "teleop-quest-piper", diff --git a/dimos/teleop/README.md b/dimos/teleop/README.md index c29ac5011e..0fe6958c91 100644 --- a/dimos/teleop/README.md +++ b/dimos/teleop/README.md @@ -64,11 +64,16 @@ Filters to mobile-base axes (linear.x, linear.y, angular.z) and publishes as `Tw ``` teleop/ ├── quest/ -│ ├── quest_teleop_module.py # Base Quest teleop module +│ ├── quest_teleop_module.py # Base Quest teleop module (local WebSocket) │ ├── quest_extensions.py # ArmTeleop, TwistTeleop │ ├── quest_types.py # QuestControllerState, Buttons │ └── web/ │ └── static/index.html # WebXR client +├── quest_hosted/ +│ ├── hosted_teleop_module.py # Hosted Quest teleop (Cloudflare SFU broker) +│ ├── hosted_extensions.py # HostedArmTeleop, HostedTwistTeleop +│ ├── blueprints.py # Pre-wired blueprints +│ └── README.md # Channel/CF gotchas, threads, sidecars ├── phone/ │ ├── phone_teleop_module.py # Base Phone teleop module │ ├── phone_extensions.py # SimplePhoneTeleop @@ -77,6 +82,9 @@ teleop/ │ └── static/index.html # Mobile sensor web app ├── utils/ │ ├── teleop_transforms.py # WebXR → robot frame math +│ ├── recorder.py # Generic SQLite recorder (writes .db + report.md on stop) +│ ├── report.py # generate_report(db_path) — read .db, emit report.md + PNGs +│ └── stream_stats.py # LiveStreamStats + pcts/loss_pct (shared math) └── blueprints.py # Module blueprints for easy instantiation ``` diff --git a/dimos/teleop/quest_hosted/README.md b/dimos/teleop/quest_hosted/README.md new file mode 100644 index 0000000000..10582b8fb1 --- /dev/null +++ b/dimos/teleop/quest_hosted/README.md @@ -0,0 +1,119 @@ +# Hosted Teleop Module + +Robot dials out to the [dimensional-teleop](https://github.com/dimensionalOS/dimensional-teleop) +broker (Cloudflare Realtime SFU) — no inbound ports needed. The browser/VR +operator connects through the broker; commands arrive over WebRTC datachannels, +robot video goes out as a WebRTC track. + +## Files + +- **`hosted_teleop_module.py`** — base. Owns the dial-out, datachannel + lifecycle, video send, clock-sync, and the command-plane telemetry pushed + back to the HUD. Subclassed for actuation. +- **`hosted_extensions.py`** — concrete subclasses: arm IK, mobile-base twist. +- **`blueprints.py`** — wires the module to a robot driver. + +The operator HTML lives in the [dimensional-teleop](https://github.com/dimensionalOS/dimensional-teleop) +broker repo (`web/`), not here. + +## How a session connects + +1. Robot creates an `RTCPeerConnection` (MAX_BUNDLE, **must** — see below), + `addTrack(video)`, opens a throwaway negotiated DataChannel on SCTP id 0, + creates an offer, gathers ICE non-trickle. +2. `POST /api/v1/sessions` to the broker with the offer. Broker creates a CF + session, returns the answer + a `session_id` keyed off the robot. +3. SDP answer's candidates are propagated across bundled m-sections (aiortc + workaround — see below) before `setRemoteDescription`. +4. Heartbeat thread polls `/sessions/{id}/heartbeat`; each ack carries the SCTP + ids the broker has assigned for `cmd_unreliable`, `state_reliable`, and + `state_reliable_back`. We open / re-open / close negotiated channels to + track the broker's view. +5. Once `pc.connectionState == "connected"`, `CameraVideoTrack.arm()` starts + delivering frames (drops everything before the operator was actually able + to receive). +6. Telemetry thread pushes command-plane stats (latency / jitter / loss / rate + from the inbound twist stream) on `state_reliable_back` at `telemetry_hz`, + so the operator HUD can show what *arrived* — the operator only knows what + it *sent*. + +## Datachannels (this is the trickiest part) + +CF Realtime bridges datachannels **publisher → subscriber, one direction +only**. That's why we need two reliable channels — one for each direction: + +| Channel | Direction | Reliable? | What it carries | +|---|---|---|---| +| `cmd_unreliable` | operator → robot | no (unordered, 0 retransmits) | TwistStamped / Joy / PoseStamped LCM | +| `state_reliable` | operator → robot | yes | JSON: `ping`, `clock_report`, `video_stats` | +| `state_reliable_back` | robot → operator | yes | JSON: `pong`, `robot_telemetry` | + +All three are **negotiated by SCTP id** (broker assigns; we never pick). + +### SCTP id 0 reservation (the throwaway DC) + +A plain `createDataChannel` auto-grabs SCTP id 1 at connect time — same id the +broker tends to assign `cmd_unreliable`. Collision → `createDataChannel(id=1)` +throws. So at offer time we pin a *throwaway* negotiated channel to id 0 +(reserved, never handed out by the broker). It also forces an SCTP m-line into +the offer so the SFU has a transport to bind the real channels to. + +**Do not close that channel.** Under MAX_BUNDLE the SCTP shares the one bundled +ICE/DTLS transport with the video track; closing the only datachannel risks +the transport. + +## aiortc / Cloudflare quirks (do not regress) + +These are **hard-won, not in any docs** — corresponding fixes are commented at +the call sites but the *why* lives here: + +- **MAX_BUNDLE is mandatory.** aiortc 1.14's default (BALANCED) puts video and + SCTP on separate ICE transports. CF Realtime publishes one bundled transport; + the video one fails ICE and you get a black quad forever. Force + `RTCBundlePolicy.MAX_BUNDLE` on `RTCConfiguration`. + +- **`addTrack` BEFORE `createDataChannel`.** Otherwise the SCTP m-line is + created without a transceiver and aiortc's bundle-collapse discards the + shared transport. ICE never starts. + +- **`_propagate_bundle_candidates`.** aiortc keys remote candidates by transport, + and under one bundled transport the *last* m-section processed wins. CF puts + `a=candidate` only on the video section; the empty SCTP section overwrites + it → remote-candidates=0 → ICE stalls at "checking". The helper replicates + the candidate block into every m-section that lacks one. **Do not remove.** + +- **`makeXRCompatible()` on real hardware.** The operator side, not us, but + worth knowing: `xrCompatible: true` at context creation is not enough on + Quest — `await gl.makeXRCompatible()` is required before building the + `XRWebGLLayer`. + +## Sidecar files + +- **`/tmp/dimos_netem_profile`** — written by `data/notes/benchmarks/netem/apply.sh` + before a run; `TeleopRecorder`'s report writer reads it for the report + header so the netem profile is part of the artifact. We don't touch it. + +## Threads (the runtime) + +- **Event loop thread** (`HostedTeleopLoop`) — runs asyncio for aiortc + httpx. + Datachannel `send()` calls **must** happen here (aiortc datachannels aren't + thread-safe). The pong path is already on the loop (it fires from the + channel's `message` callback); the telemetry thread uses + `loop.call_soon_threadsafe` for the same reason. +- **Heartbeat thread** — HTTP polls the broker; reacts to channel-id changes. +- **Telemetry thread** — pushes `robot_telemetry` JSON on `state_reliable_back` + at `telemetry_hz` (default 3 Hz). +- **Control loop** — `control_loop_hz` (default 50 Hz). Calls the + subclass-overridable engage / publish hooks. + +## Reconnect + +Operator-side reconnect is handled in the broker (`fix3/reconnection`) — it +closes the stale `state_reliable_back` push (CF `datachannels/close`, not in +prose docs but in the OpenAPI spec) before re-pushing. CF does **not** auto-reap +datachannel pushes (the 30s GC is media-only), so without that close, the long- +lived robot session accumulates half-dead pushes and the second bridge 502s +with `repeated_local_track_error`. + +Robot-side auto-redial (R2b in the roadmap) is not yet implemented and is +gated behind TURN landing first. diff --git a/dimos/teleop/quest_hosted/blueprints.py b/dimos/teleop/quest_hosted/blueprints.py new file mode 100644 index 0000000000..57a3214ae6 --- /dev/null +++ b/dimos/teleop/quest_hosted/blueprints.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Hosted teleop blueprints (WebRTC transport).""" + +from pathlib import Path + +from dimos.constants import DIMOS_PROJECT_ROOT +from dimos.control.blueprints.teleop import coordinator_teleop_xarm7 +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.global_config import global_config +from dimos.core.transport import LCMTransport +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic +from dimos.teleop.quest.quest_types import Buttons +from dimos.teleop.quest_hosted.hosted_extensions import ( + HostedArmTeleopModule, + HostedTwistTeleopModule, +) +from dimos.teleop.utils.recorder import TeleopRecorder, TeleopRecorderConfig + +global_config.rerun_open = "none" + +# Single XArm7 teleop via the hosted (WebRTC) client. Pass `--simulation` to +# run the coordinator inside MuJoCo, omit it for real hardware. +teleop_hosted_xarm7 = autoconnect( + HostedArmTeleopModule.blueprint(task_names={"right": "teleop_xarm"}), + coordinator_teleop_xarm7, +).transports( + { + ("right_controller_output", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("buttons", Buttons): LCMTransport("/teleop/buttons", Buttons), + } +) + + +# viewer="none" drops the rerun window (operator gets video over WebRTC, so the +# robot-side rerun view is unwanted here). +teleop_hosted_go2 = autoconnect( + HostedTwistTeleopModule.blueprint(), + unitree_go2_basic, +).global_config(n_workers=8, viewer="none") + + +HOSTED_RECORDINGS_DIR = DIMOS_PROJECT_ROOT / "data/hosted_teleop/recordings" + + +class HostedTeleopRecorderConfig(TeleopRecorderConfig): + # Same generic recorder, just defaulting recordings into the hosted dir. + db_path: str | Path = HOSTED_RECORDINGS_DIR / "recording_hosted.db" + + +class HostedTeleopRecorder(TeleopRecorder): + """Generic ``TeleopRecorder`` defaulting to the hosted recordings dir. + + Ports + per-run timestamping are inherited; this only changes the default + output path. Compose at the CLI:: + + dimos run teleop-hosted-xarm7 hosted-teleop-recorder + dimos run teleop-hosted-go2 hosted-teleop-recorder + """ + + config: HostedTeleopRecorderConfig + + +__all__ = [ + "HostedTeleopRecorder", + "HostedTeleopRecorderConfig", + "teleop_hosted_go2", + "teleop_hosted_xarm7", +] diff --git a/dimos/teleop/quest_hosted/hosted_extensions.py b/dimos/teleop/quest_hosted/hosted_extensions.py new file mode 100644 index 0000000000..c458fdd495 --- /dev/null +++ b/dimos/teleop/quest_hosted/hosted_extensions.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Hosted teleop subclasses: arm IK and mobile-base twist.""" + +import time +from typing import Any + +from pydantic import Field + +from dimos.core.stream import Out +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.teleop.quest.quest_types import Buttons, QuestControllerState +from dimos.teleop.quest_hosted.hosted_teleop_module import ( + Hand, + HostedTeleopConfig, + HostedTeleopModule, +) + + +class HostedArmTeleopConfig(HostedTeleopConfig): + # task_names maps "left"/"right" → coordinator task name (e.g. "teleop_xarm"), + # used as frame_id so the coordinator routes to the right TeleopIKTask. + task_names: dict[str, str] = Field(default_factory=dict) + + +class HostedArmTeleopModule(HostedTeleopModule): + """Arm-IK subclass: routes per-hand poses to coordinator tasks + analog triggers.""" + + config: HostedArmTeleopConfig + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._task_names: dict[Hand, str] = { + Hand[k.upper()]: v for k, v in self.config.task_names.items() + } + + def _publish_msg(self, hand: Hand, output_msg: PoseStamped) -> None: + # Stamp frame_id with the per-hand task name so the coordinator routes. + task_name = self._task_names.get(hand) + if task_name: + output_msg = PoseStamped( + position=output_msg.position, + orientation=output_msg.orientation, + ts=output_msg.ts, + frame_id=task_name, + ) + super()._publish_msg(hand, output_msg) + + def _publish_button_state( + self, + left: QuestControllerState | None, + right: QuestControllerState | None, + ) -> None: + # Same as base, plus analog triggers packed into Buttons bits 16-29. + buttons = Buttons.from_controllers(left, right) + buttons.pack_analog_triggers( + left=left.trigger if left is not None else 0.0, + right=right.trigger if right is not None else 0.0, + ) + self.buttons.publish(buttons) + + +class HostedTwistTeleopConfig(HostedTeleopConfig): + # Operator sends normalized [-1, 1] (Shift=2x, Ctrl=0.5x); we scale here. + linear_speed: float = 0.5 + angular_speed: float = 0.8 + + +class HostedTwistTeleopModule(HostedTeleopModule): + """Mobile-base subclass. Drives cmd_vel from keyboard TwistStamped or VR Joy.""" + + config: HostedTwistTeleopConfig + + cmd_vel: Out[Twist] + + def _publish_twist( + self, lx: float, ly: float, az: float, ts: float, frame_id: str, seq: int + ) -> None: + ls = self.config.linear_speed + as_ = self.config.angular_speed + linear = Vector3(lx * ls, ly * ls, 0.0) + angular = Vector3(0.0, 0.0, az * as_) + self.cmd_vel.publish(Twist(linear=linear, angular=angular)) + self.cmd_vel_stamped.publish( + TwistStamped(ts=ts, frame_id=frame_id, seq=seq, linear=linear, angular=angular) + ) + + def _on_twist_bytes(self, data: bytes) -> None: + # Keyboard/touch path: stamped ts + seq feed the HUD command-plane stats. + msg = TwistStamped.lcm_decode(data) + self._record_cmd_arrival(msg.ts, msg.seq) + self._publish_twist( + msg.linear.x, msg.linear.y, msg.angular.z, msg.ts, msg.frame_id, msg.seq + ) + + def _on_joy_bytes(self, data: bytes) -> None: + # VR thumbsticks → base velocity. Left Y = fwd/back, left X = strafe, + # right X = yaw. Stick conventions are opposite ROS, so negate. + super()._on_joy_bytes(data) + with self._lock: + right = self._controllers.get(Hand.RIGHT) + left = self._controllers.get(Hand.LEFT) + fwd = -left.thumbstick.y if left is not None else 0.0 + strafe = -left.thumbstick.x if left is not None else 0.0 + yaw = -right.thumbstick.x if right is not None else 0.0 + self._publish_twist(fwd, strafe, yaw, time.time(), "vr", 0) diff --git a/dimos/teleop/quest_hosted/hosted_teleop_module.py b/dimos/teleop/quest_hosted/hosted_teleop_module.py new file mode 100644 index 0000000000..e267de1294 --- /dev/null +++ b/dimos/teleop/quest_hosted/hosted_teleop_module.py @@ -0,0 +1,654 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Hosted Teleop Module — Cloudflare Realtime SFU client.""" + +from __future__ import annotations + +import asyncio +from enum import IntEnum +import json +import os +import threading +import time +from typing import Any + +from aiortc import ( + RTCBundlePolicy, + RTCConfiguration, + RTCDataChannel, + RTCIceServer, + RTCPeerConnection, + RTCSessionDescription, +) +from dimos_lcm.geometry_msgs import PoseStamped as LCMPoseStamped, TwistStamped as LCMTwistStamped +from dimos_lcm.sensor_msgs import Joy as LCMJoy +import httpx +from reactivex.disposable import Disposable + +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped +from dimos.msgs.sensor_msgs.Image import Image +from dimos.msgs.sensor_msgs.Joy import Joy +from dimos.teleop.quest.quest_types import Buttons, QuestControllerState +from dimos.teleop.quest_hosted.sdp import propagate_bundle_candidates +from dimos.teleop.quest_hosted.video_track import CameraVideoTrack +from dimos.teleop.utils.stream_stats import LiveStreamStats +from dimos.teleop.utils.teleop_transforms import webxr_to_robot +from dimos.teleop.utils.video_stats import VideoStats +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class Hand(IntEnum): + LEFT = 0 + RIGHT = 1 + + +class HostedTeleopConfig(ModuleConfig): + control_loop_hz: float = 50.0 + + broker_url: str = os.getenv("TELEOP_BROKER_URL", "https://teleop.dimensionalos.com") + broker_api_key: str = os.getenv("TELEOP_API_KEY", "") + robot_id: str = os.getenv("TELEOP_ROBOT_ID", "") + robot_name: str = os.getenv("TELEOP_ROBOT_NAME", "") + + stun_urls: list[str] = ["stun:stun.cloudflare.com:3478"] + turn_urls: list[str] = [] + turn_username: str = "" + turn_credential: str = "" + + heartbeat_hz: float = 1.0 + telemetry_hz: float = 3.0 # robot → operator HUD command-plane stats + + +class HostedTeleopModule(Module): + """Cloudflare-Realtime-based teleop client. + + Override hooks: ``_handle_engage``, ``_should_publish``, + ``_get_output_pose``, ``_publish_msg``, ``_publish_button_state``. + """ + + config: HostedTeleopConfig + + left_controller_output: Out[PoseStamped] + right_controller_output: Out[PoseStamped] + buttons: Out[Buttons] + # cmd_vel actuation port is on the mobile-base subclass; this stays generic. + cmd_vel_stamped: Out[TwistStamped] + # Operator-side video health, sampled in the browser via getStats() and + # relayed over state_reliable. Recorders pick this up like any typed stream. + video_stats: Out[VideoStats] + color_image: In[Image] + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + self._is_engaged: dict[Hand, bool] = {Hand.LEFT: False, Hand.RIGHT: False} + self._initial_poses: dict[Hand, PoseStamped | None] = {Hand.LEFT: None, Hand.RIGHT: None} + self._current_poses: dict[Hand, PoseStamped | None] = {Hand.LEFT: None, Hand.RIGHT: None} + self._controllers: dict[Hand, QuestControllerState | None] = { + Hand.LEFT: None, + Hand.RIGHT: None, + } + self._lock = threading.RLock() + + self._loop: asyncio.AbstractEventLoop | None = None + self._loop_thread: threading.Thread | None = None + + self._pc: RTCPeerConnection | None = None + self._http: httpx.AsyncClient | None = None + self._session_id: str | None = None + + # All three datachannels are negotiated; SCTP ids come from the broker + # heartbeat ack (so they're None until an operator joins). + self._cmd_channel: RTCDataChannel | None = None + self._cmd_channel_id: int | None = None + self._state_channel: RTCDataChannel | None = None + self._state_channel_id: int | None = None + self._state_back_channel: RTCDataChannel | None = None + self._state_back_channel_id: int | None = None + + self._video_track = CameraVideoTrack() + self._cmd_stats = LiveStreamStats() + + self._control_loop_thread: threading.Thread | None = None + self._heartbeat_thread: threading.Thread | None = None + self._telemetry_thread: threading.Thread | None = None + self._stop_event = threading.Event() + + self._decoders: dict[bytes, Any] = { + LCMPoseStamped._get_packed_fingerprint(): self._on_pose_bytes, + LCMJoy._get_packed_fingerprint(): self._on_joy_bytes, + LCMTwistStamped._get_packed_fingerprint(): self._on_twist_bytes, + } + + @rpc + def start(self) -> None: + super().start() + self._stop_event.clear() + unsub = self.color_image.subscribe(self._video_track.set_latest) + self.register_disposable(Disposable(unsub)) + self._start_event_loop() + self._connect_blocking() + self._start_heartbeat() + self._start_telemetry() + self._start_control_loop() + logger.info("HostedTeleopModule started") + + @rpc + def stop(self) -> None: + self._stop_event.set() + if self._control_loop_thread is not None: + self._control_loop_thread.join(timeout=1.0) + self._control_loop_thread = None + if self._heartbeat_thread is not None: + self._heartbeat_thread.join(timeout=2.0) + self._heartbeat_thread = None + if self._telemetry_thread is not None: + self._telemetry_thread.join(timeout=2.0) + self._telemetry_thread = None + if self._loop is not None and self._loop.is_running(): + try: + asyncio.run_coroutine_threadsafe(self._disconnect(), self._loop).result(timeout=5.0) + except Exception: + logger.exception("Error during disconnect") + self._stop_event_loop() + super().stop() + + def _start_event_loop(self) -> None: + ready = threading.Event() + + def runner() -> None: + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + ready.set() + self._loop.run_forever() + + self._loop_thread = threading.Thread(target=runner, daemon=True, name="HostedTeleopLoop") + self._loop_thread.start() + ready.wait() + + def _stop_event_loop(self) -> None: + if self._loop is not None and self._loop.is_running(): + self._loop.call_soon_threadsafe(self._loop.stop) + if self._loop_thread is not None: + self._loop_thread.join(timeout=2.0) + self._loop_thread = None + self._loop = None + + def _connect_blocking(self) -> None: + assert self._loop is not None + future = asyncio.run_coroutine_threadsafe(self._connect(), self._loop) + future.result(timeout=45.0) + + async def _connect(self) -> None: + self._http = httpx.AsyncClient(timeout=30.0) + + ice_servers = [RTCIceServer(urls=u) for u in self.config.stun_urls] + for url in self.config.turn_urls or []: + ice_servers.append( + RTCIceServer( + urls=url, + username=self.config.turn_username or None, + credential=self.config.turn_credential or None, + ) + ) + + # MAX_BUNDLE, addTrack-before-createDataChannel, and the id=0 throwaway + # below are all CF/aiortc workarounds — see README before changing. + self._pc = RTCPeerConnection( + RTCConfiguration( + iceServers=ice_servers, + bundlePolicy=RTCBundlePolicy.MAX_BUNDLE, + ) + ) + self._pc.addTrack(self._video_track) + sctp_init = self._pc.createDataChannel("_sctp_init", negotiated=True, id=0) + + @self._pc.on("connectionstatechange") + async def _on_state() -> None: + if self._pc is None: + return + logger.info(f"PC state: {self._pc.connectionState}") + if self._pc.connectionState == "connected": + self._video_track.arm() + + offer = await self._pc.createOffer() + await self._pc.setLocalDescription(offer) + + # Wait for ICE gathering before posting (non-trickle). + if self._pc.iceGatheringState != "complete": + done: asyncio.Future[None] = asyncio.get_event_loop().create_future() + + @self._pc.on("icegatheringstatechange") + def _on_gathering() -> None: + if self._pc is None: + return + if self._pc.iceGatheringState == "complete" and not done.done(): + done.set_result(None) + + await done + + url = f"{self.config.broker_url.rstrip('/')}/api/v1/sessions" + body = { + "robot_id": self.config.robot_id, + "robot_name": self.config.robot_name, + "sdp_offer": self._pc.localDescription.sdp, + } + resp = await self._http.post(url, json=body, headers=self._auth_headers()) + if resp.status_code >= 400: + # raise_for_status drops the body; surface the broker's JSON detail. + logger.error( + "Broker POST /sessions -> %s: %s", + resp.status_code, + resp.text[:1000], + ) + resp.raise_for_status() + data = resp.json() + self._session_id = data["session_id"] + + answer_sdp = propagate_bundle_candidates(data["sdp_answer"]) + await self._pc.setRemoteDescription(RTCSessionDescription(sdp=answer_sdp, type="answer")) + + _ = sctp_init # intentionally left open + + logger.info( + f"Registered with broker: session_id={self._session_id}, " + f"cf_session_id={data.get('cf_session_id')}" + ) + + async def _disconnect(self) -> None: + if self._http is not None and self._session_id is not None: + try: + url = f"{self.config.broker_url.rstrip('/')}/api/v1/sessions/{self._session_id}" + await self._http.delete(url, headers=self._auth_headers()) + except Exception: + logger.exception("Failed to deregister with broker") + self._close_cmd_channel() + self._cmd_channel_id = None + self._close_state_channel() + self._state_channel_id = None + self._close_state_back_channel() + self._state_back_channel_id = None + if self._pc is not None: + await self._pc.close() + self._pc = None + if self._http is not None: + await self._http.aclose() + self._http = None + self._session_id = None + + def _auth_headers(self) -> dict[str, str]: + if self.config.broker_api_key: + return {"X-Robot-API-Key": self.config.broker_api_key} + return {} + + def _start_heartbeat(self) -> None: + def runner() -> None: + interval = 1.0 / max(self.config.heartbeat_hz, 0.1) + while not self._stop_event.is_set(): + if self._loop is not None and self._loop.is_running() and self._session_id: + try: + asyncio.run_coroutine_threadsafe(self._heartbeat(), self._loop).result( + timeout=2.0 + ) + except Exception: + logger.exception("Heartbeat/channel-open failed") + self._stop_event.wait(interval) + + self._heartbeat_thread = threading.Thread( + target=runner, daemon=True, name="HostedTeleopHeartbeat" + ) + self._heartbeat_thread.start() + + def _record_cmd_arrival(self, ts: float | None, seq: int | None) -> None: + """Hook for the subclass cmd-decode path; feeds the HUD telemetry.""" + self._cmd_stats.record(ts, seq) + + def _start_telemetry(self) -> None: + """Push command-plane health (latency/jitter/loss/rate) to the operator HUD.""" + + def send_telemetry() -> None: + stats = self._cmd_stats.snapshot() + channel = self._state_back_channel + if stats is None or channel is None or channel.readyState != "open": + return + try: + channel.send( + json.dumps( + { + "type": "robot_telemetry", + "cmd": stats, + "robot_ts": time.time(), + } + ) + ) + except Exception: + logger.debug("telemetry send failed", exc_info=True) + + def runner() -> None: + interval = 1.0 / max(self.config.telemetry_hz, 0.1) + while not self._stop_event.is_set(): + if self._loop is not None and self._loop.is_running(): + self._loop.call_soon_threadsafe(send_telemetry) + self._stop_event.wait(interval) + + self._telemetry_thread = threading.Thread( + target=runner, daemon=True, name="HostedTeleopTelemetry" + ) + self._telemetry_thread.start() + + async def _heartbeat(self) -> None: + """POST a heartbeat; react to the SCTP ids the broker hands back by + opening / re-opening / closing the three negotiated datachannels.""" + if self._http is None or self._session_id is None: + return + url = f"{self.config.broker_url.rstrip('/')}/api/v1/sessions/{self._session_id}/heartbeat" + try: + resp = await self._http.post(url, json={}, headers=self._auth_headers()) + resp.raise_for_status() + except Exception as e: + logger.warning(f"Heartbeat POST failed: {e}") + return + + try: + data = resp.json() + except Exception: + return + + sub_id = data.get("cmd_channel_subscriber_id") + sub_id_int = int(sub_id) if sub_id is not None else None + + if sub_id_int != self._cmd_channel_id: + self._close_cmd_channel() + self._cmd_channel_id = sub_id_int + if sub_id_int is not None: + self._open_cmd_channel(sub_id_int) + + state_sub_id = data.get("state_channel_subscriber_id") + state_sub_id_int = int(state_sub_id) if state_sub_id is not None else None + + if state_sub_id_int != self._state_channel_id: + self._close_state_channel() + self._state_channel_id = state_sub_id_int + if state_sub_id_int is not None: + self._open_state_channel(state_sub_id_int) + + state_back_pub_id = data.get("state_back_channel_publisher_id") + state_back_pub_id_int = int(state_back_pub_id) if state_back_pub_id is not None else None + + if state_back_pub_id_int != self._state_back_channel_id: + self._close_state_back_channel() + self._state_back_channel_id = state_back_pub_id_int + if state_back_pub_id_int is not None: + self._open_state_back_channel(state_back_pub_id_int) + + def _open_cmd_channel(self, sctp_id: int) -> None: + """Operator → robot, unreliable + unordered. Carries LCM-encoded commands.""" + if self._pc is None: + return + logger.info("Opening negotiated cmd_unreliable on SCTP id %d", sctp_id) + channel = self._pc.createDataChannel( + "cmd_unreliable", + ordered=False, + maxRetransmits=0, + negotiated=True, + id=sctp_id, + ) + + @channel.on("message") + def _on_message(data: Any) -> None: + if isinstance(data, bytes): + self._dispatch_bytes(data) + + self._cmd_channel = channel + + def _close_cmd_channel(self) -> None: + if self._cmd_channel is not None: + try: + self._cmd_channel.close() + except Exception: + pass + self._cmd_channel = None + + def _open_state_channel(self, sctp_id: int) -> None: + """Operator → robot, reliable + ordered. JSON control plane (ping, clock_report, video_stats).""" + if self._pc is None: + return + logger.info("Opening negotiated state_reliable on SCTP id %d", sctp_id) + channel = self._pc.createDataChannel( + "state_reliable", + ordered=True, + negotiated=True, + id=sctp_id, + ) + + @channel.on("message") + def _on_message(data: Any) -> None: + self._on_state_message(data) + + self._state_channel = channel + + def _close_state_channel(self) -> None: + if self._state_channel is not None: + try: + self._state_channel.close() + except Exception: + pass + self._state_channel = None + + def _open_state_back_channel(self, sctp_id: int) -> None: + """Robot → operator (CF bridges one-way). Pong replies + robot_telemetry.""" + if self._pc is None: + return + logger.info("Opening negotiated state_reliable_back on SCTP id %d", sctp_id) + channel = self._pc.createDataChannel( + "state_reliable_back", + ordered=True, + negotiated=True, + id=sctp_id, + ) + self._state_back_channel = channel + + def _close_state_back_channel(self) -> None: + if self._state_back_channel is not None: + try: + self._state_back_channel.close() + except Exception: + pass + self._state_back_channel = None + + def _on_state_message(self, data: Any) -> None: + """Dispatch a JSON message from state_reliable: ping → pong, plus clock_report / video_stats logging.""" + if isinstance(data, bytes): + try: + data = data.decode("utf-8") + except UnicodeDecodeError: + logger.warning("state_reliable: non-utf8 payload, dropping") + return + try: + msg = json.loads(data) + except Exception: + logger.warning(f"state_reliable: malformed JSON: {data[:80]!r}") + return + + kind = msg.get("type") + if kind == "ping": + client_ts = msg.get("client_ts") + if client_ts is None: + return + pong = json.dumps({"type": "pong", "client_ts": client_ts, "robot_ts": time.time()}) + out = self._state_back_channel + if out is not None and out.readyState == "open": + out.send(pong) + else: + logger.warning( + "state_reliable: ping received but state_reliable_back not open — pong dropped" + ) + elif kind == "clock_report": + rtt = msg.get("rtt_ms") + off = msg.get("offset_ms") + logger.info( + "clock-sync: operator rtt=%.1fms offset=%.1fms", + float(rtt) if rtt is not None else float("nan"), + float(off) if off is not None else float("nan"), + ) + elif kind == "video_stats": + stats = VideoStats.from_dict(msg) + logger.info("video: %s", stats) + self.video_stats.publish(stats) + else: + logger.debug(f"state_reliable: unknown message type {kind!r}") + + def _dispatch_bytes(self, data: bytes) -> None: + decoder = self._decoders.get(data[:8]) + if decoder: + decoder(data) + else: + logger.warning(f"Unknown message fingerprint: {data[:8].hex()}") + + def _on_pose_bytes(self, data: bytes) -> None: + msg = PoseStamped.lcm_decode(data) + try: + hand = self._resolve_hand(msg.frame_id) + except ValueError: + return + robot_pose = webxr_to_robot(msg, is_left_controller=(hand == Hand.LEFT)) + with self._lock: + self._current_poses[hand] = robot_pose + + def _on_joy_bytes(self, data: bytes) -> None: + msg = Joy.lcm_decode(data) + try: + hand = self._resolve_hand(msg.frame_id) + except ValueError: + return + try: + controller = QuestControllerState.from_joy(msg, is_left=(hand == Hand.LEFT)) + except ValueError: + logger.warning( + f"Malformed Joy for {hand.name}: axes={len(msg.axes or [])}, " + f"buttons={len(msg.buttons or [])}" + ) + return + with self._lock: + self._controllers[hand] = controller + + def _on_twist_bytes(self, data: bytes) -> None: + # Mobile-base subclass overrides this to actuate cmd_vel. + msg = TwistStamped.lcm_decode(data) + self.cmd_vel_stamped.publish(msg) + + @staticmethod + def _resolve_hand(frame_id: str) -> Hand: + if frame_id == "left": + return Hand.LEFT + if frame_id == "right": + return Hand.RIGHT + raise ValueError(f"Unexpected frame_id: {frame_id!r}") + + def _start_control_loop(self) -> None: + # _stop_event already cleared in start() before any thread spawns. + self._control_loop_thread = threading.Thread( + target=self._control_loop, + daemon=True, + name="HostedTeleopControlLoop", + ) + self._control_loop_thread.start() + + def _control_loop(self) -> None: + """Fixed-rate cycle: engage-gating → per-hand publish → buttons publish. + + Drives the subclass-overridable hooks; the inbound cmd / pose / joy + decode happens off this loop (in the datachannel callbacks). + """ + period = 1.0 / self.config.control_loop_hz + while not self._stop_event.is_set(): + loop_start = time.perf_counter() + try: + with self._lock: + self._handle_engage() + for hand in Hand: + if not self._should_publish(hand): + continue + output_pose = self._get_output_pose(hand) + if output_pose is not None: + self._publish_msg(hand, output_pose) + left = self._controllers.get(Hand.LEFT) + right = self._controllers.get(Hand.RIGHT) + self._publish_button_state(left, right) + except Exception: + logger.exception("Error in control loop") + + elapsed = time.perf_counter() - loop_start + sleep_time = period - elapsed + if sleep_time > 0: + self._stop_event.wait(sleep_time) + + def _handle_engage(self) -> None: + for hand in Hand: + controller = self._controllers.get(hand) + if controller is None: + continue + if controller.primary: + if not self._is_engaged[hand]: + pose = self._current_poses.get(hand) + if pose is None: + logger.error( + f"Engage failed: {hand.name.lower()} controller has no pose data" + ) + continue + self._initial_poses[hand] = pose + self._is_engaged[hand] = True + logger.info(f"{hand.name} engaged.") + else: + if self._is_engaged[hand]: + self._is_engaged[hand] = False + logger.info(f"{hand.name} disengaged.") + + def _should_publish(self, hand: Hand) -> bool: + return self._is_engaged[hand] + + def _get_output_pose(self, hand: Hand) -> PoseStamped | None: + current = self._current_poses.get(hand) + initial = self._initial_poses.get(hand) + if current is None or initial is None: + return None + delta = current - initial + return PoseStamped( + position=delta.position, + orientation=delta.orientation, + ts=current.ts, + frame_id=current.frame_id, + ) + + def _publish_msg(self, hand: Hand, output_msg: PoseStamped) -> None: + if hand == Hand.LEFT: + self.left_controller_output.publish(output_msg) + else: + self.right_controller_output.publish(output_msg) + + def _publish_button_state( + self, + left: QuestControllerState | None, + right: QuestControllerState | None, + ) -> None: + buttons = Buttons.from_controllers(left, right) + self.buttons.publish(buttons) diff --git a/dimos/teleop/quest_hosted/sdp.py b/dimos/teleop/quest_hosted/sdp.py new file mode 100644 index 0000000000..cb1098d25d --- /dev/null +++ b/dimos/teleop/quest_hosted/sdp.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""SDP-string workarounds for the aiortc + Cloudflare Realtime SFU combo. + +Pure functions, easy to unit-test in isolation. The *why* lives in +``dimos/teleop/quest_hosted/README.md`` — these are the workarounds; the +README explains the bugs. +""" + +from __future__ import annotations + +import re + + +def propagate_bundle_candidates(sdp: str) -> str: + """Workaround for aiortc MAX_BUNDLE candidate-dict overwrite (see README).""" + sections = re.split(r"(?m)^(?=m=)", sdp) + cand_lines: list[str] = [] + for s in sections: + if s.startswith("m="): + found = re.findall(r"(?m)^(a=(?:candidate:|end-of-candidates).*)$", s) + if found: + cand_lines = found + break + if not cand_lines: + return sdp + + block = "\r\n".join(cand_lines) + "\r\n" + out = [] + for s in sections: + if s.startswith("m=") and not re.search(r"(?m)^a=candidate:", s): + out.append(s.rstrip("\r\n") + "\r\n" + block) + else: + out.append(s) + return "".join(out) + + +__all__ = ["propagate_bundle_candidates"] diff --git a/dimos/teleop/quest_hosted/test_clock_sync.py b/dimos/teleop/quest_hosted/test_clock_sync.py new file mode 100644 index 0000000000..53aa3b6233 --- /dev/null +++ b/dimos/teleop/quest_hosted/test_clock_sync.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the clock-sync ping/pong protocol. + +Validates ``HostedTeleopModule._on_state_message`` shape without standing up +a WebRTC stack — injects a mocked state channel and inspects what it sends. +""" + +from __future__ import annotations + +import json +import time +from unittest.mock import MagicMock + +import pytest + +from dimos.teleop.quest_hosted.hosted_teleop_module import HostedTeleopModule + + +@pytest.fixture +def module(): + """Bare ``HostedTeleopModule`` with a mocked open state-back channel.""" + m = HostedTeleopModule.__new__(HostedTeleopModule) + m._state_back_channel = MagicMock() + m._state_back_channel.readyState = "open" + m._state_channel = None + return m + + +def _sent_payload(module) -> dict: + """Decode the JSON the mock channel was asked to send.""" + module._state_back_channel.send.assert_called_once() + return json.loads(module._state_back_channel.send.call_args[0][0]) + + +def test_ping_echoes_pong_with_robot_ts(module): + """A well-formed ping → pong with same client_ts + a fresh robot_ts.""" + before = time.time() + module._on_state_message(json.dumps({"type": "ping", "client_ts": 100.5}).encode("utf-8")) + after = time.time() + + sent = _sent_payload(module) + assert sent["type"] == "pong" + assert sent["client_ts"] == 100.5 + assert before <= sent["robot_ts"] <= after + + +def test_ping_accepts_str_data(module): + """aiortc may deliver str OR bytes on a DataChannel; both must work.""" + module._on_state_message('{"type":"ping","client_ts":42.0}') + assert _sent_payload(module)["client_ts"] == 42.0 + + +def test_malformed_json_dropped(module): + """Garbage payload — pong not sent, no exception.""" + module._on_state_message(b"not json at all") + module._state_back_channel.send.assert_not_called() + + +def test_unknown_type_dropped(module): + """Future control-plane messages this version doesn't recognise are no-ops.""" + module._on_state_message(b'{"type":"mode_switch","mode":"arm"}') + module._state_back_channel.send.assert_not_called() + + +def test_ping_missing_client_ts_dropped(module): + """Don't echo nonsense pings — keeps the offset estimator clean.""" + module._on_state_message(b'{"type":"ping"}') + module._state_back_channel.send.assert_not_called() + + +def test_pong_swallowed_when_channel_closed(module): + """If the channel closed between recv and the send attempt, swallow it.""" + module._state_back_channel.readyState = "closed" + module._on_state_message(b'{"type":"ping","client_ts":1.0}') + module._state_back_channel.send.assert_not_called() + + +def test_non_utf8_bytes_dropped(module): + """Random bytes that aren't valid UTF-8 — drop, don't crash.""" + module._on_state_message(b"\xff\xfe\xfd") + module._state_back_channel.send.assert_not_called() + + +def test_pong_dropped_when_state_back_channel_absent(module): + """If state_reliable_back never opened, ping arriving on state_reliable + must NOT fall back to state_reliable (CF wouldn't bridge it back).""" + module._state_back_channel = None + module._state_channel = MagicMock() + module._state_channel.readyState = "open" + module._on_state_message(b'{"type":"ping","client_ts":1.0}') + module._state_channel.send.assert_not_called() diff --git a/dimos/teleop/quest_hosted/video_track.py b/dimos/teleop/quest_hosted/video_track.py new file mode 100644 index 0000000000..653f26ad70 --- /dev/null +++ b/dimos/teleop/quest_hosted/video_track.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""aiortc video track sourced from an Image stream. + +Extracted from ``hosted_teleop_module`` so the broker-handshake file stays +focused on session lifecycle rather than media plumbing. +""" + +from __future__ import annotations + +import asyncio +import threading +import time + +from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE, VideoStreamTrack +import av + +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat + +_AV_FORMAT_MAP = { + ImageFormat.BGR: "bgr24", + ImageFormat.RGB: "rgb24", + ImageFormat.BGRA: "bgra", + ImageFormat.RGBA: "rgba", + ImageFormat.GRAY: "gray", +} + + +class CameraVideoTrack(VideoStreamTrack): + """aiortc video track sourced from the latest Image on the In port. + + Drain-mode (recv only returns on a NEW frame) + wall-clock PTSs — so the + browser paces playback at the source's real cadence, not aiortc's 30fps + schedule, and we don't feed duplicates at startup (would warm up the + encoder and the browser would play the burst in fast-forward). + """ + + def __init__(self) -> None: + super().__init__() + self._lock = threading.Lock() + self._latest: Image | None = None + self._frame_seq = 0 + self._consumed_seq = 0 + self._armed = False + self._first_wall: float | None = None + + def arm(self) -> None: + """Discard buffered frames; start delivering from now. + + Called once the PC is ``connected`` so the operator's video starts at + "this instant", not "whenever the robot booted". + """ + with self._lock: + self._consumed_seq = self._frame_seq + self._armed = True + + def set_latest(self, img: Image) -> None: + with self._lock: + self._latest = img + self._frame_seq += 1 + + async def recv(self) -> av.VideoFrame: + while True: + with self._lock: + if ( + self._armed + and self._latest is not None + and self._frame_seq > self._consumed_seq + ): + img = self._latest + self._consumed_seq = self._frame_seq + break + await asyncio.sleep(0.005) + + now = time.time() + if self._first_wall is None: + self._first_wall = now + pts = int((now - self._first_wall) * VIDEO_CLOCK_RATE) + + frame = av.VideoFrame.from_ndarray(img.data, format=_AV_FORMAT_MAP.get(img.format, "bgr24")) + frame.pts = pts + frame.time_base = VIDEO_TIME_BASE + return frame + + +__all__ = ["CameraVideoTrack"] diff --git a/dimos/teleop/utils/recorder.py b/dimos/teleop/utils/recorder.py new file mode 100644 index 0000000000..c2facc9e42 --- /dev/null +++ b/dimos/teleop/utils/recorder.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Generic teleop stream recorder, shared across all teleop variants. + +One recorder for quest, phone, and hosted teleop. It declares the *superset* +of teleop output ports; autoconnect wires whichever the composed blueprint +actually produces (VR controller poses + buttons for arm teleop, or +``cmd_vel_stamped`` for mobile-base/keyboard teleop). Ports the blueprint +doesn't drive simply stay empty in the DB. + +Compose at the CLI:: + + dimos run teleop-quest-xarm7 teleop-recorder + dimos run teleop-hosted-go2 teleop-recorder +""" + +from datetime import datetime +from pathlib import Path + +from dimos.core.core import rpc +from dimos.core.stream import In +from dimos.memory2.module import Recorder, RecorderConfig +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped +from dimos.teleop.quest.quest_types import Buttons +from dimos.teleop.utils.report import generate_report +from dimos.teleop.utils.video_stats import VideoStats +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class TeleopRecorderConfig(RecorderConfig): + # Default path is a stem — TeleopRecorder.start() appends a per-run + # timestamp so successive runs don't clobber each other. Pass an absolute + # path with ``.db`` to opt out of timestamping. + db_path: str | Path = "recording_teleop.db" + + # If True (default), generate a transport-stats report next to the .db on + # stop. Set False for a pure recording-only run (skips matplotlib import + + # report formatting). + generate_report: bool = True + + +class TeleopRecorder(Recorder): + """Records teleop streams to a .db + (optionally) a transport-stats report. + + Superset of ports across arm (pose + buttons), mobile-base + (``cmd_vel_stamped``), and hosted-teleop video stats. Unconnected ports stay + empty in the DB. Each run lands in its own ``_.db`` + so runs don't clobber. On stop, if ``generate_report=True``, also writes + ``report.md`` + ``latency.png`` + ``jitter.png`` next to the .db. + """ + + left_controller_output: In[PoseStamped] + right_controller_output: In[PoseStamped] + buttons: In[Buttons] + cmd_vel_stamped: In[TwistStamped] + video_stats: In[VideoStats] + config: TeleopRecorderConfig + + @rpc + def start(self) -> None: + # Append per-run timestamp to the stem so each run is its own file. + base = Path(self.config.db_path) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self.config.db_path = base.with_name(f"{base.stem}_{timestamp}{base.suffix}") + # SqliteStore (sqlite3.connect) won't create the parent dir — ensure it. + self.config.db_path.parent.mkdir(parents=True, exist_ok=True) + super().start() + + @rpc + def stop(self) -> None: + # Snapshot the db_path before super().stop() closes the store — once + # closed, we still want to point the report writer at the same file. + db_path = Path(self.config.db_path) if self.config.generate_report else None + super().stop() + if db_path is not None: + try: + generate_report(db_path) + except Exception: + logger.exception("generate_report failed for %s", db_path) + + +__all__ = ["TeleopRecorder", "TeleopRecorderConfig"] diff --git a/dimos/teleop/utils/report.py b/dimos/teleop/utils/report.py new file mode 100644 index 0000000000..eece4714e8 --- /dev/null +++ b/dimos/teleop/utils/report.py @@ -0,0 +1,364 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Transport-stats report from a recorded teleop ``.db``. + +Reads the streams a ``TeleopRecorder`` writes (twist, poses, buttons, video +stats) and emits ``report.md`` + ``latency.png`` + ``jitter.png`` next to it. +The math (percentiles, loss, reorder, stalls) is the same one the live HUD +uses — both go through ``stream_stats``. + +Importable from ``TeleopRecorder.stop()`` (post-hoc on the run's own .db) or +runnable standalone over an old recording:: + + python -m dimos.teleop.utils.report +""" + +from __future__ import annotations + +from collections import Counter +from datetime import datetime +from pathlib import Path +import sys +from typing import Any + +import numpy as np + +from dimos.memory2.store.sqlite import SqliteStore +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped +from dimos.teleop.quest.quest_types import Buttons +from dimos.teleop.utils.stream_stats import classify_e2e, loss_pct, pcts, reorder_count +from dimos.teleop.utils.video_stats import VideoStats +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +# E2E latency acceptance bands (ms), keyed on p50. +_LOSS_THRESHOLD_PCT = 1.0 + +# Streams the recorder declares + the dimos msg type to decode each as. Order +# here drives the order in the report. +_STREAM_TYPES = { + "cmd_vel_stamped": TwistStamped, + "left_controller_output": PoseStamped, + "right_controller_output": PoseStamped, + "buttons": Buttons, + "video_stats": VideoStats, +} + + +def generate_report(db_path: Path, out_dir: Path | None = None) -> Path: + """Write ``report.md`` (+ PNGs) for the recording at *db_path*. + + Output lands in *out_dir* if given, else next to the .db. Returns the + written report.md path. Raises if the .db is missing or unreadable. + """ + db_path = Path(db_path) + if not db_path.exists(): + raise FileNotFoundError(f"Recording not found: {db_path}") + if out_dir is None: + out_dir = db_path.parent + out_dir = Path(out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + # Pull each stream's rows out of the SqliteStore + decode by typed schema. + store = SqliteStore(path=str(db_path)) + store.start() + try: + records = _read_all(store) + finally: + store.stop() + + # Per-message-stream → summary stats. video_stats is a separate shape. + twist_streams = {n: r for n, r in records.items() if n != "video_stats" and r} + summaries = {name: _summary(rs, stall_factor=3.0) for name, rs in twist_streams.items()} + active = {n: s for n, s in summaries.items() if s.get("rate_hz")} + video_summary = _summarize_video(records.get("video_stats", [])) + + duration_s = _run_duration(records) + timestamp = datetime.fromtimestamp(db_path.stat().st_mtime).strftime("%Y%m%d_%H%M%S") + + graph_lines = _write_graphs(out_dir, active, twist_streams) + md = _format_report(timestamp, duration_s, active, video_summary, graph_lines) + + report_path = out_dir / "report.md" + report_path.write_text(md) + logger.info("Report written to %s", report_path) + return report_path + + +def _read_all(store: SqliteStore) -> dict[str, list[Any]]: + """Pull every known teleop stream out of *store*, decoded to typed msgs. + + Streams not in this recording yield empty lists. Each list is ordered by + insertion (which equals arrival order, the recorder writes synchronously + in the message-arrival thread). + """ + available = set(store.list_streams()) + out: dict[str, list[Any]] = {} + for name, msg_type in _STREAM_TYPES.items(): + if name not in available: + out[name] = [] + continue + stream: Any = store.stream(name, msg_type) + # Stream.__iter__ yields Observation[T]; we want (ts, payload) so the + # stats math (which reads .ts / .seq on the payload) matches what the + # benchmark module did with in-memory msgs. + out[name] = [obs.data for obs in stream] + return out + + +def _run_duration(records: dict[str, list[Any]]) -> float: + """Wall-clock span across every stream in this recording.""" + all_ts: list[float] = [] + for rs in records.values(): + all_ts.extend(getattr(m, "ts", 0.0) for m in rs if getattr(m, "ts", 0.0) > 0) + if len(all_ts) < 2: + return 0.0 + return max(all_ts) - min(all_ts) + + +def _summary(records: list[Any], stall_factor: float = 3.0) -> dict[str, Any]: + """Stats for one twist/pose/buttons stream. + + Computed from each message's ``.ts`` (sender stamp, clock-sync calibrated) + and ``.seq`` where present. We treat .ts as the arrival time too because + the recorder doesn't persist a separate wall-arrival stamp — for these + streams in practice the recorder writes within microseconds of arrival, so + inter-stamp deltas track inter-arrival deltas closely. + + Buttons lacks ``.ts``/``.seq``, so rate/jitter/loss are all ``None``. + """ + count = len(records) + tss = [float(m.ts) for m in records if getattr(m, "ts", None) is not None] + seqs = [int(m.seq) for m in records if getattr(m, "seq", None) is not None] + + intervals_ms = (np.diff(sorted(tss)) * 1000.0).tolist() if len(tss) >= 2 else [] + span = (tss[-1] - tss[0]) if len(tss) >= 2 else 0.0 + + stalls: list[float] = [] + if intervals_ms: + stall_thresh = stall_factor * float(np.median(intervals_ms)) + stalls = [iv for iv in intervals_ms if iv > stall_thresh] + + return { + "count": count, + "rate_hz": (len(tss) - 1) / span if span > 0 else None, + "jitter_ms": pcts(intervals_ms), + "loss_pct": loss_pct(seqs), + "reorder_count": reorder_count(seqs), + "stall_count": len(stalls), + "stall_total_s": sum(stalls) / 1000.0, + # No wall-arrival stored, so report .ts-based span as E2E is moot + # here. The live HUD has the real E2E (it knows wall − sender ts). + "e2e_ms": None, + } + + +def _summarize_video(samples: list[VideoStats]) -> dict[str, Any] | None: + """Aggregate per-sample VideoStats into report figures, or None. + + fps/kbps/loss/jbuf/decode → p50+p95 percentiles. Resolution → modal WxH. + dropped/freezes → run totals (the operator's monotonic counters). + """ + if not samples: + return None + + def col(attr: str) -> list[float]: + return [float(getattr(s, attr)) for s in samples] + + resolutions = [f"{s.width}x{s.height}" for s in samples if s.width and s.height] + resolution = Counter(resolutions).most_common(1)[0][0] if resolutions else "n/a" + + return { + "count": len(samples), + "resolution": resolution, + "fps": pcts(col("fps")), + "kbps": pcts(col("kbps")), + "loss_pct": pcts(col("loss_pct")), + "jitter_buffer_ms": pcts(col("jitter_buffer_ms")), + "decode_ms": pcts(col("decode_ms")), + "frames_dropped": max((s.frames_dropped for s in samples), default=0), + "freezes": max((s.freezes for s in samples), default=0), + } + + +def _write_graphs( + out_dir: Path, + active: dict[str, dict[str, Any]], + records: dict[str, list[Any]], +) -> list[str]: + """Render ``latency.png`` + ``jitter.png`` into *out_dir*. + + Returns markdown lines that embed them, or an empty list if there's + nothing to plot or matplotlib is unavailable (the report still writes). + """ + try: + import matplotlib + + matplotlib.use("Agg") + import matplotlib.pyplot as plt + except Exception: + logger.warning("matplotlib unavailable — report has no graphs") + return [] + + refs: list[str] = [] + + jitter = [] + for name in active: + msgs = records.get(name, []) + tss = sorted(float(m.ts) for m in msgs if getattr(m, "ts", 0.0) > 0) + if len(tss) >= 2: + jitter.append((name, (np.diff(tss) * 1000.0).tolist())) + + if jitter: + fig, ax = plt.subplots(figsize=(9, 3.2)) + for n, intervals in jitter: + ax.hist(intervals, bins=40, alpha=0.6, label=n) + ax.set( + xlabel="inter-arrival interval (ms)", + ylabel="count", + title="Inter-arrival jitter", + ) + ax.legend(fontsize=8) + fig.tight_layout() + fig.savefig(out_dir / "jitter.png", dpi=110) + plt.close(fig) + refs += ["![jitter](jitter.png)", ""] + + return ["## Graphs", "", *refs] if refs else [] + + +def _format_report( + timestamp: str, + duration_s: float, + active: dict[str, dict[str, Any]], + video: dict[str, Any] | None, + graph_lines: list[str], +) -> str: + # If an external tool (e.g. data/notes/benchmarks/netem/apply.sh) left a + # profile name at this path, record it in the report header. + netem_profile: str | None = None + try: + netem_profile = Path("/tmp/dimos_netem_profile").read_text().strip() or None + except OSError: + pass + + lines = [ + "# Hosted Teleop Recording Report", + "", + f"- **Timestamp:** {timestamp}", + f"- **Duration:** {duration_s:.1f} s", + f"- **Active streams:** {len(active)}", + *([f"- **netem profile:** {netem_profile}"] if netem_profile else []), + "", + "> Generated from the recording's `.db` at session stop. Stream stats " + "are computed from each message's sender timestamp (clock-sync " + "calibrated) and monotonic `seq`. Rate, jitter, loss are clock-" + "independent; video stats come from the operator's `getStats()`.", + "", + ] + if not active: + lines.append("_No messages received on any stream._") + lines += _video_lines(video) + return "\n".join(lines) + "\n" + + for name, s in active.items(): + jitter = s["jitter_ms"] + loss = s["loss_pct"] + + checks: list[str] = [] + if loss is not None: + checks.append(f"loss {'PASS' if loss < _LOSS_THRESHOLD_PCT else 'WARN'} ({loss:.2f}%)") + e2e = s["e2e_ms"] + if e2e is not None: + checks.append(f"E2E {classify_e2e(e2e['p50'])} (p50 {e2e['p50']:.0f}ms)") + + loss_line = f"{loss:.2f}%" if loss is not None else "n/a (no seq)" + jitter_line = ( + f"- Jitter (ms): p50 {jitter['p50']:.1f} / p95 {jitter['p95']:.1f} " + f"/ p99 {jitter['p99']:.1f} / max {jitter['max']:.1f}" + if jitter + else "- Jitter: n/a (need ≥2 messages)" + ) + + lines += [ + f"## {name}", + "", + f"- Messages: {s['count']}", + f"- Rate: {s['rate_hz']:.2f} Hz" if s["rate_hz"] else "- Rate: n/a", + jitter_line, + f"- Loss: {loss_line}", + f"- Reorder: {s['reorder_count']}", + f"- Stalls: {s['stall_count']} ({s['stall_total_s']:.2f} s total)", + f"- **Checks:** {', '.join(checks) if checks else 'n/a'}", + "", + ] + lines += _video_lines(video) + lines += graph_lines + return "\n".join(lines) + "\n" + + +def _video_lines(video: dict[str, Any] | None) -> list[str]: + """Render the operator-side video health section, or a hint if absent. + + These come from the operator's ``pc.getStats()`` (receive side) relayed + over ``state_reliable`` — the robot's send side can't see what actually + arrived. Empty when no operator was streaming video during the run. + """ + if not video: + return [ + "## Video (operator receive-side)", + "", + "_No video_stats received — connect an operator with video to capture them._", + "", + ] + + def pp(stats: dict[str, float] | None, unit: str = "") -> str: + if not stats: + return "n/a" + return f"p50 {stats['p50']:.1f}{unit} / p95 {stats['p95']:.1f}{unit}" + + return [ + "## Video (operator receive-side)", + "", + f"- Samples: {video['count']}", + f"- Resolution (modal): {video['resolution']}", + f"- FPS: {pp(video['fps'])}", + f"- Bitrate: {pp(video['kbps'], ' kbps')}", + f"- Packet loss: {pp(video['loss_pct'], '%')}", + f"- Jitter buffer: {pp(video['jitter_buffer_ms'], ' ms')}", + f"- Decode time: {pp(video['decode_ms'], ' ms')}", + f"- Frames dropped (total): {video['frames_dropped']}", + f"- Freezes (total): {video['freezes']}", + "", + ] + + +def main() -> None: + """CLI: ``python -m dimos.teleop.utils.report ``.""" + if len(sys.argv) != 2: + print(f"usage: python -m {__name__} ", file=sys.stderr) + sys.exit(2) + out = generate_report(Path(sys.argv[1])) + print(out) + + +if __name__ == "__main__": + main() + + +__all__ = ["generate_report"] diff --git a/dimos/teleop/utils/stream_stats.py b/dimos/teleop/utils/stream_stats.py new file mode 100644 index 0000000000..3f85da3381 --- /dev/null +++ b/dimos/teleop/utils/stream_stats.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Stat helpers for teleop streams (latency / jitter / loss / reorder). + +Two flavors live here: + +* **Pure functions** (`pcts`, `loss_pct`, `reorder_count`, `classify_e2e`) — + shared by the post-hoc report writer (``teleop/utils/report.py``) and any + live stats consumer. +* **`LiveStreamStats`** — a rolling-window class for always-on consumers that + only need a recent snapshot (e.g. the operator HUD's command-plane telemetry). +""" + +from __future__ import annotations + +from collections import deque +from collections.abc import Sequence +from itertools import pairwise +import threading +import time + +import numpy as np + + +def pcts(values: Sequence[float]) -> dict[str, float] | None: + """p50/p95/p99/max of *values* in their native unit, or None if empty.""" + if not values: + return None + a = np.asarray(values, dtype=float) + return { + "p50": float(np.percentile(a, 50)), + "p95": float(np.percentile(a, 95)), + "p99": float(np.percentile(a, 99)), + "max": float(a.max()), + } + + +def loss_pct(seqs: Sequence[int]) -> float | None: + """Loss % from gaps in a monotonic sequence; None if fewer than 2 samples. + + ``loss = 1 - distinct_received / (max_seq - min_seq + 1)``. Reorders and + duplicates do not inflate it — only genuinely missing seq values count. + """ + valid = [s for s in seqs if s is not None] + if len(valid) < 2: + return None + expected = max(valid) - min(valid) + 1 + received = len(set(valid)) + return max(0.0, (1.0 - received / expected) * 100.0) + + +def reorder_count(seqs: Sequence[int]) -> int: + """Count messages that arrived with a seq below an already-seen maximum.""" + count = 0 + running_max = -1 + for s in seqs: + if s is None: + continue + if s < running_max: + count += 1 + else: + running_max = s + return count + + +def classify_e2e( + p50_ms: float, + bands: Sequence[tuple[float, str]] = ( + (50.0, "excellent"), + (100.0, "good"), + (150.0, "usable"), + ), +) -> str: + """Map an E2E p50 latency to an acceptance band label. + + ``bands`` is an ascending list of (threshold_ms, label) pairs; anything past + the last threshold falls into ``"degraded"``. A negative p50 (operator/robot + clocks not synced) returns ``"clock skew"``. + """ + if p50_ms < 0: + return "clock skew" + for threshold, label in bands: + if p50_ms < threshold: + return label + return "degraded" + + +class LiveStreamStats: + """Rolling-window health for an always-on stream consumer. + + Records ``(wall, ts, seq)`` per arrival in a bounded deque so old samples + fall off automatically; ``snapshot()`` returns the current window's median + E2E latency, median inter-arrival jitter, seq-gap loss, and arrival rate. + Thread-safe — ``record()`` runs on the transport callback, + ``snapshot()`` on a separate reader. + """ + + def __init__(self, window: int = 120) -> None: + self._lock = threading.Lock() + # (wall_arrival, ts, seq); ts/seq are None when the stream is unstamped. + self._samples: deque[tuple[float, float | None, int | None]] = deque(maxlen=window) + + def record(self, ts: float | None, seq: int | None) -> None: + """Note an inbound message's send-stamp + monotonic counter.""" + with self._lock: + self._samples.append((time.time(), ts, seq)) + + def snapshot(self) -> dict[str, float | None] | None: + """Median latency/jitter (ms), loss (%), rate (Hz), or None. + + Returns ``None`` until at least two samples have landed (one inter-arrival + interval is needed). Uses the module's shared ``pcts`` / ``loss_pct`` so + the math matches the benchmark module's report. + """ + with self._lock: + samples = list(self._samples) + if len(samples) < 2: + return None + + arrivals = [w for w, _, _ in samples] + intervals_ms = [(b - a) * 1000.0 for a, b in pairwise(arrivals)] + e2e_ms = [(w - ts) * 1000.0 for w, ts, _ in samples if ts is not None] + seqs = [s for _, _, s in samples if s is not None] + + e2e = pcts(e2e_ms) + jit = pcts(intervals_ms) + span = arrivals[-1] - arrivals[0] + return { + "latency_ms": e2e["p50"] if e2e else None, + "jitter_ms": jit["p50"] if jit else None, + "loss_pct": loss_pct(seqs), + "rate_hz": (len(samples) - 1) / span if span > 0 else None, + } + + +__all__ = ["LiveStreamStats", "classify_e2e", "loss_pct", "pcts", "reorder_count"] diff --git a/dimos/teleop/utils/test_video_stats.py b/dimos/teleop/utils/test_video_stats.py new file mode 100644 index 0000000000..95b3749899 --- /dev/null +++ b/dimos/teleop/utils/test_video_stats.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Roundtrip + browser-payload tests for VideoStats.""" + +from dimos_lcm.sensor_msgs import Joy as LCMJoy + +from dimos.teleop.utils.video_stats import VideoStats + + +def _sample() -> VideoStats: + return VideoStats( + ts=1700_000_000.5, + frame_id="video", + fps=28.0, + kbps=2100.5, + width=1280, + height=720, + loss_pct=2.1, + jitter_buffer_ms=28.0, + decode_ms=6.0, + frames_dropped=2, + freezes=14, + ) + + +def test_lcm_roundtrip_preserves_all_fields() -> None: + original = _sample() + decoded = VideoStats.lcm_decode(original.lcm_encode()) + assert decoded == original + # ts is float; round-trips through LCM int sec + int nsec, so allow small slack. + assert abs(decoded.ts - original.ts) < 1e-6 + + +def test_from_dict_matches_browser_payload_shape() -> None: + """Browser ships exactly this JSON shape over state_reliable.""" + payload = { + "type": "video_stats", # extra key ignored + "fps": 28.0, + "kbps": 2100.5, + "width": 1280, + "height": 720, + "loss_pct": 2.1, + "jitter_ms": 5.5, # extra unused key in current payload + "frames_dropped": 2, + "freezes": 14, + "jitter_buffer_ms": 28.0, + "decode_ms": 6.0, + } + stats = VideoStats.from_dict(payload) + assert stats.fps == 28.0 + assert stats.kbps == 2100.5 + assert stats.width == 1280 + assert stats.height == 720 + assert stats.frames_dropped == 2 + assert stats.freezes == 14 + + +def test_decode_pads_short_axes_for_forward_compat() -> None: + """An older recording with fewer metrics should still decode (zero-fill).""" + short = LCMJoy() + short.header.frame_id = "video" + short.header.stamp.sec = 100 + short.header.stamp.nsec = 0 + short.axes_length = 3 + short.axes = [25.0, 1500.0, 640.0] # only fps, kbps, width + short.buttons_length = 0 + short.buttons = [] + + decoded = VideoStats.lcm_decode(short.lcm_encode()) + assert decoded.fps == 25.0 + assert decoded.kbps == 1500.0 + assert decoded.width == 640 + # Missing fields zero-filled. + assert decoded.height == 0 + assert decoded.freezes == 0 + + +def test_carrier_is_lcm_joy() -> None: + """Encoded bytes are a valid LCMJoy — proves we ride on the existing type.""" + blob = _sample().lcm_encode() + j = LCMJoy.lcm_decode(blob) + expected = [28.0, 2100.5, 1280.0, 720.0, 2.1, 28.0, 6.0, 2.0, 14.0] + # Joy.axes is float32[], so compare with tolerance. + assert len(j.axes) == len(expected) + for got, want in zip(j.axes, expected, strict=True): + assert abs(got - want) < 1e-5, f"axis mismatch: {got} vs {want}" + assert j.header.frame_id == "video" + + +if __name__ == "__main__": + test_lcm_roundtrip_preserves_all_fields() + test_from_dict_matches_browser_payload_shape() + test_decode_pads_short_axes_for_forward_compat() + test_carrier_is_lcm_joy() + print("all OK") diff --git a/dimos/teleop/utils/video_stats.py b/dimos/teleop/utils/video_stats.py new file mode 100644 index 0000000000..8546d82f5f --- /dev/null +++ b/dimos/teleop/utils/video_stats.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Operator-side video health snapshot, sampled in the browser via getStats(). + +Same trick as ``Buttons`` (which rides on ``UInt32`` with bit packing): we +don't need a new LCM type, we ride on ``sensor_msgs.Joy`` with positional +``axes[]`` slots for each metric. That keeps the message wire-compatible with +the existing dimos_lcm stack while exposing named fields in Python. + +This is a teleop-only construct (not a general sensor message), so it lives +beside the other teleop helpers in ``teleop/utils`` rather than in +``dimos.msgs.sensor_msgs``. +""" + +from __future__ import annotations + +import math +import time +from typing import BinaryIO + +from dimos_lcm.sensor_msgs import Joy as LCMJoy + +from dimos.types.timestamped import Timestamped + + +def _sec_nsec(ts: float) -> list[int]: + s = int(ts) + return [s, int((ts - s) * 1_000_000_000)] + + +# Positional layout inside the carrier's axes[] array. Add new metrics at the +# end so existing recordings stay readable. +_AXES = ( + "fps", + "kbps", + "width", + "height", + "loss_pct", + "jitter_buffer_ms", + "decode_ms", + "frames_dropped", + "freezes", +) + + +class VideoStats(Timestamped): + """One snapshot of operator-side video health. + + Sampled by the operator's browser from ``pc.getStats()`` once per second + and shipped to the robot, where ``HostedTeleopModule`` publishes it on + an ``Out[VideoStats]`` port. The recorder captures it like any other + typed stream — no sidecar file needed. + + All fields are floats on the wire (Joy ``axes[]`` is ``float32[]``); + ``width``, ``height``, ``frames_dropped``, ``freezes`` are integer-valued + but stored as floats. float32 is exact for integers up to 2^24. + """ + + msg_name = "sensor_msgs.VideoStats" + + ts: float + frame_id: str + fps: float + kbps: float + width: int + height: int + loss_pct: float + jitter_buffer_ms: float + decode_ms: float + frames_dropped: int + freezes: int + + def __init__( + self, + ts: float = 0.0, + frame_id: str = "video", + fps: float = 0.0, + kbps: float = 0.0, + width: int = 0, + height: int = 0, + loss_pct: float = 0.0, + jitter_buffer_ms: float = 0.0, + decode_ms: float = 0.0, + frames_dropped: int = 0, + freezes: int = 0, + ) -> None: + self.ts = ts if ts != 0 else time.time() + self.frame_id = frame_id + self.fps = float(fps) + self.kbps = float(kbps) + self.width = int(width) + self.height = int(height) + self.loss_pct = float(loss_pct) + self.jitter_buffer_ms = float(jitter_buffer_ms) + self.decode_ms = float(decode_ms) + self.frames_dropped = int(frames_dropped) + self.freezes = int(freezes) + + @classmethod + def from_dict(cls, d: dict[str, float | int]) -> VideoStats: + """Build from the JSON payload the browser ships (over state_reliable).""" + return cls( + ts=float(d.get("ts", 0.0)), + frame_id=str(d.get("frame_id", "video")), + fps=float(d.get("fps", 0.0)), + kbps=float(d.get("kbps", 0.0)), + width=int(d.get("width", 0)), + height=int(d.get("height", 0)), + loss_pct=float(d.get("loss_pct", 0.0)), + jitter_buffer_ms=float(d.get("jitter_buffer_ms", 0.0)), + decode_ms=float(d.get("decode_ms", 0.0)), + frames_dropped=int(d.get("frames_dropped", 0)), + freezes=int(d.get("freezes", 0)), + ) + + def _as_axes(self) -> list[float]: + # Order must match _AXES. + return [ + self.fps, + self.kbps, + float(self.width), + float(self.height), + self.loss_pct, + self.jitter_buffer_ms, + self.decode_ms, + float(self.frames_dropped), + float(self.freezes), + ] + + def lcm_encode(self) -> bytes: + lcm_msg = LCMJoy() + [lcm_msg.header.stamp.sec, lcm_msg.header.stamp.nsec] = _sec_nsec(self.ts) + lcm_msg.header.frame_id = self.frame_id + axes = self._as_axes() + lcm_msg.axes_length = len(axes) + lcm_msg.axes = axes + lcm_msg.buttons_length = 0 + lcm_msg.buttons = [] + return lcm_msg.lcm_encode() # type: ignore[no-any-return] + + @classmethod + def lcm_decode(cls, data: bytes | BinaryIO) -> VideoStats: + lcm_msg = LCMJoy.lcm_decode(data) + axes = list(lcm_msg.axes) if lcm_msg.axes else [] + # Pad short arrays so older recordings (fewer metrics) still decode. + axes += [0.0] * (len(_AXES) - len(axes)) + return cls( + ts=lcm_msg.header.stamp.sec + (lcm_msg.header.stamp.nsec / 1_000_000_000), + frame_id=lcm_msg.header.frame_id or "video", + fps=axes[0], + kbps=axes[1], + width=int(axes[2]), + height=int(axes[3]), + loss_pct=axes[4], + jitter_buffer_ms=axes[5], + decode_ms=axes[6], + frames_dropped=int(axes[7]), + freezes=int(axes[8]), + ) + + def __str__(self) -> str: + return ( + f"VideoStats({self.width}x{self.height} {self.fps:.1f}fps " + f"{self.kbps:.0f}kbps loss={self.loss_pct:.2f}% " + f"jbuf={self.jitter_buffer_ms:.0f}ms decode={self.decode_ms:.1f}ms " + f"dropped={self.frames_dropped} freezes={self.freezes})" + ) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, VideoStats): + return False + return ( + math.isclose(self.ts, other.ts, rel_tol=1e-4, abs_tol=1e-5) + and self.frame_id == other.frame_id + and math.isclose(self.fps, other.fps, rel_tol=1e-4, abs_tol=1e-5) + and math.isclose(self.kbps, other.kbps, rel_tol=1e-4, abs_tol=1e-5) + and self.width == other.width + and self.height == other.height + and math.isclose(self.loss_pct, other.loss_pct, rel_tol=1e-4, abs_tol=1e-5) + and math.isclose( + self.jitter_buffer_ms, other.jitter_buffer_ms, rel_tol=1e-4, abs_tol=1e-5 + ) + and math.isclose(self.decode_ms, other.decode_ms, rel_tol=1e-4, abs_tol=1e-5) + and self.frames_dropped == other.frames_dropped + and self.freezes == other.freezes + ) + + +__all__ = ["VideoStats"] diff --git a/docs/capabilities/teleoperation/hosted.md b/docs/capabilities/teleoperation/hosted.md new file mode 100644 index 0000000000..5270c84781 --- /dev/null +++ b/docs/capabilities/teleoperation/hosted.md @@ -0,0 +1,144 @@ +# Hosted Teleop + +Operate a DimOS robot remotely from any browser or Quest headset over WebRTC. +The robot dials out to a hosted Cloudflare Realtime SFU broker +([teleop.dimensionalos.com](https://teleop.dimensionalos.com)), so you don't +need to open any inbound ports on the robot's network — it works behind a home +router, on Wi-Fi, or wired LAN. + +## Quick Start + +```bash +TELEOP_API_KEY=dtk_live_... \ +TELEOP_ROBOT_ID=my-robot \ +TELEOP_ROBOT_NAME="Lab Go2" \ +dimos run teleop-hosted-go2 +``` + +The robot registers with the broker. Open +[teleop.dimensionalos.com](https://teleop.dimensionalos.com), log in, and your +robot appears under **Available Robots**. Click **Connect** and you're driving. + +| Module | Role | +|--------|------| +| `HostedTeleopModule` | Dials the broker, owns the WebRTC connection, datachannels, video, clock-sync, and live telemetry | +| `HostedTwistTeleopModule` | Mobile-base subclass: scales operator commands into `cmd_vel` (m/s, rad/s) | +| `HostedArmTeleopModule` | Arm-IK subclass: per-hand pose routing to coordinator tasks | +| `Go2Module` *(in the blueprint)* | The robot driver receiving `cmd_vel` | + +## Get an API key + +1. Visit [teleop.dimensionalos.com](https://teleop.dimensionalos.com) and sign up. +2. On the dashboard, **API Keys → + New Key**. +3. Copy the key (shown once) and pass it as `TELEOP_API_KEY` when launching the blueprint. + +The key is per-robot; one key authenticates one robot. The same user account can +manage many keys for different robots. + +## Available blueprints + +| Blueprint | Use case | Subclass | +|-----------|----------|----------| +| `teleop-hosted-go2` | Mobile base (Unitree Go2, wheeled robots) | `HostedTwistTeleopModule` | +| `teleop-hosted-xarm7` | Arm IK (UFactory xArm7) | `HostedArmTeleopModule` | + +Pair with the recorder to log a session and emit a transport-stats report: + +```bash +dimos run teleop-hosted-go2 teleop-recorder +``` + +This writes `recording_teleop_.db` plus a `report.md` + jitter PNG next to +it on disconnect. Reports can also be regenerated from an existing .db: + +```bash +python -m dimos.teleop.utils.report path/to/recording.db +``` + +## Operator inputs + +The browser is modality-agnostic — it just streams whatever the device gives it, +and the robot blueprint decides what to do with it. + +| Device | Input | Maps to | +|--------|-------|---------| +| Desktop browser | **WASD** keyboard | `TwistStamped` → `cmd_vel` | +| Phone | **On-screen WASD** keys | same path as keyboard | +| Quest 3 (Twist robot) | **Left thumbstick** Y → forward/back, X → strafe; **Right thumbstick** X → yaw | `Joy` → derived twist on the robot | +| Quest 3 (Arm robot) | **Controller poses** + analog triggers | `PoseStamped` → coordinator `TeleopIKTask` | + +Shift = 2× speed, Ctrl = ½× speed and strafe on keyboard. + +## Live metrics HUD + +While connected, the operator sees a metrics overlay (corner pill in the +browser, in-headset stats panel in VR). Color-coded green/amber/red based on +video and command-plane health: + +| Metric | Source | +|--------|--------| +| `fps`, `bitrate`, `loss`, `jitter buffer`, `decode time`, `freezes` | Operator's `getStats()` on the inbound video track | +| `RTT` | NTP-style min-RTT clock sync over the reliable datachannel | +| `cmd latency`, `jitter`, `loss`, `rate` | Robot-measured from the inbound twist stream — what *actually arrived*, sent back over `state_reliable_back` | + +The robot also logs the same metrics to `journalctl`/stdout; benchmark runs +fold them into `report.md`. + +## Configuration + +`HostedTeleopConfig` (base, applies to both subclasses): + +| Field | Default | Notes | +|-------|---------|-------| +| `broker_url` | `https://teleop.dimensionalos.com` | Override with `TELEOP_BROKER_URL` to point at a self-hosted broker | +| `broker_api_key` | `""` | Required. Env: `TELEOP_API_KEY` | +| `robot_id` | `""` | Required, identifies this robot. Env: `TELEOP_ROBOT_ID` | +| `robot_name` | `""` | Display name shown in the dashboard. Env: `TELEOP_ROBOT_NAME` | +| `control_loop_hz` | `50.0` | Per-hand publish + button-state cycle | +| `heartbeat_hz` | `1.0` | HTTP heartbeat to the broker (also drives channel-id sync) | +| `telemetry_hz` | `3.0` | Robot → operator HUD command-plane stats | +| `stun_urls` | `[stun:stun.cloudflare.com:3478]` | STUN servers for ICE | +| `turn_urls`, `turn_username`, `turn_credential` | `""` | TURN credentials. Fields exist; not yet auto-provisioned. | + +`HostedTwistTeleopConfig` adds: + +| Field | Default | Notes | +|-------|---------|-------| +| `linear_speed` | `0.5` | Multiplied into `cmd_vel.linear` (m/s) | +| `angular_speed` | `0.8` | Multiplied into `cmd_vel.angular` (rad/s) | + +`HostedArmTeleopConfig` adds: + +| Field | Default | Notes | +|-------|---------|-------| +| `task_names` | `{}` | Maps `"left"`/`"right"` → coordinator task name (e.g. `"teleop_xarm"`), used as `frame_id` so the coordinator routes to the right IK task | + +## How it connects + +```text +robot broker (Cloudflare SFU) operator browser/Quest +───── ────────────────────── ────────────────────── +HostedTeleopModule + POST /api/v1/sessions ───► CF session + datachannels ◄─── POST /sessions/{id}/join + (operator joins) + cmd_unreliable ◄──── (operator → robot, lossy) ◄──── WASD / Joy / poses + state_reliable ◄──── (operator → robot, json) ◄──── ping, video_stats + state_reliable_back ────► (robot → operator, json) ────► pong, robot_telemetry + video track ────► CF publishes + pulls ────►