Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ yolo11n.pt
# symlink one of .envrc.* if you'd like to use
.envrc
.claude
.opencode/
**/CLAUDE.md
.direnv/
.omo/

/logs

Expand Down
33 changes: 33 additions & 0 deletions dimos/manipulation/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from dimos.hardware.sensors.camera.realsense.camera import RealSenseCamera
from dimos.manipulation.manipulation_module import ManipulationModule
from dimos.manipulation.pick_and_place_module import PickAndPlaceModule
from dimos.manipulation.viser_panel.module import ViserManipulationPanelModule
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Transform import Transform
from dimos.msgs.geometry_msgs.Vector3 import Vector3
Expand Down Expand Up @@ -121,6 +122,38 @@
)


_xarm7_viser_mock_cfg = _catalog_xarm7(
name="arm",
adapter_type="mock",
add_gripper=True,
)

xarm7_viser_panel_mock = autoconnect(
ManipulationModule.blueprint(
robots=[_xarm7_viser_mock_cfg.to_robot_model_config()],
planning_timeout=10.0,
enable_viz=True,
),
ControlCoordinator.blueprint(
tick_rate=100.0,
publish_joint_state=True,
joint_state_frame_id="coordinator",
hardware=[_xarm7_viser_mock_cfg.to_hardware_component()],
tasks=[_xarm7_viser_mock_cfg.to_task_config()],
),
ViserManipulationPanelModule.blueprint(
host="127.0.0.1",
port=8095,
default_robot="arm",
allow_plan_execute=True,
),
).transports(
{
("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState),
}
)


# XArm7 planner + LLM agent for testing base ManipulationModule skills
# No perception — uses the base module's planning + gripper skills only.
# Usage: dimos run coordinator-mock, then dimos run xarm7-planner-coordinator-agent
Expand Down
149 changes: 149 additions & 0 deletions dimos/manipulation/manipulation_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,21 @@ def has_planned_path(self) -> bool:
path = self._planned_paths.get(robot_name)
return path is not None and len(path) > 0

@rpc
def get_planned_path(self, robot_name: RobotName | None = None) -> JointPath | None:
"""Get the currently stored planned path for a robot.

Args:
robot_name: Robot to query (required if multiple robots configured)

Returns:
Planned path waypoints or None if no path is stored
"""
robot = self._get_robot(robot_name)
if robot is None:
return None
return self._planned_paths.get(robot[0])

@rpc
def get_visualization_url(self) -> str | None:
"""Get the visualization URL.
Expand Down Expand Up @@ -633,12 +648,32 @@ def get_robot_info(self, robot_name: RobotName | None = None) -> dict[str, Any]

robot_name, robot_id, config, _ = robot

joint_limits = None
if config.joint_limits_lower is not None and config.joint_limits_upper is not None:
joint_limits = list(
zip(config.joint_limits_lower, config.joint_limits_upper, strict=False)
)
elif self._world_monitor is not None:
try:
lower, upper = self._world_monitor.get_joint_limits(robot_id)
joint_limits = [
(float(lower_value), float(upper_value))
for lower_value, upper_value in zip(lower, upper, strict=False)
]
except Exception as e:
logger.debug(f"Could not load joint limits for '{robot_name}': {e}")

return {
"name": config.name,
"world_robot_id": robot_id,
"joint_names": config.joint_names,
"end_effector_link": config.end_effector_link,
"base_link": config.base_link,
"model_path": str(config.model_path),
"base_pose": config.base_pose,
"joint_limits": joint_limits,
"package_paths": {package: str(path) for package, path in config.package_paths.items()},
"xacro_args": dict(config.xacro_args),
"max_velocity": config.max_velocity,
"max_acceleration": config.max_acceleration,
"has_joint_name_mapping": bool(config.joint_name_mapping),
Expand All @@ -650,6 +685,120 @@ def get_robot_info(self, robot_name: RobotName | None = None) -> dict[str, Any]
else None,
}

@rpc
def solve_ik_preview(self, pose: Pose, robot_name: RobotName | None = None) -> dict[str, Any]:
"""Preview IK for a target pose without storing, previewing, executing, or moving.

Args:
pose: Target end-effector pose in world coordinates
robot_name: Robot to solve for (required if multiple robots configured)
"""
if self._world_monitor is None or self._kinematics is None:
return {
"success": False,
"joint_state": None,
"status": "UNAVAILABLE",
"message": "Planning is not initialized",
"position_error": None,
"orientation_error": None,
"collision_free": False,
}
robot = self._get_robot(robot_name)
if robot is None:
return {
"success": False,
"joint_state": None,
"status": "UNKNOWN_ROBOT",
"message": "Robot not found",
"position_error": None,
"orientation_error": None,
"collision_free": False,
}
_, robot_id, _, _ = robot
current = self._world_monitor.get_current_joint_state(robot_id)
if current is None:
return {
"success": False,
"joint_state": None,
"status": "NO_JOINT_STATE",
"message": "No joint state",
"position_error": None,
"orientation_error": None,
"collision_free": False,
}

from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped

target_pose = PoseStamped(
frame_id="world",
position=pose.position,
orientation=pose.orientation,
)
ik = self._kinematics.solve(
world=self._world_monitor.world,
robot_id=robot_id,
target_pose=target_pose,
seed=current,
check_collision=True,
)
joint_state = ik.joint_state if ik.is_success() else None
collision_free = bool(
joint_state is not None and self._world_monitor.is_state_valid(robot_id, joint_state)
)
return {
"success": joint_state is not None and collision_free,
"joint_state": joint_state,
"status": ik.status.name,
"message": ik.message,
"position_error": ik.position_error,
"orientation_error": ik.orientation_error,
"collision_free": collision_free,
}

@rpc
def solve_fk_preview(
self, joints: JointState, robot_name: RobotName | None = None
) -> dict[str, Any]:
"""Preview FK and feasibility for candidate joints without planning or moving.

Args:
joints: Candidate joint state
robot_name: Robot to solve for (required if multiple robots configured)
"""
if self._world_monitor is None:
return {
"success": False,
"pose": None,
"joint_state": None,
"status": "UNAVAILABLE",
"message": "Planning is not initialized",
"collision_free": False,
}
robot = self._get_robot(robot_name)
if robot is None:
return {
"success": False,
"pose": None,
"joint_state": None,
"status": "UNKNOWN_ROBOT",
"message": "Robot not found",
"collision_free": False,
}
_, robot_id, config, _ = robot
joint_state = joints
if not joint_state.name:
joint_state = JointState(name=config.joint_names, position=list(joints.position))
pose = self._world_monitor.get_ee_pose(robot_id, joint_state=joint_state)
collision_free = self._world_monitor.is_state_valid(robot_id, joint_state)
return {
"success": collision_free,
"pose": pose,
"joint_state": joint_state,
"status": "SUCCESS" if collision_free else "COLLISION",
"message": "" if collision_free else "Joint state is in collision or invalid",
"collision_free": collision_free,
}

@rpc
def get_init_joints(self, robot_name: RobotName | None = None) -> JointState | None:
"""Get the init joint state (captured at startup or set manually).
Expand Down
78 changes: 78 additions & 0 deletions dimos/manipulation/test_manipulation_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
ManipulationState,
)
from dimos.manipulation.planning.spec.config import RobotModelConfig
from dimos.manipulation.planning.spec.enums import IKStatus
from dimos.manipulation.planning.spec.models import IKResult
from dimos.msgs.geometry_msgs.Pose import Pose
from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Vector3 import Vector3
Expand All @@ -45,6 +48,10 @@ def robot_config():
joint_names=["joint1", "joint2", "joint3"],
end_effector_link="link_tcp",
base_link="link_base",
package_paths={"robot_description": Path("/path/to")},
joint_limits_lower=[-1.0, -2.0, -3.0],
joint_limits_upper=[1.0, 2.0, 3.0],
xacro_args={"prefix": "test_"},
max_velocity=1.0,
max_acceleration=2.0,
coordinator_task_name="traj_arm",
Expand Down Expand Up @@ -269,6 +276,77 @@ def test_execute_rejected(self, robot_config, simple_trajectory):
assert module._state == ManipulationState.FAULT


