Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 13 additions & 26 deletions ros2model/api/runtime_parser/rosmodel_runtime_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import shutil
import subprocess
import ros2model.core.metamodels.metamodel_ros as ROSModel
from ros2model.core.metamodels.metamodel_ros import set_full_name
from ros2model.core.metamodels.metamodel_ros import set_full_name, split_interface_name
import ros2node.api as ROS2Node
from ros2cli.node.strategy import NodeStrategy

Expand Down Expand Up @@ -112,23 +112,9 @@ def get_parameter_type_string(parameter_type):


def get_interface_name(node_namespace: str, interface_name: str):
"""get interface relative name

Args:
node_namespace (str): _description_
interface_name (str): _description_

Returns:
_type_: _description_
"""
if node_namespace != "/":
prefix = f"{node_namespace}/"
else:
prefix = node_namespace

if interface_name.startswith(prefix):
interface_name = interface_name[len(prefix) :]
return interface_name
"""get interface relative name"""
_, name = split_interface_name(node_namespace, interface_name)
return name


def parse_interface(
Expand All @@ -149,24 +135,23 @@ def parse_interface(
"parse_interface: interface_name:",
get_interface_name(node.name.namespace, topic_info.name),
)
interface_name = get_interface_name(node.name.namespace, topic_info.name)
interface_namespace, interface_name = split_interface_name(
node.name.namespace, topic_info.name
)

existing = getattr(node, typee)
if any(item.name == interface_name for item in existing):
return
interface = ROSModel.InterfaceTypeImpl(
namespace=node.name.namespace,
namespace=interface_namespace,
name=interface_name,
type=topic_info.types[0],
)
getattr(node, typee).append(interface)


def set_name(namespace: str, full_name: str) -> str:
if namespace == "/":
name = full_name[1:]
else:
name = full_name[len(namespace) + 1 :]
_, name = split_interface_name(namespace, full_name)
return name


Expand All @@ -178,11 +163,13 @@ def parse_interface_verbose(
f"topic name again: {topic_endpoint_info.full_name[len(node.name.namespace) :]}\n"
)
existing = getattr(node, typee)
interface_name = set_name(node.name.namespace, topic_endpoint_info.full_name)
interface_namespace, interface_name = split_interface_name(
node.name.namespace, topic_endpoint_info.full_name
)
if any(item.name == interface_name for item in existing):
return
interface = ROSModel.InterfaceTypeImpl(
namespace=node.name.namespace,
namespace=interface_namespace,
name=interface_name,
type=topic_endpoint_info.info.topic_type,
qos=ROSModel.QualityOfService(
Expand Down
25 changes: 25 additions & 0 deletions ros2model/core/metamodels/metamodel_ros.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ def set_full_name(namespace: str, name: str) -> str:
return full_name


def split_interface_name(node_namespace: str, full_name: str) -> tuple[str, str]:
"""Return the namespace and local name for a ROS interface.

A namespaced node can still use global topics such as /tf. Only strip the
node namespace when the interface full name actually starts with it.
"""
if not full_name:
return node_namespace or "/", full_name

namespace = node_namespace or "/"
if not namespace.startswith("/"):
namespace = f"/{namespace}"

normalized_full_name = full_name
if not normalized_full_name.startswith("/"):
normalized_full_name = f"/{normalized_full_name}"

if namespace != "/":
prefix = namespace.rstrip("/") + "/"
if normalized_full_name.startswith(prefix):
return namespace, normalized_full_name[len(prefix) :]

return "/", normalized_full_name.lstrip("/")


class GraphName(BaseModel):
name: str
namespace: str
Expand Down
2 changes: 1 addition & 1 deletion ros2model/verb/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _format_field_type(field_type) -> str:
array_suffix = ""
array_start = type_repr.find("[")
if array_start != -1:
array_suffix = type_repr[array_start:]
array_suffix = "[]"
type_repr = type_repr[:array_start]
else:
array_start = None
Expand Down
53 changes: 53 additions & 0 deletions test/unittest/test_generate_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,58 @@ def test_generate_pkg(self):
self.assertEqual(expect_result.strip(), data.strip())


class test_runtime_interface_names(unittest.TestCase):
def test_namespaced_topic_is_made_relative(self):
namespace, name = split_interface_name("/mavros", "/mavros/local_position/pose")

self.assertEqual("/mavros", namespace)
self.assertEqual("local_position/pose", name)
self.assertEqual(
"/mavros/local_position/pose",
Publisher(
namespace=namespace,
name=name,
type="geometry_msgs/msg/PoseStamped",
).full_name,
)

def test_namespace_without_leading_slash_is_normalized(self):
self.assertEqual(
("/mavros", "local_position/pose"),
split_interface_name("mavros", "/mavros/local_position/pose"),
)

def test_namespace_with_trailing_slash_is_normalized(self):
self.assertEqual(
("/mavros/", "local_position/pose"),
split_interface_name("/mavros/", "/mavros/local_position/pose"),
)

def test_global_topic_from_namespaced_node_stays_global(self):
namespace, name = split_interface_name("/mavros", "/tf")

self.assertEqual("/", namespace)
self.assertEqual("tf", name)
self.assertEqual(
"/tf",
Publisher(
namespace=namespace,
name=name,
type="tf2_msgs/msg/TFMessage",
).full_name,
)
self.assertEqual(("/", "tf_static"), split_interface_name("/mavros", "/tf_static"))

def test_root_namespace_topic_drops_leading_slash(self):
self.assertEqual(("/", "diagnostics"), split_interface_name("/", "/diagnostics"))

def test_relative_full_name_is_treated_as_root_topic(self):
self.assertEqual(("/", "diagnostics"), split_interface_name("/mavros", "diagnostics"))

def test_empty_full_name_is_preserved(self):
self.assertEqual(("/mavros", ""), split_interface_name("/mavros", ""))
self.assertEqual(("/", ""), split_interface_name("", ""))


if __name__ == "__main__":
unittest.main()
25 changes: 25 additions & 0 deletions test/unittest/test_generate_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
from pathlib import Path
from ros2model.api.model_generator.message_generator import MessageGenerator
from ros2model.core.metamodels.metamodel_ros import *
from ros2model.verb.message import _format_field_type


class FieldType:
def __init__(self, type_repr, pkg_name=None):
self.type_repr = type_repr
self.pkg_name = pkg_name

def __str__(self):
return self.type_repr

test_interfaces = Package(
name="test_interfaces",
Expand Down Expand Up @@ -88,5 +98,20 @@ def test_generate_pkg(self):
self.assertEqual(expect_result.strip(), data.strip())


class test_field_type_formatting(unittest.TestCase):
def test_fixed_size_primitive_array_is_normalized(self):
self.assertEqual("float64[]", _format_field_type(FieldType("float64[36]")))
self.assertEqual("uint8[]", _format_field_type(FieldType("uint8[16]")))

def test_bounded_primitive_array_is_normalized(self):
self.assertEqual("float64[]", _format_field_type(FieldType("float64[<=3]")))

def test_bounded_message_array_is_normalized_and_qualified(self):
self.assertEqual(
"pkg/msg/SomeMsg[]",
_format_field_type(FieldType("pkg/SomeMsg[<=1]", pkg_name="pkg")),
)


if __name__ == "__main__":
unittest.main()