diff --git a/src/clob/ws/client.rs b/src/clob/ws/client.rs index d18d45c8..c5aec6ad 100644 --- a/src/clob/ws/client.rs +++ b/src/clob/ws/client.rs @@ -358,6 +358,19 @@ impl Client { })) } + /// All market events on a single stream (`custom_feature_enabled`). + /// + /// Single broadcast receiver for all [`WsMessage`] variants. + pub fn subscribe_all_market( + &self, + asset_ids: Vec, + ) -> Result> + use> { + self.inner + .get_or_create_channel(ChannelType::Market)? + .subscriptions + .subscribe_market_with_options(asset_ids, true) + } + /// Get the current connection state for a specific channel. /// /// Returns [`ConnectionState::Disconnected`] if the channel has not been diff --git a/tests/websocket.rs b/tests/websocket.rs index 404ac156..208a6e48 100644 --- a/tests/websocket.rs +++ b/tests/websocket.rs @@ -1465,6 +1465,38 @@ mod custom_features { }) } + #[tokio::test] + async fn subscribe_all_market_receives_book_and_bba() { + let mut server = MockWsServer::start().await; + let endpoint = server.ws_url("/ws/market"); + + let client = Client::new(&endpoint, Config::default()).unwrap(); + + let stream = client + .subscribe_all_market(vec![payloads::asset_id()]) + .unwrap(); + let mut stream = Box::pin(stream); + + let sub_request = server.recv_subscription().await.unwrap(); + assert!(sub_request.contains("\"custom_feature_enabled\":true")); + + server.send(&payloads::book().to_string()); + let msg = timeout(Duration::from_secs(2), stream.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + assert!(matches!(msg, WsMessage::Book(_))); + + server.send(&best_bid_ask().to_string()); + let msg = timeout(Duration::from_secs(2), stream.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + assert!(matches!(msg, WsMessage::BestBidAsk(_))); + } + #[tokio::test] async fn subscribe_best_bid_ask_receives_updates() { let mut server = MockWsServer::start().await;