Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions rmf_fleet_adapter/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ target_link_libraries(rmf_fleet_adapter
rmf_websocket::rmf_websocket
nlohmann_json::nlohmann_json
rmf_api_msgs::rmf_api_msgs
nlohmann_json_schema_validator
nlohmann_json_schema_validator::validator
)

target_include_directories(rmf_fleet_adapter
Expand All @@ -118,7 +118,6 @@ target_include_directories(rmf_fleet_adapter
PRIVATE
${rmf_api_msgs_INCLUDE_DIRS}
${rmf_websocket_INCLUDE_DIR}
${nlohmann_json_schema_validator_INCLUDE_DIRS}
)

if (BUILD_TESTING)
Expand Down Expand Up @@ -147,7 +146,6 @@ if (BUILD_TESTING)
# private includes of rmf_fleet_adapter
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rmf_fleet_adapter>
${rmf_api_msgs_INCLUDE_DIRS}
${nlohmann_json_schema_validator_INCLUDE_DIRS}
)
target_link_libraries(test_rmf_fleet_adapter
PUBLIC
Expand All @@ -166,7 +164,7 @@ if (BUILD_TESTING)
rmf_fleet_adapter
rmf_utils::rmf_utils
rmf_api_msgs::rmf_api_msgs
nlohmann_json_schema_validator
nlohmann_json_schema_validator::validator
)

