Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions dimos/mapping/ray_tracing/test_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import numpy as np
from numpy.typing import NDArray
import pytest

from dimos.mapping.ray_tracing.transformer import RayTraceMap
from dimos.memory2.type.observation import Observation
from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2


def _obs(
points: NDArray[np.float32], ts: float, pose: tuple[float, float, float]
) -> Observation[PointCloud2]:
return Observation(
id=0,
ts=ts,
pose=pose,
_data=PointCloud2.from_numpy(points),
)


def _cube(n: int = 100) -> NDArray[np.float32]:
rng = np.random.default_rng(0)
return rng.random((n, 3)).astype(np.float32)


def test_emit_every_n_yields_on_cadence_and_flushes_remainder() -> None:
points = _cube()
obs = [_obs(points, ts=float(i), pose=(0.0, 0.0, 0.0)) for i in range(7)]

results = list(RayTraceMap(emit_every=3)(iter(obs)))

assert [r.tags["frame_count"] for r in results] == [3, 6, 7]


def test_pose_propagates_to_emitted_obs() -> None:
pose = (1.5, -2.0, 0.5)
obs = _obs(_cube(), ts=1.0, pose=pose)

[emitted] = list(RayTraceMap()(iter([obs])))

assert emitted.pose_tuple is not None
assert emitted.pose_tuple[:3] == pose


def test_emit_every_zero_yields_single_batch_map() -> None:
obs = [_obs(_cube(), ts=float(i), pose=(0.0, 0.0, 0.0)) for i in range(4)]

results = list(RayTraceMap(emit_every=0)(iter(obs)))

assert len(results) == 1
assert results[0].tags["frame_count"] == 4


def test_poseless_obs_are_skipped() -> None:
points = _cube()
poseless = Observation(id=1, ts=0.0, pose=None, _data=PointCloud2.from_numpy(points))
posed = _obs(points, ts=1.0, pose=(0.0, 0.0, 0.0))

results = list(RayTraceMap()(iter([poseless, posed])))

assert [r.tags["frame_count"] for r in results] == [1]


def test_negative_emit_every_is_rejected() -> None:
with pytest.raises(ValueError):
RayTraceMap(emit_every=-1)
86 changes: 86 additions & 0 deletions dimos/mapping/ray_tracing/transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Any

import open3d as o3d # type: ignore[import-untyped]
import open3d.core as o3c # type: ignore[import-untyped]

from dimos.mapping.ray_tracing.voxel_map import VoxelRayMapper
from dimos.memory2.transform import Transformer
from dimos.memory2.utils.poseless import posed_only
from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2

if TYPE_CHECKING:
from collections.abc import Iterator

from dimos.memory2.type.observation import Observation


class RayTraceMap(Transformer[PointCloud2, PointCloud2]):
"""Accumulate world-frame lidar into a voxel map with raycast clearing."""

def __init__(
self,
*,
voxel_size: float = 0.1,
max_range: float = 30.0,
emit_every: int = 1,
**mapper_kwargs: Any,
) -> None:
if emit_every < 0:
raise ValueError(f"emit_every must be >= 0, got {emit_every}")
self.voxel_size = voxel_size
self.max_range = max_range
self.emit_every = emit_every
self._mapper_kwargs = mapper_kwargs

def _make_obs(
self,
mapper: VoxelRayMapper,
last_obs: Observation[PointCloud2],
count: int,
) -> Observation[PointCloud2]:
positions = mapper.global_map()
pcd = o3d.t.geometry.PointCloud()
pcd.point["positions"] = o3c.Tensor.from_numpy(positions)
cloud = PointCloud2(pointcloud=pcd, frame_id="world", ts=last_obs.ts)
return last_obs.derive(
data=cloud,
tags={**last_obs.tags, "frame_count": count},
)

def __call__(
self,
upstream: Iterator[Observation[PointCloud2]],
) -> Iterator[Observation[PointCloud2]]:
mapper = VoxelRayMapper(
voxel_size=self.voxel_size, max_range=self.max_range, **self._mapper_kwargs
)
last_obs: Observation[PointCloud2] | None = None
count = 0

for obs, pose in posed_only(upstream, "RayTraceMap"):
x, y, z, *_ = pose
mapper.add_frame(obs.data.points_f32(), (x, y, z))
last_obs = obs
count += 1

