Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ yolo11n.pt
# symlink one of .envrc.* if you'd like to use
.envrc
.claude
.opencode/
**/CLAUDE.md
.direnv/
.omo/

/logs

Expand Down
46 changes: 45 additions & 1 deletion dimos/manipulation/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,24 @@
from dimos.hardware.sensors.camera.realsense.camera import RealSenseCamera
from dimos.manipulation.manipulation_module import ManipulationModule
from dimos.manipulation.pick_and_place_module import PickAndPlaceModule
from dimos.manipulation.viser_panel.module import ViserManipulationPanelModule
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Transform import Transform
from dimos.msgs.geometry_msgs.Vector3 import Vector3
from dimos.msgs.sensor_msgs.JointState import JointState
from dimos.perception.object_scene_registration import ObjectSceneRegistrationModule
from dimos.robot.catalog.ufactory import xarm6 as _catalog_xarm6, xarm7 as _catalog_xarm7


def _quaternion(x: float, y: float, z: float, w: float) -> Quaternion:
quaternion = Quaternion.__new__(Quaternion)
quaternion.x = x
quaternion.y = y
quaternion.z = z
quaternion.w = w
return quaternion


# Single XArm6 planner (standalone, no coordinator)
_xarm6_planner_cfg = _catalog_xarm6(
name="arm",
Expand Down Expand Up @@ -121,6 +132,38 @@
)


_xarm7_viser_mock_cfg = _catalog_xarm7(
name="arm",
adapter_type="mock",
add_gripper=True,
)

