Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/clob/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::types::{Address, B256, Decimal, U256};
use crate::ws::ConnectionManager;
use crate::ws::config::Config;
use crate::ws::connection::ConnectionState;
use crate::ws::task::AbortOnDrop;

/// WebSocket client for real-time market data and user updates.
///
Expand Down Expand Up @@ -622,6 +623,15 @@ impl<S: State> ClientInner<S> {
struct ChannelResources {
connection: ConnectionManager<WsMessage, Arc<InterestTracker>>,
subscriptions: Arc<SubscriptionManager>,
/// Owns the reconnection task spawned by
/// [`SubscriptionManager::start_reconnection_handler`]. The wrapper
/// aborts the task on drop, which releases the strong
/// `Arc<SubscriptionManager>` clone held by the task's future and
/// breaks the reference cycle that would otherwise leak the whole
/// channel (task, WebSocket, subscription manager) for the lifetime
/// of the process — see issue #325 and [`AbortOnDrop`].
#[expect(dead_code, reason = "Field held only for its Drop side effect")]
reconnect_handle: AbortOnDrop,
}

impl ChannelResources {
Expand All @@ -630,11 +640,12 @@ impl ChannelResources {
let connection = ConnectionManager::new(endpoint, config, Arc::clone(&interest))?;
let subscriptions = Arc::new(SubscriptionManager::new(connection.clone(), interest));

subscriptions.start_reconnection_handler();
let reconnect_handle = AbortOnDrop::new(subscriptions.start_reconnection_handler());

Ok(Self {
connection,
subscriptions,
reconnect_handle,
})
}

Expand Down
77 changes: 75 additions & 2 deletions src/clob/ws/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,17 @@ impl SubscriptionManager {
}

/// Start the reconnection handler that re-subscribes on connection recovery.
pub fn start_reconnection_handler(self: &Arc<Self>) {
///
/// Returns the [`tokio::task::JoinHandle`] for the spawned handler so the
/// caller can abort it when the owning client is dropped. The handler
/// holds a strong `Arc<Self>` clone and also owns a clone of the
/// underlying [`ConnectionManager`]; without external cancellation, the
/// `watch::Sender` it waits on can never close, so the task (and every
/// `Arc` it transitively keeps alive) leaks for the lifetime of the
/// process. Callers MUST retain the returned handle and `abort()` it in
/// their `Drop` impl to break this cycle — see
/// [`crate::clob::ws::client`] for the canonical pattern.
pub fn start_reconnection_handler(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
let this = Arc::clone(self);

tokio::spawn(async move {
Expand Down Expand Up @@ -140,7 +150,7 @@ impl SubscriptionManager {
}
}
}
});
})
}

/// Re-send subscription requests for all tracked assets and markets.
Expand Down Expand Up @@ -563,3 +573,66 @@ impl SubscriptionManager {
Ok(())
}
}

#[cfg(test)]
mod reconnect_handler_tests {
//! Regression tests for issue #325: the reconnection handler task used
//! to hold a strong `Arc<SubscriptionManager>` that was only released
//! when the connection's watch `Sender` closed — but the task itself
//! kept that `Sender` alive (via a cloned `ConnectionManager` inside
//! the manager), creating a refcount cycle that leaked the entire
//! channel whenever a `WsClient` was dropped.

use std::sync::{Arc, Weak};
use std::time::Duration;

use super::{InterestTracker, SubscriptionManager};
use crate::ws::ConnectionManager;
use crate::ws::config::Config;
use crate::ws::task::AbortOnDrop;

/// Endpoint that resolves immediately and refuses connections, so the
/// underlying connection task never blocks on DNS or a slow TCP handshake.
const UNROUTABLE_ENDPOINT: &str = "ws://127.0.0.1:1";

#[tokio::test]
async fn aborting_reconnect_handle_releases_subscription_manager() {
let interest = Arc::new(InterestTracker::new());
let connection = ConnectionManager::new(
UNROUTABLE_ENDPOINT.to_owned(),
Config::default(),
Arc::clone(&interest),
)
.expect("ConnectionManager::new");

let subscriptions = Arc::new(SubscriptionManager::new(connection, interest));
let reconnect_handle = AbortOnDrop::new(subscriptions.start_reconnection_handler());

// The spawned task holds an extra strong clone; with the owner clone
// we should observe at least 2 strong refs.
assert!(
Arc::strong_count(&subscriptions) >= 2,
"reconnection task should have cloned an Arc<SubscriptionManager>"
);

let weak: Weak<SubscriptionManager> = Arc::downgrade(&subscriptions);
drop(subscriptions);
drop(reconnect_handle);

// Abort is observed on the next scheduler tick. Poll briefly rather
// than sleeping a hardcoded amount — on a loaded CI runner the task
// may need a few yields before its future is actually dropped.
let start = std::time::Instant::now();
while weak.strong_count() != 0 && start.elapsed() < Duration::from_secs(2) {
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(10)).await;
}

assert!(
weak.upgrade().is_none(),
"SubscriptionManager leaked after reconnect handle aborted: \
strong_count={} (issue #325 regression)",
weak.strong_count(),
);
}
}
162 changes: 160 additions & 2 deletions src/rtds/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::types::Address;
use crate::ws::ConnectionManager;
use crate::ws::config::Config;
use crate::ws::connection::ConnectionState;
use crate::ws::task::AbortOnDrop;

