Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ yolo11n.pt
# symlink one of .envrc.* if you'd like to use
.envrc
.claude
.opencode/
**/CLAUDE.md
.direnv/
.omo/

/logs

Expand Down
46 changes: 45 additions & 1 deletion dimos/manipulation/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,24 @@
from dimos.hardware.sensors.camera.realsense.camera import RealSenseCamera
from dimos.manipulation.manipulation_module import ManipulationModule
from dimos.manipulation.pick_and_place_module import PickAndPlaceModule
from dimos.manipulation.viser_panel.module import ViserManipulationPanelModule
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Transform import Transform
from dimos.msgs.geometry_msgs.Vector3 import Vector3
from dimos.msgs.sensor_msgs.JointState import JointState
from dimos.perception.object_scene_registration import ObjectSceneRegistrationModule
from dimos.robot.catalog.ufactory import xarm6 as _catalog_xarm6, xarm7 as _catalog_xarm7


def _quaternion(x: float, y: float, z: float, w: float) -> Quaternion:
quaternion = Quaternion.__new__(Quaternion)
quaternion.x = x
quaternion.y = y
quaternion.z = z
quaternion.w = w
return quaternion


# Single XArm6 planner (standalone, no coordinator)
_xarm6_planner_cfg = _catalog_xarm6(
name="arm",
Expand Down Expand Up @@ -121,6 +132,38 @@
)


_xarm7_viser_mock_cfg = _catalog_xarm7(
name="arm",
adapter_type="mock",
add_gripper=True,
)

