From 466caac0942393735d49021c136726d258d03dcb Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Thu, 11 Jun 2026 12:08:31 -0400 Subject: [PATCH 01/11] Updated gripper camera's pipeline to match stretch4_gripper_modeling_and_control's settings --- stretch4_body/subsystem/cameras/__init__.py | 3 +- .../luxonis_gripper_camera_adapter.py | 91 +++++++++++++++---- .../controllers/camera_pipeline_controller.py | 29 ++++-- .../subsystem/cameras/emulated_rgbd.py | 24 +++++ .../subsystem/cameras/enums/rgb_camera.py | 3 +- .../subsystem/cameras/stream_cameras.py | 7 +- stretch4_body/tools/stretch_rgbd_show.py | 14 ++- 7 files changed, 135 insertions(+), 36 deletions(-) diff --git a/stretch4_body/subsystem/cameras/__init__.py b/stretch4_body/subsystem/cameras/__init__.py index 7131790..f2454aa 100644 --- a/stretch4_body/subsystem/cameras/__init__.py +++ b/stretch4_body/subsystem/cameras/__init__.py @@ -19,5 +19,6 @@ stream_right_rgbd, stream_center_rgbd, stream_left_right_rgbd, - stream_left_right_center_rgbd + stream_left_right_center_rgbd, + stream_gripper_rgbd ) \ No newline at end of file diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py index 1995632..909c23e 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py @@ -9,13 +9,16 @@ from stretch4_body.subsystem.cameras.enums.rgb_camera import RGBCameraConfig, RGBCameras from stretch4_body.subsystem.cameras.adapters.luxonis_camera_adapter import LuxonisCameraAdapter, clear_device_cache from stretch4_body.subsystem.cameras.adapters.synced_camera import SyncedCamera -from stretch4_body.subsystem.cameras.models.image_frame import SyncedImageFrame +from stretch4_body.subsystem.cameras.models.image_frame import SyncedImageFrame, ImageFrame +import dataclasses +import numpy as np class GripperCameraLuxonis(SyncedCamera): """Start a stream with the gripper left/right stereo cameras and the point cloud pipeline.""" - def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig): + def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig, enable_pointcloud: bool = False): self.do_sync_frames = True + self.enable_pointcloud = enable_pointcloud self.left = left self.right = right @@ -26,15 +29,34 @@ def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig): self.pipeline, self.device = LuxonisCameraAdapter.create_pipeline(left.camera_device) self.camera = self.pipeline - self.left_camera_node, node_left = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=left) + left_cfg = dataclasses.replace(left, is_compressed=False) + + self.left_camera_node, node_left = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=left_cfg) self.right_camera_node, node_right = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=right) - stereo, rgbd = LuxonisCameraAdapter.create_rgbd_node(self.pipeline, node_left, node_right) - - self.left_output = stereo.syncedLeft.createOutputQueue(maxSize=1) - self.right_output = stereo.syncedRight.createOutputQueue(maxSize=1) - self.depth_output = stereo.depth.createOutputQueue(maxSize=1) - self.pointcloud_output = rgbd.pcl.createOutputQueue(maxSize=1) + if not self.enable_pointcloud: + stereo = self.pipeline.create(dai.node.StereoDepth) + stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.ROBOTICS) + stereo.setDepthAlign(dai.CameraBoardSocket.CAM_C) + stereo.initialConfig.postProcessing.thresholdFilter.maxRange = 10000 + + node_left.link(stereo.left) + node_right.link(stereo.right) + + import datetime + sync = self.pipeline.create(dai.node.Sync) + sync.setSyncThreshold(datetime.timedelta(milliseconds=15)) + + node_right.link(sync.inputs["right"]) + stereo.depth.link(sync.inputs["depth"]) + + self.q_sync = sync.out.createOutputQueue(maxSize=1, blocking=False) + else: + stereo, rgbd = LuxonisCameraAdapter.create_rgbd_node(self.pipeline, node_left, node_right) + + self.right_output = node_right.createOutputQueue(maxSize=1) + self.depth_output = stereo.depth.createOutputQueue(maxSize=1) + self.pointcloud_output = rgbd.pcl.createOutputQueue(maxSize=1) self.left_input_queue = self.left_camera_node.inputControl.createInputQueue() self.right_input_queue = self.right_camera_node.inputControl.createInputQueue() @@ -53,16 +75,47 @@ def get_frames(self): raise RuntimeError("Camera is not running.") while True: - - left_callback = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.left_output)) - right_callback = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.right_output)) - depth_callback = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.depth_output)) - - points, points_rgb, points_sequence_number = next(LuxonisCameraAdapter.get_pointcloud_from_output_queue(self.pointcloud_output)) - - synced_image = SyncedImageFrame(timestamp=left_callback.timestamp, left=left_callback, right=right_callback, center=None, pointcloud=points, pointcloud_color=points_rgb, depth=depth_callback.image) - - yield synced_image + if not self.enable_pointcloud: + msgGroup = self.q_sync.get() + if msgGroup is not None: + msgNames = msgGroup.getMessageNames() + frame_right_msg = msgGroup["right"] if "right" in msgNames else None + frame_depth_msg = msgGroup["depth"] if "depth" in msgNames else None + + if frame_right_msg and frame_depth_msg: + timestamp = frame_right_msg.getTimestamp().total_seconds() + sequence_num = frame_right_msg.getSequenceNum() + + if self.right.is_compressed: + img_right = frame_right_msg.getData() + right_frame = ImageFrame(image=img_right, timestamp=timestamp, frame_number=sequence_num, compression_format="jpeg") + else: + img_right = frame_right_msg.getCvFrame() + right_frame = ImageFrame(image=img_right, timestamp=timestamp, frame_number=sequence_num) + + depth_frame = frame_depth_msg.getFrame() + + left_frame = ImageFrame(image=np.zeros((1, 1, 3), dtype=np.uint8), timestamp=timestamp, frame_number=sequence_num) + + synced_image = SyncedImageFrame( + timestamp=timestamp, + left=left_frame, + right=right_frame, + center=None, + depth=depth_frame + ) + yield synced_image + else: + right_callback = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.right_output)) + depth_callback = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.depth_output)) + + points, points_rgb, points_sequence_number = next(LuxonisCameraAdapter.get_pointcloud_from_output_queue(self.pointcloud_output)) + + left_callback = ImageFrame(image=np.zeros((1, 1, 3), dtype=np.uint8), timestamp=right_callback.timestamp, frame_number=right_callback.frame_number) + + synced_image = SyncedImageFrame(timestamp=right_callback.timestamp, left=left_callback, right=right_callback, center=None, pointcloud=points, pointcloud_color=points_rgb, depth=depth_callback.image) + + yield synced_image def stop(self): self.pipeline.stop() diff --git a/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py b/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py index cf56309..4ad72bc 100644 --- a/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py +++ b/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py @@ -45,7 +45,8 @@ def __init__( is_crop: bool, ai_models_to_use: list[AIModelWrapper], detect_aruco_marker_size: float|None, - is_open_camera: bool = True + is_open_camera: bool = True, + enable_pointcloud: bool = False ): """ `detect_aruco_marker_size`: Runs ArUco detection if a float >= 0.0 is provided. If length is 0.0, the ArUco markers will be detected, but distance will not be printed. If length > 0.0 and calibration is available, ArUco pose and L2 distance to the marker will be displayed. @@ -59,6 +60,7 @@ def __init__( self.detect_aruco_marker_size = detect_aruco_marker_size self.detect_aruco_dictionaries_to_detect = ArucoDictionary.all_1000() self.is_rotate = is_rotate + self.enable_pointcloud = enable_pointcloud self.ai_models: list[AIModelWrapper] = [] self.rectify_maps: "RectifyMaps | None" = None @@ -96,7 +98,7 @@ def __init__( def open_camera(self): if self.camera_type.is_synced_camera_type(): - self._camera = self.camera_type.start_synced(stop_event=self.stop_event) + self._camera = self.camera_type.start_synced(stop_event=self.stop_event,enable_pointcloud=self.enable_pointcloud) else: self._camera = self.camera_type.start(stop_event=self.stop_event) @@ -293,13 +295,17 @@ def get_frame_synced(self, is_run_pipeline: bool) -> Generator[SyncedImageFrame, break if is_run_pipeline and frame is not None: - left_pipeline_controller.run_pipeline(frame.left) - right_pipeline_controller.run_pipeline(frame.right) + if frame.left is not None and frame.left.image is not None and frame.left.image.shape[:2] != (1, 1): + left_pipeline_controller.run_pipeline(frame.left) + if frame.right is not None and frame.right.image is not None and frame.right.image.shape[:2] != (1, 1): + right_pipeline_controller.run_pipeline(frame.right) if frame.center is not None: center_pipeline_controller.run_pipeline(frame.center) - left_pipeline_controller.show_image(frame.left.image) - right_pipeline_controller.show_image(frame.right.image) + if frame.left is not None and frame.left.image is not None and frame.left.image.shape[:2] != (1, 1): + left_pipeline_controller.show_image(frame.left.image) + if frame.right is not None and frame.right.image is not None and frame.right.image.shape[:2] != (1, 1): + right_pipeline_controller.show_image(frame.right.image) if frame.center is not None: center_pipeline_controller.show_image(frame.center.image) @@ -372,7 +378,8 @@ def copy_for(self, camera_type: RGBCameras, is_open_camera: bool): is_crop=self.is_crop, ai_models_to_use=self.ai_models_to_use, detect_aruco_marker_size=self.detect_aruco_marker_size, - is_open_camera=is_open_camera + is_open_camera=is_open_camera, + enable_pointcloud=self.enable_pointcloud ) return copy @@ -478,7 +485,7 @@ class RGBPipelineControllerROS(RGBPipelineController): A specialized controller that leverages stretch_python_bridge's StreamManager for camera streams. This adapter allows using camera tools with ROS2 camera nodes. """ - def __init__(self, camera_type: "RGBCameras", recording_directory: str | None, show_image_in: "RecordRgbShowImageIn | None", is_rotate: bool, is_rectify: bool, is_crop: bool, ai_models_to_use: list[AIModelWrapper], detect_aruco_marker_size: float|None, is_open_camera: bool = True): + def __init__(self, camera_type: "RGBCameras", recording_directory: str | None, show_image_in: "RecordRgbShowImageIn | None", is_rotate: bool, is_rectify: bool, is_crop: bool, ai_models_to_use: list[AIModelWrapper], detect_aruco_marker_size: float|None, is_open_camera: bool = True, enable_pointcloud: bool = False): super().__init__( camera_type=camera_type, recording_directory=recording_directory, @@ -488,7 +495,8 @@ def __init__(self, camera_type: "RGBCameras", recording_directory: str | None, s is_crop=is_crop, ai_models_to_use=ai_models_to_use, detect_aruco_marker_size=detect_aruco_marker_size, - is_open_camera=False + is_open_camera=False, + enable_pointcloud=enable_pointcloud ) try: @@ -510,7 +518,8 @@ def copy_for(self, camera_type: "RGBCameras", is_open_camera: bool): is_crop=self.is_crop, ai_models_to_use=self.ai_models_to_use, detect_aruco_marker_size=self.detect_aruco_marker_size, - is_open_camera=is_open_camera + is_open_camera=is_open_camera, + enable_pointcloud=self.enable_pointcloud ) return copy diff --git a/stretch4_body/subsystem/cameras/emulated_rgbd.py b/stretch4_body/subsystem/cameras/emulated_rgbd.py index ce6cfe4..e7d56c2 100644 --- a/stretch4_body/subsystem/cameras/emulated_rgbd.py +++ b/stretch4_body/subsystem/cameras/emulated_rgbd.py @@ -16,6 +16,7 @@ stream_center_camera, stream_left_right_camera, stream_left_right_center_camera, + stream_gripper_camera, ) from stretch4_body.subsystem.cameras.models.image_frame import ( ImageFrame, @@ -515,3 +516,26 @@ def stream_left_right_center_rgbd(*, is_rotate=True, use_left_lidar=True, use_ri yield ret finally: streamer.stop() + + +def stream_gripper_rgbd(*, is_rotate=True, ai_models_to_use: list[AIModelWrapper]|None=None, detect_aruco_marker_size: float|None=None, use_ros_for_cameras:bool=False) -> Generator[RGBDFrame, None, None]: + for synced_frame in stream_gripper_camera(is_rotate=is_rotate, ai_models_to_use=ai_models_to_use, detect_aruco_marker_size=detect_aruco_marker_size, use_ros_for_cameras=use_ros_for_cameras, enable_pointcloud=True): + if synced_frame is None: + continue + image_frame = synced_frame.right + if image_frame is None: + continue + + pointcloud = synced_frame.pointcloud if synced_frame.pointcloud is not None else np.zeros((0, 3)) + pointcloud_colors = synced_frame.pointcloud_color if synced_frame.pointcloud_color is not None else np.zeros((0, 3)) + depth_image = synced_frame.depth if synced_frame.depth is not None else np.zeros((0, 0)) + + yield RGBDFrame( + timestamp=synced_frame.timestamp, + image_frame=image_frame, + camera_type=RGBCameras.gripper_rgbd, + pointcloud=pointcloud, + pointcloud_base=np.zeros((0, 3)), + pointcloud_colors=pointcloud_colors, + depth_image=depth_image + ) diff --git a/stretch4_body/subsystem/cameras/enums/rgb_camera.py b/stretch4_body/subsystem/cameras/enums/rgb_camera.py index d0f0b01..490ed52 100644 --- a/stretch4_body/subsystem/cameras/enums/rgb_camera.py +++ b/stretch4_body/subsystem/cameras/enums/rgb_camera.py @@ -129,7 +129,7 @@ def start(self, stop_event: threading.Event | None = None) -> "CameraAdapter": raise NotImplementedError(f"{self}'s start() method is not implemented.") - def start_synced(self, stop_event: "threading.Event | None" = None) -> "SyncedCamera": + def start_synced(self, stop_event: threading.Event | None = None, enable_pointcloud: bool = False) -> "SyncedCamera": """Use `start_synced()` to start sync'd frame grabbing.""" from stretch4_body.subsystem.cameras.adapters.luxonis_gripper_camera_adapter import ( GripperCameraLuxonis # import here to avoid circular import @@ -160,6 +160,7 @@ def start_synced(self, stop_event: "threading.Event | None" = None) -> "SyncedCa return GripperCameraLuxonis( RGBCameras.gripper_left.config, RGBCameras.gripper_right.config, + enable_pointcloud=enable_pointcloud, ) raise NotImplementedError(f"{self}'s start_synced() method is not implemented.") diff --git a/stretch4_body/subsystem/cameras/stream_cameras.py b/stretch4_body/subsystem/cameras/stream_cameras.py index 8fe7d13..8e5ed2f 100644 --- a/stretch4_body/subsystem/cameras/stream_cameras.py +++ b/stretch4_body/subsystem/cameras/stream_cameras.py @@ -21,7 +21,7 @@ def _start_camera(camera_type:RGBCameras, is_rotate:bool, is_rectify:bool, is_cr return rgb_pipeline_controller.get_frame(is_run_pipeline=is_run_pipeline) -def _start_synced_camera(camera_type:RGBCameras, is_rotate:bool, is_rectify:bool, is_crop: bool, ai_models_to_use: list[AIModelWrapper]|None, detect_aruco_marker_size: float|None, use_ros_for_cameras:bool=False, is_run_pipeline:bool=True) -> Generator[SyncedImageFrame, None, None]: +def _start_synced_camera(camera_type:RGBCameras, is_rotate:bool, is_rectify:bool, is_crop: bool, ai_models_to_use: list[AIModelWrapper]|None, detect_aruco_marker_size: float|None, use_ros_for_cameras:bool=False, is_run_pipeline:bool=True, enable_pointcloud:bool=False) -> Generator[SyncedImageFrame, None, None]: cls = RGBPipelineControllerROS if use_ros_for_cameras else RGBPipelineController rgb_pipeline_controller = cls( camera_type=camera_type, @@ -32,6 +32,7 @@ def _start_synced_camera(camera_type:RGBCameras, is_rotate:bool, is_rectify:bool is_crop=is_crop, ai_models_to_use=ai_models_to_use or [], detect_aruco_marker_size=detect_aruco_marker_size, + enable_pointcloud=enable_pointcloud, ) return rgb_pipeline_controller.get_frame_synced(is_run_pipeline=is_run_pipeline) @@ -57,9 +58,9 @@ def stream_left_right_center_camera(*, is_rotate:bool=True, is_rectify:bool=Fals """Stream the center, left and right head cameras""" return _start_synced_camera(camera_type=RGBCameras.head_left_right_center, is_rotate=is_rotate, is_rectify=is_rectify, is_crop=is_crop, ai_models_to_use=ai_models_to_use, detect_aruco_marker_size=detect_aruco_marker_size, use_ros_for_cameras=use_ros_for_cameras, is_run_pipeline=is_run_pipeline) -def stream_gripper_camera(*, is_rotate:bool=True, is_rectify:bool=False, is_crop: bool=False, ai_models_to_use: list[AIModelWrapper]|None=None, detect_aruco_marker_size: float|None=None, use_ros_for_cameras:bool=False, is_run_pipeline:bool=True) -> Generator[SyncedImageFrame, None, None]: +def stream_gripper_camera(*, is_rotate:bool=True, is_rectify:bool=False, is_crop: bool=False, ai_models_to_use: list[AIModelWrapper]|None=None, detect_aruco_marker_size: float|None=None, use_ros_for_cameras:bool=False, is_run_pipeline:bool=True, enable_pointcloud:bool=False) -> Generator[SyncedImageFrame, None, None]: """Stream the gripper RGBD camera""" - return _start_synced_camera(camera_type=RGBCameras.gripper_rgbd, is_rotate=is_rotate, is_rectify=is_rectify, is_crop=is_crop, ai_models_to_use=ai_models_to_use, detect_aruco_marker_size=detect_aruco_marker_size, use_ros_for_cameras=use_ros_for_cameras, is_run_pipeline=is_run_pipeline) + return _start_synced_camera(camera_type=RGBCameras.gripper_rgbd, is_rotate=is_rotate, is_rectify=is_rectify, is_crop=is_crop, ai_models_to_use=ai_models_to_use, detect_aruco_marker_size=detect_aruco_marker_size, use_ros_for_cameras=use_ros_for_cameras, is_run_pipeline=is_run_pipeline, enable_pointcloud=enable_pointcloud) if __name__ == "__main__": diff --git a/stretch4_body/tools/stretch_rgbd_show.py b/stretch4_body/tools/stretch_rgbd_show.py index 119603e..2c484e1 100644 --- a/stretch4_body/tools/stretch_rgbd_show.py +++ b/stretch4_body/tools/stretch_rgbd_show.py @@ -11,6 +11,7 @@ stream_center_rgbd, stream_left_right_rgbd, stream_left_right_center_rgbd, + stream_gripper_rgbd, RGBDFrame, EmulatedRGBDStreamer, ) @@ -27,6 +28,7 @@ def _parse_args(): parser.add_argument("-c", "--center", action="store_true", help="Display RGBD stream from center camera") parser.add_argument("-lr", "--left_right", action="store_true", help="Display RGBD streams from left and right cameras") parser.add_argument("-lrc", "--left_right_center", action="store_true", help="Display RGBD streams from all cameras") + parser.add_argument("-g", "--gripper", action="store_true", help="Display RGBD stream from gripper camera") # Lidar selection flags parser.add_argument("--lidar_left", action="store_true", help="Use left lidar") @@ -83,11 +85,12 @@ def main(): use_center = args.center use_left_right = args.left_right use_left_right_center = args.left_right_center + use_gripper = args.gripper use_ros_for_cameras = args.use_ros_for_cameras use_ros_for_lidars = args.use_ros_for_lidars - if not (use_left or use_right or use_center or use_left_right or use_left_right_center): + if not (use_left or use_right or use_center or use_left_right or use_left_right_center or use_gripper): use_left_right = True # Resolve lidar flags (both by default) @@ -145,13 +148,15 @@ def main(): camera_name = "center" elif use_right: camera_name = "right" + elif use_gripper: + camera_name = "gripper" blueprint = rrb.Blueprint( rrb.Horizontal( rrb.Vertical( rrb.Spatial2DView(name="Camera Rotated", origin=f"Cameras/{camera_name}_rotated"), rrb.Spatial2DView(name="Depth Camera", origin=f"Cameras/{camera_name}"), ), - rrb.Spatial3DView(name="Base Frame", origin="/", contents=["+ Pointclouds/base_frame/**"]), + rrb.Spatial3DView(name="Base Frame", origin="/", contents=["+ Pointclouds/base_frame/**", "+ Pointclouds/camera_frame/**"]), column_shares=[1,5] ), collapse_panels=True @@ -200,6 +205,11 @@ def print_loop_timer(): render_rgbd("center", frame) print_loop_timer() + elif use_gripper: + for frame in stream_gripper_rgbd(use_ros_for_cameras=use_ros_for_cameras): + render_rgbd("gripper", frame) + print_loop_timer() + except KeyboardInterrupt: print("Stopping... (Force quitting due to background threads)") os._exit(0) From 6252c30b99e4c8e7be2dba1eccc1214efda30f30 Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Thu, 11 Jun 2026 13:08:34 -0400 Subject: [PATCH 02/11] Changed to using non-blocking queue's --- .../subsystem/cameras/adapters/luxonis_camera_adapter.py | 2 +- .../cameras/adapters/luxonis_gripper_camera_adapter.py | 6 +++--- .../cameras/adapters/luxonis_synced_camera_adapter.py | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py index e0897ae..8d254fb 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py @@ -202,7 +202,7 @@ def open(self): pipeline=self.pipeline, camera_config=self.camera_config ) - self.output_queue = node_output.createOutputQueue(maxSize=1) + self.output_queue = node_output.createOutputQueue(maxSize=1, blocking=False) self.input_queue = self.camera_node.inputControl.createInputQueue() try: diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py index 909c23e..e69c5a7 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py @@ -54,9 +54,9 @@ def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig, enable_pointcl else: stereo, rgbd = LuxonisCameraAdapter.create_rgbd_node(self.pipeline, node_left, node_right) - self.right_output = node_right.createOutputQueue(maxSize=1) - self.depth_output = stereo.depth.createOutputQueue(maxSize=1) - self.pointcloud_output = rgbd.pcl.createOutputQueue(maxSize=1) + self.right_output = node_right.createOutputQueue(maxSize=1, blocking=False) + self.depth_output = stereo.depth.createOutputQueue(maxSize=1, blocking=False) + self.pointcloud_output = rgbd.pcl.createOutputQueue(maxSize=1, blocking=False) self.left_input_queue = self.left_camera_node.inputControl.createInputQueue() self.right_input_queue = self.right_camera_node.inputControl.createInputQueue() diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py index ac654df..b995ffd 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py @@ -35,11 +35,11 @@ def __init__(self, left: RGBCameraConfig, right:RGBCameraConfig, center:RGBCamer if center is not None: self.center_camera_node, node_center = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=center) - self.center_output = node_center.createOutputQueue(maxSize=1) + self.center_output = node_center.createOutputQueue(maxSize=1, blocking=False) self.center_input_queue = self.center_camera_node.inputControl.createInputQueue() - self.left_output = node_left.createOutputQueue(maxSize=1) - self.right_output = node_right.createOutputQueue(maxSize=1) + self.left_output = node_left.createOutputQueue(maxSize=1, blocking=False) + self.right_output = node_right.createOutputQueue(maxSize=1, blocking=False) self.left_input_queue = self.left_camera_node.inputControl.createInputQueue() self.right_input_queue = self.right_camera_node.inputControl.createInputQueue() From 753c88cf8edbb50946a285b3a6a174754e3828d7 Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Thu, 11 Jun 2026 13:29:12 -0400 Subject: [PATCH 03/11] Added a sync node to the luxonis camera pipeline --- .../adapters/luxonis_synced_camera_adapter.py | 87 +++++++++++-------- 1 file changed, 53 insertions(+), 34 deletions(-) diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py index b995ffd..cca1e78 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py @@ -35,15 +35,36 @@ def __init__(self, left: RGBCameraConfig, right:RGBCameraConfig, center:RGBCamer if center is not None: self.center_camera_node, node_center = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=center) - self.center_output = node_center.createOutputQueue(maxSize=1, blocking=False) self.center_input_queue = self.center_camera_node.inputControl.createInputQueue() - self.left_output = node_left.createOutputQueue(maxSize=1, blocking=False) - self.right_output = node_right.createOutputQueue(maxSize=1, blocking=False) - self.left_input_queue = self.left_camera_node.inputControl.createInputQueue() self.right_input_queue = self.right_camera_node.inputControl.createInputQueue() + if self.do_sync_frames: + sync = self.pipeline.create(dai.node.Sync) + buffer_size = left.buffer_size + sync.setNumFramesPool(buffer_size) + + sync.inputs["left"].setQueueSize(1) + sync.inputs["left"].setBlocking(False) + node_left.link(sync.inputs["left"]) + + sync.inputs["right"].setQueueSize(1) + sync.inputs["right"].setBlocking(False) + node_right.link(sync.inputs["right"]) + + if center is not None: + sync.inputs["center"].setQueueSize(1) + sync.inputs["center"].setBlocking(False) + node_center.link(sync.inputs["center"]) + + self.q_sync = sync.out.createOutputQueue(maxSize=1, blocking=False) + else: + if center is not None: + self.center_output = node_center.createOutputQueue(maxSize=1, blocking=False) + self.left_output = node_left.createOutputQueue(maxSize=1, blocking=False) + self.right_output = node_right.createOutputQueue(maxSize=1, blocking=False) + try: self.pipeline.start() except Exception: @@ -57,36 +78,34 @@ def get_frames(self): if not self.is_open(): raise RuntimeError("Camera is not running.") - while True: - if self.stop_event is not None and self.stop_event.is_set(): - return - - left_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.left_output, stop_event=self.stop_event)) - right_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.right_output, stop_event=self.stop_event)) - - center_frame = None - if self.center is not None: - center_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue_no_block(self.center_output)) - - synced_image = SyncedImageFrame(timestamp=time.time(), left=left_frame, right=right_frame, center=center_frame) - - if not self.do_sync_frames: - # If the user is not requesting synced images, return them as is. - yield synced_image - - # sync the left and right images, otherwise drop the frame if they are not synced. - left_right_diff = abs(left_frame.frame_number - right_frame.frame_number) - if left_right_diff <= 4: - if synced_image.center is not None: - # sync left and center based on timestamp - left_center_diff = abs(left_frame.timestamp - synced_image.center.timestamp) - if left_center_diff > 1.5/self.center.fps: # A looser tolerance for left-center sync to account for the different requested FPS's - logging.warning(f"Center frame is not synced, ignoring center frame. Off by {left_center_diff:.3f}s") - synced_image.center = None - - yield synced_image - else: - logging.warn(f"Left and Right frames are not synced, dropping synced frame. Off by {left_right_diff} frames.") + if self.do_sync_frames: + while True: + if self.stop_event is not None and self.stop_event.is_set(): + return + msg_group = self.q_sync.get() + left_msg = msg_group["left"] + right_msg = msg_group["right"] + + left_frame = LuxonisCameraAdapter.dai_message_to_image_frame(left_msg) + right_frame = LuxonisCameraAdapter.dai_message_to_image_frame(right_msg) + + center_frame = None + if self.center is not None: + center_msg = msg_group["center"] + if center_msg is not None: + center_frame = LuxonisCameraAdapter.dai_message_to_image_frame(center_msg) + + yield SyncedImageFrame(timestamp=time.time(), left=left_frame, right=right_frame, center=center_frame) + else: + while True: + left_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.left_output)) + right_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.right_output)) + + center_frame = None + if self.center is not None: + center_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue_no_block(self.center_output)) + + yield SyncedImageFrame(timestamp=time.time(), left=left_frame, right=right_frame, center=center_frame) def stop(self): try: From c49cf7cb74127ea674eac5e5a02e9cb6bfab449a Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Thu, 11 Jun 2026 10:35:41 -0700 Subject: [PATCH 04/11] Fixed bug with setNumFramesPool not being available --- .../cameras/adapters/luxonis_synced_camera_adapter.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py index cca1e78..00b7261 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py @@ -43,18 +43,17 @@ def __init__(self, left: RGBCameraConfig, right:RGBCameraConfig, center:RGBCamer if self.do_sync_frames: sync = self.pipeline.create(dai.node.Sync) buffer_size = left.buffer_size - sync.setNumFramesPool(buffer_size) - sync.inputs["left"].setQueueSize(1) + sync.inputs["left"].setMaxSize(1) sync.inputs["left"].setBlocking(False) node_left.link(sync.inputs["left"]) - sync.inputs["right"].setQueueSize(1) + sync.inputs["right"].setMaxSize(1) sync.inputs["right"].setBlocking(False) node_right.link(sync.inputs["right"]) if center is not None: - sync.inputs["center"].setQueueSize(1) + sync.inputs["center"].setMaxSize(1) sync.inputs["center"].setBlocking(False) node_center.link(sync.inputs["center"]) From 518d6d3102021cdfe8f026036de43165aec836a4 Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Thu, 11 Jun 2026 11:06:28 -0700 Subject: [PATCH 05/11] Added an asynchronous logger for Rerun that offloads CPU-intensive tasks to a background thread to maintain high frame rates. --- .../controllers/camera_pipeline_controller.py | 15 +++-- .../subsystem/cameras/rerun_utils.py | 65 +++++++++++++++++++ stretch4_body/tools/stretch_rgbd_show.py | 22 +++++-- 3 files changed, 93 insertions(+), 9 deletions(-) create mode 100644 stretch4_body/subsystem/cameras/rerun_utils.py diff --git a/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py b/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py index 4ad72bc..f991f24 100644 --- a/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py +++ b/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py @@ -23,6 +23,9 @@ from stretch4_body.subsystem.cameras.enums.rgb_camera import RGBCameras from stretch4_body.subsystem.cameras.models.image_write_to_disk import RgbImageToWriteToDisk, add_image_to_save_queue, create_directory_if_it_does_not_exist, saver_thread from stretch4_body.subsystem.cameras.models.image_frame import ImageFrame, SyncedImageFrame +from stretch4_body.subsystem.cameras.rerun_utils import RerunAsyncLogger + + class RecordRgbShowImageIn(Enum): @@ -83,6 +86,8 @@ def __init__( target=saver_thread, args=(self.stop_event, self.save_rgb_queue), daemon=True ) + self.rerun_logger: RerunAsyncLogger | None = None + self.save_directory = None if self.recording_directory: if not self.camera_type.is_synced_camera_type(): @@ -109,10 +114,10 @@ def camera(self): return self._camera def _show_rererun(self, color_image: np.ndarray): - rr.log( - f"{self.camera_type.name.upper()} Camera", - rr.Image(color_image, color_model="BGR").compress(), - ) + if self.rerun_logger is None: + self.rerun_logger = RerunAsyncLogger(camera_name=self.camera_type.name) + + self.rerun_logger.log_image(f"{self.camera_type.name.upper()} Camera", color_image) def _show_cvimshow(self, color_image: np.ndarray): cv2.namedWindow(self.camera_type.name, cv2.WINDOW_NORMAL) @@ -479,6 +484,8 @@ def stop(self): self._camera.stop() if self.save_thread.is_alive(): self.save_thread.join(timeout=5) + if self.rerun_logger is not None: + self.rerun_logger.stop() class RGBPipelineControllerROS(RGBPipelineController): """ diff --git a/stretch4_body/subsystem/cameras/rerun_utils.py b/stretch4_body/subsystem/cameras/rerun_utils.py new file mode 100644 index 0000000..554f7ac --- /dev/null +++ b/stretch4_body/subsystem/cameras/rerun_utils.py @@ -0,0 +1,65 @@ +import threading +import queue +import cv2 +import rerun as rr +import numpy as np + +class RerunAsyncLogger: + """ + An asynchronous logger for Rerun that offloads CPU-intensive tasks like + image compression to a background thread to maintain high frame rates. + """ + def __init__(self, camera_name: str, queue_size: int = 10): + self.camera_name = camera_name.upper() + self.queue = queue.Queue(maxsize=queue_size) + self.stop_event = threading.Event() + self.thread = threading.Thread(target=self._worker, daemon=True) + self.thread.start() + + def _worker(self): + while not self.stop_event.is_set(): + try: + # Task format: (entity_path, data, type) + task = self.queue.get(timeout=0.1) + if task is None: + continue + + path, data, task_type = task + + if task_type == "image": + # Use OpenCV for faster encoding. Quality 80 is a good balance. + success, jpeg_bytes = cv2.imencode('.jpg', data, [cv2.IMWRITE_JPEG_QUALITY, 80]) + if success: + rr.log( + path, + rr.EncodedImage(contents=jpeg_bytes.tobytes(), media_type="image/jpeg"), + ) + elif task_type == "direct": + # For DepthImage, Points3D, etc. that don't need host-side compression + rr.log(path, data) + + except queue.Empty: + continue + except Exception as e: + import logging + logging.error(f"Error in RerunAsyncLogger worker for {self.camera_name}: {e}") + + def log_image(self, path: str, image: np.ndarray): + """Queue an image for asynchronous compression and logging.""" + try: + self.queue.put( (path, image, "image"), block=True) + except Exception: + pass + + def log_any(self, path: str, archetype: "Any"): + """Queue any Rerun archetype for asynchronous logging.""" + try: + self.queue.put( (path, archetype, "direct"), block=True) + except Exception: + pass + + def stop(self, timeout: float = 2.0): + """Stop the background thread.""" + self.stop_event.set() + if self.thread.is_alive(): + self.thread.join(timeout=timeout) diff --git a/stretch4_body/tools/stretch_rgbd_show.py b/stretch4_body/tools/stretch_rgbd_show.py index 2c484e1..252cd41 100644 --- a/stretch4_body/tools/stretch_rgbd_show.py +++ b/stretch4_body/tools/stretch_rgbd_show.py @@ -15,6 +15,14 @@ RGBDFrame, EmulatedRGBDStreamer, ) +from stretch4_body.subsystem.cameras.rerun_utils import RerunAsyncLogger + +rerun_loggers: dict[str, RerunAsyncLogger] = {} + +def get_rerun_logger(c_name: str) -> RerunAsyncLogger: + if c_name not in rerun_loggers: + rerun_loggers[c_name] = RerunAsyncLogger(camera_name=c_name) + return rerun_loggers[c_name] def _parse_args(): @@ -56,19 +64,21 @@ def _parse_args(): def render_rgbd(c_name: str, frame: RGBDFrame): - rr.log(f"Cameras/{c_name}_rotated", rr.Image(frame.image_frame.image, color_model="BGR").compress()) - rr.log(f"Cameras/{c_name}/rgb_raw", rr.Image(frame.image_frame.image_raw, color_model="BGR").compress()) + logger = get_rerun_logger(c_name) + + logger.log_image(f"Cameras/{c_name}_rotated", frame.image_frame.image) + logger.log_image(f"Cameras/{c_name}/rgb_raw", frame.image_frame.image_raw) if frame.depth_image is not None and frame.depth_image.shape[0] > 0: - rr.log(f"Cameras/{c_name}/depth", rr.DepthImage(frame.depth_image, meter=1.0)) + logger.log_any(f"Cameras/{c_name}/depth", rr.DepthImage(frame.depth_image, meter=1.0)) if len(frame.pointcloud) > 0: - rr.log( + logger.log_any( f"Pointclouds/camera_frame/{c_name}", rr.Points3D(frame.pointcloud, colors=frame.pointcloud_colors, radii=[0.0025]), ) if len(frame.pointcloud_base) > 0: - rr.log( + logger.log_any( f"Pointclouds/base_frame/{c_name}", rr.Points3D(frame.pointcloud_base, colors=frame.pointcloud_colors, radii=[0.0025]), ) @@ -217,6 +227,8 @@ def print_loop_timer(): print(f"Stopping due to error: {e=}") raise e finally: + for logger in rerun_loggers.values(): + logger.stop() EmulatedRGBDStreamer.get_instance().stop() From 84edff62447f226eba00c9072bb95cd8fa6cde03 Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Thu, 11 Jun 2026 11:13:30 -0700 Subject: [PATCH 06/11] Updated luxonis camera adapters to use the robot params --- stretch4_body/robot/robot_params_SE4.py | 16 ++++++++++------ .../adapters/luxonis_gripper_camera_adapter.py | 12 ++++++------ .../adapters/luxonis_synced_camera_adapter.py | 2 +- .../subsystem/cameras/enums/rgb_camera.py | 4 +++- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/stretch4_body/robot/robot_params_SE4.py b/stretch4_body/robot/robot_params_SE4.py index 45bfe89..d5fbeab 100644 --- a/stretch4_body/robot/robot_params_SE4.py +++ b/stretch4_body/robot/robot_params_SE4.py @@ -1359,7 +1359,7 @@ "image_size": (1200, 1920), "fps": 30, "rotate_number_of_times": 1, - "buffer_size": 2, + "buffer_size": 1, "is_compressed": False, "is_lossless": False, # Only used if is_compressed is true "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False @@ -1377,7 +1377,7 @@ "image_size": (1200, 1920), "fps": 30, "rotate_number_of_times": -1, - "buffer_size": 2, + "buffer_size": 1, "is_compressed": False, "is_lossless": False, # Only used if is_compressed is true "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False @@ -1396,7 +1396,7 @@ "image_size": (3040, 4032), # Almost full 12MP resolution, 24 pixels subtracted to be divisible by 16 for compression "fps": 10, "rotate_number_of_times": -1, - "buffer_size": 2, + "buffer_size": 1, "is_compressed": False, "is_lossless": False, # Only used if is_compressed is true "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False @@ -1416,7 +1416,7 @@ # "image_size": (800, 1280), "fps": 30, "rotate_number_of_times": 0, - "buffer_size": 2, + "buffer_size": 1, "is_compressed": False, "is_lossless": False, # Only used if is_compressed is true "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False @@ -1424,7 +1424,9 @@ "use_auto_exposure": True, "limit_max": None, # Only used if use_auto_exposure is True "exposure_time": None, # Only used if use_auto_exposure is False - "iso": None # Only used if use_auto_exposure is False + "iso": None, # Only used if use_auto_exposure is False + "sync_threshold_ms": 15.0, + "stereo_max_range_mm": 10000.0 } }, 'gripper_right': { @@ -1443,7 +1445,9 @@ "use_auto_exposure": True, "limit_max": None, # Only used if use_auto_exposure is True "exposure_time": None, # Only used if use_auto_exposure is False - "iso": None # Only used if use_auto_exposure is False + "iso": None, # Only used if use_auto_exposure is False + "sync_threshold_ms": 15.0, + "stereo_max_range_mm": 10000.0 } }, }, diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py index e69c5a7..3be0093 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py @@ -38,25 +38,25 @@ def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig, enable_pointcl stereo = self.pipeline.create(dai.node.StereoDepth) stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.ROBOTICS) stereo.setDepthAlign(dai.CameraBoardSocket.CAM_C) - stereo.initialConfig.postProcessing.thresholdFilter.maxRange = 10000 + stereo.initialConfig.postProcessing.thresholdFilter.maxRange = self.right.stereo_max_range_mm or 10000.0 node_left.link(stereo.left) node_right.link(stereo.right) import datetime sync = self.pipeline.create(dai.node.Sync) - sync.setSyncThreshold(datetime.timedelta(milliseconds=15)) + sync.setSyncThreshold(datetime.timedelta(milliseconds=self.right.sync_threshold_ms or 15)) node_right.link(sync.inputs["right"]) stereo.depth.link(sync.inputs["depth"]) - self.q_sync = sync.out.createOutputQueue(maxSize=1, blocking=False) + self.q_sync = sync.out.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) else: stereo, rgbd = LuxonisCameraAdapter.create_rgbd_node(self.pipeline, node_left, node_right) - self.right_output = node_right.createOutputQueue(maxSize=1, blocking=False) - self.depth_output = stereo.depth.createOutputQueue(maxSize=1, blocking=False) - self.pointcloud_output = rgbd.pcl.createOutputQueue(maxSize=1, blocking=False) + self.right_output = node_right.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) + self.depth_output = stereo.depth.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) + self.pointcloud_output = rgbd.pcl.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) self.left_input_queue = self.left_camera_node.inputControl.createInputQueue() self.right_input_queue = self.right_camera_node.inputControl.createInputQueue() diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py index 00b7261..7d549d1 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py @@ -57,7 +57,7 @@ def __init__(self, left: RGBCameraConfig, right:RGBCameraConfig, center:RGBCamer sync.inputs["center"].setBlocking(False) node_center.link(sync.inputs["center"]) - self.q_sync = sync.out.createOutputQueue(maxSize=1, blocking=False) + self.q_sync = sync.out.createOutputQueue(maxSize=buffer_size, blocking=False) else: if center is not None: self.center_output = node_center.createOutputQueue(maxSize=1, blocking=False) diff --git a/stretch4_body/subsystem/cameras/enums/rgb_camera.py b/stretch4_body/subsystem/cameras/enums/rgb_camera.py index 490ed52..a02a85a 100644 --- a/stretch4_body/subsystem/cameras/enums/rgb_camera.py +++ b/stretch4_body/subsystem/cameras/enums/rgb_camera.py @@ -30,6 +30,8 @@ class RGBCameraConfig: limit_max: int | None = None exposure_time: int | None = None iso: int | None = None + sync_threshold_ms: float | None = None + stereo_max_range_mm: float | None = None class CameraDevice(Device): """Sets up a Stretch Body camera device to pull params from robot params.""" @@ -45,7 +47,7 @@ def get_config(self, camera_type: "RGBCameras") -> "RGBCameraConfig": config = RGBCameraConfig(**config_dict) config.distortion_model = DistortionModels[config_dict["distortion_model"].replace("DistortionModels.", "")] if config_dict["distortion_model"] is not None else None - + return config class RGBCameras(Enum): From 1a313c7ca59ea1e161fda44eefb71dfa7aaf5b74 Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Thu, 11 Jun 2026 11:17:01 -0700 Subject: [PATCH 07/11] Left/right maintain 30fps while center camera is 10fps when all three are used --- .../cameras/adapters/luxonis_synced_camera_adapter.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py index 7d549d1..ae9a7a6 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py @@ -52,10 +52,9 @@ def __init__(self, left: RGBCameraConfig, right:RGBCameraConfig, center:RGBCamer sync.inputs["right"].setBlocking(False) node_right.link(sync.inputs["right"]) + # CENTER is NOT in sync to keep it at its own 10fps while left/right are at 30fps if center is not None: - sync.inputs["center"].setMaxSize(1) - sync.inputs["center"].setBlocking(False) - node_center.link(sync.inputs["center"]) + self.center_output = node_center.createOutputQueue(maxSize=1, blocking=False) self.q_sync = sync.out.createOutputQueue(maxSize=buffer_size, blocking=False) else: @@ -90,9 +89,8 @@ def get_frames(self): center_frame = None if self.center is not None: - center_msg = msg_group["center"] - if center_msg is not None: - center_frame = LuxonisCameraAdapter.dai_message_to_image_frame(center_msg) + # Get the LATEST center frame, but don't block + center_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue_no_block(self.center_output)) yield SyncedImageFrame(timestamp=time.time(), left=left_frame, right=right_frame, center=center_frame) else: From 5ca83890d146d4e9081f04584576f5e3889476ed Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Thu, 11 Jun 2026 11:30:06 -0700 Subject: [PATCH 08/11] Ensures when a center frame is available, it's synced with left/right frames too --- .../luxonis_gripper_camera_adapter.py | 4 +-- .../adapters/luxonis_synced_camera_adapter.py | 35 +++++++++++++++++-- .../subsystem/cameras/enums/rgb_camera.py | 4 +-- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py index 3be0093..084d5e6 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py @@ -38,14 +38,14 @@ def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig, enable_pointcl stereo = self.pipeline.create(dai.node.StereoDepth) stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.ROBOTICS) stereo.setDepthAlign(dai.CameraBoardSocket.CAM_C) - stereo.initialConfig.postProcessing.thresholdFilter.maxRange = self.right.stereo_max_range_mm or 10000.0 + stereo.initialConfig.postProcessing.thresholdFilter.maxRange = self.right.stereo_max_range_mm node_left.link(stereo.left) node_right.link(stereo.right) import datetime sync = self.pipeline.create(dai.node.Sync) - sync.setSyncThreshold(datetime.timedelta(milliseconds=self.right.sync_threshold_ms or 15)) + sync.setSyncThreshold(datetime.timedelta(milliseconds=self.right.sync_threshold_ms)) node_right.link(sync.inputs["right"]) stereo.depth.link(sync.inputs["depth"]) diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py index ae9a7a6..ed18ee8 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py @@ -5,6 +5,7 @@ import time import threading import logging +import collections import depthai as dai from stretch4_body.subsystem.cameras.cv_utils import RectifyMaps from stretch4_body.subsystem.cameras.enums.rgb_camera import RGBCameraConfig, RGBCameras @@ -39,6 +40,7 @@ def __init__(self, left: RGBCameraConfig, right:RGBCameraConfig, center:RGBCamer self.left_input_queue = self.left_camera_node.inputControl.createInputQueue() self.right_input_queue = self.right_camera_node.inputControl.createInputQueue() + self.center_buffer = collections.deque(maxlen=10) if self.do_sync_frames: sync = self.pipeline.create(dai.node.Sync) @@ -89,8 +91,37 @@ def get_frames(self): center_frame = None if self.center is not None: - # Get the LATEST center frame, but don't block - center_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue_no_block(self.center_output)) + # Drain center queue into buffer + while True: + c_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue_no_block(self.center_output)) + if c_frame is None: + break + self.center_buffer.append(c_frame) + + # Find best match in center_buffer to keep center synced with left/right frames + if self.center_buffer: + lr_timestamp = left_msg.getTimestamp().total_seconds() + + # Find the frame with the closest timestamp + # We also discard frames that are significantly older than the current LR frame + best_diff = float('inf') + best_idx = -1 + for i, c_f in enumerate(self.center_buffer): + diff = abs(c_f.timestamp - lr_timestamp) + if diff < best_diff: + best_diff = diff + best_idx = i + + # If a match is found within a reasonable window + if best_idx != -1 and best_diff < 1/self.center.fps: + center_frame = self.center_buffer[best_idx] + # Discard the used frame and all older ones + for _ in range(best_idx + 1): + self.center_buffer.popleft() + elif best_idx != -1 and self.center_buffer[0].timestamp < lr_timestamp - 0.1: + # Discard frames that are way too old + while self.center_buffer and self.center_buffer[0].timestamp < lr_timestamp - 0.1: + self.center_buffer.popleft() yield SyncedImageFrame(timestamp=time.time(), left=left_frame, right=right_frame, center=center_frame) else: diff --git a/stretch4_body/subsystem/cameras/enums/rgb_camera.py b/stretch4_body/subsystem/cameras/enums/rgb_camera.py index a02a85a..25db945 100644 --- a/stretch4_body/subsystem/cameras/enums/rgb_camera.py +++ b/stretch4_body/subsystem/cameras/enums/rgb_camera.py @@ -30,8 +30,8 @@ class RGBCameraConfig: limit_max: int | None = None exposure_time: int | None = None iso: int | None = None - sync_threshold_ms: float | None = None - stereo_max_range_mm: float | None = None + sync_threshold_ms: float = 15.0 + stereo_max_range_mm: float = 10000.0 class CameraDevice(Device): """Sets up a Stretch Body camera device to pull params from robot params.""" From 7d6d2a15e215dd4b37ca323b39ae9dc2d254b7dc Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Thu, 11 Jun 2026 11:44:17 -0700 Subject: [PATCH 09/11] Fixed gripper camera params wrong data type crashing the pipeline --- .../cameras/adapters/luxonis_gripper_camera_adapter.py | 4 ++-- stretch4_body/subsystem/cameras/enums/rgb_camera.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py index 084d5e6..8a58d4e 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py @@ -38,14 +38,14 @@ def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig, enable_pointcl stereo = self.pipeline.create(dai.node.StereoDepth) stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.ROBOTICS) stereo.setDepthAlign(dai.CameraBoardSocket.CAM_C) - stereo.initialConfig.postProcessing.thresholdFilter.maxRange = self.right.stereo_max_range_mm + stereo.initialConfig.postProcessing.thresholdFilter.maxRange = int(self.right.stereo_max_range_mm) node_left.link(stereo.left) node_right.link(stereo.right) import datetime sync = self.pipeline.create(dai.node.Sync) - sync.setSyncThreshold(datetime.timedelta(milliseconds=self.right.sync_threshold_ms)) + sync.setSyncThreshold(datetime.timedelta(milliseconds=int(self.right.sync_threshold_ms))) node_right.link(sync.inputs["right"]) stereo.depth.link(sync.inputs["depth"]) diff --git a/stretch4_body/subsystem/cameras/enums/rgb_camera.py b/stretch4_body/subsystem/cameras/enums/rgb_camera.py index 25db945..e51b702 100644 --- a/stretch4_body/subsystem/cameras/enums/rgb_camera.py +++ b/stretch4_body/subsystem/cameras/enums/rgb_camera.py @@ -30,8 +30,8 @@ class RGBCameraConfig: limit_max: int | None = None exposure_time: int | None = None iso: int | None = None - sync_threshold_ms: float = 15.0 - stereo_max_range_mm: float = 10000.0 + sync_threshold_ms: int = 15 + stereo_max_range_mm: int = 10000 class CameraDevice(Device): """Sets up a Stretch Body camera device to pull params from robot params.""" From 8bc23817d03682982bb2162ec5c8ffe107b53b81 Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Thu, 11 Jun 2026 12:06:24 -0700 Subject: [PATCH 10/11] Updated rerun blueprints for gripper camera --- .../controllers/camera_pipeline_controller.py | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py b/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py index f991f24..afc783b 100644 --- a/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py +++ b/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py @@ -117,7 +117,7 @@ def _show_rererun(self, color_image: np.ndarray): if self.rerun_logger is None: self.rerun_logger = RerunAsyncLogger(camera_name=self.camera_type.name) - self.rerun_logger.log_image(f"{self.camera_type.name.upper()} Camera", color_image) + self.rerun_logger.log_image(f"Cameras/{self.camera_type.name}/image", color_image) def _show_cvimshow(self, color_image: np.ndarray): cv2.namedWindow(self.camera_type.name, cv2.WINDOW_NORMAL) @@ -229,21 +229,38 @@ def run_pipeline(self, frame: ImageFrame): def _start_rerun(self): rr.init(self.camera_type.name, spawn=False) rr.spawn(memory_limit="4GB") - + import rerun.blueprint as rrb + from stretch4_body.subsystem.cameras.enums.rgb_camera import RGBCameras - if self.camera_type.is_synced_camera_type(): + if self.camera_type == RGBCameras.gripper_rgbd: + blueprint = rrb.Blueprint( + rrb.Horizontal( + rrb.Spatial2DView(name="Gripper Depth", origin="Cameras/gripper_rgbd/depth"), + rrb.Spatial2DView(name="Gripper Right", origin="Cameras/gripper_right/image"), + ), + collapse_panels=True + ) + elif self.camera_type == RGBCameras.head_left_right: + blueprint = rrb.Blueprint( + rrb.Horizontal( + rrb.Spatial2DView(name="Head Left", origin="Cameras/head_left/image"), + rrb.Spatial2DView(name="Head Right", origin="Cameras/head_right/image"), + ), + collapse_panels=True + ) + elif self.camera_type == RGBCameras.head_left_right_center: blueprint = rrb.Blueprint( rrb.Horizontal( - rrb.Spatial2DView(name="Left Camera", origin="HEAD_LEFT Camera"), - rrb.Spatial2DView(name="Center Camera", origin="HEAD_CENTER Camera"), - rrb.Spatial2DView(name="Right Camera", origin="HEAD_RIGHT Camera"), + rrb.Spatial2DView(name="Head Left", origin="Cameras/head_left/image"), + rrb.Spatial2DView(name="Head Center", origin="Cameras/head_center/image"), + rrb.Spatial2DView(name="Head Right", origin="Cameras/head_right/image"), ), collapse_panels=True ) else: blueprint = rrb.Blueprint( - rrb.Spatial2DView(name=f"{self.camera_type.name} Camera", origin=f"{self.camera_type.name.upper()} Camera"), + rrb.Spatial2DView(name=self.camera_type.name.upper(), origin=f"Cameras/{self.camera_type.name}/image"), collapse_panels=True ) rr.send_blueprint(blueprint) @@ -316,19 +333,20 @@ def get_frame_synced(self, is_run_pipeline: bool) -> Generator[SyncedImageFrame, center_pipeline_controller.show_image(frame.center.image) if frame.depth is not None and self.show_image_in is RecordRgbShowImageIn.RERUN: - rr.log( - f"{self.camera_type.name}/depth", + if self.rerun_logger is None: + self.rerun_logger = RerunAsyncLogger(camera_name=self.camera_type.name) + self.rerun_logger.log_any( + f"Cameras/{self.camera_type.name}/depth", rr.DepthImage(frame.depth), ) if frame.pointcloud is not None and self.show_image_in is RecordRgbShowImageIn.RERUN: - rr.log( - f"{self.camera_type.name}/pcl", - rr.Transform3D( - rotation=rr.RotationAxisAngle(axis=[0, 0, 1], angle=rr.Angle(rad=np.pi)), - ) + if self.rerun_logger is None: + self.rerun_logger = RerunAsyncLogger(camera_name=self.camera_type.name) + self.rerun_logger.log_any( + f"Pointclouds/{self.camera_type.name}", + rr.Points3D(frame.pointcloud, colors=frame.pointcloud_colors, radii=[0.0025]), ) - rr.log(f"{self.camera_type.name}/pcl", rr.Points3D(frame.pointcloud, colors=frame.pointcloud_color, radii=[0.002])) yield frame From 76a8599c0e471d81dc7004743e299f4ecd81a78c Mon Sep 17 00:00:00 2001 From: Shehab Attia Date: Mon, 22 Jun 2026 15:52:47 -0700 Subject: [PATCH 11/11] Changed left/right gripper camera to use compressed, made compressed imagery work with synced pipelines --- stretch4_body/robot/robot_params_SE4.py | 10 +- .../adapters/luxonis_camera_adapter.py | 37 ++-- .../luxonis_gripper_camera_adapter.py | 158 +++++++++++------- .../adapters/luxonis_synced_camera_adapter.py | 84 ++++------ .../controllers/camera_pipeline_controller.py | 12 +- 5 files changed, 150 insertions(+), 151 deletions(-) diff --git a/stretch4_body/robot/robot_params_SE4.py b/stretch4_body/robot/robot_params_SE4.py index d5fbeab..858272a 100644 --- a/stretch4_body/robot/robot_params_SE4.py +++ b/stretch4_body/robot/robot_params_SE4.py @@ -1417,9 +1417,9 @@ "fps": 30, "rotate_number_of_times": 0, "buffer_size": 1, - "is_compressed": False, + "is_compressed": True, "is_lossless": False, # Only used if is_compressed is true - "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False + "jpeg_quality": 80, # Only used if is_compressed is true and is_lossless is False "distortion_model": None, "use_auto_exposure": True, "limit_max": None, # Only used if use_auto_exposure is True @@ -1437,10 +1437,10 @@ # "image_size": (800, 1280), "fps": 30, "rotate_number_of_times": 0, - "buffer_size": 2, - "is_compressed": False, + "buffer_size": 1, + "is_compressed": True, "is_lossless": False, # Only used if is_compressed is true - "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False + "jpeg_quality": 80, # Only used if is_compressed is true and is_lossless is False "distortion_model": None, "use_auto_exposure": True, "limit_max": None, # Only used if use_auto_exposure is True diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py index 8d254fb..d2e6c81 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py @@ -108,7 +108,7 @@ def get_depthai_camera_socket(camera_type: RGBCameras): raise Exception(f"{camera_type} is not supported as a Luxonis device.") @staticmethod - def create_camera_node(pipeline: dai.Pipeline, camera_config: RGBCameraConfig) -> tuple[dai.node.Camera, dai.Node.Output]: + def create_camera_node(pipeline: dai.Pipeline, camera_config: RGBCameraConfig) -> tuple[dai.node.Camera, dai.Node.Output, dai.Node.Output|None]: """ Takes a dai.Pipeline reference and adds a camera node to it. """ @@ -144,6 +144,7 @@ def create_camera_node(pipeline: dai.Pipeline, camera_config: RGBCameraConfig) - ) print("camera_config:", camera_config) + camera_output_compressed = None if camera_config.is_compressed: videoEncoder = pipeline.create(dai.node.VideoEncoder) videoEncoder.setNumFramesPool(buffer_size) @@ -154,9 +155,9 @@ def create_camera_node(pipeline: dai.Pipeline, camera_config: RGBCameraConfig) - lossless=camera_config.is_lossless, quality=camera_config.jpeg_quality ) - camera_output = videoEncoder.bitstream + camera_output_compressed = videoEncoder.bitstream - return node, camera_output + return node, camera_output, camera_output_compressed @staticmethod def create_pipeline(luxonis_device_product_name:str) -> tuple[dai.Pipeline, dai.Device]: @@ -171,25 +172,6 @@ def create_pipeline(luxonis_device_product_name:str) -> tuple[dai.Pipeline, dai. pipeline.setXLinkChunkSize(0) return pipeline, device - - @staticmethod - def create_rgbd_node(pipeline: dai.Pipeline, left_rgb_output: dai.Node.Output, right_rgb_out: dai.Node.Output): - - stereo = pipeline.create(dai.node.StereoDepth).build(left_rgb_output, right_rgb_out) - rgbd = pipeline.create(dai.node.RGBD) - - stereo.setRectifyEdgeFillColor(0) - stereo.enableDistortionCorrection(True) - # https://docs.luxonis.com/software-v3/depthai/depthai-components/nodes/stereo_depth - stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.ROBOTICS) - stereo.initialConfig.postProcessing.thresholdFilter.maxRange = 1000 * 5 # in mm - rgbd.setDepthUnits(dai.StereoDepthConfig.AlgorithmControl.DepthUnit.METER) - - stereo.syncedLeft.link(rgbd.inColor) - stereo.depth.link(rgbd.inDepth) - left_rgb_output.link(stereo.inputAlignTo) - - return stereo, rgbd def is_open(self): return self.pipeline is not None and self.device is not None and self.pipeline.isRunning() and not self.device.isClosed() @@ -198,11 +180,13 @@ def open(self): self.pipeline, self.device = LuxonisCameraAdapter.create_pipeline(self.camera_config.camera_device) self.camera = self.pipeline - self.camera_node, node_output = LuxonisCameraAdapter.create_camera_node( + self.camera_node, node_output, node_output_compressed = LuxonisCameraAdapter.create_camera_node( pipeline=self.pipeline, camera_config=self.camera_config ) - self.output_queue = node_output.createOutputQueue(maxSize=1, blocking=False) + output_node = node_output_compressed if node_output_compressed is not None else node_output + + self.output_queue = output_node.createOutputQueue(maxSize=1, blocking=False) self.input_queue = self.camera_node.inputControl.createInputQueue() try: @@ -256,11 +240,10 @@ def get_frame_from_output_queue( @staticmethod def get_frame_from_output_queue_no_block( - output_queue: dai.MessageQueue, - timeout: float = 1/120 + output_queue: dai.MessageQueue ): while True: - message: dai.ImgFrame | None = output_queue.get(timeout=timeout) + message: dai.ImgFrame | None = output_queue.tryGet() if message: yield LuxonisCameraAdapter.dai_message_to_image_frame(message) else: diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py index 8a58d4e..6ed778b 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py @@ -12,6 +12,7 @@ from stretch4_body.subsystem.cameras.models.image_frame import SyncedImageFrame, ImageFrame import dataclasses import numpy as np +import datetime class GripperCameraLuxonis(SyncedCamera): @@ -29,34 +30,54 @@ def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig, enable_pointcl self.pipeline, self.device = LuxonisCameraAdapter.create_pipeline(left.camera_device) self.camera = self.pipeline - left_cfg = dataclasses.replace(left, is_compressed=False) + self.left_camera_node, node_left, node_left_compressed = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=left) + self.right_camera_node, node_right, node_right_compressed = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=right) - self.left_camera_node, node_left = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=left_cfg) - self.right_camera_node, node_right = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=right) + stereo = self.pipeline.create(dai.node.StereoDepth) + # stereo.setRectifyEdgeFillColor(0) + # stereo.enableDistortionCorrection(True) + # https://docs.luxonis.com/software-v3/depthai/depthai-components/nodes/stereo_depth + stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.ROBOTICS) + stereo.setDepthAlign(LuxonisCameraAdapter.get_depthai_camera_socket(self.right.camera_type)) # Align to right camera + stereo.initialConfig.postProcessing.thresholdFilter.maxRange = int(self.right.stereo_max_range_mm) + + node_left.link(stereo.left) + node_right.link(stereo.right) - if not self.enable_pointcloud: - stereo = self.pipeline.create(dai.node.StereoDepth) - stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.ROBOTICS) - stereo.setDepthAlign(dai.CameraBoardSocket.CAM_C) - stereo.initialConfig.postProcessing.thresholdFilter.maxRange = int(self.right.stereo_max_range_mm) - - node_left.link(stereo.left) - node_right.link(stereo.right) + sync = self.pipeline.create(dai.node.Sync) + sync.setSyncThreshold(datetime.timedelta(milliseconds=int(self.right.sync_threshold_ms))) - import datetime - sync = self.pipeline.create(dai.node.Sync) - sync.setSyncThreshold(datetime.timedelta(milliseconds=int(self.right.sync_threshold_ms))) + output_node_left = node_left_compressed if node_left_compressed is not None else node_left + output_node_right = node_right_compressed if node_right_compressed is not None else node_right - node_right.link(sync.inputs["right"]) - stereo.depth.link(sync.inputs["depth"]) + sync.inputs["left"].setMaxSize(self.left.buffer_size) + sync.inputs["left"].setBlocking(False) + output_node_left.link(sync.inputs["left"]) + + sync.inputs["right"].setMaxSize(self.right.buffer_size) + sync.inputs["right"].setBlocking(False) + output_node_right.link(sync.inputs["right"]) + + sync.inputs["depth"].setMaxSize(self.right.buffer_size) + sync.inputs["depth"].setBlocking(False) + stereo.depth.link(sync.inputs["depth"]) + + if self.enable_pointcloud: + rgbd = self.pipeline.create(dai.node.RGBD) + rgbd.setDepthUnits(dai.StereoDepthConfig.AlgorithmControl.DepthUnit.METER) - self.q_sync = sync.out.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) - else: - stereo, rgbd = LuxonisCameraAdapter.create_rgbd_node(self.pipeline, node_left, node_right) - - self.right_output = node_right.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) - self.depth_output = stereo.depth.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) - self.pointcloud_output = rgbd.pcl.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) + + node_left.link(rgbd.inColor) + stereo.depth.link(rgbd.inDepth) + sync.inputs["pointcloud"].setBlocking(False) + rgbd.pcl.link(sync.inputs["pointcloud"]) + sync.inputs["pointcloud"].setMaxSize(self.right.buffer_size) + node_left.link(stereo.inputAlignTo) + # self.right_output = node_right.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) + # self.depth_output = stereo.depth.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) + # self.pointcloud_output = rgbd.pcl.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) + + self.q_sync = sync.out.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) self.left_input_queue = self.left_camera_node.inputControl.createInputQueue() self.right_input_queue = self.right_camera_node.inputControl.createInputQueue() @@ -66,6 +87,17 @@ def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig, enable_pointcl except Exception: clear_device_cache() raise + + def get_gripper_intrinsics(self, camera_type: RGBCameras): + """Returns M and D from the hardware factory calibration if available.""" + try: + calib = self.device.readCalibration() + M = np.array(calib.getCameraIntrinsics(LuxonisCameraAdapter.get_depthai_camera_socket(camera_type), camera_type.config.image_size[0], camera_type.config.image_size[1]), dtype=np.float64) + D = np.array(calib.getDistortionCoefficients(LuxonisCameraAdapter.get_depthai_camera_socket(camera_type)), dtype=np.float64) + return M, D + except Exception as e: + print(f"Warning: could not read calibration from OAK-D: {e}") + return None, None def is_open(self): return self.pipeline is not None and self.device is not None and self.pipeline.isRunning() and not self.device.isClosed() @@ -73,49 +105,49 @@ def is_open(self): def get_frames(self): if not self.is_open(): raise RuntimeError("Camera is not running.") + + empty_left_or_right_frame = ImageFrame(image=np.zeros((self.left.image_size[1], self.left.image_size[0], 3), dtype=np.uint8), timestamp=0, frame_number=0) while True: - if not self.enable_pointcloud: - msgGroup = self.q_sync.get() - if msgGroup is not None: - msgNames = msgGroup.getMessageNames() - frame_right_msg = msgGroup["right"] if "right" in msgNames else None - frame_depth_msg = msgGroup["depth"] if "depth" in msgNames else None + msgGroup = self.q_sync.get() + if msgGroup is not None: + msgNames = msgGroup.getMessageNames() + frame_left_msg = msgGroup["left"] if "left" in msgNames else None + frame_right_msg = msgGroup["right"] if "right" in msgNames else None + frame_depth_msg = msgGroup["depth"] if "depth" in msgNames else None + frame_pointcloud_msg = msgGroup["pointcloud"] if "pointcloud" in msgNames else None + + pointcloud = None + pointcloud_color = None + if frame_pointcloud_msg: + pointcloud, pointcloud_color = frame_pointcloud_msg.getPointsRGB() + + if frame_depth_msg: + timestamp = frame_depth_msg.getTimestamp().total_seconds() + sequence_num = frame_depth_msg.getSequenceNum() + if frame_left_msg: + left_frame = LuxonisCameraAdapter.dai_message_to_image_frame(frame_left_msg) + else: + left_frame = empty_left_or_right_frame + if frame_right_msg: + right_frame = LuxonisCameraAdapter.dai_message_to_image_frame(frame_right_msg) + else: + right_frame = empty_left_or_right_frame + + depth_frame = frame_depth_msg.getFrame() - if frame_right_msg and frame_depth_msg: - timestamp = frame_right_msg.getTimestamp().total_seconds() - sequence_num = frame_right_msg.getSequenceNum() - - if self.right.is_compressed: - img_right = frame_right_msg.getData() - right_frame = ImageFrame(image=img_right, timestamp=timestamp, frame_number=sequence_num, compression_format="jpeg") - else: - img_right = frame_right_msg.getCvFrame() - right_frame = ImageFrame(image=img_right, timestamp=timestamp, frame_number=sequence_num) - - depth_frame = frame_depth_msg.getFrame() - - left_frame = ImageFrame(image=np.zeros((1, 1, 3), dtype=np.uint8), timestamp=timestamp, frame_number=sequence_num) - - synced_image = SyncedImageFrame( - timestamp=timestamp, - left=left_frame, - right=right_frame, - center=None, - depth=depth_frame - ) - yield synced_image - else: - right_callback = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.right_output)) - depth_callback = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.depth_output)) - - points, points_rgb, points_sequence_number = next(LuxonisCameraAdapter.get_pointcloud_from_output_queue(self.pointcloud_output)) - - left_callback = ImageFrame(image=np.zeros((1, 1, 3), dtype=np.uint8), timestamp=right_callback.timestamp, frame_number=right_callback.frame_number) - - synced_image = SyncedImageFrame(timestamp=right_callback.timestamp, left=left_callback, right=right_callback, center=None, pointcloud=points, pointcloud_color=points_rgb, depth=depth_callback.image) - - yield synced_image + synced_image = SyncedImageFrame( + timestamp=timestamp, + left=left_frame, + right=right_frame, + center=None, + depth=depth_frame, + pointcloud=pointcloud, + pointcloud_color=pointcloud_color + ) + yield synced_image + else: + print("No depth frame received.") def stop(self): self.pipeline.stop() diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py index ed18ee8..402deb9 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py @@ -1,11 +1,10 @@ """ Adapter for connecting to and controlling the Luxonis head cameras using the DepthAI API for the Luxonis OAK-FFC 3P board. """ - +import datetime import time import threading import logging -import collections import depthai as dai from stretch4_body.subsystem.cameras.cv_utils import RectifyMaps from stretch4_body.subsystem.cameras.enums.rgb_camera import RGBCameraConfig, RGBCameras @@ -31,39 +30,50 @@ def __init__(self, left: RGBCameraConfig, right:RGBCameraConfig, center:RGBCamer self.pipeline, self.device = LuxonisCameraAdapter.create_pipeline(left.camera_device or right.camera_device) self.camera = self.pipeline - self.left_camera_node, node_left = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=left) - self.right_camera_node, node_right = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=right) + self.left_camera_node, node_left, node_left_compressed = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=left) + self.right_camera_node, node_right, node_right_compressed = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=right) + node_center = None + node_center_compressed = None if center is not None: - self.center_camera_node, node_center = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=center) + self.center_camera_node, node_center, node_center_compressed = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=center) self.center_input_queue = self.center_camera_node.inputControl.createInputQueue() self.left_input_queue = self.left_camera_node.inputControl.createInputQueue() self.right_input_queue = self.right_camera_node.inputControl.createInputQueue() - self.center_buffer = collections.deque(maxlen=10) + output_node_left = node_left_compressed if node_left_compressed is not None else node_left + output_node_right = node_right_compressed if node_right_compressed is not None else node_right + output_node_center = node_center_compressed if node_center_compressed is not None else node_center + + self.center_output = None if self.do_sync_frames: sync = self.pipeline.create(dai.node.Sync) - buffer_size = left.buffer_size + sync.setSyncThreshold(datetime.timedelta(milliseconds=int(self.left.sync_threshold_ms))) - sync.inputs["left"].setMaxSize(1) + sync.inputs["left"].setMaxSize(self.left.buffer_size) sync.inputs["left"].setBlocking(False) - node_left.link(sync.inputs["left"]) + output_node_left.link(sync.inputs["left"]) - sync.inputs["right"].setMaxSize(1) + sync.inputs["right"].setMaxSize(self.right.buffer_size) sync.inputs["right"].setBlocking(False) - node_right.link(sync.inputs["right"]) + output_node_right.link(sync.inputs["right"]) - # CENTER is NOT in sync to keep it at its own 10fps while left/right are at 30fps + # sync.inputs["center"].setMaxSize(self.center.buffer_size) + # sync.inputs["center"].setBlocking(False) + + # output_node_center.link(sync.inputs["center"]) + + # CENTER is NOT in the sync pipeline to keep it at its own 10fps while left/right are at 30fps if center is not None: - self.center_output = node_center.createOutputQueue(maxSize=1, blocking=False) + self.center_output = output_node_center.createOutputQueue(maxSize=self.center.buffer_size, blocking=False) - self.q_sync = sync.out.createOutputQueue(maxSize=buffer_size, blocking=False) + self.q_sync = sync.out.createOutputQueue(maxSize= self.left.buffer_size, blocking=False) else: if center is not None: - self.center_output = node_center.createOutputQueue(maxSize=1, blocking=False) - self.left_output = node_left.createOutputQueue(maxSize=1, blocking=False) - self.right_output = node_right.createOutputQueue(maxSize=1, blocking=False) + self.center_output = output_node_center.createOutputQueue(maxSize=self.center.buffer_size, blocking=False) + self.left_output = output_node_left.createOutputQueue(maxSize=self.left.buffer_size, blocking=False) + self.right_output = output_node_right.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) try: self.pipeline.start() @@ -85,43 +95,21 @@ def get_frames(self): msg_group = self.q_sync.get() left_msg = msg_group["left"] right_msg = msg_group["right"] + center_msg = msg_group["center"] if "center" in msg_group else None left_frame = LuxonisCameraAdapter.dai_message_to_image_frame(left_msg) right_frame = LuxonisCameraAdapter.dai_message_to_image_frame(right_msg) center_frame = None - if self.center is not None: - # Drain center queue into buffer - while True: - c_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue_no_block(self.center_output)) - if c_frame is None: - break - self.center_buffer.append(c_frame) - - # Find best match in center_buffer to keep center synced with left/right frames - if self.center_buffer: + # if center_msg is not None: + # c_frame = LuxonisCameraAdapter.dai_message_to_image_frame(center_msg) + if self.center_output: + c_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue_no_block(self.center_output)) + if c_frame: lr_timestamp = left_msg.getTimestamp().total_seconds() - - # Find the frame with the closest timestamp - # We also discard frames that are significantly older than the current LR frame - best_diff = float('inf') - best_idx = -1 - for i, c_f in enumerate(self.center_buffer): - diff = abs(c_f.timestamp - lr_timestamp) - if diff < best_diff: - best_diff = diff - best_idx = i - - # If a match is found within a reasonable window - if best_idx != -1 and best_diff < 1/self.center.fps: - center_frame = self.center_buffer[best_idx] - # Discard the used frame and all older ones - for _ in range(best_idx + 1): - self.center_buffer.popleft() - elif best_idx != -1 and self.center_buffer[0].timestamp < lr_timestamp - 0.1: - # Discard frames that are way too old - while self.center_buffer and self.center_buffer[0].timestamp < lr_timestamp - 0.1: - self.center_buffer.popleft() + diff = abs(c_frame.timestamp - lr_timestamp) + if diff < 1/self.center.fps: + center_frame = c_frame yield SyncedImageFrame(timestamp=time.time(), left=left_frame, right=right_frame, center=center_frame) else: diff --git a/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py b/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py index afc783b..430b74e 100644 --- a/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py +++ b/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py @@ -317,17 +317,13 @@ def get_frame_synced(self, is_run_pipeline: bool) -> Generator[SyncedImageFrame, break if is_run_pipeline and frame is not None: - if frame.left is not None and frame.left.image is not None and frame.left.image.shape[:2] != (1, 1): - left_pipeline_controller.run_pipeline(frame.left) - if frame.right is not None and frame.right.image is not None and frame.right.image.shape[:2] != (1, 1): - right_pipeline_controller.run_pipeline(frame.right) + left_pipeline_controller.run_pipeline(frame.left) + right_pipeline_controller.run_pipeline(frame.right) if frame.center is not None: center_pipeline_controller.run_pipeline(frame.center) - if frame.left is not None and frame.left.image is not None and frame.left.image.shape[:2] != (1, 1): - left_pipeline_controller.show_image(frame.left.image) - if frame.right is not None and frame.right.image is not None and frame.right.image.shape[:2] != (1, 1): - right_pipeline_controller.show_image(frame.right.image) + left_pipeline_controller.show_image(frame.left.image) + right_pipeline_controller.show_image(frame.right.image) if frame.center is not None: center_pipeline_controller.show_image(frame.center.image)