target_compile_definitions(test_rmf_fleet_adapter
Expand Down
2 changes: 1 addition & 1 deletion rmf_task_ros2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ target_link_libraries(rmf_task_ros2
${rmf_task_msgs_LIBRARIES}
${rclcpp_LIBRARIES}
nlohmann_json::nlohmann_json
nlohmann_json_schema_validator
nlohmann_json_schema_validator::validator
)

target_include_directories(rmf_task_ros2
Expand Down
44 changes: 19 additions & 25 deletions rmf_websocket/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
cmake_minimum_required(VERSION 3.5)
cmake_minimum_required(VERSION 3.22)

project(rmf_websocket)

# Websocketpp is not compatible with CXX above 17
set(CMAKE_CXX_STANDARD 17)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
# we dont use add_compile_options with pedantic in message packages
# because the Python C extensions dont comply with it
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wpedantic")
endif()

if(POLICY CMP0167)
cmake_policy(SET CMP0167 NEW)
endif()

include(GNUInstallDirs)

find_package(ament_cmake REQUIRED)
Expand All @@ -19,10 +21,10 @@ find_package(rclcpp REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(nlohmann_json_schema_validator_vendor REQUIRED)
find_package(nlohmann_json_schema_validator REQUIRED)
find_package(websocketpp REQUIRED)
find_package(Boost COMPONENTS system REQUIRED)
find_package(Boost REQUIRED)
find_package(Threads)

add_subdirectory(websocketpp)

add_library(rmf_websocket SHARED
src/rmf_websocket/client/ClientWebSocketEndpoint.cpp
Expand All @@ -33,30 +35,29 @@ add_library(rmf_websocket SHARED
target_link_libraries(rmf_websocket
PUBLIC
${rclcpp_LIBRARIES}
${websocketpp_LIBRARIES}
rmf_utils::rmf_utils
nlohmann_json::nlohmann_json
nlohmann_json_schema_validator
nlohmann_json_schema_validator::validator
PRIVATE
Boost::system
${Boost_LIBRARIES}
Threads::Threads
websocketpp
)

target_include_directories(rmf_websocket
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
${rclcpp_INCLUDE_DIRS}
${WEBSOCKETPP_INCLUDE_DIR}
)

add_executable(example_client
examples/client.cpp)
examples/client.cpp
)

target_link_libraries(example_client
PUBLIC
rmf_websocket
${websocketpp_LIBRARIES}
PRIVATE
Threads::Threads
)
Expand All @@ -66,12 +67,11 @@ target_include_directories(example_client
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
${rclcpp_INCLUDE_DIRS}
${WEBSOCKETPP_INCLUDE_DIR}
)


ament_export_targets(export_rmf_websocket HAS_LIBRARY_TARGET)
ament_export_dependencies(rmf_traffic rclcpp nlohmann_json websocketpp)
ament_export_dependencies(rmf_traffic rclcpp nlohmann_json)

#===============================================================================
install(
Expand Down Expand Up @@ -112,8 +112,8 @@ if(BUILD_TESTING)
target_link_libraries(test_ring_buffer
PRIVATE
rmf_utils::rmf_utils
Boost::system
)
${Boost_LIBRARIES}
)

#integration test
find_package(OpenSSL REQUIRED)
Expand All @@ -122,21 +122,19 @@ if(BUILD_TESTING)
TIMEOUT 300)
target_link_libraries(test_client
PUBLIC
${websocketpp_LIBRARIES}
${rclcpp_LIBRARIES}
rmf_utils::rmf_utils
Boost::system
${Boost_LIBRARIES}
rmf_websocket
PRIVATE
Threads::Threads
OpenSSL::Crypto
)
)
target_include_directories(test_client
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
${rclcpp_INCLUDE_DIRS}
${WEBSOCKETPP_INCLUDE_DIR}
)


Expand All @@ -145,10 +143,9 @@ if(BUILD_TESTING)
TIMEOUT 300)
target_link_libraries(test_client_no_server
PUBLIC
${websocketpp_LIBRARIES}
${rclcpp_LIBRARIES}
rmf_utils::rmf_utils
Boost::system
${Boost_LIBRARIES}
rmf_websocket
PRIVATE
Threads::Threads
Expand All @@ -159,18 +156,16 @@ if(BUILD_TESTING)
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
${rclcpp_INCLUDE_DIRS}
${WEBSOCKETPP_INCLUDE_DIR}
)

ament_add_catch2(test_client_with_update_cb
test/test_client_with_update_cb.cpp
TIMEOUT 300)
target_link_libraries(test_client_with_update_cb
PUBLIC
${websocketpp_LIBRARIES}
${rclcpp_LIBRARIES}
rmf_utils::rmf_utils
Boost::system
${Boost_LIBRARIES}
rmf_websocket
PRIVATE
Threads::Threads
Expand All @@ -181,7 +176,6 @@ if(BUILD_TESTING)
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
${rclcpp_INCLUDE_DIRS}
${WEBSOCKETPP_INCLUDE_DIR}
)
endif()

Expand Down
27 changes: 18 additions & 9 deletions rmf_websocket/src/rmf_websocket/BroadcastClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,23 @@ class BroadcastClient::Implementation
const std::shared_ptr<rclcpp::Node>& node,
ProvideJsonUpdates get_json_updates_cb)
: _uri{std::move(uri)},
_io_service{},
_io_context{},
_node{std::move(node)},
_queue(1000),
_get_json_updates_cb{std::move(get_json_updates_cb)},
_endpoint(_uri,
_node,
&_io_service,
&_io_context,
std::bind(&BroadcastClient::Implementation::on_connect, this))
{
_consumer_thread = std::thread([this]()
{
_io_service.run();
_io_context.run();
});

_io_service.dispatch([this]()
boost::asio::dispatch(
_io_context,
[this]()
{
_endpoint.connect();
});
Expand Down Expand Up @@ -113,7 +115,9 @@ class BroadcastClient::Implementation
RCLCPP_INFO(
this->_node->get_logger(),
"Attempting queue flush if connected");
_io_service.dispatch([this]()
boost::asio::dispatch(
_io_context,
[this]()
{
_flush_queue_if_connected();
});
Expand All @@ -134,7 +138,9 @@ class BroadcastClient::Implementation
{
/// _queue is thread safe. No need to lock.
_queue.push(msg);
_io_service.dispatch([this]()
boost::asio::dispatch(
_io_context,
[this]()
{
_flush_queue_if_connected();
});
Expand All @@ -151,7 +157,10 @@ class BroadcastClient::Implementation
log("Buffer full dropping oldest message");
}
}
_io_service.dispatch([this]()

boost::asio::dispatch(
_io_context,
[this]()
{
_flush_queue_if_connected();
});
Expand All @@ -168,7 +177,7 @@ class BroadcastClient::Implementation
//============================================================================
~Implementation()
{
_io_service.stop();
_io_context.stop();
_consumer_thread.join();
}

Expand Down Expand Up @@ -217,7 +226,7 @@ class BroadcastClient::Implementation
}
// create pimpl
std::string _uri;
boost::asio::io_service _io_service;
boost::asio::io_context _io_context;
std::shared_ptr<rclcpp::Node> _node;
RingBuffer<nlohmann::json> _queue;
ProvideJsonUpdates _get_json_updates_cb;
Expand Down
7 changes: 4 additions & 3 deletions rmf_websocket/src/rmf_websocket/BroadcastServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class BroadcastServer::Implementation
"Starting BroadcastServer on port " << endpoint.port());
}

// Start the ASIO io_service run loop
// Start the ASIO io_context run loop
_server_thread = std::thread(
[data = _data]() { data->echo_server.run(); });
}
Expand All @@ -128,8 +128,9 @@ class BroadcastServer::Implementation
_logger_interface->get_logger(), "Stopping BroadcastServer");
}