xarm7_viser_panel_mock = autoconnect(
ManipulationModule.blueprint(
robots=[_xarm7_viser_mock_cfg.to_robot_model_config()],
planning_timeout=10.0,
enable_viz=True,
),
ControlCoordinator.blueprint(
tick_rate=100.0,
publish_joint_state=True,
joint_state_frame_id="coordinator",
hardware=[_xarm7_viser_mock_cfg.to_hardware_component()],
tasks=[_xarm7_viser_mock_cfg.to_task_config()],
),
ViserManipulationPanelModule.blueprint(
host="127.0.0.1",
port=8095,
default_robot="arm",
allow_plan_execute=True,
),
).transports(
{
("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState),
}
)


# XArm7 planner + LLM agent for testing base ManipulationModule skills
# No perception — uses the base module's planning + gripper skills only.
# Usage: dimos run coordinator-mock, then dimos run xarm7-planner-coordinator-agent
Expand Down Expand Up @@ -164,7 +207,7 @@
# Usage: dimos run coordinator-mock, then dimos run xarm-perception
_XARM_PERCEPTION_CAMERA_TRANSFORM = Transform(
translation=Vector3(x=0.06693724, y=-0.0309563, z=0.00691482),
rotation=Quaternion(0.70513398, 0.00535696, 0.70897578, -0.01052180), # xyzw
rotation=_quaternion(0.70513398, 0.00535696, 0.70897578, -0.01052180), # xyzw
)

_xarm7_perception_cfg = _catalog_xarm7(
Expand Down Expand Up @@ -342,6 +385,7 @@
"xarm6_planner_only",
"xarm7_planner_coordinator",
"xarm7_planner_coordinator_agent",
"xarm7_viser_panel_mock",
"xarm_perception",
"xarm_perception_agent",
"xarm_perception_sim",
Expand Down
177 changes: 177 additions & 0 deletions dimos/manipulation/manipulation_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,41 @@ def has_planned_path(self) -> bool:
path = self._planned_paths.get(robot_name)
return path is not None and len(path) > 0

@rpc
def get_planned_path(self, robot_name: RobotName | None = None) -> JointPath | None:
"""Get the currently stored planned path for a robot.
Args:
robot_name: Robot to query (required if multiple robots configured)
Returns:
Planned path waypoints or None if no path is stored
"""
robot = self._get_robot(robot_name)
if robot is None:
return None
return self._planned_paths.get(robot[0])

@rpc
def get_planned_path_poses(self, robot_name: RobotName | None = None) -> list[Pose] | None:
"""Get end-effector poses for the stored planned path.
Args:
robot_name: Robot to query (required if multiple robots configured)
"""
if self._world_monitor is None:
return None
robot = self._get_robot(robot_name)
if robot is None:
return None
robot_name, robot_id, _, _ = robot
path = self._planned_paths.get(robot_name)
if path is None:
return None
return [
self._world_monitor.get_ee_pose(robot_id, joint_state=waypoint) for waypoint in path
]

@rpc
def get_visualization_url(self) -> str | None:
"""Get the visualization URL.
Expand Down Expand Up @@ -633,12 +668,32 @@ def get_robot_info(self, robot_name: RobotName | None = None) -> dict[str, Any]

robot_name, robot_id, config, _ = robot

joint_limits = None
if config.joint_limits_lower is not None and config.joint_limits_upper is not None:
joint_limits = list(
zip(config.joint_limits_lower, config.joint_limits_upper, strict=False)
)
elif self._world_monitor is not None:
try:
lower, upper = self._world_monitor.get_joint_limits(robot_id)
joint_limits = [
(float(lower_value), float(upper_value))
for lower_value, upper_value in zip(lower, upper, strict=False)
]
except Exception as e:
logger.debug(f"Could not load joint limits for '{robot_name}': {e}")

return {
"name": config.name,
"world_robot_id": robot_id,
"joint_names": config.joint_names,
"end_effector_link": config.end_effector_link,
"base_link": config.base_link,
"model_path": str(config.model_path),
"base_pose": config.base_pose,
"joint_limits": joint_limits,
"package_paths": {package: str(path) for package, path in config.package_paths.items()},
"xacro_args": dict(config.xacro_args),
"max_velocity": config.max_velocity,
"max_acceleration": config.max_acceleration,
"has_joint_name_mapping": bool(config.joint_name_mapping),
Expand All @@ -650,6 +705,128 @@ def get_robot_info(self, robot_name: RobotName | None = None) -> dict[str, Any]
else None,
}

@rpc
def evaluate_pose_target(
self, pose: Pose, robot_name: RobotName | None = None
) -> dict[str, Any]:
"""Evaluate a pose target without storing, previewing, executing, or moving.
Args:
pose: Target end-effector pose in world coordinates
robot_name: Robot to solve for (required if multiple robots configured)
"""
if self._world_monitor is None or self._kinematics is None:
return {
"success": False,
"joint_state": None,
"status": "UNAVAILABLE",
"message": "Planning is not initialized",
"position_error": None,
"orientation_error": None,
"collision_free": False,
}
robot = self._get_robot(robot_name)
if robot is None:
return {
"success": False,
"joint_state": None,
"status": "UNKNOWN_ROBOT",
"message": "Robot not found",
"position_error": None,
"orientation_error": None,
"collision_free": False,
}
_, robot_id, _, _ = robot
current = self._world_monitor.get_current_joint_state(robot_id)
if current is None:
return {
"success": False,
"joint_state": None,
"status": "NO_JOINT_STATE",
"message": "No joint state",
"position_error": None,
"orientation_error": None,
"collision_free": False,
}

from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped

target_pose = PoseStamped(
frame_id="world",
position=pose.position,
orientation=pose.orientation,
)
ik = self._kinematics.solve(
world=self._world_monitor.world,
robot_id=robot_id,
target_pose=target_pose,
seed=current,
check_collision=True,
)
joint_state = ik.joint_state if ik.is_success() else None
collision_free = bool(
joint_state is not None and self._world_monitor.is_state_valid(robot_id, joint_state)
)
return {
"success": joint_state is not None and collision_free,
"joint_state": joint_state,
"status": ik.status.name,
"message": ik.message,
"position_error": ik.position_error,
"orientation_error": ik.orientation_error,
"collision_free": collision_free,
}

@rpc
def evaluate_joint_target(
self, joints: JointState, robot_name: RobotName | None = None
) -> dict[str, Any]:
"""Evaluate candidate joints without planning or moving.
Args:
joints: Candidate joint state
robot_name: Robot to solve for (required if multiple robots configured)
"""
if self._world_monitor is None:
return {
"success": False,
"pose": None,
"joint_state": None,
"status": "UNAVAILABLE",
"message": "Planning is not initialized",
"collision_free": False,
}
robot = self._get_robot(robot_name)
if robot is None:
return {
"success": False,
"pose": None,
"joint_state": None,
"status": "UNKNOWN_ROBOT",
"message": "Robot not found",
"collision_free": False,
}
_, robot_id, config, _ = robot
joint_state = joints
if not joint_state.name:
joint_state = JointState.__new__(JointState)
joint_state.ts = joints.ts
joint_state.frame_id = joints.frame_id
joint_state.name = config.joint_names
joint_state.position = list(joints.position)
joint_state.velocity = list(joints.velocity or [])
joint_state.effort = list(joints.effort or [])
pose = self._world_monitor.get_ee_pose(robot_id, joint_state=joint_state)
collision_free = self._world_monitor.is_state_valid(robot_id, joint_state)
return {
"success": collision_free,
"pose": pose,
"joint_state": joint_state,
"status": "SUCCESS" if collision_free else "COLLISION",
"message": "" if collision_free else "Joint state is in collision or invalid",
"collision_free": collision_free,
}

@rpc
def get_init_joints(self, robot_name: RobotName | None = None) -> JointState | None:
"""Get the init joint state (captured at startup or set manually).
Expand Down
Loading
Loading