diff --git a/stretch4_body/robot/robot_params_SE4.py b/stretch4_body/robot/robot_params_SE4.py index 45bfe89..858272a 100644 --- a/stretch4_body/robot/robot_params_SE4.py +++ b/stretch4_body/robot/robot_params_SE4.py @@ -1359,7 +1359,7 @@ "image_size": (1200, 1920), "fps": 30, "rotate_number_of_times": 1, - "buffer_size": 2, + "buffer_size": 1, "is_compressed": False, "is_lossless": False, # Only used if is_compressed is true "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False @@ -1377,7 +1377,7 @@ "image_size": (1200, 1920), "fps": 30, "rotate_number_of_times": -1, - "buffer_size": 2, + "buffer_size": 1, "is_compressed": False, "is_lossless": False, # Only used if is_compressed is true "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False @@ -1396,7 +1396,7 @@ "image_size": (3040, 4032), # Almost full 12MP resolution, 24 pixels subtracted to be divisible by 16 for compression "fps": 10, "rotate_number_of_times": -1, - "buffer_size": 2, + "buffer_size": 1, "is_compressed": False, "is_lossless": False, # Only used if is_compressed is true "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False @@ -1416,15 +1416,17 @@ # "image_size": (800, 1280), "fps": 30, "rotate_number_of_times": 0, - "buffer_size": 2, - "is_compressed": False, + "buffer_size": 1, + "is_compressed": True, "is_lossless": False, # Only used if is_compressed is true - "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False + "jpeg_quality": 80, # Only used if is_compressed is true and is_lossless is False "distortion_model": None, "use_auto_exposure": True, "limit_max": None, # Only used if use_auto_exposure is True "exposure_time": None, # Only used if use_auto_exposure is False - "iso": None # Only used if use_auto_exposure is False + "iso": None, # Only used if use_auto_exposure is False + "sync_threshold_ms": 15.0, + "stereo_max_range_mm": 10000.0 } }, 'gripper_right': { @@ -1435,15 +1437,17 @@ # "image_size": (800, 1280), "fps": 30, "rotate_number_of_times": 0, - "buffer_size": 2, - "is_compressed": False, + "buffer_size": 1, + "is_compressed": True, "is_lossless": False, # Only used if is_compressed is true - "jpeg_quality": 90, # Only used if is_compressed is true and is_lossless is False + "jpeg_quality": 80, # Only used if is_compressed is true and is_lossless is False "distortion_model": None, "use_auto_exposure": True, "limit_max": None, # Only used if use_auto_exposure is True "exposure_time": None, # Only used if use_auto_exposure is False - "iso": None # Only used if use_auto_exposure is False + "iso": None, # Only used if use_auto_exposure is False + "sync_threshold_ms": 15.0, + "stereo_max_range_mm": 10000.0 } }, }, diff --git a/stretch4_body/subsystem/cameras/__init__.py b/stretch4_body/subsystem/cameras/__init__.py index 7131790..f2454aa 100644 --- a/stretch4_body/subsystem/cameras/__init__.py +++ b/stretch4_body/subsystem/cameras/__init__.py @@ -19,5 +19,6 @@ stream_right_rgbd, stream_center_rgbd, stream_left_right_rgbd, - stream_left_right_center_rgbd + stream_left_right_center_rgbd, + stream_gripper_rgbd ) \ No newline at end of file diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py index e0897ae..d2e6c81 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_camera_adapter.py @@ -108,7 +108,7 @@ def get_depthai_camera_socket(camera_type: RGBCameras): raise Exception(f"{camera_type} is not supported as a Luxonis device.") @staticmethod - def create_camera_node(pipeline: dai.Pipeline, camera_config: RGBCameraConfig) -> tuple[dai.node.Camera, dai.Node.Output]: + def create_camera_node(pipeline: dai.Pipeline, camera_config: RGBCameraConfig) -> tuple[dai.node.Camera, dai.Node.Output, dai.Node.Output|None]: """ Takes a dai.Pipeline reference and adds a camera node to it. """ @@ -144,6 +144,7 @@ def create_camera_node(pipeline: dai.Pipeline, camera_config: RGBCameraConfig) - ) print("camera_config:", camera_config) + camera_output_compressed = None if camera_config.is_compressed: videoEncoder = pipeline.create(dai.node.VideoEncoder) videoEncoder.setNumFramesPool(buffer_size) @@ -154,9 +155,9 @@ def create_camera_node(pipeline: dai.Pipeline, camera_config: RGBCameraConfig) - lossless=camera_config.is_lossless, quality=camera_config.jpeg_quality ) - camera_output = videoEncoder.bitstream + camera_output_compressed = videoEncoder.bitstream - return node, camera_output + return node, camera_output, camera_output_compressed @staticmethod def create_pipeline(luxonis_device_product_name:str) -> tuple[dai.Pipeline, dai.Device]: @@ -171,25 +172,6 @@ def create_pipeline(luxonis_device_product_name:str) -> tuple[dai.Pipeline, dai. pipeline.setXLinkChunkSize(0) return pipeline, device - - @staticmethod - def create_rgbd_node(pipeline: dai.Pipeline, left_rgb_output: dai.Node.Output, right_rgb_out: dai.Node.Output): - - stereo = pipeline.create(dai.node.StereoDepth).build(left_rgb_output, right_rgb_out) - rgbd = pipeline.create(dai.node.RGBD) - - stereo.setRectifyEdgeFillColor(0) - stereo.enableDistortionCorrection(True) - # https://docs.luxonis.com/software-v3/depthai/depthai-components/nodes/stereo_depth - stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.ROBOTICS) - stereo.initialConfig.postProcessing.thresholdFilter.maxRange = 1000 * 5 # in mm - rgbd.setDepthUnits(dai.StereoDepthConfig.AlgorithmControl.DepthUnit.METER) - - stereo.syncedLeft.link(rgbd.inColor) - stereo.depth.link(rgbd.inDepth) - left_rgb_output.link(stereo.inputAlignTo) - - return stereo, rgbd def is_open(self): return self.pipeline is not None and self.device is not None and self.pipeline.isRunning() and not self.device.isClosed() @@ -198,11 +180,13 @@ def open(self): self.pipeline, self.device = LuxonisCameraAdapter.create_pipeline(self.camera_config.camera_device) self.camera = self.pipeline - self.camera_node, node_output = LuxonisCameraAdapter.create_camera_node( + self.camera_node, node_output, node_output_compressed = LuxonisCameraAdapter.create_camera_node( pipeline=self.pipeline, camera_config=self.camera_config ) - self.output_queue = node_output.createOutputQueue(maxSize=1) + output_node = node_output_compressed if node_output_compressed is not None else node_output + + self.output_queue = output_node.createOutputQueue(maxSize=1, blocking=False) self.input_queue = self.camera_node.inputControl.createInputQueue() try: @@ -256,11 +240,10 @@ def get_frame_from_output_queue( @staticmethod def get_frame_from_output_queue_no_block( - output_queue: dai.MessageQueue, - timeout: float = 1/120 + output_queue: dai.MessageQueue ): while True: - message: dai.ImgFrame | None = output_queue.get(timeout=timeout) + message: dai.ImgFrame | None = output_queue.tryGet() if message: yield LuxonisCameraAdapter.dai_message_to_image_frame(message) else: diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py index 1995632..6ed778b 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_gripper_camera_adapter.py @@ -9,13 +9,17 @@ from stretch4_body.subsystem.cameras.enums.rgb_camera import RGBCameraConfig, RGBCameras from stretch4_body.subsystem.cameras.adapters.luxonis_camera_adapter import LuxonisCameraAdapter, clear_device_cache from stretch4_body.subsystem.cameras.adapters.synced_camera import SyncedCamera -from stretch4_body.subsystem.cameras.models.image_frame import SyncedImageFrame +from stretch4_body.subsystem.cameras.models.image_frame import SyncedImageFrame, ImageFrame +import dataclasses +import numpy as np +import datetime class GripperCameraLuxonis(SyncedCamera): """Start a stream with the gripper left/right stereo cameras and the point cloud pipeline.""" - def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig): + def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig, enable_pointcloud: bool = False): self.do_sync_frames = True + self.enable_pointcloud = enable_pointcloud self.left = left self.right = right @@ -26,15 +30,54 @@ def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig): self.pipeline, self.device = LuxonisCameraAdapter.create_pipeline(left.camera_device) self.camera = self.pipeline - self.left_camera_node, node_left = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=left) - self.right_camera_node, node_right = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=right) + self.left_camera_node, node_left, node_left_compressed = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=left) + self.right_camera_node, node_right, node_right_compressed = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=right) - stereo, rgbd = LuxonisCameraAdapter.create_rgbd_node(self.pipeline, node_left, node_right) + stereo = self.pipeline.create(dai.node.StereoDepth) + # stereo.setRectifyEdgeFillColor(0) + # stereo.enableDistortionCorrection(True) + # https://docs.luxonis.com/software-v3/depthai/depthai-components/nodes/stereo_depth + stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.ROBOTICS) + stereo.setDepthAlign(LuxonisCameraAdapter.get_depthai_camera_socket(self.right.camera_type)) # Align to right camera + stereo.initialConfig.postProcessing.thresholdFilter.maxRange = int(self.right.stereo_max_range_mm) - self.left_output = stereo.syncedLeft.createOutputQueue(maxSize=1) - self.right_output = stereo.syncedRight.createOutputQueue(maxSize=1) - self.depth_output = stereo.depth.createOutputQueue(maxSize=1) - self.pointcloud_output = rgbd.pcl.createOutputQueue(maxSize=1) + node_left.link(stereo.left) + node_right.link(stereo.right) + + sync = self.pipeline.create(dai.node.Sync) + sync.setSyncThreshold(datetime.timedelta(milliseconds=int(self.right.sync_threshold_ms))) + + output_node_left = node_left_compressed if node_left_compressed is not None else node_left + output_node_right = node_right_compressed if node_right_compressed is not None else node_right + + sync.inputs["left"].setMaxSize(self.left.buffer_size) + sync.inputs["left"].setBlocking(False) + output_node_left.link(sync.inputs["left"]) + + sync.inputs["right"].setMaxSize(self.right.buffer_size) + sync.inputs["right"].setBlocking(False) + output_node_right.link(sync.inputs["right"]) + + sync.inputs["depth"].setMaxSize(self.right.buffer_size) + sync.inputs["depth"].setBlocking(False) + stereo.depth.link(sync.inputs["depth"]) + + if self.enable_pointcloud: + rgbd = self.pipeline.create(dai.node.RGBD) + rgbd.setDepthUnits(dai.StereoDepthConfig.AlgorithmControl.DepthUnit.METER) + + + node_left.link(rgbd.inColor) + stereo.depth.link(rgbd.inDepth) + sync.inputs["pointcloud"].setBlocking(False) + rgbd.pcl.link(sync.inputs["pointcloud"]) + sync.inputs["pointcloud"].setMaxSize(self.right.buffer_size) + node_left.link(stereo.inputAlignTo) + # self.right_output = node_right.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) + # self.depth_output = stereo.depth.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) + # self.pointcloud_output = rgbd.pcl.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) + + self.q_sync = sync.out.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) self.left_input_queue = self.left_camera_node.inputControl.createInputQueue() self.right_input_queue = self.right_camera_node.inputControl.createInputQueue() @@ -44,6 +87,17 @@ def __init__(self, left: RGBCameraConfig, right: RGBCameraConfig): except Exception: clear_device_cache() raise + + def get_gripper_intrinsics(self, camera_type: RGBCameras): + """Returns M and D from the hardware factory calibration if available.""" + try: + calib = self.device.readCalibration() + M = np.array(calib.getCameraIntrinsics(LuxonisCameraAdapter.get_depthai_camera_socket(camera_type), camera_type.config.image_size[0], camera_type.config.image_size[1]), dtype=np.float64) + D = np.array(calib.getDistortionCoefficients(LuxonisCameraAdapter.get_depthai_camera_socket(camera_type)), dtype=np.float64) + return M, D + except Exception as e: + print(f"Warning: could not read calibration from OAK-D: {e}") + return None, None def is_open(self): return self.pipeline is not None and self.device is not None and self.pipeline.isRunning() and not self.device.isClosed() @@ -51,18 +105,49 @@ def is_open(self): def get_frames(self): if not self.is_open(): raise RuntimeError("Camera is not running.") + + empty_left_or_right_frame = ImageFrame(image=np.zeros((self.left.image_size[1], self.left.image_size[0], 3), dtype=np.uint8), timestamp=0, frame_number=0) while True: - - left_callback = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.left_output)) - right_callback = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.right_output)) - depth_callback = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.depth_output)) - - points, points_rgb, points_sequence_number = next(LuxonisCameraAdapter.get_pointcloud_from_output_queue(self.pointcloud_output)) - - synced_image = SyncedImageFrame(timestamp=left_callback.timestamp, left=left_callback, right=right_callback, center=None, pointcloud=points, pointcloud_color=points_rgb, depth=depth_callback.image) - - yield synced_image + msgGroup = self.q_sync.get() + if msgGroup is not None: + msgNames = msgGroup.getMessageNames() + frame_left_msg = msgGroup["left"] if "left" in msgNames else None + frame_right_msg = msgGroup["right"] if "right" in msgNames else None + frame_depth_msg = msgGroup["depth"] if "depth" in msgNames else None + frame_pointcloud_msg = msgGroup["pointcloud"] if "pointcloud" in msgNames else None + + pointcloud = None + pointcloud_color = None + if frame_pointcloud_msg: + pointcloud, pointcloud_color = frame_pointcloud_msg.getPointsRGB() + + if frame_depth_msg: + timestamp = frame_depth_msg.getTimestamp().total_seconds() + sequence_num = frame_depth_msg.getSequenceNum() + if frame_left_msg: + left_frame = LuxonisCameraAdapter.dai_message_to_image_frame(frame_left_msg) + else: + left_frame = empty_left_or_right_frame + if frame_right_msg: + right_frame = LuxonisCameraAdapter.dai_message_to_image_frame(frame_right_msg) + else: + right_frame = empty_left_or_right_frame + + depth_frame = frame_depth_msg.getFrame() + + synced_image = SyncedImageFrame( + timestamp=timestamp, + left=left_frame, + right=right_frame, + center=None, + depth=depth_frame, + pointcloud=pointcloud, + pointcloud_color=pointcloud_color + ) + yield synced_image + else: + print("No depth frame received.") def stop(self): self.pipeline.stop() diff --git a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py index ac654df..402deb9 100644 --- a/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py +++ b/stretch4_body/subsystem/cameras/adapters/luxonis_synced_camera_adapter.py @@ -1,7 +1,7 @@ """ Adapter for connecting to and controlling the Luxonis head cameras using the DepthAI API for the Luxonis OAK-FFC 3P board. """ - +import datetime import time import threading import logging @@ -30,20 +30,51 @@ def __init__(self, left: RGBCameraConfig, right:RGBCameraConfig, center:RGBCamer self.pipeline, self.device = LuxonisCameraAdapter.create_pipeline(left.camera_device or right.camera_device) self.camera = self.pipeline - self.left_camera_node, node_left = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=left) - self.right_camera_node, node_right = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=right) + self.left_camera_node, node_left, node_left_compressed = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=left) + self.right_camera_node, node_right, node_right_compressed = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=right) + node_center = None + node_center_compressed = None if center is not None: - self.center_camera_node, node_center = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=center) - self.center_output = node_center.createOutputQueue(maxSize=1) + self.center_camera_node, node_center, node_center_compressed = LuxonisCameraAdapter.create_camera_node(pipeline=self.pipeline, camera_config=center) self.center_input_queue = self.center_camera_node.inputControl.createInputQueue() - self.left_output = node_left.createOutputQueue(maxSize=1) - self.right_output = node_right.createOutputQueue(maxSize=1) - self.left_input_queue = self.left_camera_node.inputControl.createInputQueue() self.right_input_queue = self.right_camera_node.inputControl.createInputQueue() + output_node_left = node_left_compressed if node_left_compressed is not None else node_left + output_node_right = node_right_compressed if node_right_compressed is not None else node_right + output_node_center = node_center_compressed if node_center_compressed is not None else node_center + + self.center_output = None + if self.do_sync_frames: + sync = self.pipeline.create(dai.node.Sync) + sync.setSyncThreshold(datetime.timedelta(milliseconds=int(self.left.sync_threshold_ms))) + + sync.inputs["left"].setMaxSize(self.left.buffer_size) + sync.inputs["left"].setBlocking(False) + output_node_left.link(sync.inputs["left"]) + + sync.inputs["right"].setMaxSize(self.right.buffer_size) + sync.inputs["right"].setBlocking(False) + output_node_right.link(sync.inputs["right"]) + + # sync.inputs["center"].setMaxSize(self.center.buffer_size) + # sync.inputs["center"].setBlocking(False) + + # output_node_center.link(sync.inputs["center"]) + + # CENTER is NOT in the sync pipeline to keep it at its own 10fps while left/right are at 30fps + if center is not None: + self.center_output = output_node_center.createOutputQueue(maxSize=self.center.buffer_size, blocking=False) + + self.q_sync = sync.out.createOutputQueue(maxSize= self.left.buffer_size, blocking=False) + else: + if center is not None: + self.center_output = output_node_center.createOutputQueue(maxSize=self.center.buffer_size, blocking=False) + self.left_output = output_node_left.createOutputQueue(maxSize=self.left.buffer_size, blocking=False) + self.right_output = output_node_right.createOutputQueue(maxSize=self.right.buffer_size, blocking=False) + try: self.pipeline.start() except Exception: @@ -57,36 +88,40 @@ def get_frames(self): if not self.is_open(): raise RuntimeError("Camera is not running.") - while True: - if self.stop_event is not None and self.stop_event.is_set(): - return - - left_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.left_output, stop_event=self.stop_event)) - right_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.right_output, stop_event=self.stop_event)) - - center_frame = None - if self.center is not None: - center_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue_no_block(self.center_output)) - - synced_image = SyncedImageFrame(timestamp=time.time(), left=left_frame, right=right_frame, center=center_frame) - - if not self.do_sync_frames: - # If the user is not requesting synced images, return them as is. - yield synced_image - - # sync the left and right images, otherwise drop the frame if they are not synced. - left_right_diff = abs(left_frame.frame_number - right_frame.frame_number) - if left_right_diff <= 4: - if synced_image.center is not None: - # sync left and center based on timestamp - left_center_diff = abs(left_frame.timestamp - synced_image.center.timestamp) - if left_center_diff > 1.5/self.center.fps: # A looser tolerance for left-center sync to account for the different requested FPS's - logging.warning(f"Center frame is not synced, ignoring center frame. Off by {left_center_diff:.3f}s") - synced_image.center = None - - yield synced_image - else: - logging.warn(f"Left and Right frames are not synced, dropping synced frame. Off by {left_right_diff} frames.") + if self.do_sync_frames: + while True: + if self.stop_event is not None and self.stop_event.is_set(): + return + msg_group = self.q_sync.get() + left_msg = msg_group["left"] + right_msg = msg_group["right"] + center_msg = msg_group["center"] if "center" in msg_group else None + + left_frame = LuxonisCameraAdapter.dai_message_to_image_frame(left_msg) + right_frame = LuxonisCameraAdapter.dai_message_to_image_frame(right_msg) + + center_frame = None + # if center_msg is not None: + # c_frame = LuxonisCameraAdapter.dai_message_to_image_frame(center_msg) + if self.center_output: + c_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue_no_block(self.center_output)) + if c_frame: + lr_timestamp = left_msg.getTimestamp().total_seconds() + diff = abs(c_frame.timestamp - lr_timestamp) + if diff < 1/self.center.fps: + center_frame = c_frame + + yield SyncedImageFrame(timestamp=time.time(), left=left_frame, right=right_frame, center=center_frame) + else: + while True: + left_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.left_output)) + right_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue(self.right_output)) + + center_frame = None + if self.center is not None: + center_frame = next(LuxonisCameraAdapter.get_frame_from_output_queue_no_block(self.center_output)) + + yield SyncedImageFrame(timestamp=time.time(), left=left_frame, right=right_frame, center=center_frame) def stop(self): try: diff --git a/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py b/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py index cf56309..430b74e 100644 --- a/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py +++ b/stretch4_body/subsystem/cameras/controllers/camera_pipeline_controller.py @@ -23,6 +23,9 @@ from stretch4_body.subsystem.cameras.enums.rgb_camera import RGBCameras from stretch4_body.subsystem.cameras.models.image_write_to_disk import RgbImageToWriteToDisk, add_image_to_save_queue, create_directory_if_it_does_not_exist, saver_thread from stretch4_body.subsystem.cameras.models.image_frame import ImageFrame, SyncedImageFrame +from stretch4_body.subsystem.cameras.rerun_utils import RerunAsyncLogger + + class RecordRgbShowImageIn(Enum): @@ -45,7 +48,8 @@ def __init__( is_crop: bool, ai_models_to_use: list[AIModelWrapper], detect_aruco_marker_size: float|None, - is_open_camera: bool = True + is_open_camera: bool = True, + enable_pointcloud: bool = False ): """ `detect_aruco_marker_size`: Runs ArUco detection if a float >= 0.0 is provided. If length is 0.0, the ArUco markers will be detected, but distance will not be printed. If length > 0.0 and calibration is available, ArUco pose and L2 distance to the marker will be displayed. @@ -59,6 +63,7 @@ def __init__( self.detect_aruco_marker_size = detect_aruco_marker_size self.detect_aruco_dictionaries_to_detect = ArucoDictionary.all_1000() self.is_rotate = is_rotate + self.enable_pointcloud = enable_pointcloud self.ai_models: list[AIModelWrapper] = [] self.rectify_maps: "RectifyMaps | None" = None @@ -81,6 +86,8 @@ def __init__( target=saver_thread, args=(self.stop_event, self.save_rgb_queue), daemon=True ) + self.rerun_logger: RerunAsyncLogger | None = None + self.save_directory = None if self.recording_directory: if not self.camera_type.is_synced_camera_type(): @@ -96,7 +103,7 @@ def __init__( def open_camera(self): if self.camera_type.is_synced_camera_type(): - self._camera = self.camera_type.start_synced(stop_event=self.stop_event) + self._camera = self.camera_type.start_synced(stop_event=self.stop_event,enable_pointcloud=self.enable_pointcloud) else: self._camera = self.camera_type.start(stop_event=self.stop_event) @@ -107,10 +114,10 @@ def camera(self): return self._camera def _show_rererun(self, color_image: np.ndarray): - rr.log( - f"{self.camera_type.name.upper()} Camera", - rr.Image(color_image, color_model="BGR").compress(), - ) + if self.rerun_logger is None: + self.rerun_logger = RerunAsyncLogger(camera_name=self.camera_type.name) + + self.rerun_logger.log_image(f"Cameras/{self.camera_type.name}/image", color_image) def _show_cvimshow(self, color_image: np.ndarray): cv2.namedWindow(self.camera_type.name, cv2.WINDOW_NORMAL) @@ -222,21 +229,38 @@ def run_pipeline(self, frame: ImageFrame): def _start_rerun(self): rr.init(self.camera_type.name, spawn=False) rr.spawn(memory_limit="4GB") - + import rerun.blueprint as rrb + from stretch4_body.subsystem.cameras.enums.rgb_camera import RGBCameras - if self.camera_type.is_synced_camera_type(): + if self.camera_type == RGBCameras.gripper_rgbd: + blueprint = rrb.Blueprint( + rrb.Horizontal( + rrb.Spatial2DView(name="Gripper Depth", origin="Cameras/gripper_rgbd/depth"), + rrb.Spatial2DView(name="Gripper Right", origin="Cameras/gripper_right/image"), + ), + collapse_panels=True + ) + elif self.camera_type == RGBCameras.head_left_right: + blueprint = rrb.Blueprint( + rrb.Horizontal( + rrb.Spatial2DView(name="Head Left", origin="Cameras/head_left/image"), + rrb.Spatial2DView(name="Head Right", origin="Cameras/head_right/image"), + ), + collapse_panels=True + ) + elif self.camera_type == RGBCameras.head_left_right_center: blueprint = rrb.Blueprint( rrb.Horizontal( - rrb.Spatial2DView(name="Left Camera", origin="HEAD_LEFT Camera"), - rrb.Spatial2DView(name="Center Camera", origin="HEAD_CENTER Camera"), - rrb.Spatial2DView(name="Right Camera", origin="HEAD_RIGHT Camera"), + rrb.Spatial2DView(name="Head Left", origin="Cameras/head_left/image"), + rrb.Spatial2DView(name="Head Center", origin="Cameras/head_center/image"), + rrb.Spatial2DView(name="Head Right", origin="Cameras/head_right/image"), ), collapse_panels=True ) else: blueprint = rrb.Blueprint( - rrb.Spatial2DView(name=f"{self.camera_type.name} Camera", origin=f"{self.camera_type.name.upper()} Camera"), + rrb.Spatial2DView(name=self.camera_type.name.upper(), origin=f"Cameras/{self.camera_type.name}/image"), collapse_panels=True ) rr.send_blueprint(blueprint) @@ -305,19 +329,20 @@ def get_frame_synced(self, is_run_pipeline: bool) -> Generator[SyncedImageFrame, center_pipeline_controller.show_image(frame.center.image) if frame.depth is not None and self.show_image_in is RecordRgbShowImageIn.RERUN: - rr.log( - f"{self.camera_type.name}/depth", + if self.rerun_logger is None: + self.rerun_logger = RerunAsyncLogger(camera_name=self.camera_type.name) + self.rerun_logger.log_any( + f"Cameras/{self.camera_type.name}/depth", rr.DepthImage(frame.depth), ) if frame.pointcloud is not None and self.show_image_in is RecordRgbShowImageIn.RERUN: - rr.log( - f"{self.camera_type.name}/pcl", - rr.Transform3D( - rotation=rr.RotationAxisAngle(axis=[0, 0, 1], angle=rr.Angle(rad=np.pi)), - ) + if self.rerun_logger is None: + self.rerun_logger = RerunAsyncLogger(camera_name=self.camera_type.name) + self.rerun_logger.log_any( + f"Pointclouds/{self.camera_type.name}", + rr.Points3D(frame.pointcloud, colors=frame.pointcloud_colors, radii=[0.0025]), ) - rr.log(f"{self.camera_type.name}/pcl", rr.Points3D(frame.pointcloud, colors=frame.pointcloud_color, radii=[0.002])) yield frame @@ -372,7 +397,8 @@ def copy_for(self, camera_type: RGBCameras, is_open_camera: bool): is_crop=self.is_crop, ai_models_to_use=self.ai_models_to_use, detect_aruco_marker_size=self.detect_aruco_marker_size, - is_open_camera=is_open_camera + is_open_camera=is_open_camera, + enable_pointcloud=self.enable_pointcloud ) return copy @@ -472,13 +498,15 @@ def stop(self): self._camera.stop() if self.save_thread.is_alive(): self.save_thread.join(timeout=5) + if self.rerun_logger is not None: + self.rerun_logger.stop() class RGBPipelineControllerROS(RGBPipelineController): """ A specialized controller that leverages stretch_python_bridge's StreamManager for camera streams. This adapter allows using camera tools with ROS2 camera nodes. """ - def __init__(self, camera_type: "RGBCameras", recording_directory: str | None, show_image_in: "RecordRgbShowImageIn | None", is_rotate: bool, is_rectify: bool, is_crop: bool, ai_models_to_use: list[AIModelWrapper], detect_aruco_marker_size: float|None, is_open_camera: bool = True): + def __init__(self, camera_type: "RGBCameras", recording_directory: str | None, show_image_in: "RecordRgbShowImageIn | None", is_rotate: bool, is_rectify: bool, is_crop: bool, ai_models_to_use: list[AIModelWrapper], detect_aruco_marker_size: float|None, is_open_camera: bool = True, enable_pointcloud: bool = False): super().__init__( camera_type=camera_type, recording_directory=recording_directory, @@ -488,7 +516,8 @@ def __init__(self, camera_type: "RGBCameras", recording_directory: str | None, s is_crop=is_crop, ai_models_to_use=ai_models_to_use, detect_aruco_marker_size=detect_aruco_marker_size, - is_open_camera=False + is_open_camera=False, + enable_pointcloud=enable_pointcloud ) try: @@ -510,7 +539,8 @@ def copy_for(self, camera_type: "RGBCameras", is_open_camera: bool): is_crop=self.is_crop, ai_models_to_use=self.ai_models_to_use, detect_aruco_marker_size=self.detect_aruco_marker_size, - is_open_camera=is_open_camera + is_open_camera=is_open_camera, + enable_pointcloud=self.enable_pointcloud ) return copy diff --git a/stretch4_body/subsystem/cameras/emulated_rgbd.py b/stretch4_body/subsystem/cameras/emulated_rgbd.py index ce6cfe4..e7d56c2 100644 --- a/stretch4_body/subsystem/cameras/emulated_rgbd.py +++ b/stretch4_body/subsystem/cameras/emulated_rgbd.py @@ -16,6 +16,7 @@ stream_center_camera, stream_left_right_camera, stream_left_right_center_camera, + stream_gripper_camera, ) from stretch4_body.subsystem.cameras.models.image_frame import ( ImageFrame, @@ -515,3 +516,26 @@ def stream_left_right_center_rgbd(*, is_rotate=True, use_left_lidar=True, use_ri yield ret finally: streamer.stop() + + +def stream_gripper_rgbd(*, is_rotate=True, ai_models_to_use: list[AIModelWrapper]|None=None, detect_aruco_marker_size: float|None=None, use_ros_for_cameras:bool=False) -> Generator[RGBDFrame, None, None]: + for synced_frame in stream_gripper_camera(is_rotate=is_rotate, ai_models_to_use=ai_models_to_use, detect_aruco_marker_size=detect_aruco_marker_size, use_ros_for_cameras=use_ros_for_cameras, enable_pointcloud=True): + if synced_frame is None: + continue + image_frame = synced_frame.right + if image_frame is None: + continue + + pointcloud = synced_frame.pointcloud if synced_frame.pointcloud is not None else np.zeros((0, 3)) + pointcloud_colors = synced_frame.pointcloud_color if synced_frame.pointcloud_color is not None else np.zeros((0, 3)) + depth_image = synced_frame.depth if synced_frame.depth is not None else np.zeros((0, 0)) + + yield RGBDFrame( + timestamp=synced_frame.timestamp, + image_frame=image_frame, + camera_type=RGBCameras.gripper_rgbd, + pointcloud=pointcloud, + pointcloud_base=np.zeros((0, 3)), + pointcloud_colors=pointcloud_colors, + depth_image=depth_image + ) diff --git a/stretch4_body/subsystem/cameras/enums/rgb_camera.py b/stretch4_body/subsystem/cameras/enums/rgb_camera.py index d0f0b01..e51b702 100644 --- a/stretch4_body/subsystem/cameras/enums/rgb_camera.py +++ b/stretch4_body/subsystem/cameras/enums/rgb_camera.py @@ -30,6 +30,8 @@ class RGBCameraConfig: limit_max: int | None = None exposure_time: int | None = None iso: int | None = None + sync_threshold_ms: int = 15 + stereo_max_range_mm: int = 10000 class CameraDevice(Device): """Sets up a Stretch Body camera device to pull params from robot params.""" @@ -45,7 +47,7 @@ def get_config(self, camera_type: "RGBCameras") -> "RGBCameraConfig": config = RGBCameraConfig(**config_dict) config.distortion_model = DistortionModels[config_dict["distortion_model"].replace("DistortionModels.", "")] if config_dict["distortion_model"] is not None else None - + return config class RGBCameras(Enum): @@ -129,7 +131,7 @@ def start(self, stop_event: threading.Event | None = None) -> "CameraAdapter": raise NotImplementedError(f"{self}'s start() method is not implemented.") - def start_synced(self, stop_event: "threading.Event | None" = None) -> "SyncedCamera": + def start_synced(self, stop_event: threading.Event | None = None, enable_pointcloud: bool = False) -> "SyncedCamera": """Use `start_synced()` to start sync'd frame grabbing.""" from stretch4_body.subsystem.cameras.adapters.luxonis_gripper_camera_adapter import ( GripperCameraLuxonis # import here to avoid circular import @@ -160,6 +162,7 @@ def start_synced(self, stop_event: "threading.Event | None" = None) -> "SyncedCa return GripperCameraLuxonis( RGBCameras.gripper_left.config, RGBCameras.gripper_right.config, + enable_pointcloud=enable_pointcloud, ) raise NotImplementedError(f"{self}'s start_synced() method is not implemented.") diff --git a/stretch4_body/subsystem/cameras/rerun_utils.py b/stretch4_body/subsystem/cameras/rerun_utils.py new file mode 100644 index 0000000..554f7ac --- /dev/null +++ b/stretch4_body/subsystem/cameras/rerun_utils.py @@ -0,0 +1,65 @@ +import threading +import queue +import cv2 +import rerun as rr +import numpy as np + +class RerunAsyncLogger: + """ + An asynchronous logger for Rerun that offloads CPU-intensive tasks like + image compression to a background thread to maintain high frame rates. + """ + def __init__(self, camera_name: str, queue_size: int = 10): + self.camera_name = camera_name.upper() + self.queue = queue.Queue(maxsize=queue_size) + self.stop_event = threading.Event() + self.thread = threading.Thread(target=self._worker, daemon=True) + self.thread.start() + + def _worker(self): + while not self.stop_event.is_set(): + try: + # Task format: (entity_path, data, type) + task = self.queue.get(timeout=0.1) + if task is None: + continue + + path, data, task_type = task + + if task_type == "image": + # Use OpenCV for faster encoding. Quality 80 is a good balance. + success, jpeg_bytes = cv2.imencode('.jpg', data, [cv2.IMWRITE_JPEG_QUALITY, 80]) + if success: + rr.log( + path, + rr.EncodedImage(contents=jpeg_bytes.tobytes(), media_type="image/jpeg"), + ) + elif task_type == "direct": + # For DepthImage, Points3D, etc. that don't need host-side compression + rr.log(path, data) + + except queue.Empty: + continue + except Exception as e: + import logging + logging.error(f"Error in RerunAsyncLogger worker for {self.camera_name}: {e}") + + def log_image(self, path: str, image: np.ndarray): + """Queue an image for asynchronous compression and logging.""" + try: + self.queue.put( (path, image, "image"), block=True) + except Exception: + pass + + def log_any(self, path: str, archetype: "Any"): + """Queue any Rerun archetype for asynchronous logging.""" + try: + self.queue.put( (path, archetype, "direct"), block=True) + except Exception: + pass + + def stop(self, timeout: float = 2.0): + """Stop the background thread.""" + self.stop_event.set() + if self.thread.is_alive(): + self.thread.join(timeout=timeout) diff --git a/stretch4_body/subsystem/cameras/stream_cameras.py b/stretch4_body/subsystem/cameras/stream_cameras.py index 8fe7d13..8e5ed2f 100644 --- a/stretch4_body/subsystem/cameras/stream_cameras.py +++ b/stretch4_body/subsystem/cameras/stream_cameras.py @@ -21,7 +21,7 @@ def _start_camera(camera_type:RGBCameras, is_rotate:bool, is_rectify:bool, is_cr return rgb_pipeline_controller.get_frame(is_run_pipeline=is_run_pipeline) -def _start_synced_camera(camera_type:RGBCameras, is_rotate:bool, is_rectify:bool, is_crop: bool, ai_models_to_use: list[AIModelWrapper]|None, detect_aruco_marker_size: float|None, use_ros_for_cameras:bool=False, is_run_pipeline:bool=True) -> Generator[SyncedImageFrame, None, None]: +def _start_synced_camera(camera_type:RGBCameras, is_rotate:bool, is_rectify:bool, is_crop: bool, ai_models_to_use: list[AIModelWrapper]|None, detect_aruco_marker_size: float|None, use_ros_for_cameras:bool=False, is_run_pipeline:bool=True, enable_pointcloud:bool=False) -> Generator[SyncedImageFrame, None, None]: cls = RGBPipelineControllerROS if use_ros_for_cameras else RGBPipelineController rgb_pipeline_controller = cls( camera_type=camera_type, @@ -32,6 +32,7 @@ def _start_synced_camera(camera_type:RGBCameras, is_rotate:bool, is_rectify:bool is_crop=is_crop, ai_models_to_use=ai_models_to_use or [], detect_aruco_marker_size=detect_aruco_marker_size, + enable_pointcloud=enable_pointcloud, ) return rgb_pipeline_controller.get_frame_synced(is_run_pipeline=is_run_pipeline) @@ -57,9 +58,9 @@ def stream_left_right_center_camera(*, is_rotate:bool=True, is_rectify:bool=Fals """Stream the center, left and right head cameras""" return _start_synced_camera(camera_type=RGBCameras.head_left_right_center, is_rotate=is_rotate, is_rectify=is_rectify, is_crop=is_crop, ai_models_to_use=ai_models_to_use, detect_aruco_marker_size=detect_aruco_marker_size, use_ros_for_cameras=use_ros_for_cameras, is_run_pipeline=is_run_pipeline) -def stream_gripper_camera(*, is_rotate:bool=True, is_rectify:bool=False, is_crop: bool=False, ai_models_to_use: list[AIModelWrapper]|None=None, detect_aruco_marker_size: float|None=None, use_ros_for_cameras:bool=False, is_run_pipeline:bool=True) -> Generator[SyncedImageFrame, None, None]: +def stream_gripper_camera(*, is_rotate:bool=True, is_rectify:bool=False, is_crop: bool=False, ai_models_to_use: list[AIModelWrapper]|None=None, detect_aruco_marker_size: float|None=None, use_ros_for_cameras:bool=False, is_run_pipeline:bool=True, enable_pointcloud:bool=False) -> Generator[SyncedImageFrame, None, None]: """Stream the gripper RGBD camera""" - return _start_synced_camera(camera_type=RGBCameras.gripper_rgbd, is_rotate=is_rotate, is_rectify=is_rectify, is_crop=is_crop, ai_models_to_use=ai_models_to_use, detect_aruco_marker_size=detect_aruco_marker_size, use_ros_for_cameras=use_ros_for_cameras, is_run_pipeline=is_run_pipeline) + return _start_synced_camera(camera_type=RGBCameras.gripper_rgbd, is_rotate=is_rotate, is_rectify=is_rectify, is_crop=is_crop, ai_models_to_use=ai_models_to_use, detect_aruco_marker_size=detect_aruco_marker_size, use_ros_for_cameras=use_ros_for_cameras, is_run_pipeline=is_run_pipeline, enable_pointcloud=enable_pointcloud) if __name__ == "__main__": diff --git a/stretch4_body/tools/stretch_rgbd_show.py b/stretch4_body/tools/stretch_rgbd_show.py index 119603e..252cd41 100644 --- a/stretch4_body/tools/stretch_rgbd_show.py +++ b/stretch4_body/tools/stretch_rgbd_show.py @@ -11,9 +11,18 @@ stream_center_rgbd, stream_left_right_rgbd, stream_left_right_center_rgbd, + stream_gripper_rgbd, RGBDFrame, EmulatedRGBDStreamer, ) +from stretch4_body.subsystem.cameras.rerun_utils import RerunAsyncLogger + +rerun_loggers: dict[str, RerunAsyncLogger] = {} + +def get_rerun_logger(c_name: str) -> RerunAsyncLogger: + if c_name not in rerun_loggers: + rerun_loggers[c_name] = RerunAsyncLogger(camera_name=c_name) + return rerun_loggers[c_name] def _parse_args(): @@ -27,6 +36,7 @@ def _parse_args(): parser.add_argument("-c", "--center", action="store_true", help="Display RGBD stream from center camera") parser.add_argument("-lr", "--left_right", action="store_true", help="Display RGBD streams from left and right cameras") parser.add_argument("-lrc", "--left_right_center", action="store_true", help="Display RGBD streams from all cameras") + parser.add_argument("-g", "--gripper", action="store_true", help="Display RGBD stream from gripper camera") # Lidar selection flags parser.add_argument("--lidar_left", action="store_true", help="Use left lidar") @@ -54,19 +64,21 @@ def _parse_args(): def render_rgbd(c_name: str, frame: RGBDFrame): - rr.log(f"Cameras/{c_name}_rotated", rr.Image(frame.image_frame.image, color_model="BGR").compress()) - rr.log(f"Cameras/{c_name}/rgb_raw", rr.Image(frame.image_frame.image_raw, color_model="BGR").compress()) + logger = get_rerun_logger(c_name) + + logger.log_image(f"Cameras/{c_name}_rotated", frame.image_frame.image) + logger.log_image(f"Cameras/{c_name}/rgb_raw", frame.image_frame.image_raw) if frame.depth_image is not None and frame.depth_image.shape[0] > 0: - rr.log(f"Cameras/{c_name}/depth", rr.DepthImage(frame.depth_image, meter=1.0)) + logger.log_any(f"Cameras/{c_name}/depth", rr.DepthImage(frame.depth_image, meter=1.0)) if len(frame.pointcloud) > 0: - rr.log( + logger.log_any( f"Pointclouds/camera_frame/{c_name}", rr.Points3D(frame.pointcloud, colors=frame.pointcloud_colors, radii=[0.0025]), ) if len(frame.pointcloud_base) > 0: - rr.log( + logger.log_any( f"Pointclouds/base_frame/{c_name}", rr.Points3D(frame.pointcloud_base, colors=frame.pointcloud_colors, radii=[0.0025]), ) @@ -83,11 +95,12 @@ def main(): use_center = args.center use_left_right = args.left_right use_left_right_center = args.left_right_center + use_gripper = args.gripper use_ros_for_cameras = args.use_ros_for_cameras use_ros_for_lidars = args.use_ros_for_lidars - if not (use_left or use_right or use_center or use_left_right or use_left_right_center): + if not (use_left or use_right or use_center or use_left_right or use_left_right_center or use_gripper): use_left_right = True # Resolve lidar flags (both by default) @@ -145,13 +158,15 @@ def main(): camera_name = "center" elif use_right: camera_name = "right" + elif use_gripper: + camera_name = "gripper" blueprint = rrb.Blueprint( rrb.Horizontal( rrb.Vertical( rrb.Spatial2DView(name="Camera Rotated", origin=f"Cameras/{camera_name}_rotated"), rrb.Spatial2DView(name="Depth Camera", origin=f"Cameras/{camera_name}"), ), - rrb.Spatial3DView(name="Base Frame", origin="/", contents=["+ Pointclouds/base_frame/**"]), + rrb.Spatial3DView(name="Base Frame", origin="/", contents=["+ Pointclouds/base_frame/**", "+ Pointclouds/camera_frame/**"]), column_shares=[1,5] ), collapse_panels=True @@ -200,6 +215,11 @@ def print_loop_timer(): render_rgbd("center", frame) print_loop_timer() + elif use_gripper: + for frame in stream_gripper_rgbd(use_ros_for_cameras=use_ros_for_cameras): + render_rgbd("gripper", frame) + print_loop_timer() + except KeyboardInterrupt: print("Stopping... (Force quitting due to background threads)") os._exit(0) @@ -207,6 +227,8 @@ def print_loop_timer(): print(f"Stopping due to error: {e=}") raise e finally: + for logger in rerun_loggers.values(): + logger.stop() EmulatedRGBDStreamer.get_instance().stop()