/// RTDS (Real-Time Data Socket) client for streaming Polymarket data.
///
Expand Down Expand Up @@ -65,6 +66,14 @@ struct ClientInner<S: State> {
connection: ConnectionManager<RtdsMessage, SimpleParser>,
/// Subscription manager for handling subscriptions
subscriptions: Arc<SubscriptionManager>,
/// Owns the reconnection task spawned by
/// [`SubscriptionManager::start_reconnection_handler`]. The wrapper
/// aborts the task on drop, which releases the strong
/// `Arc<SubscriptionManager>` clone held by the task's future and
/// breaks the reference cycle that would otherwise leak the whole
/// client (task, WebSocket, subscription manager) for the lifetime
/// of the process — see issue #325 and [`AbortOnDrop`].
reconnect_handle: AbortOnDrop,
}

impl Client<Unauthenticated> {
Expand All @@ -73,8 +82,10 @@ impl Client<Unauthenticated> {
let connection = ConnectionManager::new(endpoint.to_owned(), config.clone(), SimpleParser)?;
let subscriptions = Arc::new(SubscriptionManager::new(connection.clone()));

// Start reconnection handler to re-subscribe on connection recovery
subscriptions.start_reconnection_handler();
// Start reconnection handler to re-subscribe on connection recovery.
// The handle is retained in an `AbortOnDrop` so the task is
// cancelled when the client is dropped — see the field docs.
let reconnect_handle = AbortOnDrop::new(subscriptions.start_reconnection_handler());

Ok(Self {
inner: Arc::new(ClientInner {
Expand All @@ -83,6 +94,7 @@ impl Client<Unauthenticated> {
endpoint: endpoint.to_owned(),
connection,
subscriptions,
reconnect_handle,
}),
})
}
Expand Down Expand Up @@ -110,6 +122,7 @@ impl Client<Unauthenticated> {
endpoint: inner.endpoint,
connection: inner.connection,
subscriptions: inner.subscriptions,
reconnect_handle: inner.reconnect_handle,
}),
})
}
Expand Down Expand Up @@ -325,7 +338,152 @@ impl Client<Authenticated<Normal>> {
endpoint: inner.endpoint,
connection: inner.connection,
subscriptions: inner.subscriptions,
reconnect_handle: inner.reconnect_handle,
}),
})
}
}

