diff --git a/dimos/mapping/ray_tracing/test_transformer.py b/dimos/mapping/ray_tracing/test_transformer.py new file mode 100644 index 0000000000..11ef46ddc1 --- /dev/null +++ b/dimos/mapping/ray_tracing/test_transformer.py @@ -0,0 +1,82 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import numpy as np +from numpy.typing import NDArray +import pytest + +from dimos.mapping.ray_tracing.transformer import RayTraceMap +from dimos.memory2.type.observation import Observation +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 + + +def _obs( + points: NDArray[np.float32], ts: float, pose: tuple[float, float, float] +) -> Observation[PointCloud2]: + return Observation( + id=0, + ts=ts, + pose=pose, + _data=PointCloud2.from_numpy(points), + ) + + +def _cube(n: int = 100) -> NDArray[np.float32]: + rng = np.random.default_rng(0) + return rng.random((n, 3)).astype(np.float32) + + +def test_emit_every_n_yields_on_cadence_and_flushes_remainder() -> None: + points = _cube() + obs = [_obs(points, ts=float(i), pose=(0.0, 0.0, 0.0)) for i in range(7)] + + results = list(RayTraceMap(emit_every=3)(iter(obs))) + + assert [r.tags["frame_count"] for r in results] == [3, 6, 7] + + +def test_pose_propagates_to_emitted_obs() -> None: + pose = (1.5, -2.0, 0.5) + obs = _obs(_cube(), ts=1.0, pose=pose) + + [emitted] = list(RayTraceMap()(iter([obs]))) + + assert emitted.pose_tuple is not None + assert emitted.pose_tuple[:3] == pose + + +def test_emit_every_zero_yields_single_batch_map() -> None: + obs = [_obs(_cube(), ts=float(i), pose=(0.0, 0.0, 0.0)) for i in range(4)] + + results = list(RayTraceMap(emit_every=0)(iter(obs))) + + assert len(results) == 1 + assert results[0].tags["frame_count"] == 4 + + +def test_poseless_obs_are_skipped() -> None: + points = _cube() + poseless = Observation(id=1, ts=0.0, pose=None, _data=PointCloud2.from_numpy(points)) + posed = _obs(points, ts=1.0, pose=(0.0, 0.0, 0.0)) + + results = list(RayTraceMap()(iter([poseless, posed]))) + + assert [r.tags["frame_count"] for r in results] == [1] + + +def test_negative_emit_every_is_rejected() -> None: + with pytest.raises(ValueError): + RayTraceMap(emit_every=-1) diff --git a/dimos/mapping/ray_tracing/transformer.py b/dimos/mapping/ray_tracing/transformer.py new file mode 100644 index 0000000000..816ce715e6 --- /dev/null +++ b/dimos/mapping/ray_tracing/transformer.py @@ -0,0 +1,86 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import open3d as o3d # type: ignore[import-untyped] +import open3d.core as o3c # type: ignore[import-untyped] + +from dimos.mapping.ray_tracing.voxel_map import VoxelRayMapper +from dimos.memory2.transform import Transformer +from dimos.memory2.utils.poseless import posed_only +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 + +if TYPE_CHECKING: + from collections.abc import Iterator + + from dimos.memory2.type.observation import Observation + + +class RayTraceMap(Transformer[PointCloud2, PointCloud2]): + """Accumulate world-frame lidar into a voxel map with raycast clearing.""" + + def __init__( + self, + *, + voxel_size: float = 0.1, + max_range: float = 30.0, + emit_every: int = 1, + **mapper_kwargs: Any, + ) -> None: + if emit_every < 0: + raise ValueError(f"emit_every must be >= 0, got {emit_every}") + self.voxel_size = voxel_size + self.max_range = max_range + self.emit_every = emit_every + self._mapper_kwargs = mapper_kwargs + + def _make_obs( + self, + mapper: VoxelRayMapper, + last_obs: Observation[PointCloud2], + count: int, + ) -> Observation[PointCloud2]: + positions = mapper.global_map() + pcd = o3d.t.geometry.PointCloud() + pcd.point["positions"] = o3c.Tensor.from_numpy(positions) + cloud = PointCloud2(pointcloud=pcd, frame_id="world", ts=last_obs.ts) + return last_obs.derive( + data=cloud, + tags={**last_obs.tags, "frame_count": count}, + ) + + def __call__( + self, + upstream: Iterator[Observation[PointCloud2]], + ) -> Iterator[Observation[PointCloud2]]: + mapper = VoxelRayMapper( + voxel_size=self.voxel_size, max_range=self.max_range, **self._mapper_kwargs + ) + last_obs: Observation[PointCloud2] | None = None + count = 0 + + for obs, pose in posed_only(upstream, "RayTraceMap"): + x, y, z, *_ = pose + mapper.add_frame(obs.data.points_f32(), (x, y, z)) + last_obs = obs + count += 1 + + if self.emit_every > 0 and count % self.emit_every == 0: + yield self._make_obs(mapper, last_obs, count) + + if last_obs is not None and (self.emit_every == 0 or count % self.emit_every != 0): + yield self._make_obs(mapper, last_obs, count) diff --git a/dimos/memory2/utils/poseless.py b/dimos/memory2/utils/poseless.py new file mode 100644 index 0000000000..84a0744edf --- /dev/null +++ b/dimos/memory2/utils/poseless.py @@ -0,0 +1,60 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, TypeVar + +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from collections.abc import Iterator + + from dimos.memory2.type.observation import Observation, PoseTuple + +logger = setup_logger() + +# Warn once after this many leading pose-less observations. +_POSE_WARN_AFTER = 10 + +T = TypeVar("T") + + +def posed_only( + upstream: Iterator[Observation[T]], label: str +) -> Iterator[tuple[Observation[T], PoseTuple]]: + """Yield each posed observation paired with its pose tuple, dropping pose-less ones. + + Warns once if many leading observations arrive without a pose before anything + has been yielded, which usually points at a misconfigured stream name or + alignment tolerance. + """ + skipped = 0 + yielded = 0 + warned = False + for obs in upstream: + if obs.pose_tuple is None: + skipped += 1 + if not warned and yielded == 0 and skipped >= _POSE_WARN_AFTER: + logger.warning( + "%s: %d observations had no pose; check the lidar/odom " + "stream names and alignment tolerance", + label, + skipped, + ) + warned = True + logger.debug("%s: obs %s has no pose; skipping", label, obs.id) + continue + yielded += 1 + yield obs, obs.pose_tuple diff --git a/dimos/navigation/nav_3d/mls_planner/test_transformer.py b/dimos/navigation/nav_3d/mls_planner/test_transformer.py new file mode 100644 index 0000000000..eb5cd9ee34 --- /dev/null +++ b/dimos/navigation/nav_3d/mls_planner/test_transformer.py @@ -0,0 +1,75 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import numpy as np +from numpy.typing import NDArray + +from dimos.memory2.type.observation import Observation +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 +from dimos.navigation.nav_3d.mls_planner.transformer import MLSPlan + + +def _obs(points: NDArray[np.float32], pose: tuple[float, float, float]) -> Observation[PointCloud2]: + return Observation(id=0, ts=0.0, pose=pose, _data=PointCloud2.from_numpy(points)) + + +def _flat_floor(half_extent: float = 3.0, spacing: float = 0.1) -> NDArray[np.float32]: + coords = np.arange(-half_extent, half_extent, spacing, dtype=np.float32) + xs, ys = np.meshgrid(coords, coords) + zs = np.zeros_like(xs) + return np.stack([xs.ravel(), ys.ravel(), zs.ravel()], axis=1) + + +def test_start_z_is_dropped_by_robot_height() -> None: + obs = _obs(np.zeros((1, 3), dtype=np.float32), pose=(1.0, 2.0, 3.0)) + + [out] = list(MLSPlan(goal=(10.0, 10.0, 0.0), robot_height=0.4)(iter([obs]))) + + assert out.tags["start"] == (1.0, 2.0, 3.0 - 0.4) + + +def test_flat_floor_yields_populated_path_and_planned_true() -> None: + obs = _obs(_flat_floor(), pose=(-2.0, -2.0, 1.0)) + + [out] = list(MLSPlan(goal=(2.0, 2.0, 0.0), voxel_size=0.2, robot_height=1.0)(iter([obs]))) + + assert out.tags["planned"] is True + assert len(out.data.poses) >= 2 + assert out.tags["voxel_map"] is obs.data + assert out.tags["nodes"].shape[1] == 3 + assert out.tags["surface_map"].shape[1] == 3 + + +def test_no_route_yields_empty_path_with_planned_false() -> None: + rng = np.random.default_rng(0) + obs = _obs(rng.random((50, 3)).astype(np.float32), pose=(0.0, 0.0, 0.0)) + + [out] = list(MLSPlan(goal=(100.0, 100.0, 100.0))(iter([obs]))) + + assert out.tags["planned"] is False + assert out.data.poses == [] + + +def test_poseless_obs_is_skipped_and_following_posed_obs_plans() -> None: + poseless = Observation(id=1, ts=0.0, pose=None, _data=PointCloud2.from_numpy(_flat_floor())) + posed = _obs(_flat_floor(), pose=(-2.0, -2.0, 1.0)) + + results = list( + MLSPlan(goal=(2.0, 2.0, 0.0), voxel_size=0.2, robot_height=1.0)(iter([poseless, posed])) + ) + + assert len(results) == 1 + assert results[0].tags["planned"] is True diff --git a/dimos/navigation/nav_3d/mls_planner/transformer.py b/dimos/navigation/nav_3d/mls_planner/transformer.py new file mode 100644 index 0000000000..338a3c248e --- /dev/null +++ b/dimos/navigation/nav_3d/mls_planner/transformer.py @@ -0,0 +1,94 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from dimos.memory2.transform import Transformer +from dimos.memory2.utils.poseless import posed_only +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.nav_msgs.Path import Path +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 +from dimos.navigation.nav_3d.mls_planner.mls_planner import MLSPlanner + +if TYPE_CHECKING: + from collections.abc import Iterator + + import numpy as np + from numpy.typing import NDArray + + from dimos.memory2.type.observation import Observation + + +class MLSPlan(Transformer[PointCloud2, Path]): + """Plan paths from current pose to a fixed goal over an accumulating voxel map.""" + + def __init__( + self, + *, + goal: tuple[float, float, float], + voxel_size: float = 0.1, + robot_height: float = 1.5, + **planner_kwargs: Any, + ) -> None: + self.goal = goal + self.voxel_size = voxel_size + self.robot_height = robot_height + self._planner_kwargs = planner_kwargs + + def _path_from_waypoints(self, waypoints: NDArray[np.float32] | None, ts: float) -> Path: + poses: list[PoseStamped] = [] + if waypoints is not None: + for x, y, z in waypoints: + poses.append( + PoseStamped( + ts=ts, + frame_id="world", + position=(float(x), float(y), float(z)), + orientation=(0.0, 0.0, 0.0, 1.0), + ) + ) + return Path(ts=ts, frame_id="world", poses=poses) + + def __call__( + self, + upstream: Iterator[Observation[PointCloud2]], + ) -> Iterator[Observation[Path]]: + planner = MLSPlanner( + voxel_size=self.voxel_size, + robot_height=self.robot_height, + **self._planner_kwargs, + ) + for obs, pose in posed_only(upstream, "MLSPlan"): + x, y, z, *_ = pose + start = (float(x), float(y), float(z) - self.robot_height) + + voxel_map = obs.data + planner.update_global_map(voxel_map.points_f32()) + waypoints = planner.plan(start, self.goal) + path = self._path_from_waypoints(waypoints, obs.ts) + + yield obs.derive( + data=path, + tags={ + **obs.tags, + "voxel_map": voxel_map, + "surface_map": planner.surface_map(), + "nodes": planner.nodes(), + "node_edges": planner.node_edges(), + "start": start, + "planned": waypoints is not None, + }, + ) diff --git a/dimos/navigation/nav_3d/mls_planner/utils/plan_rrd.py b/dimos/navigation/nav_3d/mls_planner/utils/plan_rrd.py new file mode 100644 index 0000000000..494764695b --- /dev/null +++ b/dimos/navigation/nav_3d/mls_planner/utils/plan_rrd.py @@ -0,0 +1,184 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dev replay util: run a lidar+odometry .db through RayTraceMap and MLSPlan into rerun.""" + +from __future__ import annotations + +from pathlib import Path as FsPath +from typing import TYPE_CHECKING + +import rerun as rr +import typer + +from dimos.mapping.ray_tracing.transformer import RayTraceMap +from dimos.memory2.store.sqlite import SqliteStore +from dimos.memory2.transform import FnTransformer +from dimos.memory2.type.observation import Observation +from dimos.msgs.nav_msgs.Odometry import Odometry +from dimos.msgs.nav_msgs.Path import Path +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2, register_colormap_annotation +from dimos.navigation.nav_3d.mls_planner.transformer import MLSPlan +from dimos.utils.data import resolve_named_path + +if TYPE_CHECKING: + import numpy as np + from numpy.typing import NDArray + +TIMELINE = "ts" + +PairObs = Observation[tuple[Observation[PointCloud2], Observation[Odometry]]] + + +def _attach_pose_from_odom(pair_obs: PairObs) -> Observation[PointCloud2]: + lidar_obs = pair_obs.data[0] + odom_obs = pair_obs.data[1] + odom = odom_obs.data + pose_tuple = ( + float(odom.position.x), + float(odom.position.y), + float(odom.position.z), + float(odom.orientation.x), + float(odom.orientation.y), + float(odom.orientation.z), + float(odom.orientation.w), + ) + return lidar_obs.with_pose(pose_tuple) + + +def _log_edges(edges: NDArray[np.float32], entity: str) -> None: + if edges.size == 0: + rr.log(entity, rr.LineStrips3D([])) + return + segments = [ + [(float(r[0]), float(r[1]), float(r[2])), (float(r[3]), float(r[4]), float(r[5]))] + for r in edges + ] + rr.log(entity, rr.LineStrips3D(segments)) + + +def _log_path(path: Path, entity: str) -> None: + if not path.poses: + rr.log(entity, rr.LineStrips3D([])) + return + points = [(float(p.position.x), float(p.position.y), float(p.position.z)) for p in path.poses] + rr.log(entity, rr.LineStrips3D([points], colors=[[0, 255, 0]], radii=0.05)) + + +def main( + dataset: str = typer.Argument(..., help="Dataset .db: bare name (cwd or data/) or path"), + out: FsPath | None = typer.Option( + None, "--out", help="Output .rrd path. If omitted, spawn rerun live." + ), + lidar_stream: str = typer.Option( + "fastlio_lidar", "--lidar-stream", help="Lidar stream in the recording" + ), + odom_stream: str = typer.Option( + "fastlio_odometry", "--odom-stream", help="Odometry stream in the recording" + ), + align_tol: float = typer.Option(0.05, "--align-tol", help="Lidar/odom alignment tolerance (s)"), + voxel_size: float = typer.Option(0.1, "--voxel-size", help="Voxel edge length (m)"), + max_range: float = typer.Option(30.0, "--max-range", help="Max ray cast distance (m)"), + ray_subsample: int = typer.Option(1, "--ray-subsample", help="Keep every Nth ray"), + emit_every: int = typer.Option(1, "--emit-every", help="Replan every N lidar frames"), + robot_height: float = typer.Option(0.3, "--robot-height", help="Robot height (m)"), + node_spacing: float = typer.Option(1.0, "--node-spacing", help="Graph node spacing (m)"), + goal: tuple[float, float, float] = typer.Option( + (1.25, 35.45, 1.9), + "--goal", + help="Planner goal xyz. Default is dataset-specific; override per recording.", + ), + live: bool = typer.Option( + False, "--live", help="Also spawn the rerun viewer when --out is set" + ), + render_voxel: float = typer.Option(0.05, "--render-voxel", help="Rerun voxel render size (m)"), +) -> None: + db_path = resolve_named_path(dataset, ".db") + + rr.init("plan_rrd", recording_id=db_path.stem) + if out is not None and live: + # Generous viewer memory so the gRPC sink never backpressures the writer. + rr.spawn(connect=False, memory_limit="16GB", server_memory_limit="16GB") + rr.set_sinks(rr.GrpcSink(), rr.FileSink(str(out))) + elif out is not None: + rr.save(str(out)) + else: + rr.spawn() + register_colormap_annotation("turbo") + + store = SqliteStore(path=str(db_path)) + with store: + lidar = store.stream(lidar_stream, PointCloud2).order_by("ts") + odom = store.stream(odom_stream, Odometry).order_by("ts") + + pose_tagged = lidar.align(odom, tolerance=align_tol).transform( + FnTransformer(_attach_pose_from_odom) + ) + pipeline = pose_tagged.transform( + RayTraceMap( + voxel_size=voxel_size, + max_range=max_range, + ray_subsample=ray_subsample, + emit_every=emit_every, + ) + ).transform( + MLSPlan( + goal=goal, + voxel_size=voxel_size, + robot_height=robot_height, + node_spacing_m=node_spacing, + ) + ) + + rr.log("world/goal", rr.Points3D([goal], colors=[[255, 0, 0]], radii=0.1), static=True) + + for obs in pipeline: + rr.set_time(TIMELINE, timestamp=obs.ts) + + start = obs.tags["start"] + rr.log("world/start", rr.Points3D([start], colors=[[0, 255, 0]], radii=0.1)) + + voxel_map = obs.tags["voxel_map"] + rr.log("world/voxel_map", voxel_map.to_rerun(voxel_size=render_voxel)) + + surface = obs.tags["surface_map"] + if surface.size: + rr.log( + "world/surface_map", + rr.Points3D(surface, colors=[[120, 120, 200]], radii=render_voxel / 2), + ) + + nodes = obs.tags["nodes"] + if nodes.size: + rr.log("world/nodes", rr.Points3D(nodes, colors=[[255, 200, 0]], radii=0.05)) + + _log_edges(obs.tags["node_edges"], "world/node_edges") + _log_path(obs.data, "world/path") + + count = obs.tags.get("frame_count", "?") + planned = obs.tags.get("planned", False) + print( + f"frame_count={count} planned={planned} waypoints={len(obs.data.poses)}", + end="\r", + flush=True, + ) + print() + + if out is not None: + print(f"wrote {out}") + print(f"open with: rerun {out}") + + +if __name__ == "__main__": + typer.run(main)