xarm7_viser_panel_mock = autoconnect(
ManipulationModule.blueprint(
robots=[_xarm7_viser_mock_cfg.to_robot_model_config()],
planning_timeout=10.0,
enable_viz=True,
),
ControlCoordinator.blueprint(
tick_rate=100.0,
publish_joint_state=True,
joint_state_frame_id="coordinator",
hardware=[_xarm7_viser_mock_cfg.to_hardware_component()],
tasks=[_xarm7_viser_mock_cfg.to_task_config()],
),
ViserManipulationPanelModule.blueprint(
host="127.0.0.1",
port=8095,
default_robot="arm",
allow_plan_execute=True,
),
).transports(
{
("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState),
}
)


# XArm7 planner + LLM agent for testing base ManipulationModule skills
# No perception — uses the base module's planning + gripper skills only.
# Usage: dimos run coordinator-mock, then dimos run xarm7-planner-coordinator-agent
Expand Down Expand Up @@ -164,7 +207,7 @@
# Usage: dimos run coordinator-mock, then dimos run xarm-perception
_XARM_PERCEPTION_CAMERA_TRANSFORM = Transform(
translation=Vector3(x=0.06693724, y=-0.0309563, z=0.00691482),
rotation=Quaternion(0.70513398, 0.00535696, 0.70897578, -0.01052180), # xyzw
rotation=_quaternion(0.70513398, 0.00535696, 0.70897578, -0.01052180), # xyzw
)

_xarm7_perception_cfg = _catalog_xarm7(
Expand Down Expand Up @@ -342,6 +385,7 @@
"xarm6_planner_only",
"xarm7_planner_coordinator",
"xarm7_planner_coordinator_agent",
"xarm7_viser_panel_mock",
"xarm_perception",
"xarm_perception_agent",
"xarm_perception_sim",
Expand Down
177 changes: 177 additions & 0 deletions dimos/manipulation/manipulation_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,41 @@ def has_planned_path(self) -> bool:
path = self._planned_paths.get(robot_name)
return path is not None and len(path) > 0

@rpc
def get_planned_path(self, robot_name: RobotName | None = None) -> JointPath | None:
"""Get the currently stored planned path for a robot.

Args:
robot_name: Robot to query (required if multiple robots configured)

Returns:
Planned path waypoints or None if no path is stored
"""
robot = self._get_robot(robot_name)
if robot is None:
return None
return self._planned_paths.get(robot[0])

@rpc
def get_planned_path_poses(self, robot_name: RobotName | None = None) -> list[Pose] | None:
"""Get end-effector poses for the stored planned path.

Args:
robot_name: Robot to query (required if multiple robots configured)
"""
if self._world_monitor is None:
return None
robot = self._get_robot(robot_name)
if robot is None:
return None
robot_name, robot_id, _, _ = robot
path = self._planned_paths.get(robot_name)
if path is None:
return None
return [
self._world_monitor.get_ee_pose(robot_id, joint_state=waypoint) for waypoint in path
]

@rpc
def get_visualization_url(self) -> str | None:
"""Get the visualization URL.
Expand Down Expand Up @@ -633,12 +668,32 @@ def get_robot_info(self, robot_name: RobotName | None = None) -> dict[str, Any]

robot_name, robot_id, config, _ = robot

joint_limits = None
if config.joint_limits_lower is not None and config.joint_limits_upper is not None:
joint_limits = list(
zip(config.joint_limits_lower, config.joint_limits_upper, strict=False)
)
elif self._world_monitor is not None:
try:
lower, upper = self._world_monitor.get_joint_limits(robot_id)
joint_limits = [
(float(lower_value), float(upper_value))
for lower_value, upper_value in zip(lower, upper, strict=False)
]
except Exception as e:
logger.debug(f"Could not load joint limits for '{robot_name}': {e}")

return {
"name": config.name,
"world_robot_id": robot_id,
"joint_names": config.joint_names,
"end_effector_link": config.end_effector_link,
"base_link": config.base_link,
"model_path": str(config.model_path),
"base_pose": config.base_pose,
"joint_limits": joint_limits,
"package_paths": {package: str(path) for package, path in config.package_paths.items()},
"xacro_args": dict(config.xacro_args),
"max_velocity": config.max_velocity,
"max_acceleration": config.max_acceleration,
"has_joint_name_mapping": bool(config.joint_name_mapping),
Expand All @@ -650,6 +705,128 @@ def get_robot_info(self, robot_name: RobotName | None = None) -> dict[str, Any]
else None,
}

@rpc
def evaluate_pose_target(
self, pose: Pose, robot_name: RobotName | None = None
) -> dict[str, Any]:
"""Evaluate a pose target without storing, previewing, executing, or moving.

Args:
pose: Target end-effector pose in world coordinates
robot_name: Robot to solve for (required if multiple robots configured)
"""
if self._world_monitor is None or self._kinematics is None:
return {
"success": False,
"joint_state": None,
"status": "UNAVAILABLE",
"message": "Planning is not initialized",
"position_error": None,
"orientation_error": None,
"collision_free": False,
}
robot = self._get_robot(robot_name)
if robot is None:
return {
"success": False,
"joint_state": None,
"status": "UNKNOWN_ROBOT",
"message": "Robot not found",
"position_error": None,
"orientation_error": None,
"collision_free": False,
}
_, robot_id, _, _ = robot
current = self._world_monitor.get_current_joint_state(robot_id)
if current is None:
return {
"success": False,
"joint_state": None,
"status": "NO_JOINT_STATE",
"message": "No joint state",
"position_error": None,
"orientation_error": None,
"collision_free": False,
}

from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped

target_pose = PoseStamped(
frame_id="world",
position=pose.position,
orientation=pose.orientation,
)
ik = self._kinematics.solve(
world=self._world_monitor.world,
robot_id=robot_id,
target_pose=target_pose,
seed=current,
check_collision=True,
)
joint_state = ik.joint_state if ik.is_success() else None
collision_free = bool(
joint_state is not None and self._world_monitor.is_state_valid(robot_id, joint_state)
)
return {
"success": joint_state is not None and collision_free,
"joint_state": joint_state,
"status": ik.status.name,
"message": ik.message,
"position_error": ik.position_error,
"orientation_error": ik.orientation_error,
"collision_free": collision_free,
}

@rpc
def evaluate_joint_target(
self, joints: JointState, robot_name: RobotName | None = None
) -> dict[str, Any]:
"""Evaluate candidate joints without planning or moving.

Args:
joints: Candidate joint state
robot_name: Robot to solve for (required if multiple robots configured)
"""
if self._world_monitor is None:
return {
"success": False,
"pose": None,
"joint_state": None,
"status": "UNAVAILABLE",
"message": "Planning is not initialized",
"collision_free": False,
}
robot = self._get_robot(robot_name)
if robot is None:
return {
"success": False,
"pose": None,
"joint_state": None,
"status": "UNKNOWN_ROBOT",
"message": "Robot not found",
"collision_free": False,
}
_, robot_id, config, _ = robot
joint_state = joints
if not joint_state.name:
joint_state = JointState.__new__(JointState)
joint_state.ts = joints.ts
joint_state.frame_id = joints.frame_id
joint_state.name = config.joint_names
joint_state.position = list(joints.position)
joint_state.velocity = list(joints.velocity or [])
joint_state.effort = list(joints.effort or [])
pose = self._world_monitor.get_ee_pose(robot_id, joint_state=joint_state)
collision_free = self._world_monitor.is_state_valid(robot_id, joint_state)
return {
"success": collision_free,
"pose": pose,
"joint_state": joint_state,
"status": "SUCCESS" if collision_free else "COLLISION",
"message": "" if collision_free else "Joint state is in collision or invalid",
"collision_free": collision_free,
}

@rpc
def get_init_joints(self, robot_name: RobotName | None = None) -> JointState | None:
"""Get the init joint state (captured at startup or set manually).
Expand Down
Loading
Loading