if self.emit_every > 0 and count % self.emit_every == 0:
yield self._make_obs(mapper, last_obs, count)

if last_obs is not None and (self.emit_every == 0 or count % self.emit_every != 0):
yield self._make_obs(mapper, last_obs, count)
60 changes: 60 additions & 0 deletions dimos/memory2/utils/poseless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, TypeVar

from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from collections.abc import Iterator

from dimos.memory2.type.observation import Observation, PoseTuple

logger = setup_logger()

# Warn once after this many leading pose-less observations.
_POSE_WARN_AFTER = 10

T = TypeVar("T")


def posed_only(
upstream: Iterator[Observation[T]], label: str
) -> Iterator[tuple[Observation[T], PoseTuple]]:
"""Yield each posed observation paired with its pose tuple, dropping pose-less ones.

Warns once if many leading observations arrive without a pose before anything
has been yielded, which usually points at a misconfigured stream name or
alignment tolerance.
"""
skipped = 0
yielded = 0
warned = False
for obs in upstream:
if obs.pose_tuple is None:
skipped += 1
if not warned and yielded == 0 and skipped >= _POSE_WARN_AFTER:
logger.warning(
"%s: %d observations had no pose; check the lidar/odom "
"stream names and alignment tolerance",
label,
skipped,
)
warned = True
logger.debug("%s: obs %s has no pose; skipping", label, obs.id)
continue
yielded += 1
yield obs, obs.pose_tuple
75 changes: 75 additions & 0 deletions dimos/navigation/nav_3d/mls_planner/test_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import numpy as np
from numpy.typing import NDArray

from dimos.memory2.type.observation import Observation
from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2
from dimos.navigation.nav_3d.mls_planner.transformer import MLSPlan


def _obs(points: NDArray[np.float32], pose: tuple[float, float, float]) -> Observation[PointCloud2]:
return Observation(id=0, ts=0.0, pose=pose, _data=PointCloud2.from_numpy(points))


def _flat_floor(half_extent: float = 3.0, spacing: float = 0.1) -> NDArray[np.float32]:
coords = np.arange(-half_extent, half_extent, spacing, dtype=np.float32)
xs, ys = np.meshgrid(coords, coords)
zs = np.zeros_like(xs)
return np.stack([xs.ravel(), ys.ravel(), zs.ravel()], axis=1)


def test_start_z_is_dropped_by_robot_height() -> None:
obs = _obs(np.zeros((1, 3), dtype=np.float32), pose=(1.0, 2.0, 3.0))

[out] = list(MLSPlan(goal=(10.0, 10.0, 0.0), robot_height=0.4)(iter([obs])))

assert out.tags["start"] == (1.0, 2.0, 3.0 - 0.4)


def test_flat_floor_yields_populated_path_and_planned_true() -> None:
obs = _obs(_flat_floor(), pose=(-2.0, -2.0, 1.0))

[out] = list(MLSPlan(goal=(2.0, 2.0, 0.0), voxel_size=0.2, robot_height=1.0)(iter([obs])))

assert out.tags["planned"] is True
assert len(out.data.poses) >= 2
assert out.tags["voxel_map"] is obs.data
assert out.tags["nodes"].shape[1] == 3
assert out.tags["surface_map"].shape[1] == 3


def test_no_route_yields_empty_path_with_planned_false() -> None:
rng = np.random.default_rng(0)
obs = _obs(rng.random((50, 3)).astype(np.float32), pose=(0.0, 0.0, 0.0))

[out] = list(MLSPlan(goal=(100.0, 100.0, 100.0))(iter([obs])))

assert out.tags["planned"] is False
assert out.data.poses == []


def test_poseless_obs_is_skipped_and_following_posed_obs_plans() -> None:
poseless = Observation(id=1, ts=0.0, pose=None, _data=PointCloud2.from_numpy(_flat_floor()))
posed = _obs(_flat_floor(), pose=(-2.0, -2.0, 1.0))

results = list(
MLSPlan(goal=(2.0, 2.0, 0.0), voxel_size=0.2, robot_height=1.0)(iter([poseless, posed]))
)

assert len(results) == 1
assert results[0].tags["planned"] is True
Loading
Loading