class TestManipulationPanelRPCs:
def test_get_robot_info_includes_panel_metadata(self, robot_config):
module = _make_module()
module._robots = {"test_arm": ("robot_id", robot_config, MagicMock())}
module._init_joints = {
"test_arm": JointState(
name=robot_config.joint_names,
position=[0.1, 0.2, 0.3],
)
}

info = module.get_robot_info("test_arm")

assert info is not None
assert info["model_path"] == "/path/to/robot.urdf"
assert info["base_pose"] == robot_config.base_pose
assert info["package_paths"] == {"robot_description": "/path/to"}
assert info["xacro_args"] == {"prefix": "test_"}
assert info["joint_limits"] == [(-1.0, 1.0), (-2.0, 2.0), (-3.0, 3.0)]
assert info["init_joints"] == [0.1, 0.2, 0.3]

def test_get_planned_path_returns_stored_path(self, robot_config):
module = _make_module()
module._robots = {"test_arm": ("robot_id", robot_config, MagicMock())}
path = [JointState(name=robot_config.joint_names, position=[0.0, 0.0, 0.0])]
module._planned_paths = {"test_arm": path}

assert module.get_planned_path("test_arm") is path

def test_solve_ik_preview_does_not_store_path_or_change_state(self, robot_config):
module = _make_module_with_monitor(robot_config)
module._kinematics = MagicMock()
current = JointState(name=robot_config.joint_names, position=[0.0, 0.0, 0.0])
solution = JointState(name=robot_config.joint_names, position=[0.1, 0.2, 0.3])
module._world_monitor.get_current_joint_state.return_value = current
module._world_monitor.is_state_valid.return_value = True
module._kinematics.solve.return_value = IKResult(
status=IKStatus.SUCCESS,
joint_state=solution,
position_error=0.001,
orientation_error=0.002,
message="ok",
)

result = module.solve_ik_preview(Pose(), "test_arm")

assert result["success"] is True
assert result["joint_state"] is solution
assert result["status"] == "SUCCESS"
assert module._planned_paths == {}
assert module._planned_trajectories == {}
assert module._state == ManipulationState.IDLE

def test_solve_fk_preview_returns_pose_and_feasibility(self, robot_config):
module = _make_module_with_monitor(robot_config)
target = JointState(name=[], position=[0.1, 0.2, 0.3])
pose = PoseStamped(position=Vector3(0.4, 0.5, 0.6), orientation=Quaternion())
module._world_monitor.get_ee_pose.return_value = pose
module._world_monitor.is_state_valid.return_value = True

result = module.solve_fk_preview(target, "test_arm")

assert result["success"] is True
assert result["pose"] is pose
assert result["joint_state"].name == robot_config.joint_names
assert result["joint_state"].position == [0.1, 0.2, 0.3]
assert result["collision_free"] is True
assert module._planned_paths == {}
assert module._state == ManipulationState.IDLE


class TestRobotModelConfigMapping:
"""Test RobotModelConfig joint name mapping helpers."""

Expand Down
Loading
Loading