diff --git a/src/clob/ws/client.rs b/src/clob/ws/client.rs index d18d45c8..0a96ecf9 100644 --- a/src/clob/ws/client.rs +++ b/src/clob/ws/client.rs @@ -20,6 +20,7 @@ use crate::types::{Address, B256, Decimal, U256}; use crate::ws::ConnectionManager; use crate::ws::config::Config; use crate::ws::connection::ConnectionState; +use crate::ws::task::AbortOnDrop; /// WebSocket client for real-time market data and user updates. /// @@ -622,6 +623,15 @@ impl ClientInner { struct ChannelResources { connection: ConnectionManager>, subscriptions: Arc, + /// Owns the reconnection task spawned by + /// [`SubscriptionManager::start_reconnection_handler`]. The wrapper + /// aborts the task on drop, which releases the strong + /// `Arc` clone held by the task's future and + /// breaks the reference cycle that would otherwise leak the whole + /// channel (task, WebSocket, subscription manager) for the lifetime + /// of the process — see issue #325 and [`AbortOnDrop`]. + #[expect(dead_code, reason = "Field held only for its Drop side effect")] + reconnect_handle: AbortOnDrop, } impl ChannelResources { @@ -630,11 +640,12 @@ impl ChannelResources { let connection = ConnectionManager::new(endpoint, config, Arc::clone(&interest))?; let subscriptions = Arc::new(SubscriptionManager::new(connection.clone(), interest)); - subscriptions.start_reconnection_handler(); + let reconnect_handle = AbortOnDrop::new(subscriptions.start_reconnection_handler()); Ok(Self { connection, subscriptions, + reconnect_handle, }) } diff --git a/src/clob/ws/subscription.rs b/src/clob/ws/subscription.rs index d01aa2da..29c480dd 100644 --- a/src/clob/ws/subscription.rs +++ b/src/clob/ws/subscription.rs @@ -105,7 +105,17 @@ impl SubscriptionManager { } /// Start the reconnection handler that re-subscribes on connection recovery. - pub fn start_reconnection_handler(self: &Arc) { + /// + /// Returns the [`tokio::task::JoinHandle`] for the spawned handler so the + /// caller can abort it when the owning client is dropped. The handler + /// holds a strong `Arc` clone and also owns a clone of the + /// underlying [`ConnectionManager`]; without external cancellation, the + /// `watch::Sender` it waits on can never close, so the task (and every + /// `Arc` it transitively keeps alive) leaks for the lifetime of the + /// process. Callers MUST retain the returned handle and `abort()` it in + /// their `Drop` impl to break this cycle — see + /// [`crate::clob::ws::client`] for the canonical pattern. + pub fn start_reconnection_handler(self: &Arc) -> tokio::task::JoinHandle<()> { let this = Arc::clone(self); tokio::spawn(async move { @@ -140,7 +150,7 @@ impl SubscriptionManager { } } } - }); + }) } /// Re-send subscription requests for all tracked assets and markets. @@ -563,3 +573,66 @@ impl SubscriptionManager { Ok(()) } } + +#[cfg(test)] +mod reconnect_handler_tests { + //! Regression tests for issue #325: the reconnection handler task used + //! to hold a strong `Arc` that was only released + //! when the connection's watch `Sender` closed — but the task itself + //! kept that `Sender` alive (via a cloned `ConnectionManager` inside + //! the manager), creating a refcount cycle that leaked the entire + //! channel whenever a `WsClient` was dropped. + + use std::sync::{Arc, Weak}; + use std::time::Duration; + + use super::{InterestTracker, SubscriptionManager}; + use crate::ws::ConnectionManager; + use crate::ws::config::Config; + use crate::ws::task::AbortOnDrop; + + /// Endpoint that resolves immediately and refuses connections, so the + /// underlying connection task never blocks on DNS or a slow TCP handshake. + const UNROUTABLE_ENDPOINT: &str = "ws://127.0.0.1:1"; + + #[tokio::test] + async fn aborting_reconnect_handle_releases_subscription_manager() { + let interest = Arc::new(InterestTracker::new()); + let connection = ConnectionManager::new( + UNROUTABLE_ENDPOINT.to_owned(), + Config::default(), + Arc::clone(&interest), + ) + .expect("ConnectionManager::new"); + + let subscriptions = Arc::new(SubscriptionManager::new(connection, interest)); + let reconnect_handle = AbortOnDrop::new(subscriptions.start_reconnection_handler()); + + // The spawned task holds an extra strong clone; with the owner clone + // we should observe at least 2 strong refs. + assert!( + Arc::strong_count(&subscriptions) >= 2, + "reconnection task should have cloned an Arc" + ); + + let weak: Weak = Arc::downgrade(&subscriptions); + drop(subscriptions); + drop(reconnect_handle); + + // Abort is observed on the next scheduler tick. Poll briefly rather + // than sleeping a hardcoded amount — on a loaded CI runner the task + // may need a few yields before its future is actually dropped. + let start = std::time::Instant::now(); + while weak.strong_count() != 0 && start.elapsed() < Duration::from_secs(2) { + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(10)).await; + } + + assert!( + weak.upgrade().is_none(), + "SubscriptionManager leaked after reconnect handle aborted: \ + strong_count={} (issue #325 regression)", + weak.strong_count(), + ); + } +} diff --git a/src/rtds/client.rs b/src/rtds/client.rs index 25ba3833..32842c7d 100644 --- a/src/rtds/client.rs +++ b/src/rtds/client.rs @@ -14,6 +14,7 @@ use crate::types::Address; use crate::ws::ConnectionManager; use crate::ws::config::Config; use crate::ws::connection::ConnectionState; +use crate::ws::task::AbortOnDrop; /// RTDS (Real-Time Data Socket) client for streaming Polymarket data. /// @@ -65,6 +66,14 @@ struct ClientInner { connection: ConnectionManager, /// Subscription manager for handling subscriptions subscriptions: Arc, + /// Owns the reconnection task spawned by + /// [`SubscriptionManager::start_reconnection_handler`]. The wrapper + /// aborts the task on drop, which releases the strong + /// `Arc` clone held by the task's future and + /// breaks the reference cycle that would otherwise leak the whole + /// client (task, WebSocket, subscription manager) for the lifetime + /// of the process — see issue #325 and [`AbortOnDrop`]. + reconnect_handle: AbortOnDrop, } impl Client { @@ -73,8 +82,10 @@ impl Client { let connection = ConnectionManager::new(endpoint.to_owned(), config.clone(), SimpleParser)?; let subscriptions = Arc::new(SubscriptionManager::new(connection.clone())); - // Start reconnection handler to re-subscribe on connection recovery - subscriptions.start_reconnection_handler(); + // Start reconnection handler to re-subscribe on connection recovery. + // The handle is retained in an `AbortOnDrop` so the task is + // cancelled when the client is dropped — see the field docs. + let reconnect_handle = AbortOnDrop::new(subscriptions.start_reconnection_handler()); Ok(Self { inner: Arc::new(ClientInner { @@ -83,6 +94,7 @@ impl Client { endpoint: endpoint.to_owned(), connection, subscriptions, + reconnect_handle, }), }) } @@ -110,6 +122,7 @@ impl Client { endpoint: inner.endpoint, connection: inner.connection, subscriptions: inner.subscriptions, + reconnect_handle: inner.reconnect_handle, }), }) } @@ -325,7 +338,152 @@ impl Client> { endpoint: inner.endpoint, connection: inner.connection, subscriptions: inner.subscriptions, + reconnect_handle: inner.reconnect_handle, }), }) } } + +#[cfg(test)] +mod teardown_tests { + //! RTDS client teardown regression tests for issue #325. These cover + //! the `reconnect_handle` plumbing in `Client::new`, + //! `Client::authenticate`, and `Client::deauthenticate` — each must + //! forward the `AbortOnDrop` into the new `ClientInner` so the + //! spawned task is still tied to the live client, and each must let + //! the wrapper run its `Drop` (aborting the task) when the last + //! client clone goes away. + + use std::sync::Weak; + use std::time::Duration; + + use super::{Client, SubscriptionManager}; + use crate::auth::{Credentials, Uuid}; + use crate::types::Address; + use crate::ws::config::Config; + + /// Resolves immediately and refuses TCP connections. + const UNROUTABLE_ENDPOINT: &str = "ws://127.0.0.1:1"; + + /// Dummy credentials for `authenticate` / `deauthenticate` round-trips. + /// Only the struct shape matters — the test never hits the network. + fn dummy_credentials() -> Credentials { + Credentials::new( + Uuid::nil(), + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=".to_owned(), + "passphrase".to_owned(), + ) + } + + /// Dummy EOA used for the authenticated client state. + fn dummy_address() -> Address { + "0x0000000000000000000000000000000000000001" + .parse() + .expect("valid zero-ish address") + } + + /// Poll the weak reference until the strong count drops to zero, with + /// a generous timeout so a busy CI runner doesn't flake. + async fn wait_for_drop(weak: &Weak) { + let start = std::time::Instant::now(); + while weak.strong_count() != 0 && start.elapsed() < Duration::from_secs(2) { + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + + #[tokio::test] + async fn unauthenticated_client_drop_releases_subscription_manager() { + let client = Client::new(UNROUTABLE_ENDPOINT, Config::default()) + .expect("Client::new should not fail for a well-formed endpoint string"); + + let weak = std::sync::Arc::downgrade(&client.inner_subscriptions_for_test()); + + drop(client); + wait_for_drop(&weak).await; + + assert!( + weak.upgrade().is_none(), + "Unauthenticated RTDS Client leaked SubscriptionManager on drop: \ + strong_count={} (issue #325 regression)", + weak.strong_count(), + ); + } + + #[tokio::test] + async fn authenticate_then_drop_releases_subscription_manager() { + let client = Client::new(UNROUTABLE_ENDPOINT, Config::default()).expect("Client::new"); + + let weak = std::sync::Arc::downgrade(&client.inner_subscriptions_for_test()); + + let authenticated = client + .authenticate(dummy_address(), dummy_credentials()) + .expect("authenticate should succeed when no extra clones exist"); + + // `authenticate` moved the reconnect handle + subscription manager + // into a new `ClientInner`, so the weak ref should still upgrade. + assert!( + weak.upgrade().is_some(), + "authenticate prematurely dropped the SubscriptionManager" + ); + + drop(authenticated); + wait_for_drop(&weak).await; + + assert!( + weak.upgrade().is_none(), + "Authenticated RTDS Client leaked SubscriptionManager on drop: \ + strong_count={} (issue #325 regression)", + weak.strong_count(), + ); + } + + #[tokio::test] + async fn deauthenticate_preserves_reconnect_handle_then_drop_cleans_up() { + let client = Client::new(UNROUTABLE_ENDPOINT, Config::default()).expect("Client::new"); + + let weak = std::sync::Arc::downgrade(&client.inner_subscriptions_for_test()); + + let authenticated = client + .authenticate(dummy_address(), dummy_credentials()) + .expect("authenticate"); + + // Round-trip through deauthenticate; the handle must be forwarded + // into the new `ClientInner` so the task stays alive. + let unauth = authenticated + .deauthenticate() + .expect("deauthenticate should succeed when no extra clones exist"); + + // After the round-trip the manager is still reachable — nothing has + // dropped yet. + assert!( + weak.upgrade().is_some(), + "Round-tripping through authenticate/deauthenticate prematurely \ + dropped the SubscriptionManager" + ); + + drop(unauth); + wait_for_drop(&weak).await; + + assert!( + weak.upgrade().is_none(), + "RTDS Client leaked SubscriptionManager after deauthenticate+drop: \ + strong_count={} (issue #325 regression)", + weak.strong_count(), + ); + } +} + +// Test-only accessor: expose the inner Arc so the +// teardown tests can take a `Weak` without widening the public API. The +// field is module-private, so the accessor lives in the same file. +#[cfg(test)] +#[expect( + clippy::multiple_inherent_impl, + reason = "Test-only accessor kept isolated from the main public impl" +)] +impl Client { + fn inner_subscriptions_for_test(&self) -> Arc { + Arc::clone(&self.inner.subscriptions) + } +} diff --git a/src/rtds/subscription.rs b/src/rtds/subscription.rs index 233d4b40..484f5c5d 100644 --- a/src/rtds/subscription.rs +++ b/src/rtds/subscription.rs @@ -83,7 +83,15 @@ impl SubscriptionManager { } /// Start the reconnection handler that re-subscribes on connection recovery. - pub fn start_reconnection_handler(self: &Arc) { + /// + /// Returns the [`tokio::task::JoinHandle`] for the spawned handler so the + /// caller can abort it when the owning client is dropped. The handler + /// holds a strong `Arc` clone and awaits on a `watch::Sender` it + /// transitively keeps alive, so without external cancellation the task + /// (and the whole `SubscriptionManager` graph) leaks — same class of + /// reference cycle as `clob::ws::subscription`. Callers MUST retain the + /// returned handle and `abort()` it on drop. + pub fn start_reconnection_handler(self: &Arc) -> tokio::task::JoinHandle<()> { let this = Arc::clone(self); tokio::spawn(async move { @@ -118,7 +126,7 @@ impl SubscriptionManager { } } } - }); + }) } /// Re-send subscription requests for all tracked topics. @@ -325,3 +333,62 @@ impl SubscriptionManager { Ok(()) } } + +#[cfg(test)] +mod reconnect_handler_tests { + //! RTDS-side regression tests for issue #325. Mirrors the + //! `clob::ws::subscription` test so the same reference-cycle + //! invariant is enforced on both code paths. + + use std::sync::{Arc, Weak}; + use std::time::Duration; + + use super::{SimpleParser, SubscriptionManager}; + use crate::ws::ConnectionManager; + use crate::ws::config::Config; + use crate::ws::task::AbortOnDrop; + + /// Resolves immediately and refuses TCP connections, so the underlying + /// connection task does not block on DNS or a slow handshake. + const UNROUTABLE_ENDPOINT: &str = "ws://127.0.0.1:1"; + + #[tokio::test] + async fn aborting_reconnect_handle_releases_rtds_subscription_manager() { + let connection = ConnectionManager::new( + UNROUTABLE_ENDPOINT.to_owned(), + Config::default(), + SimpleParser, + ) + .expect("ConnectionManager::new"); + + let subscriptions = Arc::new(SubscriptionManager::new(connection)); + let reconnect_handle = AbortOnDrop::new(subscriptions.start_reconnection_handler()); + + // The spawned reconnect task clones the `Arc`, so with the owner + // clone we must observe at least 2 strong refs before drop. + assert!( + Arc::strong_count(&subscriptions) >= 2, + "reconnection task should have cloned an Arc" + ); + + let weak: Weak = Arc::downgrade(&subscriptions); + drop(subscriptions); + drop(reconnect_handle); + + // Poll briefly: the task future is only dropped once the runtime + // processes the abort, which may take a handful of scheduler ticks + // on a loaded runner. + let start = std::time::Instant::now(); + while weak.strong_count() != 0 && start.elapsed() < Duration::from_secs(2) { + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(10)).await; + } + + assert!( + weak.upgrade().is_none(), + "RTDS SubscriptionManager leaked after reconnect handle aborted: \ + strong_count={} (issue #325 regression)", + weak.strong_count(), + ); + } +} diff --git a/src/ws/mod.rs b/src/ws/mod.rs index 5ab261cd..0e089a1d 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -22,6 +22,7 @@ pub mod config; pub mod connection; pub mod error; +pub(crate) mod task; pub mod traits; pub use connection::ConnectionManager; diff --git a/src/ws/task.rs b/src/ws/task.rs new file mode 100644 index 00000000..89dfa515 --- /dev/null +++ b/src/ws/task.rs @@ -0,0 +1,84 @@ +//! Helpers for owning spawned [`tokio`] tasks. +//! +//! The reconnection handlers in [`crate::clob::ws::subscription`] and +//! [`crate::rtds::subscription`] spawn detached tasks that each hold a +//! strong `Arc` clone. Those tasks also own a clone +//! of the underlying [`crate::ws::ConnectionManager`], whose `watch::Sender` +//! is what the task awaits on for state changes. Because the `Sender` only +//! closes when every clone of it drops, and a strong `Arc` clone inside the +//! spawned task prevents the owning `SubscriptionManager` (and therefore +//! that `ConnectionManager` clone) from dropping, the task can never exit +//! on its own — a reference cycle that leaks the entire channel for the +//! lifetime of the process (issue #325). +//! +//! [`AbortOnDrop`] is a thin wrapper around [`tokio::task::JoinHandle`] +//! that calls `abort()` on drop. Clients store the wrapped handle next to +//! the `Arc`; when the client (and therefore the +//! wrapper) drops, the handler task is aborted, its stack locals — which +//! include the strong `Arc` clone — are released, and the whole graph can +//! drop normally. + +use tokio::task::JoinHandle; + +/// Owns a [`JoinHandle`] and calls [`JoinHandle::abort`] on drop. +/// +/// See the module-level documentation for the cycle this breaks. +pub(crate) struct AbortOnDrop(JoinHandle<()>); + +impl AbortOnDrop { + /// Wrap a spawned task handle so it is aborted when this value drops. + #[must_use] + pub(crate) fn new(handle: JoinHandle<()>) -> Self { + Self(handle) + } +} + +impl Drop for AbortOnDrop { + fn drop(&mut self) { + // `JoinHandle::abort` is a no-op if the task has already completed, + // so this is always safe to call. + self.0.abort(); + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::time::Duration; + + use super::AbortOnDrop; + + #[tokio::test] + async fn abort_on_drop_cancels_pending_task() { + let finished = Arc::new(AtomicBool::new(false)); + let finished_task = Arc::clone(&finished); + + let handle = tokio::spawn(async move { + // Park forever unless aborted. + std::future::pending::<()>().await; + finished_task.store(true, Ordering::SeqCst); + }); + + let wrapper = AbortOnDrop::new(handle); + drop(wrapper); + + // Give the runtime a moment to process the abort. + tokio::time::sleep(Duration::from_millis(50)).await; + + assert!( + !finished.load(Ordering::SeqCst), + "task body should never have run to completion after abort" + ); + } + + #[tokio::test] + async fn abort_on_drop_is_noop_for_finished_task() { + // Spawn a task that completes immediately, wait for it, then drop + // the wrapper. `JoinHandle::abort` on a finished task is documented + // as a no-op, so this must not panic or error. + let wrapper = AbortOnDrop::new(tokio::spawn(async {})); + tokio::time::sleep(Duration::from_millis(10)).await; + drop(wrapper); + } +}