#[cfg(test)]
mod teardown_tests {
//! RTDS client teardown regression tests for issue #325. These cover
//! the `reconnect_handle` plumbing in `Client::new`,
//! `Client::authenticate`, and `Client::deauthenticate` — each must
//! forward the `AbortOnDrop` into the new `ClientInner` so the
//! spawned task is still tied to the live client, and each must let
//! the wrapper run its `Drop` (aborting the task) when the last
//! client clone goes away.

use std::sync::Weak;
use std::time::Duration;

use super::{Client, SubscriptionManager};
use crate::auth::{Credentials, Uuid};
use crate::types::Address;
use crate::ws::config::Config;

/// Resolves immediately and refuses TCP connections.
const UNROUTABLE_ENDPOINT: &str = "ws://127.0.0.1:1";

/// Dummy credentials for `authenticate` / `deauthenticate` round-trips.
/// Only the struct shape matters — the test never hits the network.
fn dummy_credentials() -> Credentials {
Credentials::new(
Uuid::nil(),
"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=".to_owned(),
"passphrase".to_owned(),
)
}

/// Dummy EOA used for the authenticated client state.
fn dummy_address() -> Address {
"0x0000000000000000000000000000000000000001"
.parse()
.expect("valid zero-ish address")
}

/// Poll the weak reference until the strong count drops to zero, with
/// a generous timeout so a busy CI runner doesn't flake.
async fn wait_for_drop(weak: &Weak<SubscriptionManager>) {
let start = std::time::Instant::now();
while weak.strong_count() != 0 && start.elapsed() < Duration::from_secs(2) {
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(10)).await;
}
}

#[tokio::test]
async fn unauthenticated_client_drop_releases_subscription_manager() {
let client = Client::new(UNROUTABLE_ENDPOINT, Config::default())
.expect("Client::new should not fail for a well-formed endpoint string");

let weak = std::sync::Arc::downgrade(&client.inner_subscriptions_for_test());

drop(client);
wait_for_drop(&weak).await;

assert!(
weak.upgrade().is_none(),
"Unauthenticated RTDS Client leaked SubscriptionManager on drop: \
strong_count={} (issue #325 regression)",
weak.strong_count(),
);
}

#[tokio::test]
async fn authenticate_then_drop_releases_subscription_manager() {
let client = Client::new(UNROUTABLE_ENDPOINT, Config::default()).expect("Client::new");

let weak = std::sync::Arc::downgrade(&client.inner_subscriptions_for_test());

let authenticated = client
.authenticate(dummy_address(), dummy_credentials())
.expect("authenticate should succeed when no extra clones exist");

// `authenticate` moved the reconnect handle + subscription manager
// into a new `ClientInner`, so the weak ref should still upgrade.
assert!(
weak.upgrade().is_some(),
"authenticate prematurely dropped the SubscriptionManager"
);

drop(authenticated);
wait_for_drop(&weak).await;

assert!(
weak.upgrade().is_none(),
"Authenticated RTDS Client leaked SubscriptionManager on drop: \
strong_count={} (issue #325 regression)",
weak.strong_count(),
);
}

#[tokio::test]
async fn deauthenticate_preserves_reconnect_handle_then_drop_cleans_up() {
let client = Client::new(UNROUTABLE_ENDPOINT, Config::default()).expect("Client::new");

let weak = std::sync::Arc::downgrade(&client.inner_subscriptions_for_test());

let authenticated = client
.authenticate(dummy_address(), dummy_credentials())
.expect("authenticate");

// Round-trip through deauthenticate; the handle must be forwarded
// into the new `ClientInner` so the task stays alive.
let unauth = authenticated
.deauthenticate()
.expect("deauthenticate should succeed when no extra clones exist");

// After the round-trip the manager is still reachable — nothing has
// dropped yet.
assert!(
weak.upgrade().is_some(),
"Round-tripping through authenticate/deauthenticate prematurely \
dropped the SubscriptionManager"
);

drop(unauth);
wait_for_drop(&weak).await;

assert!(
weak.upgrade().is_none(),
"RTDS Client leaked SubscriptionManager after deauthenticate+drop: \
strong_count={} (issue #325 regression)",
weak.strong_count(),
);
}
}

// Test-only accessor: expose the inner Arc<SubscriptionManager> so the
// teardown tests can take a `Weak` without widening the public API. The
// field is module-private, so the accessor lives in the same file.
#[cfg(test)]
#[expect(
clippy::multiple_inherent_impl,
reason = "Test-only accessor kept isolated from the main public impl"
)]
impl<S: State> Client<S> {
fn inner_subscriptions_for_test(&self) -> Arc<SubscriptionManager> {
Arc::clone(&self.inner.subscriptions)
}
}
Loading
Loading