_data->echo_server.get_io_service().post(
[data = _data]()
boost::asio::post(
_data->echo_server.get_io_context(),
[data = _data]()
{
data->echo_server.stop_listening();
data->echo_server.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void ConnectionMetadata::on_fail(WsClient* c, websocketpp::connection_hdl hdl)
WsClient::connection_ptr con = c->get_con_from_hdl(hdl);
_server = con->get_response_header("Server");
_error_reason = con->get_ec().message();
c->get_io_service().post(_reconnection_cb);
boost::asio::post(c->get_io_context(), _reconnection_cb);
}

//=============================================================================
Expand All @@ -54,7 +54,7 @@ void ConnectionMetadata::on_close(WsClient* c, websocketpp::connection_hdl hdl)
<< websocketpp::close::status::get_string(con->get_remote_close_code())
<< "), close reason: " << con->get_remote_close_reason();
_error_reason = s.str();
c->get_io_service().post(_reconnection_cb);
boost::asio::post(c->get_io_context(), _reconnection_cb);
}

//=============================================================================
Expand Down Expand Up @@ -108,15 +108,15 @@ websocketpp::connection_hdl ConnectionMetadata::get_hdl() const
//=============================================================================
ClientWebSocketEndpoint::ClientWebSocketEndpoint(
std::string const& uri, std::shared_ptr<rclcpp::Node> node,
asio::io_service* io_service,
asio::io_context* io_context,
ConnectionCallback cb)
: _uri(uri), _node(node), _init{false}, _reconnect_enqueued(false),
_connection_cb(std::move(cb))
{
_endpoint = std::make_unique<WsClient>();
_endpoint->clear_access_channels(websocketpp::log::alevel::all);
_endpoint->clear_error_channels(websocketpp::log::elevel::all);
_endpoint->init_asio(io_service);
_endpoint->init_asio(io_context);
_endpoint->start_perpetual();
}

Expand Down Expand Up @@ -144,11 +144,11 @@ websocketpp::lib::error_code ClientWebSocketEndpoint::connect()
"> Reconnecting in 1s\n"
"> Host: %s", _uri.c_str());
_endpoint->stop_perpetual();
auto io_service = &_endpoint->get_io_service();
auto io_context = &_endpoint->get_io_context();
_endpoint = std::make_unique<WsClient>();
_endpoint->clear_access_channels(websocketpp::log::alevel::all);
_endpoint->clear_error_channels(websocketpp::log::elevel::all);
_endpoint->init_asio(io_service);
_endpoint->init_asio(io_context);
_endpoint->start_perpetual();
websocketpp::lib::error_code ec;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ClientWebSocketEndpoint
ClientWebSocketEndpoint(
std::string const& uri,
std::shared_ptr<rclcpp::Node> node,
boost::asio::io_service* io_service,
boost::asio::io_context* io_context,
ConnectionCallback cb);

/// Delete move constructor
Expand Down
Loading
Loading