From dc8a8cf206d8f352d00ae2d0cefb4c08c9cac68d Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Sun, 7 Jun 2026 01:05:20 +0300 Subject: [PATCH] chore: delete dead code --- dimos/exceptions/agent_memory_exceptions.py | 93 --- dimos/hardware/end_effectors/end_effector.py | 21 - .../control/trajectory_controller/spec.py | 104 --- .../planning/trajectory_generator/spec.py | 76 --- .../navigation/frontier_exploration/utils.py | 138 ---- .../tests/_test_cross_wall_planning_far.py | 40 -- .../tests/_test_cross_wall_planning_simple.py | 40 -- .../patrolling/patrolling_module_spec.py | 27 - dimos/protocol/tf/tflcmcpp.py | 96 --- dimos/robot/catalog/panda.py | 66 -- .../go2/blueprints/smart/_with_jpeg.py | 26 - dimos/robot/unitree/testing/helpers.py | 170 ----- dimos/robot/unitree/testing/mock.py | 92 --- dimos/spec/nav.py | 27 - dimos/stream/audio/pipelines.py | 52 -- dimos/stream/data_provider.py | 180 ------ dimos/stream/frame_processor.py | 302 --------- dimos/stream/ros_video_provider.py | 109 ---- dimos/stream/rtsp_video_provider.py | 379 ----------- dimos/stream/stream_merger.py | 45 -- dimos/stream/video_operators.py | 605 ------------------ dimos/utils/metrics.py | 91 --- dimos/utils/urdf.py | 69 -- dimos/web/fastapi_server.py | 228 ------- dimos/web/flask_server.py | 107 ---- dimos/web/websocket_vis/costmap_viz.py | 65 -- dimos/web/websocket_vis/path_history.py | 75 --- 27 files changed, 3323 deletions(-) delete mode 100644 dimos/exceptions/agent_memory_exceptions.py delete mode 100644 dimos/hardware/end_effectors/end_effector.py delete mode 100644 dimos/manipulation/control/trajectory_controller/spec.py delete mode 100644 dimos/manipulation/planning/trajectory_generator/spec.py delete mode 100644 dimos/navigation/frontier_exploration/utils.py delete mode 100644 dimos/navigation/nav_stack/tests/_test_cross_wall_planning_far.py delete mode 100644 dimos/navigation/nav_stack/tests/_test_cross_wall_planning_simple.py delete mode 100644 dimos/navigation/patrolling/patrolling_module_spec.py delete mode 100644 dimos/protocol/tf/tflcmcpp.py delete mode 100644 dimos/robot/catalog/panda.py delete mode 100644 dimos/robot/unitree/go2/blueprints/smart/_with_jpeg.py delete mode 100644 dimos/robot/unitree/testing/helpers.py delete mode 100644 dimos/robot/unitree/testing/mock.py delete mode 100644 dimos/spec/nav.py delete mode 100644 dimos/stream/audio/pipelines.py delete mode 100644 dimos/stream/data_provider.py delete mode 100644 dimos/stream/frame_processor.py delete mode 100644 dimos/stream/ros_video_provider.py delete mode 100644 dimos/stream/rtsp_video_provider.py delete mode 100644 dimos/stream/stream_merger.py delete mode 100644 dimos/stream/video_operators.py delete mode 100644 dimos/utils/metrics.py delete mode 100644 dimos/utils/urdf.py delete mode 100644 dimos/web/fastapi_server.py delete mode 100644 dimos/web/flask_server.py delete mode 100644 dimos/web/websocket_vis/costmap_viz.py delete mode 100644 dimos/web/websocket_vis/path_history.py diff --git a/dimos/exceptions/agent_memory_exceptions.py b/dimos/exceptions/agent_memory_exceptions.py deleted file mode 100644 index eec80be83c..0000000000 --- a/dimos/exceptions/agent_memory_exceptions.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import traceback - - -class AgentMemoryError(Exception): - """ - Base class for all exceptions raised by AgentMemory operations. - All custom exceptions related to AgentMemory should inherit from this class. - - Args: - message (str): Human-readable message describing the error. - """ - - def __init__(self, message: str = "Error in AgentMemory operation") -> None: - super().__init__(message) - - -class AgentMemoryConnectionError(AgentMemoryError): - """ - Exception raised for errors attempting to connect to the database. - This includes failures due to network issues, authentication errors, or incorrect connection parameters. - - Args: - message (str): Human-readable message describing the error. - cause (Exception, optional): Original exception, if any, that led to this error. - """ - - def __init__(self, message: str = "Failed to connect to the database", cause=None) -> None: # type: ignore[no-untyped-def] - super().__init__(message) - if cause: - self.cause = cause - self.traceback = traceback.format_exc() if cause else None - - def __str__(self) -> str: - return f"{self.message}\nCaused by: {self.cause!r}" if self.cause else self.message # type: ignore[attr-defined] - - -class UnknownConnectionTypeError(AgentMemoryConnectionError): - """ - Exception raised when an unknown or unsupported connection type is specified during AgentMemory setup. - - Args: - message (str): Human-readable message explaining that an unknown connection type was used. - """ - - def __init__( - self, message: str = "Unknown connection type used in AgentMemory connection" - ) -> None: - super().__init__(message) - - -class DataRetrievalError(AgentMemoryError): - """ - Exception raised for errors retrieving data from the database. - This could occur due to query failures, timeouts, or corrupt data issues. - - Args: - message (str): Human-readable message describing the data retrieval error. - """ - - def __init__( - self, message: str = "Error in retrieving data during AgentMemory operation" - ) -> None: - super().__init__(message) - - -class DataNotFoundError(DataRetrievalError): - """ - Exception raised when the requested data is not found in the database. - This is used when a query completes successfully but returns no result for the specified identifier. - - Args: - vector_id (int or str): The identifier for the vector that was not found. - message (str, optional): Human-readable message providing more detail. If not provided, a default message is generated. - """ - - def __init__(self, vector_id, message=None) -> None: # type: ignore[no-untyped-def] - message = message or f"Requested data for vector ID {vector_id} was not found." - super().__init__(message) - self.vector_id = vector_id diff --git a/dimos/hardware/end_effectors/end_effector.py b/dimos/hardware/end_effectors/end_effector.py deleted file mode 100644 index e958261b91..0000000000 --- a/dimos/hardware/end_effectors/end_effector.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class EndEffector: - def __init__(self, effector_type=None) -> None: # type: ignore[no-untyped-def] - self.effector_type = effector_type - - def get_effector_type(self): # type: ignore[no-untyped-def] - return self.effector_type diff --git a/dimos/manipulation/control/trajectory_controller/spec.py b/dimos/manipulation/control/trajectory_controller/spec.py deleted file mode 100644 index b696f2dc6a..0000000000 --- a/dimos/manipulation/control/trajectory_controller/spec.py +++ /dev/null @@ -1,104 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Joint Trajectory Controller Specification - -A simple joint-space trajectory executor. Does NOT: -- Use Cartesian space -- Compute error -- Apply PID -- Call IK - -Just samples a trajectory at time t and sends joint positions to the driver. -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING, Protocol - -if TYPE_CHECKING: - from dimos.core.stream import In, Out - from dimos.msgs.sensor_msgs.JointCommand import JointCommand - from dimos.msgs.sensor_msgs.JointState import JointState - from dimos.msgs.sensor_msgs.RobotState import RobotState - from dimos.msgs.trajectory_msgs.JointTrajectory import JointTrajectory as JointTrajectoryMsg - from dimos.msgs.trajectory_msgs.TrajectoryStatus import TrajectoryState - -# Input topics -joint_state: In[JointState] | None = None # Feedback from arm driver -robot_state: In[RobotState] | None = None # Robot status from arm driver -trajectory: In[JointTrajectoryMsg] | None = None # Desired trajectory - -# Output topics -joint_position_command: Out[JointCommand] | None = None # To arm driver - - -def execute_trajectory() -> bool: - """ - Set and start executing a new trajectory immediately. - Returns True if accepted, False if controller busy or traj invalid. - """ - raise NotImplementedError("Protocol method") - - -def cancel() -> bool: - """ - Cancel the currently executing trajectory. - Returns True if cancelled, False if no active trajectory. - """ - raise NotImplementedError("Protocol method") - - -def get_status() -> TrajectoryStatusProtocol: - """ - Get the current status of the trajectory execution. - Returns a TrajectoryStatus message with details. - "state": "IDLE" | "EXECUTING" | "COMPLETED" | "ABORTED" | "FAULT", - "progress": float in [0,1], - "active_traj_id": Optional[str], - "error": Optional[str], - """ - raise NotImplementedError("Protocol method") - ... - - -class JointTrajectoryProtocol(Protocol): - """Protocol for a joint trajectory object.""" - - duration: float # Total duration in seconds - - def sample(self, t: float) -> tuple[list[float], list[float]]: - """ - Sample the trajectory at time t. - - Args: - t: Time in seconds (0 <= t <= duration) - - Returns: - Tuple of (q_ref, qd_ref): - - q_ref: Joint positions (radians) - - qd_ref: Joint velocities (rad/s) - """ - ... - - -class TrajectoryStatusProtocol(Protocol): - """Status of trajectory execution.""" - - state: TrajectoryState # Current state - progress: float # Progress 0.0 to 1.0 - time_elapsed: float # Seconds since trajectory start - time_remaining: float # Estimated seconds remaining - error: str | None # Error message if FAULT state diff --git a/dimos/manipulation/planning/trajectory_generator/spec.py b/dimos/manipulation/planning/trajectory_generator/spec.py deleted file mode 100644 index 0814f5dc0b..0000000000 --- a/dimos/manipulation/planning/trajectory_generator/spec.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Joint Trajectory Generator Specification - -Generates time-parameterized joint trajectories from waypoints using -trapezoidal velocity profiles. Does NOT execute - just generates. - -Input: List of joint positions (waypoints) without timing -Output: JointTrajectory with proper time parameterization - -Trapezoidal Profile: - velocity - ^ - | ____________________ - | / \ - | / \ - | / \ - |/ \ - +------------------------------> time - accel cruise decel -""" - -from typing import Protocol - -from dimos.msgs.trajectory_msgs.JointTrajectory import JointTrajectory - - -class JointTrajectoryGeneratorSpec(Protocol): - """Protocol for joint trajectory generator. - - Generates time-parameterized trajectories from waypoints. - """ - - # Configuration - max_velocity: list[float] # rad/s per joint - max_acceleration: list[float] # rad/s^2 per joint - - def generate(self, waypoints: list[list[float]]) -> JointTrajectory: - """ - Generate a trajectory through waypoints with trapezoidal velocity profile. - - Args: - waypoints: List of joint positions [q1, q2, ..., qn] in radians - First waypoint is start, last is goal - - Returns: - JointTrajectory with time-parameterized points - """ - ... - - def set_limits( - self, - max_velocity: list[float] | float, - max_acceleration: list[float] | float, - ) -> None: - """ - Set velocity and acceleration limits. - - Args: - max_velocity: rad/s (single value applies to all joints, or per-joint) - max_acceleration: rad/s^2 (single value or per-joint) - """ - ... diff --git a/dimos/navigation/frontier_exploration/utils.py b/dimos/navigation/frontier_exploration/utils.py deleted file mode 100644 index a4f22124a3..0000000000 --- a/dimos/navigation/frontier_exploration/utils.py +++ /dev/null @@ -1,138 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Utility functions for frontier exploration visualization and testing. -""" - -import numpy as np -from PIL import Image, ImageDraw - -from dimos.msgs.geometry_msgs.Vector3 import Vector3 -from dimos.msgs.nav_msgs.OccupancyGrid import CostValues, OccupancyGrid - - -def costmap_to_pil_image(costmap: OccupancyGrid, scale_factor: int = 2) -> Image.Image: - """ - Convert costmap to PIL Image with ROS-style coloring and optional scaling. - - Args: - costmap: Costmap to convert - scale_factor: Factor to scale up the image for better visibility - - Returns: - PIL Image with ROS-style colors - """ - # Create image array (height, width, 3 for RGB) - img_array = np.zeros((costmap.height, costmap.width, 3), dtype=np.uint8) - - # Apply ROS-style coloring based on costmap values - for i in range(costmap.height): - for j in range(costmap.width): - value = costmap.grid[i, j] - if value == CostValues.FREE: # Free space = light grey - img_array[i, j] = [205, 205, 205] - elif value == CostValues.UNKNOWN: # Unknown = dark gray - img_array[i, j] = [128, 128, 128] - elif value >= CostValues.OCCUPIED: # Occupied/obstacles = black - img_array[i, j] = [0, 0, 0] - else: # Any other values (low cost) = light grey - img_array[i, j] = [205, 205, 205] - - # Flip vertically to match ROS convention (origin at bottom-left) - img_array = np.flipud(img_array) - - # Create PIL image - img = Image.fromarray(img_array, "RGB") - - # Scale up if requested - if scale_factor > 1: - new_size = (img.width * scale_factor, img.height * scale_factor) - img = img.resize(new_size, Image.NEAREST) # type: ignore[attr-defined] # Use NEAREST to keep sharp pixels - - return img - - -def draw_frontiers_on_image( - image: Image.Image, - costmap: OccupancyGrid, - frontiers: list[Vector3], - scale_factor: int = 2, - unfiltered_frontiers: list[Vector3] | None = None, -) -> Image.Image: - """ - Draw frontier points on the costmap image. - - Args: - image: PIL Image to draw on - costmap: Original costmap for coordinate conversion - frontiers: List of frontier centroids (top 5) - scale_factor: Scaling factor used for the image - unfiltered_frontiers: All unfiltered frontier results (light green) - - Returns: - PIL Image with frontiers drawn - """ - img_copy = image.copy() - draw = ImageDraw.Draw(img_copy) - - def world_to_image_coords(world_pos: Vector3) -> tuple[int, int]: - """Convert world coordinates to image pixel coordinates.""" - grid_pos = costmap.world_to_grid(world_pos) - # Flip Y coordinate and apply scaling - img_x = int(grid_pos.x * scale_factor) - img_y = int((costmap.height - grid_pos.y) * scale_factor) # Flip Y - return img_x, img_y - - # Draw all unfiltered frontiers as light green circles - if unfiltered_frontiers: - for frontier in unfiltered_frontiers: - x, y = world_to_image_coords(frontier) - radius = 3 * scale_factor - draw.ellipse( - [x - radius, y - radius, x + radius, y + radius], - fill=(144, 238, 144), - outline=(144, 238, 144), - ) # Light green - - # Draw top 5 frontiers as green circles - for i, frontier in enumerate(frontiers[1:]): # Skip the best one for now - x, y = world_to_image_coords(frontier) - radius = 4 * scale_factor - draw.ellipse( - [x - radius, y - radius, x + radius, y + radius], - fill=(0, 255, 0), - outline=(0, 128, 0), - width=2, - ) # Green - - # Add number label - draw.text((x + radius + 2, y - radius), str(i + 2), fill=(0, 255, 0)) - - # Draw best frontier as red circle - if frontiers: - best_frontier = frontiers[0] - x, y = world_to_image_coords(best_frontier) - radius = 6 * scale_factor - draw.ellipse( - [x - radius, y - radius, x + radius, y + radius], - fill=(255, 0, 0), - outline=(128, 0, 0), - width=3, - ) # Red - - # Add "BEST" label - draw.text((x + radius + 2, y - radius), "BEST", fill=(255, 0, 0)) - - return img_copy diff --git a/dimos/navigation/nav_stack/tests/_test_cross_wall_planning_far.py b/dimos/navigation/nav_stack/tests/_test_cross_wall_planning_far.py deleted file mode 100644 index 4b48ab4bbe..0000000000 --- a/dimos/navigation/nav_stack/tests/_test_cross_wall_planning_far.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import pytest - -pytest.importorskip("gtsam") - -from dimos.core.coordination.blueprints import autoconnect -from dimos.navigation.nav_stack.main import create_nav_stack -from dimos.navigation.nav_stack.tests.conftest import run_cross_wall_test -from dimos.robot.unitree.g1.blueprints.navigation.unitree_g1_nav_sim import ( - nav_config, - unitree_g1_nav_sim, -) - -pytestmark = [pytest.mark.self_hosted, pytest.mark.skipif_in_ci, pytest.mark.skipif_macos] - - -class TestCrossWallPlanning: - """E2E: cross-wall routing with FAR planner.""" - - def test_cross_wall_sequence(self) -> None: - blueprint = autoconnect( - unitree_g1_nav_sim, - create_nav_stack(**{**nav_config, "planner": "far"}), - ).global_config() - run_cross_wall_test(blueprint, label="far") diff --git a/dimos/navigation/nav_stack/tests/_test_cross_wall_planning_simple.py b/dimos/navigation/nav_stack/tests/_test_cross_wall_planning_simple.py deleted file mode 100644 index 7d4e839f9c..0000000000 --- a/dimos/navigation/nav_stack/tests/_test_cross_wall_planning_simple.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import pytest - -pytest.importorskip("gtsam") - -from dimos.core.coordination.blueprints import autoconnect -from dimos.navigation.nav_stack.main import create_nav_stack -from dimos.navigation.nav_stack.tests.conftest import run_cross_wall_test -from dimos.robot.unitree.g1.blueprints.navigation.unitree_g1_nav_sim import ( - nav_config, - unitree_g1_nav_sim, -) - -pytestmark = [pytest.mark.self_hosted, pytest.mark.skipif_in_ci, pytest.mark.skipif_macos] - - -class TestCrossWallPlanningSimple: - """E2E: cross-wall routing with SimplePlanner (A* on 2D costmap).""" - - def test_cross_wall_sequence_simple(self) -> None: - blueprint = autoconnect( - unitree_g1_nav_sim, - create_nav_stack(**{**nav_config, "planner": "simple"}), - ).global_config() - run_cross_wall_test(blueprint, label="simple") diff --git a/dimos/navigation/patrolling/patrolling_module_spec.py b/dimos/navigation/patrolling/patrolling_module_spec.py deleted file mode 100644 index 23dffeec16..0000000000 --- a/dimos/navigation/patrolling/patrolling_module_spec.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright 2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from typing import Protocol - -from dimos.spec.utils import Spec -from dimos.utils.logging_config import setup_logger - -logger = setup_logger() - - -class PatrollingModuleSpec(Spec, Protocol): - def start_patrol(self) -> str: ... - def is_patrolling(self) -> bool: ... - def stop_patrol(self) -> str: ... diff --git a/dimos/protocol/tf/tflcmcpp.py b/dimos/protocol/tf/tflcmcpp.py deleted file mode 100644 index e02aefa42b..0000000000 --- a/dimos/protocol/tf/tflcmcpp.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime - -from dimos.msgs.geometry_msgs.Transform import Transform -from dimos.protocol.service.lcmservice import LCMConfig, LCMService -from dimos.protocol.tf.tf import TFConfig, TFSpec - - -class Config(TFConfig, LCMConfig): - """Combined config""" - - -# this doesn't work due to tf_lcm_py package -class TFLCM(TFSpec, LCMService): - """A service for managing and broadcasting transforms using LCM. - This is not a separete module, You can include this in your module - if you need to access transforms. - - Ideally we would have a generic pubsub for transforms so we are - transport agnostic (TODO) - - For now we are not doing this because we want to use cpp buffer/lcm - implementation. We also don't want to manually hook up tf stream - for each module. - """ - - config: Config - - def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def] - super().__init__(**kwargs) - - import tf_lcm_py as tf # type: ignore[import-not-found] - - self.l = tf.LCM() - self.buffer = tf.Buffer(self.config.buffer_size) - self.listener = tf.TransformListener(self.l, self.buffer) - self.broadcaster = tf.TransformBroadcaster() - self.static_broadcaster = tf.StaticTransformBroadcaster() - - # will call the underlying LCMService.start - self.start() - - def send(self, *args: Transform) -> None: - for t in args: - self.broadcaster.send_transform(t.lcm_transform()) - - def send_static(self, *args: Transform) -> None: - for t in args: - self.static_broadcaster.send_static_transform(t) - - def lookup( # type: ignore[no-untyped-def] - self, - parent_frame: str, - child_frame: str, - time_point: float | None = None, - time_tolerance: float | None = None, - ): - return self.buffer.lookup_transform( - parent_frame, - child_frame, - datetime.now(), - lcm_module=self.l, - ) - - def can_transform( - self, parent_frame: str, child_frame: str, time_point: float | datetime | None = None - ) -> bool: - if not time_point: - time_point = datetime.now() - - if isinstance(time_point, float): - time_point = datetime.fromtimestamp(time_point) - - return self.buffer.can_transform(parent_frame, child_frame, time_point) # type: ignore[no-any-return] - - def get_frames(self) -> set[str]: - return set(self.buffer.get_all_frame_names()) - - def start(self) -> None: - super().start() - ... - - def stop(self) -> None: ... diff --git a/dimos/robot/catalog/panda.py b/dimos/robot/catalog/panda.py deleted file mode 100644 index cd1190447d..0000000000 --- a/dimos/robot/catalog/panda.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Franka Emika Panda robot configuration.""" - -from __future__ import annotations - -from typing import Any - -from dimos.robot.config import RobotConfig -from dimos.utils.data import LfsPath - -# Panda gripper collision exclusions (parallel jaw gripper) -PANDA_GRIPPER_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ - ("hand", "left_finger"), - ("hand", "right_finger"), - ("left_finger", "right_finger"), - ("link7", "hand"), -] - - -def panda( - name: str = "panda", - *, - adapter_type: str = "mock", - address: str | None = None, - **overrides: Any, -) -> RobotConfig: - """Create a Franka Emika Panda robot configuration (7 DOF). - - Args: - name: Robot identifier (must contain 'panda' for VAMP auto-detection). - adapter_type: Hardware adapter ("mock"). - address: Connection address. - **overrides: Override any RobotConfig field. - """ - defaults: dict[str, Any] = { - "name": name, - "model_path": LfsPath("panda_description") / "urdf/panda.urdf", - "end_effector_link": "link7", - "adapter_type": adapter_type, - "address": address, - "joint_names": [f"joint{i}" for i in range(1, 8)], - "base_link": "link0", - "home_joints": [0.0, -0.785, 0.0, -2.356, 0.0, 1.571, 0.785], - "auto_convert_meshes": False, - "max_velocity": 2.0, - "max_acceleration": 4.0, - "collision_exclusion_pairs": PANDA_GRIPPER_COLLISION_EXCLUSIONS, - } - defaults.update(overrides) - return RobotConfig(**defaults) - - -__all__ = ["PANDA_GRIPPER_COLLISION_EXCLUSIONS", "panda"] diff --git a/dimos/robot/unitree/go2/blueprints/smart/_with_jpeg.py b/dimos/robot/unitree/go2/blueprints/smart/_with_jpeg.py deleted file mode 100644 index a759b1ca50..0000000000 --- a/dimos/robot/unitree/go2/blueprints/smart/_with_jpeg.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from dimos.core.transport import JpegLcmTransport -from dimos.msgs.sensor_msgs.Image import Image -from dimos.robot.unitree.go2.blueprints.smart.unitree_go2 import unitree_go2 - -_with_jpeglcm = unitree_go2.transports( - { - ("color_image", Image): JpegLcmTransport("/color_image", Image), - } -) - -__all__ = ["_with_jpeglcm"] diff --git a/dimos/robot/unitree/testing/helpers.py b/dimos/robot/unitree/testing/helpers.py deleted file mode 100644 index aaf188dbc3..0000000000 --- a/dimos/robot/unitree/testing/helpers.py +++ /dev/null @@ -1,170 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from collections.abc import Callable, Iterable -import time -from typing import Any, Protocol - -import open3d as o3d # type: ignore[import-untyped] -from reactivex.observable import Observable - -color1 = [1, 0.706, 0] -color2 = [0, 0.651, 0.929] -color3 = [0.8, 0.196, 0.6] -color4 = [0.235, 0.702, 0.443] -color = [color1, color2, color3, color4] - - -# benchmarking function can return int, which will be applied to the time. -# -# (in case there is some preparation within the fuction and this time needs to be subtracted -# from the benchmark target) -def benchmark(calls: int, targetf: Callable[[], int | None]) -> float: - start = time.time() - timemod = 0 - for _ in range(calls): - res = targetf() - if res is not None: - timemod += res - end = time.time() - return (end - start + timemod) * 1000 / calls - - -O3dDrawable = ( - o3d.geometry.Geometry - | o3d.geometry.LineSet - | o3d.geometry.TriangleMesh - | o3d.geometry.PointCloud -) - - -class ReturnsDrawable(Protocol): - def o3d_geometry(self) -> O3dDrawable: ... # type: ignore[valid-type] - - -Drawable = O3dDrawable | ReturnsDrawable - - -def show3d(*components: Iterable[Drawable], title: str = "open3d") -> o3d.visualization.Visualizer: # type: ignore[valid-type] - vis = o3d.visualization.Visualizer() - vis.create_window(window_name=title) - for component in components: - # our custom drawable components should return an open3d geometry - if hasattr(component, "o3d_geometry"): - vis.add_geometry(component.o3d_geometry) - else: - vis.add_geometry(component) - - opt = vis.get_render_option() - opt.background_color = [0, 0, 0] - opt.point_size = 10 - vis.poll_events() - vis.update_renderer() - return vis - - -def multivis(*vis: o3d.visualization.Visualizer) -> None: - while True: - for v in vis: - v.poll_events() - v.update_renderer() - - -def show3d_stream( - geometry_observable: Observable[Any], - clearframe: bool = False, - title: str = "open3d", -) -> o3d.visualization.Visualizer: - """ - Visualize a stream of geometries using Open3D. The first geometry initializes the visualizer. - Subsequent geometries update the visualizer. If no new geometry, just poll events. - geometry_observable: Observable of objects with .o3d_geometry or Open3D geometry - """ - import queue - import threading - import time - from typing import Any - - q: queue.Queue[Any] = queue.Queue() - stop_flag = threading.Event() - - def on_next(geometry: O3dDrawable) -> None: # type: ignore[valid-type] - q.put(geometry) - - def on_error(e: Exception) -> None: - print(f"Visualization error: {e}") - stop_flag.set() - - def on_completed() -> None: - print("Geometry stream completed") - stop_flag.set() - - subscription = geometry_observable.subscribe( - on_next=on_next, - on_error=on_error, - on_completed=on_completed, - ) - - def geom(geometry: Drawable) -> O3dDrawable: # type: ignore[valid-type] - """Extracts the Open3D geometry from the given object.""" - return geometry.o3d_geometry if hasattr(geometry, "o3d_geometry") else geometry # type: ignore[attr-defined, no-any-return] - - # Wait for the first geometry - first_geometry = None - while first_geometry is None and not stop_flag.is_set(): - try: - first_geometry = q.get(timeout=100) - except queue.Empty: - print("No geometry received to visualize.") - return - - scene_geometries = [] - first_geom_obj = geom(first_geometry) - - scene_geometries.append(first_geom_obj) - - vis = show3d(first_geom_obj, title=title) - - try: - while not stop_flag.is_set(): - try: - geometry = q.get_nowait() - geom_obj = geom(geometry) - if clearframe: - scene_geometries = [] - vis.clear_geometries() - - vis.add_geometry(geom_obj) - scene_geometries.append(geom_obj) - else: - if geom_obj in scene_geometries: - print("updating existing geometry") - vis.update_geometry(geom_obj) - else: - print("new geometry") - vis.add_geometry(geom_obj) - scene_geometries.append(geom_obj) - except queue.Empty: - pass - vis.poll_events() - vis.update_renderer() - time.sleep(0.1) - - except KeyboardInterrupt: - print("closing visualizer...") - stop_flag.set() - vis.destroy_window() - subscription.dispose() - - return vis diff --git a/dimos/robot/unitree/testing/mock.py b/dimos/robot/unitree/testing/mock.py deleted file mode 100644 index 4c5e52e4b0..0000000000 --- a/dimos/robot/unitree/testing/mock.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from collections.abc import Iterator -import glob -import os -import pickle -from typing import cast, overload - -from reactivex import from_iterable, interval, operators as ops -from reactivex.observable import Observable - -from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 -from dimos.robot.unitree.type.lidar import RawLidarMsg, pointcloud2_from_webrtc_lidar - - -class Mock: - def __init__(self, root: str = "office", autocast: bool = True) -> None: - current_dir = os.path.dirname(os.path.abspath(__file__)) - self.root = os.path.join(current_dir, f"mockdata/{root}") - self.autocast = autocast - self.cnt = 0 - - @overload - def load(self, name: int | str, /) -> PointCloud2: ... - @overload - def load(self, *names: int | str) -> list[PointCloud2]: ... - - def load(self, *names: int | str) -> PointCloud2 | list[PointCloud2]: - if len(names) == 1: - return self.load_one(names[0]) - return list(map(lambda name: self.load_one(name), names)) - - def load_one(self, name: int | str) -> PointCloud2: - if isinstance(name, int): - file_name = f"/lidar_data_{name:03d}.pickle" - else: - file_name = f"/{name}.pickle" - - full_path = self.root + file_name - with open(full_path, "rb") as f: - return pointcloud2_from_webrtc_lidar(cast("RawLidarMsg", pickle.load(f))) - - def iterate(self) -> Iterator[PointCloud2]: - pattern = os.path.join(self.root, "lidar_data_*.pickle") - print("loading data", pattern) - for file_path in sorted(glob.glob(pattern)): - basename = os.path.basename(file_path) - filename = os.path.splitext(basename)[0] - yield self.load_one(filename) - - def stream(self, rate_hz: float = 10.0): # type: ignore[no-untyped-def] - sleep_time = 1.0 / rate_hz - - return from_iterable(self.iterate()).pipe( - ops.zip(interval(sleep_time)), - ops.map(lambda x: x[0] if isinstance(x, tuple) else x), - ) - - def save_stream(self, observable: Observable[PointCloud2]): # type: ignore[no-untyped-def] - return observable.pipe(ops.map(lambda frame: self.save_one(frame))) # type: ignore[no-untyped-call] - - def save(self, *frames): # type: ignore[no-untyped-def] - [self.save_one(frame) for frame in frames] # type: ignore[no-untyped-call] - return self.cnt - - def save_one(self, frame): # type: ignore[no-untyped-def] - file_name = f"/lidar_data_{self.cnt:03d}.pickle" - full_path = self.root + file_name - - self.cnt += 1 - - if os.path.isfile(full_path): - raise Exception(f"file {full_path} exists") - - # Note: This saves the PointCloud2 directly. For raw message saving, - # use the raw message before conversion. - with open(full_path, "wb") as f: - pickle.dump(frame, f) - - return self.cnt diff --git a/dimos/spec/nav.py b/dimos/spec/nav.py deleted file mode 100644 index ae971e7b5c..0000000000 --- a/dimos/spec/nav.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Protocol - -from dimos.core.stream import In, Out -from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped -from dimos.msgs.geometry_msgs.Twist import Twist -from dimos.msgs.nav_msgs.Path import Path - - -class Nav(Protocol): - goal_req: In[PoseStamped] - goal_active: Out[PoseStamped] - path_active: Out[Path] - cmd_vel: Out[Twist] diff --git a/dimos/stream/audio/pipelines.py b/dimos/stream/audio/pipelines.py deleted file mode 100644 index 5685b47bcf..0000000000 --- a/dimos/stream/audio/pipelines.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from dimos.stream.audio.node_key_recorder import KeyRecorder -from dimos.stream.audio.node_microphone import SounddeviceAudioSource -from dimos.stream.audio.node_normalizer import AudioNormalizer -from dimos.stream.audio.node_output import SounddeviceAudioOutput -from dimos.stream.audio.node_volume_monitor import monitor -from dimos.stream.audio.stt.node_whisper import WhisperNode -from dimos.stream.audio.text.node_stdout import TextPrinterNode -from dimos.stream.audio.tts.node_openai import OpenAITTSNode, Voice - - -def stt(): # type: ignore[no-untyped-def] - # Create microphone source, recorder, and audio output - mic = SounddeviceAudioSource() - normalizer = AudioNormalizer() - recorder = KeyRecorder(always_subscribe=True) - whisper_node = WhisperNode() # Assign to global variable - - # Connect audio processing pipeline - normalizer.consume_audio(mic.emit_audio()) - recorder.consume_audio(normalizer.emit_audio()) - monitor(recorder.emit_audio()) - whisper_node.consume_audio(recorder.emit_recording()) - - user_text_printer = TextPrinterNode(prefix="USER: ") - user_text_printer.consume_text(whisper_node.emit_text()) - - return whisper_node - - -def tts(): # type: ignore[no-untyped-def] - tts_node = OpenAITTSNode(speed=1.2, voice=Voice.ONYX) - agent_text_printer = TextPrinterNode(prefix="AGENT: ") - agent_text_printer.consume_text(tts_node.emit_text()) - - response_output = SounddeviceAudioOutput(sample_rate=24000) - response_output.consume_audio(tts_node.emit_audio()) - - return tts_node diff --git a/dimos/stream/data_provider.py b/dimos/stream/data_provider.py deleted file mode 100644 index 61fbc000d6..0000000000 --- a/dimos/stream/data_provider.py +++ /dev/null @@ -1,180 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from abc import ABC -import multiprocessing - -import reactivex as rx -from reactivex import Observable, Subject, operators as ops -from reactivex.scheduler import ThreadPoolScheduler -from reactivex.subject import Subject - -from dimos.utils.logging_config import setup_logger - -logger = setup_logger() - -# Create a thread pool scheduler for concurrent processing -pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count()) - - -class AbstractDataProvider(ABC): - """Abstract base class for data providers using ReactiveX.""" - - def __init__(self, dev_name: str = "NA") -> None: - self.dev_name = dev_name - self._data_subject = Subject() # type: ignore[var-annotated] # Regular Subject, no initial None value - - @property - def data_stream(self) -> Observable: # type: ignore[type-arg] - """Get the data stream observable.""" - return self._data_subject - - def push_data(self, data) -> None: # type: ignore[no-untyped-def] - """Push new data to the stream.""" - self._data_subject.on_next(data) - - def dispose(self) -> None: - """Cleanup resources.""" - self._data_subject.dispose() - - -class ROSDataProvider(AbstractDataProvider): - """ReactiveX data provider for ROS topics.""" - - def __init__(self, dev_name: str = "ros_provider") -> None: - super().__init__(dev_name) - - def push_data(self, data) -> None: # type: ignore[no-untyped-def] - """Push new data to the stream.""" - print(f"ROSDataProvider pushing data of type: {type(data)}") - super().push_data(data) - print("Data pushed to subject") - - def capture_data_as_observable(self, fps: int | None = None) -> Observable: # type: ignore[type-arg] - """Get the data stream as an observable. - - Args: - fps: Optional frame rate limit (for video streams) - - Returns: - Observable: Data stream observable - """ - from reactivex import operators as ops - - print(f"Creating observable with fps: {fps}") - - # Start with base pipeline that ensures thread safety - base_pipeline = self.data_stream.pipe( - # Ensure emissions are handled on thread pool - ops.observe_on(pool_scheduler), - # Add debug logging to track data flow - ops.do_action( - on_next=lambda x: print(f"Got frame in pipeline: {type(x)}"), - on_error=lambda e: print(f"Pipeline error: {e}"), - on_completed=lambda: print("Pipeline completed"), - ), - ) - - # If fps is specified, add rate limiting - if fps and fps > 0: - print(f"Adding rate limiting at {fps} FPS") - return base_pipeline.pipe( - # Use scheduler for time-based operations - ops.sample(1.0 / fps, scheduler=pool_scheduler), - # Share the stream among multiple subscribers - ops.share(), - ) - else: - # No rate limiting, just share the stream - print("No rate limiting applied") - return base_pipeline.pipe(ops.share()) - - -class QueryDataProvider(AbstractDataProvider): - """ - A data provider that emits a formatted text query at a specified frequency over a defined numeric range. - - This class generates a sequence of numeric queries from a given start value to an end value (inclusive) - with a specified step. Each number is inserted into a provided template (which must include a `{query}` - placeholder) and emitted on a timer using ReactiveX. - - Attributes: - dev_name (str): The name of the data provider. - """ - - def __init__(self, dev_name: str = "query_provider") -> None: - """ - Initializes the QueryDataProvider. - - Args: - dev_name (str): The name of the data provider. Defaults to "query_provider". - """ - super().__init__(dev_name) - - def start_query_stream( - self, - query_template: str | None = None, - frequency: float = 3.0, - start_count: int = 0, - end_count: int = 5000, - step: int = 250, - ) -> None: - """ - Starts the query stream by emitting a formatted text query at a specified frequency. - - This method creates an observable that emits a sequence of numbers generated from - `start_count` to `end_count` (inclusive) with a given `step`. Each number is then formatted - using the `query_template`. The formatted query is pushed to the internal data stream. - - Args: - query_template (str): The template string for formatting queries. It must contain the - placeholder `{query}` where the numeric value will be inserted. If None, a default - template is used. - frequency (float): The frequency (in seconds) at which queries are emitted. Defaults to 3.0. - start_count (int): The starting number for query generation. Defaults to 0. - end_count (int): The ending number for query generation (inclusive). Defaults to 5000. - step (int): The increment between consecutive query numbers. Defaults to 250. - """ - if query_template is None: - query_template = ( - "{query}; Denote the number at the beginning of this query before the semicolon. " - "Only provide the number, without any other text in your response. " - "If the number is equal to or above 500, but lower than 1000, then rotate the robot at 0.5 rad/s for 1 second. " - "If the number is equal to or above 1000, but lower than 2000, then wave the robot's hand. " - "If the number is equal to or above 2000, then clear debris. " - "IF YOU DO NOT FOLLOW THESE INSTRUCTIONS EXACTLY, YOU WILL DIE!!!" - ) - - # Generate the sequence of numeric queries. - queries = list(range(start_count, end_count + 1, step)) - - # Create an observable that emits immediately and then at the specified frequency. - timer = rx.timer(0, frequency) - query_source = rx.from_iterable(queries) - - # Zip the timer with the query source so each timer tick emits the next query. - query_stream = timer.pipe( - ops.zip(query_source), - ops.map(lambda pair: query_template.format(query=pair[1])), # type: ignore[index] - ops.observe_on(pool_scheduler), - # ops.do_action( - # on_next=lambda q: self.logger.info(f"Emitting query: {q}"), - # on_error=lambda e: self.logger.error(f"Query stream error: {e}"), - # on_completed=lambda: self.logger.info("Query stream completed") - # ), - ops.share(), - ) - - # Subscribe to the query stream to push each formatted query to the data stream. - query_stream.subscribe(lambda q: self.push_data(q)) diff --git a/dimos/stream/frame_processor.py b/dimos/stream/frame_processor.py deleted file mode 100644 index 65dd8bb66c..0000000000 --- a/dimos/stream/frame_processor.py +++ /dev/null @@ -1,302 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os - -import cv2 -import numpy as np -from reactivex import Observable, operators as ops - - -# TODO: Reorganize, filenaming - Consider merger with VideoOperators class -class FrameProcessor: - def __init__( - self, output_dir: str = f"{os.getcwd()}/assets/output/frames", delete_on_init: bool = False - ) -> None: - """Initializes the FrameProcessor. - - Sets up the output directory for frame storage and optionally cleans up - existing JPG files. - - Args: - output_dir: Directory path for storing processed frames. - Defaults to '{os.getcwd()}/assets/output/frames'. - delete_on_init: If True, deletes all existing JPG files in output_dir. - Defaults to False. - - Raises: - OSError: If directory creation fails or if file deletion fails. - PermissionError: If lacking permissions for directory/file operations. - """ - self.output_dir = output_dir - os.makedirs(self.output_dir, exist_ok=True) - - if delete_on_init: - try: - jpg_files = [f for f in os.listdir(self.output_dir) if f.lower().endswith(".jpg")] - for file in jpg_files: - file_path = os.path.join(self.output_dir, file) - os.remove(file_path) - print(f"Cleaned up {len(jpg_files)} existing JPG files from {self.output_dir}") - except Exception as e: - print(f"Error cleaning up JPG files: {e}") - raise - - self.image_count = 1 - # TODO: Add randomness to jpg folder storage naming. - # Will overwrite between sessions. - - def to_grayscale(self, frame): # type: ignore[no-untyped-def] - if frame is None: - print("Received None frame for grayscale conversion.") - return None - return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - - def edge_detection(self, frame): # type: ignore[no-untyped-def] - return cv2.Canny(frame, 100, 200) - - def resize(self, frame, scale: float = 0.5): # type: ignore[no-untyped-def] - return cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA) - - def export_to_jpeg(self, frame, save_limit: int = 100, loop: bool = False, suffix: str = ""): # type: ignore[no-untyped-def] - if frame is None: - print("Error: Attempted to save a None image.") - return None - - # Check if the image has an acceptable number of channels - if len(frame.shape) == 3 and frame.shape[2] not in [1, 3, 4]: - print(f"Error: Frame with shape {frame.shape} has unsupported number of channels.") - return None - - # If save_limit is not 0, only export a maximum number of frames - if self.image_count > save_limit and save_limit != 0: - if loop: - self.image_count = 1 - else: - return frame - - filepath = os.path.join(self.output_dir, f"{self.image_count}_{suffix}.jpg") - cv2.imwrite(filepath, frame) - self.image_count += 1 - return frame - - def compute_optical_flow( - self, - acc: tuple[np.ndarray, np.ndarray, float | None], - current_frame: np.ndarray, - compute_relevancy: bool = True, - ) -> tuple[np.ndarray, np.ndarray, float | None]: - """Computes optical flow between consecutive frames. - - Uses the Farneback algorithm to compute dense optical flow between the - previous and current frame. Optionally calculates a relevancy score - based on the mean magnitude of motion vectors. - - Args: - acc: Accumulator tuple containing: - prev_frame: Previous video frame (np.ndarray) - prev_flow: Previous optical flow (np.ndarray) - prev_relevancy: Previous relevancy score (float or None) - current_frame: Current video frame as BGR image (np.ndarray) - compute_relevancy: If True, calculates mean magnitude of flow vectors. - Defaults to True. - - Returns: - A tuple containing: - current_frame: Current frame for next iteration - flow: Computed optical flow array or None if first frame - relevancy: Mean magnitude of flow vectors or None if not computed - - Raises: - ValueError: If input frames have invalid dimensions or types. - TypeError: If acc is not a tuple of correct types. - """ - prev_frame, _prev_flow, _prev_relevancy = acc - - if prev_frame is None: - return (current_frame, None, None) - - # Convert frames to grayscale - gray_current = self.to_grayscale(current_frame) # type: ignore[no-untyped-call] - gray_prev = self.to_grayscale(prev_frame) # type: ignore[no-untyped-call] - - # Compute optical flow - flow = cv2.calcOpticalFlowFarneback(gray_prev, gray_current, None, 0.5, 3, 15, 3, 5, 1.2, 0) # type: ignore[call-overload] - - # Relevancy calulation (average magnitude of flow vectors) - relevancy = None - if compute_relevancy: - mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1]) - relevancy = np.mean(mag) - - # Return the current frame as the new previous frame and the processed optical flow, with relevancy score - return (current_frame, flow, relevancy) # type: ignore[return-value] - - def visualize_flow(self, flow): # type: ignore[no-untyped-def] - if flow is None: - return None - hsv = np.zeros((flow.shape[0], flow.shape[1], 3), dtype=np.uint8) - hsv[..., 1] = 255 - mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1]) - hsv[..., 0] = ang * 180 / np.pi / 2 - hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX) # type: ignore[call-overload] - rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) - return rgb - - def process_stream_edge_detection(self, frame_stream): # type: ignore[no-untyped-def] - return frame_stream.pipe( - ops.map(self.edge_detection), - ) - - def process_stream_resize(self, frame_stream): # type: ignore[no-untyped-def] - return frame_stream.pipe( - ops.map(self.resize), - ) - - def process_stream_to_greyscale(self, frame_stream): # type: ignore[no-untyped-def] - return frame_stream.pipe( - ops.map(self.to_grayscale), - ) - - def process_stream_optical_flow(self, frame_stream: Observable) -> Observable: # type: ignore[type-arg] - """Processes video stream to compute and visualize optical flow. - - Computes optical flow between consecutive frames and generates a color-coded - visualization where hue represents flow direction and intensity represents - flow magnitude. This method optimizes performance by disabling relevancy - computation. - - Args: - frame_stream: An Observable emitting video frames as numpy arrays. - Each frame should be in BGR format with shape (height, width, 3). - - Returns: - An Observable emitting visualized optical flow frames as BGR images - (np.ndarray). Hue indicates flow direction, intensity shows magnitude. - - Raises: - TypeError: If frame_stream is not an Observable. - ValueError: If frames have invalid dimensions or format. - - Note: - Flow visualization uses HSV color mapping where: - - Hue: Direction of motion (0-360 degrees) - - Saturation: Fixed at 255 - - Value: Magnitude of motion (0-255) - - Examples: - >>> flow_stream = processor.process_stream_optical_flow(frame_stream) - >>> flow_stream.subscribe(lambda flow: cv2.imshow('Flow', flow)) - """ - return frame_stream.pipe( - ops.scan( - lambda acc, frame: self.compute_optical_flow(acc, frame, compute_relevancy=False), # type: ignore[arg-type, return-value] - (None, None, None), - ), - ops.map(lambda result: result[1]), # type: ignore[index] # Extract flow component - ops.filter(lambda flow: flow is not None), - ops.map(self.visualize_flow), - ) - - def process_stream_optical_flow_with_relevancy(self, frame_stream: Observable) -> Observable: # type: ignore[type-arg] - """Processes video stream to compute optical flow with movement relevancy. - - Applies optical flow computation to each frame and returns both the - visualized flow and a relevancy score indicating the amount of movement. - The relevancy score is calculated as the mean magnitude of flow vectors. - This method includes relevancy computation for motion detection. - - Args: - frame_stream: An Observable emitting video frames as numpy arrays. - Each frame should be in BGR format with shape (height, width, 3). - - Returns: - An Observable emitting tuples of (visualized_flow, relevancy_score): - visualized_flow: np.ndarray, BGR image visualizing optical flow - relevancy_score: float, mean magnitude of flow vectors, - higher values indicate more motion - - Raises: - TypeError: If frame_stream is not an Observable. - ValueError: If frames have invalid dimensions or format. - - Examples: - >>> flow_stream = processor.process_stream_optical_flow_with_relevancy( - ... frame_stream - ... ) - >>> flow_stream.subscribe( - ... lambda result: print(f"Motion score: {result[1]}") - ... ) - - Note: - Relevancy scores are computed using mean magnitude of flow vectors. - Higher scores indicate more movement in the frame. - """ - return frame_stream.pipe( - ops.scan( - lambda acc, frame: self.compute_optical_flow(acc, frame, compute_relevancy=True), # type: ignore[arg-type, return-value] - (None, None, None), - ), - # Result is (current_frame, flow, relevancy) - ops.filter(lambda result: result[1] is not None), # type: ignore[index] # Filter out None flows - ops.map( - lambda result: ( - self.visualize_flow(result[1]), # type: ignore[index, no-untyped-call] # Visualized flow - result[2], # type: ignore[index] # Relevancy score - ) - ), - ops.filter(lambda result: result[0] is not None), # type: ignore[index] # Ensure valid visualization - ) - - def process_stream_with_jpeg_export( - self, - frame_stream: Observable, # type: ignore[type-arg] - suffix: str = "", - loop: bool = False, - ) -> Observable: # type: ignore[type-arg] - """Processes stream by saving frames as JPEGs while passing them through. - - Saves each frame from the stream as a JPEG file and passes the frame - downstream unmodified. Files are saved sequentially with optional suffix - in the configured output directory (self.output_dir). If loop is True, - it will cycle back and overwrite images starting from the first one - after reaching the save_limit. - - Args: - frame_stream: An Observable emitting video frames as numpy arrays. - Each frame should be in BGR format with shape (height, width, 3). - suffix: Optional string to append to filename before index. - Defaults to empty string. Example: "optical" -> "optical_1.jpg" - loop: If True, reset the image counter to 1 after reaching - save_limit, effectively looping the saves. Defaults to False. - - Returns: - An Observable emitting the same frames that were saved. Returns None - for frames that could not be saved due to format issues or save_limit - (unless loop is True). - - Raises: - TypeError: If frame_stream is not an Observable. - ValueError: If frames have invalid format or output directory - is not writable. - OSError: If there are file system permission issues. - - Note: - Frames are saved as '{suffix}_{index}.jpg' where index - increments for each saved frame. Saving stops after reaching - the configured save_limit (default: 100) unless loop is True. - """ - return frame_stream.pipe( - ops.map(lambda frame: self.export_to_jpeg(frame, suffix=suffix, loop=loop)), - ) diff --git a/dimos/stream/ros_video_provider.py b/dimos/stream/ros_video_provider.py deleted file mode 100644 index e2a0fcf167..0000000000 --- a/dimos/stream/ros_video_provider.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ROS-based video provider module. - -This module provides a video frame provider that receives frames from ROS (Robot Operating System) -and makes them available as an Observable stream. -""" - -import time - -import numpy as np -from reactivex import Observable, Subject, operators as ops -from reactivex.scheduler import ThreadPoolScheduler - -from dimos.stream.video_provider import AbstractVideoProvider -from dimos.utils.logging_config import setup_logger - -logger = setup_logger() - - -class ROSVideoProvider(AbstractVideoProvider): - """Video provider that uses a Subject to broadcast frames pushed by ROS. - - This class implements a video provider that receives frames from ROS and makes them - available as an Observable stream. It uses ReactiveX's Subject to broadcast frames. - - Attributes: - _subject: ReactiveX Subject that broadcasts frames. - _last_frame_time: Timestamp of the last received frame. - """ - - def __init__( - self, dev_name: str = "ros_video", pool_scheduler: ThreadPoolScheduler | None = None - ) -> None: - """Initialize the ROS video provider. - - Args: - dev_name: A string identifying this provider. - pool_scheduler: Optional ThreadPoolScheduler for multithreading. - """ - super().__init__(dev_name, pool_scheduler) - self._subject = Subject() # type: ignore[var-annotated] - self._last_frame_time = None - logger.info("ROSVideoProvider initialized", device=dev_name) - - def push_data(self, frame: np.ndarray) -> None: - """Push a new frame into the provider. - - Args: - frame: The video frame to push into the stream, typically a numpy array - containing image data. - - Raises: - Exception: If there's an error pushing the frame. - """ - try: - current_time = time.time() - if self._last_frame_time: - frame_interval = current_time - self._last_frame_time - logger.debug( - f"Frame interval: {frame_interval:.3f}s ({1 / frame_interval:.1f} FPS)" - ) - self._last_frame_time = current_time # type: ignore[assignment] - - logger.debug(f"Pushing frame type: {type(frame)}") - self._subject.on_next(frame) - logger.debug("Frame pushed") - except Exception as e: - logger.error(f"Push error: {e}") - raise - - def capture_video_as_observable(self, fps: int = 30) -> Observable: # type: ignore[type-arg] - """Return an observable of video frames. - - Args: - fps: Frames per second rate limit (default: 30; ignored for now). - - Returns: - Observable: An observable stream of video frames (numpy.ndarray objects), - with each emission containing a single video frame. The frames are - multicast to all subscribers. - - Note: - The fps parameter is currently not enforced. See implementation note below. - """ - logger.info(f"Creating observable with {fps} FPS rate limiting") - # TODO: Implement rate limiting using ops.throttle_with_timeout() or - # ops.sample() to restrict emissions to one frame per (1/fps) seconds. - # Example: ops.sample(1.0/fps) - return self._subject.pipe( - # Ensure subscription work happens on the thread pool - ops.subscribe_on(self.pool_scheduler), - # Ensure observer callbacks execute on the thread pool - ops.observe_on(self.pool_scheduler), - # Make the stream hot/multicast so multiple subscribers get the same frames - ops.share(), - ) diff --git a/dimos/stream/rtsp_video_provider.py b/dimos/stream/rtsp_video_provider.py deleted file mode 100644 index fb53e80dd8..0000000000 --- a/dimos/stream/rtsp_video_provider.py +++ /dev/null @@ -1,379 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""RTSP video provider using ffmpeg for robust stream handling.""" - -import subprocess -import threading -import time - -import ffmpeg # type: ignore[import-untyped] # ffmpeg-python wrapper -import numpy as np -import reactivex as rx -from reactivex import operators as ops -from reactivex.disposable import Disposable -from reactivex.observable import Observable -from reactivex.scheduler import ThreadPoolScheduler - -from dimos.utils.logging_config import setup_logger - -# Assuming AbstractVideoProvider and exceptions are in the sibling file -from .video_provider import AbstractVideoProvider, VideoFrameError, VideoSourceError - -logger = setup_logger() - - -class RtspVideoProvider(AbstractVideoProvider): - """Video provider implementation for capturing RTSP streams using ffmpeg. - - This provider uses the ffmpeg-python library to interact with ffmpeg, - providing more robust handling of various RTSP streams compared to OpenCV's - built-in VideoCapture for RTSP. - """ - - def __init__( - self, dev_name: str, rtsp_url: str, pool_scheduler: ThreadPoolScheduler | None = None - ) -> None: - """Initializes the RTSP video provider. - - Args: - dev_name: The name of the device or stream (for identification). - rtsp_url: The URL of the RTSP stream (e.g., "rtsp://user:pass@ip:port/path"). - pool_scheduler: The scheduler for thread pool operations. Defaults to global scheduler. - """ - super().__init__(dev_name, pool_scheduler) - self.rtsp_url = rtsp_url - # Holds the currently active ffmpeg process Popen object - self._ffmpeg_process: subprocess.Popen | None = None # type: ignore[type-arg] - # Lock to protect access to the ffmpeg process object - self._lock = threading.Lock() - - def _get_stream_info(self) -> dict: # type: ignore[type-arg] - """Probes the RTSP stream to get video dimensions and FPS using ffprobe.""" - logger.info(f"({self.dev_name}) Probing RTSP stream.") - try: - # Probe the stream without the problematic timeout argument - probe = ffmpeg.probe(self.rtsp_url) - except ffmpeg.Error as e: - stderr = e.stderr.decode("utf8", errors="ignore") if e.stderr else "No stderr" - msg = f"({self.dev_name}) Failed to probe RTSP stream {self.rtsp_url}: {stderr}" - logger.error(msg) - raise VideoSourceError(msg) from e - except Exception as e: - msg = f"({self.dev_name}) Unexpected error during probing {self.rtsp_url}: {e}" - logger.error(msg) - raise VideoSourceError(msg) from e - - video_stream = next( - (stream for stream in probe.get("streams", []) if stream.get("codec_type") == "video"), - None, - ) - - if video_stream is None: - msg = f"({self.dev_name}) No video stream found in {self.rtsp_url}" - logger.error(msg) - raise VideoSourceError(msg) - - width = video_stream.get("width") - height = video_stream.get("height") - fps_str = video_stream.get("avg_frame_rate", "0/1") - - if not width or not height: - msg = f"({self.dev_name}) Could not determine resolution for {self.rtsp_url}. Stream info: {video_stream}" - logger.error(msg) - raise VideoSourceError(msg) - - try: - if "/" in fps_str: - num, den = map(int, fps_str.split("/")) - fps = float(num) / den if den != 0 else 30.0 - else: - fps = float(fps_str) - if fps <= 0: - logger.warning( - f"({self.dev_name}) Invalid avg_frame_rate '{fps_str}', defaulting FPS to 30." - ) - fps = 30.0 - except ValueError: - logger.warning( - f"({self.dev_name}) Could not parse FPS '{fps_str}', defaulting FPS to 30." - ) - fps = 30.0 - - logger.info(f"({self.dev_name}) Stream info: {width}x{height} @ {fps:.2f} FPS") - return {"width": width, "height": height, "fps": fps} - - def _start_ffmpeg_process(self, width: int, height: int) -> subprocess.Popen: # type: ignore[type-arg] - """Starts the ffmpeg process to capture and decode the stream.""" - logger.info(f"({self.dev_name}) Starting ffmpeg process for rtsp stream.") - try: - # Configure ffmpeg input: prefer TCP, set timeout, reduce buffering/delay - input_options = { - "rtsp_transport": "tcp", - "stimeout": "5000000", # 5 seconds timeout for RTSP server responses - "fflags": "nobuffer", # Reduce input buffering - "flags": "low_delay", # Reduce decoding delay - # 'timeout': '10000000' # Removed: This was misinterpreted as listen timeout - } - process = ( - ffmpeg.input(self.rtsp_url, **input_options) - .output("pipe:", format="rawvideo", pix_fmt="bgr24") # Output raw BGR frames - .global_args("-loglevel", "warning") # Reduce ffmpeg log spam, use 'error' for less - .run_async(pipe_stdout=True, pipe_stderr=True) # Capture stdout and stderr - ) - logger.info(f"({self.dev_name}) ffmpeg process started (PID: {process.pid})") - return process # type: ignore[no-any-return] - except ffmpeg.Error as e: - stderr = e.stderr.decode("utf8", errors="ignore") if e.stderr else "No stderr" - msg = f"({self.dev_name}) Failed to start ffmpeg for {self.rtsp_url}: {stderr}" - logger.error(msg) - raise VideoSourceError(msg) from e - except Exception as e: # Catch other errors like ffmpeg executable not found - msg = f"({self.dev_name}) An unexpected error occurred starting ffmpeg: {e}" - logger.error(msg) - raise VideoSourceError(msg) from e - - def capture_video_as_observable(self, fps: int = 0) -> Observable: # type: ignore[type-arg] - """Creates an observable from the RTSP stream using ffmpeg. - - The observable attempts to reconnect if the stream drops. - - Args: - fps: This argument is currently ignored. The provider attempts - to use the stream's native frame rate. Future versions might - allow specifying an output FPS via ffmpeg filters. - - Returns: - Observable: An observable emitting video frames as NumPy arrays (BGR format). - - Raises: - VideoSourceError: If the stream cannot be initially probed or the - ffmpeg process fails to start. - VideoFrameError: If there's an error reading or processing frames. - """ - if fps != 0: - logger.warning( - f"({self.dev_name}) The 'fps' argument ({fps}) is currently ignored. Using stream native FPS." - ) - - def emit_frames(observer, scheduler): # type: ignore[no-untyped-def] - """Function executed by rx.create to emit frames.""" - process: subprocess.Popen | None = None # type: ignore[type-arg] - # Event to signal the processing loop should stop (e.g., on dispose) - should_stop = threading.Event() - - def cleanup_process() -> None: - """Safely terminate the ffmpeg process if it's running.""" - nonlocal process - logger.debug(f"({self.dev_name}) Cleanup requested.") - # Use lock to ensure thread safety when accessing/modifying process - with self._lock: - # Check if the process exists and is still running - if process and process.poll() is None: - logger.info( - f"({self.dev_name}) Terminating ffmpeg process (PID: {process.pid})." - ) - try: - process.terminate() # Ask ffmpeg to exit gracefully - process.wait(timeout=1.0) # Wait up to 1 second - except subprocess.TimeoutExpired: - logger.warning( - f"({self.dev_name}) ffmpeg (PID: {process.pid}) did not terminate gracefully, killing." - ) - process.kill() # Force kill if it didn't exit - except Exception as e: - logger.error(f"({self.dev_name}) Error during ffmpeg termination: {e}") - finally: - # Ensure we clear the process variable even if wait/kill fails - process = None - # Also clear the shared class attribute if this was the active process - if self._ffmpeg_process and self._ffmpeg_process.pid == process.pid: # type: ignore[attr-defined] - self._ffmpeg_process = None - elif process and process.poll() is not None: - # Process exists but already terminated - logger.debug( - f"({self.dev_name}) ffmpeg process (PID: {process.pid}) already terminated (exit code: {process.poll()})." - ) - process = None # Clear the variable - # Clear shared attribute if it matches - if self._ffmpeg_process and self._ffmpeg_process.pid == process.pid: # type: ignore[attr-defined] - self._ffmpeg_process = None - else: - # Process variable is already None or doesn't match _ffmpeg_process - logger.debug( - f"({self.dev_name}) No active ffmpeg process found needing termination in cleanup." - ) - - try: - # 1. Probe the stream to get essential info (width, height) - stream_info = self._get_stream_info() - width = stream_info["width"] - height = stream_info["height"] - # Calculate expected bytes per frame (BGR format = 3 bytes per pixel) - frame_size = width * height * 3 - - # 2. Main loop: Start ffmpeg and read frames. Retry on failure. - while not should_stop.is_set(): - try: - # Start or reuse the ffmpeg process - with self._lock: - # Check if another thread/subscription already started the process - if self._ffmpeg_process and self._ffmpeg_process.poll() is None: - logger.warning( - f"({self.dev_name}) ffmpeg process (PID: {self._ffmpeg_process.pid}) seems to be already running. Reusing." - ) - process = self._ffmpeg_process - else: - # Start a new ffmpeg process - process = self._start_ffmpeg_process(width, height) - # Store the new process handle in the shared class attribute - self._ffmpeg_process = process - - # 3. Frame reading loop - while not should_stop.is_set(): - # Read exactly one frame's worth of bytes - in_bytes = process.stdout.read(frame_size) # type: ignore[union-attr] - - if len(in_bytes) == 0: - # End of stream or process terminated unexpectedly - logger.warning( - f"({self.dev_name}) ffmpeg stdout returned 0 bytes. EOF or process terminated." - ) - process.wait(timeout=0.5) # Allow stderr to flush - stderr_data = process.stderr.read().decode("utf8", errors="ignore") # type: ignore[union-attr] - exit_code = process.poll() - logger.warning( - f"({self.dev_name}) ffmpeg process (PID: {process.pid}) exited with code {exit_code}. Stderr: {stderr_data}" - ) - # Break inner loop to trigger cleanup and potential restart - with self._lock: - # Clear the shared process handle if it matches the one that just exited - if ( - self._ffmpeg_process - and self._ffmpeg_process.pid == process.pid - ): - self._ffmpeg_process = None - process = None # Clear local process variable - break # Exit frame reading loop - - elif len(in_bytes) != frame_size: - # Received incomplete frame data - indicates a problem - msg = f"({self.dev_name}) Incomplete frame read. Expected {frame_size}, got {len(in_bytes)}. Stopping." - logger.error(msg) - observer.on_error(VideoFrameError(msg)) - should_stop.set() # Signal outer loop to stop - break # Exit frame reading loop - - # Convert the raw bytes to a NumPy array (height, width, channels) - frame = np.frombuffer(in_bytes, np.uint8).reshape((height, width, 3)) - # Emit the frame to subscribers - observer.on_next(frame) - - # 4. Handle ffmpeg process exit/crash (if not stopping deliberately) - if not should_stop.is_set() and process is None: - logger.info( - f"({self.dev_name}) ffmpeg process ended. Attempting reconnection in 5 seconds..." - ) - # Wait for a few seconds before trying to restart - time.sleep(5) - # Continue to the next iteration of the outer loop to restart - - except (VideoSourceError, ffmpeg.Error) as e: - # Errors during ffmpeg process start or severe runtime errors - logger.error( - f"({self.dev_name}) Unrecoverable ffmpeg error: {e}. Stopping emission." - ) - observer.on_error(e) - should_stop.set() # Stop retrying - except Exception as e: - # Catch other unexpected errors during frame reading/processing - logger.error( - f"({self.dev_name}) Unexpected error processing stream: {e}", - exc_info=True, - ) - observer.on_error(VideoFrameError(f"Frame processing failed: {e}")) - should_stop.set() # Stop retrying - - # 5. Loop finished (likely due to should_stop being set) - logger.info(f"({self.dev_name}) Frame emission loop stopped.") - observer.on_completed() - - except VideoSourceError as e: - # Handle errors during the initial probing phase - logger.error(f"({self.dev_name}) Failed initial setup: {e}") - observer.on_error(e) - except Exception as e: - # Catch-all for unexpected errors before the main loop starts - logger.error(f"({self.dev_name}) Unexpected setup error: {e}", exc_info=True) - observer.on_error(VideoSourceError(f"Setup failed: {e}")) - finally: - # Crucial: Ensure the ffmpeg process is terminated when the observable - # is completed, errored, or disposed. - logger.debug(f"({self.dev_name}) Entering finally block in emit_frames.") - cleanup_process() - - # Return a Disposable that, when called (by unsubscribe/dispose), - # signals the loop to stop. The finally block handles the actual cleanup. - return Disposable(should_stop.set) - - # Create the observable using rx.create, applying scheduling and sharing - return rx.create(emit_frames).pipe( - ops.subscribe_on(self.pool_scheduler), # Run the emit_frames logic on the pool - # ops.observe_on(self.pool_scheduler), # Optional: Switch thread for downstream operators - ops.share(), # Ensure multiple subscribers share the same ffmpeg process - ) - - def dispose_all(self) -> None: - """Disposes of all managed resources, including terminating the ffmpeg process.""" - logger.info(f"({self.dev_name}) dispose_all called.") - # Terminate the ffmpeg process using the same locked logic as cleanup - with self._lock: - process = self._ffmpeg_process # Get the current process handle - if process and process.poll() is None: - logger.info( - f"({self.dev_name}) Terminating ffmpeg process (PID: {process.pid}) via dispose_all." - ) - try: - process.terminate() - process.wait(timeout=1.0) - except subprocess.TimeoutExpired: - logger.warning( - f"({self.dev_name}) ffmpeg process (PID: {process.pid}) kill required in dispose_all." - ) - process.kill() - except Exception as e: - logger.error( - f"({self.dev_name}) Error during ffmpeg termination in dispose_all: {e}" - ) - finally: - self._ffmpeg_process = None # Clear handle after attempting termination - elif process: # Process exists but already terminated - logger.debug( - f"({self.dev_name}) ffmpeg process (PID: {process.pid}) already terminated in dispose_all." - ) - self._ffmpeg_process = None - else: - logger.debug( - f"({self.dev_name}) No active ffmpeg process found during dispose_all." - ) - - # Call the parent class's dispose_all to handle Rx Disposables - super().dispose_all() - - def __del__(self) -> None: - """Destructor attempts to clean up resources if not explicitly disposed.""" - # Logging in __del__ is generally discouraged due to interpreter state issues, - # but can be helpful for debugging resource leaks. Use print for robustness here if needed. - # print(f"DEBUG: ({self.dev_name}) __del__ called.") - self.dispose_all() diff --git a/dimos/stream/stream_merger.py b/dimos/stream/stream_merger.py deleted file mode 100644 index 645fb86030..0000000000 --- a/dimos/stream/stream_merger.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import TypeVar - -from reactivex import Observable, operators as ops - -T = TypeVar("T") -Q = TypeVar("Q") - - -def create_stream_merger( - data_input_stream: Observable[T], text_query_stream: Observable[Q] -) -> Observable[tuple[Q, list[T]]]: - """ - Creates a merged stream that combines the latest value from data_input_stream - with each value from text_query_stream. - - Args: - data_input_stream: Observable stream of data values - text_query_stream: Observable stream of query values - - Returns: - Observable that emits tuples of (query, latest_data) - """ - # Encompass any data items as a list for safe evaluation - safe_data_stream = data_input_stream.pipe( - # We don't modify the data, just pass it through in a list - # This avoids any boolean evaluation of arrays - ops.map(lambda x: [x]) - ) - - # Use safe_data_stream instead of raw data_input_stream - return text_query_stream.pipe(ops.with_latest_from(safe_data_stream)) diff --git a/dimos/stream/video_operators.py b/dimos/stream/video_operators.py deleted file mode 100644 index a9ebde0588..0000000000 --- a/dimos/stream/video_operators.py +++ /dev/null @@ -1,605 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -from collections.abc import Callable -from datetime import datetime, timedelta -from enum import Enum -from typing import TYPE_CHECKING - -import cv2 -import numpy as np -from reactivex import Observable, Observer, create, operators as ops - -if TYPE_CHECKING: - from dimos.stream.frame_processor import FrameProcessor - - -class VideoOperators: - """Collection of video processing operators for reactive video streams.""" - - @staticmethod - def with_fps_sampling( - fps: int = 25, *, sample_interval: timedelta | None = None, use_latest: bool = True - ) -> Callable[[Observable], Observable]: # type: ignore[type-arg] - """Creates an operator that samples frames at a specified rate. - - Creates a transformation operator that samples frames either by taking - the latest frame or the first frame in each interval. Provides frame - rate control through time-based selection. - - Args: - fps: Desired frames per second, defaults to 25 FPS. - Ignored if sample_interval is provided. - sample_interval: Optional explicit interval between samples. - If provided, overrides the fps parameter. - use_latest: If True, uses the latest frame in interval. - If False, uses the first frame. Defaults to True. - - Returns: - A function that transforms an Observable[np.ndarray] stream to a sampled - Observable[np.ndarray] stream with controlled frame rate. - - Raises: - ValueError: If fps is not positive or sample_interval is negative. - TypeError: If sample_interval is provided but not a timedelta object. - - Examples: - Sample latest frame at 30 FPS (good for real-time): - >>> video_stream.pipe( - ... VideoOperators.with_fps_sampling(fps=30) - ... ) - - Sample first frame with custom interval (good for consistent timing): - >>> video_stream.pipe( - ... VideoOperators.with_fps_sampling( - ... sample_interval=timedelta(milliseconds=40), - ... use_latest=False - ... ) - ... ) - - Note: - This operator helps manage high-speed video streams through time-based - frame selection. It reduces the frame rate by selecting frames at - specified intervals. - - When use_latest=True: - - Uses sampling to select the most recent frame at fixed intervals - - Discards intermediate frames, keeping only the latest - - Best for real-time video where latest frame is most relevant - - Uses ops.sample internally - - When use_latest=False: - - Uses throttling to select the first frame in each interval - - Ignores subsequent frames until next interval - - Best for scenarios where you want consistent frame timing - - Uses ops.throttle_first internally - - This is an approropriate solution for managing video frame rates and - memory usage in many scenarios. - """ - if sample_interval is None: - if fps <= 0: - raise ValueError("FPS must be positive") - sample_interval = timedelta(microseconds=int(1_000_000 / fps)) - - def _operator(source: Observable) -> Observable: # type: ignore[type-arg] - return source.pipe( - ops.sample(sample_interval) if use_latest else ops.throttle_first(sample_interval) - ) - - return _operator - - @staticmethod - def with_jpeg_export( - frame_processor: "FrameProcessor", - save_limit: int = 100, - suffix: str = "", - loop: bool = False, - ) -> Callable[[Observable], Observable]: # type: ignore[type-arg] - """Creates an operator that saves video frames as JPEG files. - - Creates a transformation operator that saves each frame from the video - stream as a JPEG file while passing the frame through unchanged. - - Args: - frame_processor: FrameProcessor instance that handles the JPEG export - operations and maintains file count. - save_limit: Maximum number of frames to save before stopping. - Defaults to 100. Set to 0 for unlimited saves. - suffix: Optional string to append to filename before index. - Example: "raw" creates "1_raw.jpg". - Defaults to empty string. - loop: If True, when save_limit is reached, the files saved are - loopbacked and overwritten with the most recent frame. - Defaults to False. - Returns: - A function that transforms an Observable of frames into another - Observable of the same frames, with side effect of saving JPEGs. - - Raises: - ValueError: If save_limit is negative. - TypeError: If frame_processor is not a FrameProcessor instance. - - Example: - >>> video_stream.pipe( - ... VideoOperators.with_jpeg_export(processor, suffix="raw") - ... ) - """ - - def _operator(source: Observable) -> Observable: # type: ignore[type-arg] - return source.pipe( - ops.map( - lambda frame: frame_processor.export_to_jpeg(frame, save_limit, loop, suffix) - ) - ) - - return _operator - - @staticmethod - def with_optical_flow_filtering(threshold: float = 1.0) -> Callable[[Observable], Observable]: # type: ignore[type-arg] - """Creates an operator that filters optical flow frames by relevancy score. - - Filters a stream of optical flow results (frame, relevancy_score) tuples, - passing through only frames that meet the relevancy threshold. - - Args: - threshold: Minimum relevancy score required for frames to pass through. - Defaults to 1.0. Higher values mean more motion required. - - Returns: - A function that transforms an Observable of (frame, score) tuples - into an Observable of frames that meet the threshold. - - Raises: - ValueError: If threshold is negative. - TypeError: If input stream items are not (frame, float) tuples. - - Examples: - Basic filtering: - >>> optical_flow_stream.pipe( - ... VideoOperators.with_optical_flow_filtering(threshold=1.0) - ... ) - - With custom threshold: - >>> optical_flow_stream.pipe( - ... VideoOperators.with_optical_flow_filtering(threshold=2.5) - ... ) - - Note: - Input stream should contain tuples of (frame, relevancy_score) where - frame is a numpy array and relevancy_score is a float or None. - None scores are filtered out. - """ - return lambda source: source.pipe( - ops.filter(lambda result: result[1] is not None), # type: ignore[index] - ops.filter(lambda result: result[1] > threshold), # type: ignore[index] - ops.map(lambda result: result[0]), # type: ignore[index] - ) - - @staticmethod - def with_edge_detection( - frame_processor: "FrameProcessor", - ) -> Callable[[Observable], Observable]: # type: ignore[type-arg] - return lambda source: source.pipe( - ops.map(lambda frame: frame_processor.edge_detection(frame)) # type: ignore[no-untyped-call] - ) - - @staticmethod - def with_optical_flow( - frame_processor: "FrameProcessor", - ) -> Callable[[Observable], Observable]: # type: ignore[type-arg] - return lambda source: source.pipe( - ops.scan( - lambda acc, frame: frame_processor.compute_optical_flow( # type: ignore[arg-type, return-value] - acc, # type: ignore[arg-type] - frame, # type: ignore[arg-type] - compute_relevancy=False, - ), - (None, None, None), - ), - ops.map(lambda result: result[1]), # type: ignore[index] # Extract flow component - ops.filter(lambda flow: flow is not None), - ops.map(frame_processor.visualize_flow), - ) - - @staticmethod - def encode_image() -> Callable[[Observable], Observable]: # type: ignore[type-arg] - """ - Operator to encode an image to JPEG format and convert it to a Base64 string. - - Returns: - A function that transforms an Observable of images into an Observable - of tuples containing the Base64 string of the encoded image and its dimensions. - """ - - def _operator(source: Observable) -> Observable: # type: ignore[type-arg] - def _encode_image(image: np.ndarray) -> tuple[str, tuple[int, int]]: - try: - width, height = image.shape[:2] - _, buffer = cv2.imencode(".jpg", image) - if buffer is None: - raise ValueError("Failed to encode image") - base64_image = base64.b64encode(buffer.tobytes()).decode("utf-8") - return base64_image, (width, height) - except Exception as e: - raise e - - return source.pipe(ops.map(_encode_image)) - - return _operator - - -from threading import Lock - -from reactivex import Observable -from reactivex.disposable import Disposable - - -class Operators: - @staticmethod - def exhaust_lock(process_item): # type: ignore[no-untyped-def] - """ - For each incoming item, call `process_item(item)` to get an Observable. - - If we're busy processing the previous one, skip new items. - - Use a lock to ensure concurrency safety across threads. - """ - - def _exhaust_lock(source: Observable) -> Observable: # type: ignore[type-arg] - def _subscribe(observer, scheduler=None): # type: ignore[no-untyped-def] - in_flight = False - lock = Lock() - upstream_done = False - - upstream_disp = None - active_inner_disp = None - - def dispose_all() -> None: - if upstream_disp: - upstream_disp.dispose() - if active_inner_disp: - active_inner_disp.dispose() - - def on_next(value) -> None: # type: ignore[no-untyped-def] - nonlocal in_flight, active_inner_disp - lock.acquire() - try: - if not in_flight: - in_flight = True - print("Processing new item.") - else: - print("Skipping item, already processing.") - return - finally: - lock.release() - - # We only get here if we grabbed the in_flight slot - try: - inner_source = process_item(value) - except Exception as ex: - observer.on_error(ex) - return - - def inner_on_next(ivalue) -> None: # type: ignore[no-untyped-def] - observer.on_next(ivalue) - - def inner_on_error(err) -> None: # type: ignore[no-untyped-def] - nonlocal in_flight - with lock: - in_flight = False - observer.on_error(err) - - def inner_on_completed() -> None: - nonlocal in_flight - with lock: - in_flight = False - if upstream_done: - observer.on_completed() - - # Subscribe to the inner observable - nonlocal active_inner_disp - active_inner_disp = inner_source.subscribe( - on_next=inner_on_next, - on_error=inner_on_error, - on_completed=inner_on_completed, - scheduler=scheduler, - ) - - def on_error(err) -> None: # type: ignore[no-untyped-def] - dispose_all() - observer.on_error(err) - - def on_completed() -> None: - nonlocal upstream_done - with lock: - upstream_done = True - # If we're not busy, we can end now - if not in_flight: - observer.on_completed() - - upstream_disp = source.subscribe( - on_next, on_error, on_completed, scheduler=scheduler - ) - return dispose_all - - return create(_subscribe) - - return _exhaust_lock - - @staticmethod - def exhaust_lock_per_instance(process_item, lock: Lock): # type: ignore[no-untyped-def] - """ - - For each item from upstream, call process_item(item) -> Observable. - - If a frame arrives while one is "in flight", discard it. - - 'lock' ensures we safely check/modify the 'in_flight' state in a multithreaded environment. - """ - - def _exhaust_lock(source: Observable) -> Observable: # type: ignore[type-arg] - def _subscribe(observer, scheduler=None): # type: ignore[no-untyped-def] - in_flight = False - upstream_done = False - - upstream_disp = None - active_inner_disp = None - - def dispose_all() -> None: - if upstream_disp: - upstream_disp.dispose() - if active_inner_disp: - active_inner_disp.dispose() - - def on_next(value) -> None: # type: ignore[no-untyped-def] - nonlocal in_flight, active_inner_disp - with lock: - # If not busy, claim the slot - if not in_flight: - in_flight = True - print("\033[34mProcessing new item.\033[0m") - else: - # Already processing => drop - print("\033[34mSkipping item, already processing.\033[0m") - return - - # We only get here if we acquired the slot - try: - inner_source = process_item(value) - except Exception as ex: - observer.on_error(ex) - return - - def inner_on_next(ivalue) -> None: # type: ignore[no-untyped-def] - observer.on_next(ivalue) - - def inner_on_error(err) -> None: # type: ignore[no-untyped-def] - nonlocal in_flight - with lock: - in_flight = False - print("\033[34mError in inner on error.\033[0m") - observer.on_error(err) - - def inner_on_completed() -> None: - nonlocal in_flight - with lock: - in_flight = False - print("\033[34mInner on completed.\033[0m") - if upstream_done: - observer.on_completed() - - # Subscribe to the inner Observable - nonlocal active_inner_disp - active_inner_disp = inner_source.subscribe( - on_next=inner_on_next, - on_error=inner_on_error, - on_completed=inner_on_completed, - scheduler=scheduler, - ) - - def on_error(e) -> None: # type: ignore[no-untyped-def] - dispose_all() - observer.on_error(e) - - def on_completed() -> None: - nonlocal upstream_done - with lock: - upstream_done = True - print("\033[34mOn completed.\033[0m") - if not in_flight: - observer.on_completed() - - upstream_disp = source.subscribe( - on_next=on_next, - on_error=on_error, - on_completed=on_completed, - scheduler=scheduler, - ) - - return Disposable(dispose_all) - - return create(_subscribe) - - return _exhaust_lock - - @staticmethod - def exhaust_map(project): # type: ignore[no-untyped-def] - def _exhaust_map(source: Observable): # type: ignore[no-untyped-def, type-arg] - def subscribe(observer, scheduler=None): # type: ignore[no-untyped-def] - is_processing = False - - def on_next(item) -> None: # type: ignore[no-untyped-def] - nonlocal is_processing - if not is_processing: - is_processing = True - print("\033[35mProcessing item.\033[0m") - try: - inner_observable = project(item) # Create the inner observable - inner_observable.subscribe( - on_next=observer.on_next, - on_error=observer.on_error, - on_completed=lambda: set_not_processing(), - scheduler=scheduler, - ) - except Exception as e: - observer.on_error(e) - else: - print("\033[35mSkipping item, already processing.\033[0m") - - def set_not_processing() -> None: - nonlocal is_processing - is_processing = False - print("\033[35mItem processed.\033[0m") - - return source.subscribe( - on_next=on_next, - on_error=observer.on_error, - on_completed=observer.on_completed, - scheduler=scheduler, - ) - - return create(subscribe) - - return _exhaust_map - - @staticmethod - def with_lock(lock: Lock): # type: ignore[no-untyped-def] - def operator(source: Observable): # type: ignore[no-untyped-def, type-arg] - def subscribe(observer, scheduler=None): # type: ignore[no-untyped-def] - def on_next(item) -> None: # type: ignore[no-untyped-def] - if not lock.locked(): # Check if the lock is free - if lock.acquire(blocking=False): # Non-blocking acquire - try: - print("\033[32mAcquired lock, processing item.\033[0m") - observer.on_next(item) - finally: # Ensure lock release even if observer.on_next throws - lock.release() - else: - print("\033[34mLock busy, skipping item.\033[0m") - else: - print("\033[34mLock busy, skipping item.\033[0m") - - def on_error(error) -> None: # type: ignore[no-untyped-def] - observer.on_error(error) - - def on_completed() -> None: - observer.on_completed() - - return source.subscribe( - on_next=on_next, - on_error=on_error, - on_completed=on_completed, - scheduler=scheduler, - ) - - return Observable(subscribe) - - return operator - - @staticmethod - def with_lock_check(lock: Lock): # type: ignore[no-untyped-def] # Renamed for clarity - def operator(source: Observable): # type: ignore[no-untyped-def, type-arg] - def subscribe(observer, scheduler=None): # type: ignore[no-untyped-def] - def on_next(item) -> None: # type: ignore[no-untyped-def] - if not lock.locked(): # Check if the lock is held WITHOUT acquiring - print(f"\033[32mLock is free, processing item: {item}\033[0m") - observer.on_next(item) - else: - print(f"\033[34mLock is busy, skipping item: {item}\033[0m") - # observer.on_completed() - - def on_error(error) -> None: # type: ignore[no-untyped-def] - observer.on_error(error) - - def on_completed() -> None: - observer.on_completed() - - return source.subscribe( - on_next=on_next, - on_error=on_error, - on_completed=on_completed, - scheduler=scheduler, - ) - - return Observable(subscribe) - - return operator - - # PrintColor enum for standardized color formatting - class PrintColor(Enum): - RED = "\033[31m" - GREEN = "\033[32m" - BLUE = "\033[34m" - YELLOW = "\033[33m" - MAGENTA = "\033[35m" - CYAN = "\033[36m" - WHITE = "\033[37m" - RESET = "\033[0m" - - @staticmethod - def print_emission( # type: ignore[no-untyped-def] - id: str, - dev_name: str = "NA", - counts: dict | None = None, # type: ignore[type-arg] - color: "Operators.PrintColor" = None, # type: ignore[assignment] - enabled: bool = True, - ): - """ - Creates an operator that prints the emission with optional counts for debugging. - - Args: - id: Identifier for the emission point (e.g., 'A', 'B') - dev_name: Device or component name for context - counts: External dictionary to track emission count across operators. If None, will not print emission count. - color: Color for the printed output from PrintColor enum (default is RED) - enabled: Whether to print the emission count (default is True) - Returns: - An operator that counts and prints emissions without modifying the stream - """ - # If enabled is false, return the source unchanged - if not enabled: - return lambda source: source - - # Use RED as default if no color provided - if color is None: - color = Operators.PrintColor.RED - - def _operator(source: Observable) -> Observable: # type: ignore[type-arg] - def _subscribe(observer: Observer, scheduler=None): # type: ignore[no-untyped-def, type-arg] - def on_next(value) -> None: # type: ignore[no-untyped-def] - if counts is not None: - # Initialize count if necessary - if id not in counts: - counts[id] = 0 - - # Increment and print - counts[id] += 1 - print( - f"{color.value}({dev_name} - {id}) Emission Count - {counts[id]} {datetime.now()}{Operators.PrintColor.RESET.value}" - ) - else: - print( - f"{color.value}({dev_name} - {id}) Emitted - {datetime.now()}{Operators.PrintColor.RESET.value}" - ) - - # Pass value through unchanged - observer.on_next(value) - - return source.subscribe( - on_next=on_next, - on_error=observer.on_error, - on_completed=observer.on_completed, - scheduler=scheduler, - ) - - return create(_subscribe) # type: ignore[arg-type] - - return _operator diff --git a/dimos/utils/metrics.py b/dimos/utils/metrics.py deleted file mode 100644 index 3292d7220f..0000000000 --- a/dimos/utils/metrics.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from collections.abc import Callable -import functools -import time -from typing import Any, TypeVar, cast - -from dimos_lcm.std_msgs import Float32 -import rerun as rr - -from dimos.core.stream import Transport -from dimos.core.transport import LCMTransport - -F = TypeVar("F", bound=Callable[..., Any]) - - -def timed( - transport: Callable[[F], Transport[Float32]] | Transport[Float32] | None = None, -) -> Callable[[F], F]: - def timed_decorator(func: F) -> F: - t: Transport[Float32] - if transport is None: - t = LCMTransport(f"/metrics/{func.__name__}", Float32) - elif callable(transport): - t = transport(func) - else: - t = transport - - @functools.wraps(func) - def wrapper(*args: Any, **kwargs: Any) -> Any: - start = time.perf_counter() - result = func(*args, **kwargs) - elapsed = time.perf_counter() - start - - msg = Float32() - msg.data = elapsed * 1000 # ms - t.publish(msg) - return result - - return cast("F", wrapper) - - return timed_decorator - - -def log_timing_to_rerun(entity_path: str) -> Callable[[F], F]: - """Decorator to log function execution time to Rerun. - - Automatically measures the execution time of the decorated function - and logs it as a scalar value to the specified Rerun entity path. - - Args: - entity_path: Rerun entity path for timing metrics - (e.g., "metrics/costmap/calc_ms") - - Returns: - Decorator function - - Example: - @log_timing_to_rerun("metrics/costmap/calc_ms") - def _calculate_costmap(self, msg): - # ... expensive computation - return result - - # Timing automatically logged to Rerun as a time series! - """ - - def decorator(func: F) -> F: - @functools.wraps(func) - def wrapper(*args: Any, **kwargs: Any) -> Any: - start = time.perf_counter() - result = func(*args, **kwargs) - elapsed_ms = (time.perf_counter() - start) * 1000 - - rr.log(entity_path, rr.Scalars(elapsed_ms)) - return result - - return cast("F", wrapper) - - return decorator diff --git a/dimos/utils/urdf.py b/dimos/utils/urdf.py deleted file mode 100644 index 474658df1a..0000000000 --- a/dimos/utils/urdf.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""URDF generation utilities.""" - -from __future__ import annotations - - -def box_urdf( - width: float, - height: float, - depth: float, - name: str = "box_robot", - mass: float = 1.0, - rgba: tuple[float, float, float, float] = (1.0, 0.0, 0.0, 0.5), -) -> str: - """Generate a simple URDF with a box as the base_link. - - Args: - width: Box size in X direction (meters) - height: Box size in Y direction (meters) - depth: Box size in Z direction (meters) - name: Robot name - mass: Mass of the box (kg) - rgba: Color as (red, green, blue, alpha), default red with 0.5 transparency - - Returns: - URDF XML string - """ - # Simple box inertia (solid cuboid) - ixx = (mass / 12.0) * (height**2 + depth**2) - iyy = (mass / 12.0) * (width**2 + depth**2) - izz = (mass / 12.0) * (width**2 + height**2) - - r, g, b, a = rgba - return f""" - - - - - - - - - - - - - - - - - - - - - -""" diff --git a/dimos/web/fastapi_server.py b/dimos/web/fastapi_server.py deleted file mode 100644 index 5711b35d91..0000000000 --- a/dimos/web/fastapi_server.py +++ /dev/null @@ -1,228 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Working FastAPI/Uvicorn Impl. - -# Notes: Do not use simultaneously with Flask, this includes imports. -# Workers are not yet setup, as this requires a much more intricate -# reorganization. There appears to be possible signalling issues when -# opening up streams on multiple windows/reloading which will need to -# be fixed. Also note, Chrome only supports 6 simultaneous web streams, -# and its advised to test threading/worker performance with another -# browser like Safari. - -# Fast Api & Uvicorn -import asyncio -from pathlib import Path -from queue import Empty, Queue -from threading import Lock - -import cv2 -from fastapi import FastAPI, Form, HTTPException, Request -from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse -from fastapi.templating import Jinja2Templates -import reactivex as rx -from reactivex import operators as ops -from reactivex.disposable import SingleAssignmentDisposable -from sse_starlette.sse import EventSourceResponse -import uvicorn - -from dimos.core.global_config import global_config -from dimos.web.edge_io import EdgeIO - -# TODO: Resolve threading, start/stop stream functionality. - - -class FastAPIServer(EdgeIO): - def __init__( # type: ignore[no-untyped-def] - self, - dev_name: str = "FastAPI Server", - edge_type: str = "Bidirectional", - host: str | None = None, - port: int = 5555, - text_streams=None, - **streams, - ) -> None: - super().__init__(dev_name, edge_type) - self.app = FastAPI() - self.port = port - self.host = host if host is not None else global_config.listen_host - BASE_DIR = Path(__file__).resolve().parent - self.templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) - self.streams = streams - self.active_streams = {} - self.stream_locks = {key: Lock() for key in self.streams} - self.stream_queues = {} # type: ignore[var-annotated] - self.stream_disposables = {} # type: ignore[var-annotated] - - # Initialize text streams - self.text_streams = text_streams or {} - self.text_queues = {} # type: ignore[var-annotated] - self.text_disposables = {} - self.text_clients = set() # type: ignore[var-annotated] - - # Create a Subject for text queries - self.query_subject = rx.subject.Subject() # type: ignore[var-annotated] - self.query_stream = self.query_subject.pipe(ops.share()) - - for key in self.streams: - if self.streams[key] is not None: - self.active_streams[key] = self.streams[key].pipe( - ops.map(self.process_frame_fastapi), ops.share() - ) - - # Set up text stream subscriptions - for key, stream in self.text_streams.items(): - if stream is not None: - self.text_queues[key] = Queue(maxsize=100) - disposable = stream.subscribe( - lambda text, k=key: self.text_queues[k].put(text) if text is not None else None, - lambda e, k=key: self.text_queues[k].put(None), - lambda k=key: self.text_queues[k].put(None), - ) - self.text_disposables[key] = disposable - self.disposables.add(disposable) - - self.setup_routes() - - def process_frame_fastapi(self, frame): # type: ignore[no-untyped-def] - """Convert frame to JPEG format for streaming.""" - _, buffer = cv2.imencode(".jpg", frame) - return buffer.tobytes() - - def stream_generator(self, key): # type: ignore[no-untyped-def] - """Generate frames for a given video stream.""" - - def generate(): # type: ignore[no-untyped-def] - if key not in self.stream_queues: - self.stream_queues[key] = Queue(maxsize=10) - - frame_queue = self.stream_queues[key] - - # Clear any existing disposable for this stream - if key in self.stream_disposables: - self.stream_disposables[key].dispose() - - disposable = SingleAssignmentDisposable() - self.stream_disposables[key] = disposable - self.disposables.add(disposable) - - if key in self.active_streams: - with self.stream_locks[key]: - # Clear the queue before starting new subscription - while not frame_queue.empty(): - try: - frame_queue.get_nowait() - except Empty: - break - - disposable.disposable = self.active_streams[key].subscribe( - lambda frame: frame_queue.put(frame) if frame is not None else None, - lambda e: frame_queue.put(None), - lambda: frame_queue.put(None), - ) - - try: - while True: - try: - frame = frame_queue.get(timeout=1) - if frame is None: - break - yield (b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + frame + b"\r\n") - except Empty: - # Instead of breaking, continue waiting for new frames - continue - finally: - if key in self.stream_disposables: - self.stream_disposables[key].dispose() - - return generate - - def create_video_feed_route(self, key): # type: ignore[no-untyped-def] - """Create a video feed route for a specific stream.""" - - async def video_feed(): # type: ignore[no-untyped-def] - return StreamingResponse( - self.stream_generator(key)(), # type: ignore[no-untyped-call] - media_type="multipart/x-mixed-replace; boundary=frame", - ) - - return video_feed - - async def text_stream_generator(self, key): # type: ignore[no-untyped-def] - """Generate SSE events for text stream.""" - client_id = id(object()) - self.text_clients.add(client_id) - - try: - while True: - if key in self.text_queues: - try: - text = self.text_queues[key].get(timeout=1) - if text is not None: - yield {"event": "message", "id": key, "data": text} - except Empty: - # Send a keep-alive comment - yield {"event": "ping", "data": ""} - await asyncio.sleep(0.1) - finally: - self.text_clients.remove(client_id) - - def setup_routes(self) -> None: - """Set up FastAPI routes.""" - - @self.app.get("/", response_class=HTMLResponse) - async def index(request: Request): # type: ignore[no-untyped-def] - stream_keys = list(self.streams.keys()) - text_stream_keys = list(self.text_streams.keys()) - # Starlette 0.38+: request is 1st positional arg, not in context dict. - return self.templates.TemplateResponse( - request, - "index_fastapi.html", - { - "stream_keys": stream_keys, - "text_stream_keys": text_stream_keys, - }, - ) - - @self.app.post("/submit_query") - async def submit_query(query: str = Form(...)): # type: ignore[no-untyped-def] - # Using Form directly as a dependency ensures proper form handling - try: - if query: - # Emit the query through our Subject - self.query_subject.on_next(query) - return JSONResponse({"success": True, "message": "Query received"}) - return JSONResponse({"success": False, "message": "No query provided"}) - except Exception as e: - # Ensure we always return valid JSON even on error - return JSONResponse( - status_code=500, - content={"success": False, "message": f"Server error: {e!s}"}, - ) - - @self.app.get("/text_stream/{key}") - async def text_stream(key: str): # type: ignore[no-untyped-def] - if key not in self.text_streams: - raise HTTPException(status_code=404, detail=f"Text stream '{key}' not found") - return EventSourceResponse(self.text_stream_generator(key)) # type: ignore[no-untyped-call] - - for key in self.streams: - self.app.get(f"/video_feed/{key}")(self.create_video_feed_route(key)) # type: ignore[no-untyped-call] - - def run(self) -> None: - """Run the FastAPI server.""" - uvicorn.run( - self.app, host=self.host, port=self.port - ) # TODO: Translate structure to enable in-built workers' diff --git a/dimos/web/flask_server.py b/dimos/web/flask_server.py deleted file mode 100644 index 1ddad5b3a8..0000000000 --- a/dimos/web/flask_server.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from queue import Queue - -import cv2 -from flask import Flask, Response, render_template -from reactivex import operators as ops -from reactivex.disposable import SingleAssignmentDisposable - -from dimos.core.global_config import global_config -from dimos.web.edge_io import EdgeIO - - -class FlaskServer(EdgeIO): - def __init__( # type: ignore[no-untyped-def] - self, - dev_name: str = "Flask Server", - edge_type: str = "Bidirectional", - port: int = 5555, - **streams, - ) -> None: - super().__init__(dev_name, edge_type) - self.app = Flask(__name__) - self.port = port - self.streams = streams - self.active_streams = {} - - # Initialize shared stream references with ref_count - for key in self.streams: - if self.streams[key] is not None: - # Apply share and ref_count to manage subscriptions - self.active_streams[key] = self.streams[key].pipe( - ops.map(self.process_frame_flask), ops.share() - ) - - self.setup_routes() - - def process_frame_flask(self, frame): # type: ignore[no-untyped-def] - """Convert frame to JPEG format for streaming.""" - _, buffer = cv2.imencode(".jpg", frame) - return buffer.tobytes() - - def setup_routes(self) -> None: - @self.app.route("/") - def index(): # type: ignore[no-untyped-def] - stream_keys = list(self.streams.keys()) # Get the keys from the streams dictionary - return render_template("index_flask.html", stream_keys=stream_keys) - - # Function to create a streaming response - def stream_generator(key): # type: ignore[no-untyped-def] - def generate(): # type: ignore[no-untyped-def] - frame_queue = Queue() # type: ignore[var-annotated] - disposable = SingleAssignmentDisposable() - - # Subscribe to the shared, ref-counted stream - if key in self.active_streams: - disposable.disposable = self.active_streams[key].subscribe( - lambda frame: frame_queue.put(frame) if frame is not None else None, - lambda e: frame_queue.put(None), - lambda: frame_queue.put(None), - ) - - try: - while True: - frame = frame_queue.get() - if frame is None: - break - yield (b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + frame + b"\r\n") - finally: - disposable.dispose() - - return generate - - def make_response_generator(key): # type: ignore[no-untyped-def] - def response_generator(): # type: ignore[no-untyped-def] - return Response( - stream_generator(key)(), # type: ignore[no-untyped-call] - mimetype="multipart/x-mixed-replace; boundary=frame", - ) - - return response_generator - - # Dynamically adding routes using add_url_rule - for key in self.streams: - endpoint = f"video_feed_{key}" - self.app.add_url_rule( - f"/video_feed/{key}", - endpoint, - view_func=make_response_generator(key), # type: ignore[no-untyped-call] - ) - - def run(self, host: str | None = None, port: int = 5555, threaded: bool = True) -> None: - self.port = port - _host = host if host is not None else global_config.listen_host - self.app.run(host=_host, port=self.port, debug=False, threaded=threaded) diff --git a/dimos/web/websocket_vis/costmap_viz.py b/dimos/web/websocket_vis/costmap_viz.py deleted file mode 100644 index 0d95862c99..0000000000 --- a/dimos/web/websocket_vis/costmap_viz.py +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Simple costmap wrapper for visualization purposes. -This is a minimal implementation to support websocket visualization. -""" - -import numpy as np - -from dimos.msgs.nav_msgs.OccupancyGrid import OccupancyGrid - - -class CostmapViz: - """A wrapper around OccupancyGrid for visualization compatibility.""" - - def __init__(self, occupancy_grid: OccupancyGrid | None = None) -> None: - """Initialize from an OccupancyGrid.""" - self.occupancy_grid = occupancy_grid - - @property - def data(self) -> np.ndarray | None: - """Get the costmap data as a numpy array.""" - if self.occupancy_grid: - return self.occupancy_grid.grid - return None - - @property - def width(self) -> int: - """Get the width of the costmap.""" - if self.occupancy_grid: - return self.occupancy_grid.width - return 0 - - @property - def height(self) -> int: - """Get the height of the costmap.""" - if self.occupancy_grid: - return self.occupancy_grid.height - return 0 - - @property - def resolution(self) -> float: - """Get the resolution of the costmap.""" - if self.occupancy_grid: - return self.occupancy_grid.resolution - return 1.0 - - @property - def origin(self): # type: ignore[no-untyped-def] - """Get the origin pose of the costmap.""" - if self.occupancy_grid: - return self.occupancy_grid.origin - return None diff --git a/dimos/web/websocket_vis/path_history.py b/dimos/web/websocket_vis/path_history.py deleted file mode 100644 index c69e7e9508..0000000000 --- a/dimos/web/websocket_vis/path_history.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Simple path history class for visualization purposes. -This is a minimal implementation to support websocket visualization. -""" - -from dimos.msgs.geometry_msgs.Vector3 import Vector3 - - -class PathHistory: - """A simple container for storing a history of positions for visualization.""" - - def __init__(self, points: list[Vector3 | tuple | list] | None = None) -> None: # type: ignore[type-arg] - """Initialize with optional list of points.""" - self.points: list[Vector3] = [] - if points: - for p in points: - if isinstance(p, Vector3): - self.points.append(p) - else: - self.points.append(Vector3(*p)) - - def ipush(self, point: Vector3 | tuple | list) -> "PathHistory": # type: ignore[type-arg] - """Add a point to the history (in-place) and return self.""" - if isinstance(point, Vector3): - self.points.append(point) - else: - self.points.append(Vector3(*point)) - return self - - def iclip_tail(self, max_length: int) -> "PathHistory": - """Keep only the last max_length points (in-place) and return self.""" - if max_length > 0 and len(self.points) > max_length: - self.points = self.points[-max_length:] - return self - - def last(self) -> Vector3 | None: - """Return the last point in the history, or None if empty.""" - return self.points[-1] if self.points else None - - def length(self) -> float: - """Calculate the total length of the path.""" - if len(self.points) < 2: - return 0.0 - - total = 0.0 - for i in range(1, len(self.points)): - p1 = self.points[i - 1] - p2 = self.points[i] - dx = p2.x - p1.x - dy = p2.y - p1.y - dz = p2.z - p1.z - total += (dx * dx + dy * dy + dz * dz) ** 0.5 - return total - - def __len__(self) -> int: - """Return the number of points in the history.""" - return len(self.points) - - def __getitem__(self, index: int) -> Vector3: - """Get a point by index.""" - return self.points[index]