diff --git a/tokio-util/src/loom.rs b/tokio-util/src/loom.rs index 563e6d5ad25..bd8859a9949 100644 --- a/tokio-util/src/loom.rs +++ b/tokio-util/src/loom.rs @@ -6,4 +6,11 @@ pub(crate) mod sync { pub(crate) use loom::sync::{Arc, Mutex, MutexGuard}; #[cfg(not(all(test, loom)))] pub(crate) use std::sync::{Arc, Mutex, MutexGuard}; + + pub(crate) mod atomic { + #[cfg(all(test, loom))] + pub(crate) use loom::sync::atomic::{AtomicBool, Ordering}; + #[cfg(not(all(test, loom)))] + pub(crate) use std::sync::atomic::{AtomicBool, Ordering}; + } } diff --git a/tokio-util/src/sync/cancellation_token.rs b/tokio-util/src/sync/cancellation_token.rs index 1b397c2bbc2..6f0a4a27f81 100644 --- a/tokio-util/src/sync/cancellation_token.rs +++ b/tokio-util/src/sync/cancellation_token.rs @@ -206,6 +206,12 @@ impl CancellationToken { tree_node::is_cancelled(&self.inner) } + /// Internal method that checks cancellation with mutex lock for proper + /// synchronization in `WaitForCancellationFuture::poll()`. + fn is_cancelled_with_lock(&self) -> bool { + tree_node::is_cancelled_with_lock(&self.inner) + } + /// Returns a [`Future`] that gets fulfilled when cancellation is requested. /// /// Equivalent to: @@ -329,14 +335,14 @@ impl<'a> Future for WaitForCancellationFuture<'a> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let mut this = self.project(); loop { - if this.cancellation_token.is_cancelled() { + if this.cancellation_token.is_cancelled_with_lock() { return Poll::Ready(()); } // No wakeups can be lost here because there is always a call to - // `is_cancelled` between the creation of the future and the call to + // `is_cancelled_with_lock` between the creation of the future and the call to // `poll`, and the code that sets the cancelled flag does so before - // waking the `Notified`. + // waking the `Notified`. The mutex lock ensures proper synchronization. if this.future.as_mut().poll(cx).is_pending() { return Poll::Pending; } @@ -390,14 +396,14 @@ impl Future for WaitForCancellationFutureOwned { let mut this = self.project(); loop { - if this.cancellation_token.is_cancelled() { + if this.cancellation_token.is_cancelled_with_lock() { return Poll::Ready(()); } // No wakeups can be lost here because there is always a call to - // `is_cancelled` between the creation of the future and the call to + // `is_cancelled_with_lock` between the creation of the future and the call to // `poll`, and the code that sets the cancelled flag does so before - // waking the `Notified`. + // waking the `Notified`. The mutex lock ensures proper synchronization. if this.future.as_mut().poll(cx).is_pending() { return Poll::Pending; } diff --git a/tokio-util/src/sync/cancellation_token/tree_node.rs b/tokio-util/src/sync/cancellation_token/tree_node.rs index eabe947c18e..c66bb5bae38 100644 --- a/tokio-util/src/sync/cancellation_token/tree_node.rs +++ b/tokio-util/src/sync/cancellation_token/tree_node.rs @@ -38,6 +38,7 @@ //! Specifically, through invariant #2, we know that we always have to lock a parent //! before its child. //! +use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::{Arc, Mutex, MutexGuard}; /// A node of the cancellation tree structure @@ -46,6 +47,10 @@ use crate::loom::sync::{Arc, Mutex, MutexGuard}; pub(crate) struct TreeNode { inner: Mutex, waker: tokio::sync::Notify, + /// Atomic flag for lock-free `is_cancelled()` checks. + /// This is set to `true` when the node is cancelled, using `Release` ordering + /// to ensure all prior writes are visible to readers using `Acquire` ordering. + is_cancelled: AtomicBool, } impl TreeNode { pub(crate) fn new() -> Self { @@ -54,10 +59,10 @@ impl TreeNode { parent: None, parent_idx: 0, children: vec![], - is_cancelled: false, num_handles: 1, }), waker: tokio::sync::Notify::new(), + is_cancelled: AtomicBool::new(false), } } @@ -69,18 +74,25 @@ impl TreeNode { /// The data contained inside a `TreeNode`. /// /// This struct exists so that the data of the node can be wrapped -/// in a Mutex. +/// in a `Mutex`. struct Inner { parent: Option>, parent_idx: usize, children: Vec>, - is_cancelled: bool, num_handles: usize, } /// Returns whether or not the node is cancelled pub(crate) fn is_cancelled(node: &Arc) -> bool { - node.inner.lock().unwrap().is_cancelled + node.is_cancelled.load(Ordering::Acquire) +} + +/// Returns whether or not the node is cancelled, using a mutex lock for proper +/// synchronization. This is used in `WaitForCancellationFuture::poll()` to ensure +/// correct memory ordering with the cancellation process. +pub(crate) fn is_cancelled_with_lock(node: &Arc) -> bool { + let _guard = node.inner.lock().unwrap(); + node.is_cancelled.load(Ordering::Acquire) } /// Creates a child node @@ -90,16 +102,16 @@ pub(crate) fn child_node(parent: &Arc) -> Arc { // Do not register as child if we are already cancelled. // Cancelled trees can never be uncancelled and therefore // need no connection to parents or children any more. - if locked_parent.is_cancelled { + if parent.is_cancelled.load(Ordering::Acquire) { return Arc::new(TreeNode { inner: Mutex::new(Inner { parent: None, parent_idx: 0, children: vec![], - is_cancelled: true, num_handles: 1, }), waker: tokio::sync::Notify::new(), + is_cancelled: AtomicBool::new(true), }); } @@ -108,10 +120,10 @@ pub(crate) fn child_node(parent: &Arc) -> Arc { parent: Some(parent.clone()), parent_idx: locked_parent.children.len(), children: vec![], - is_cancelled: false, num_handles: 1, }), waker: tokio::sync::Notify::new(), + is_cancelled: AtomicBool::new(false), }); locked_parent.children.push(child.clone()); @@ -297,7 +309,7 @@ pub(crate) fn decrease_handle_refcount(node: &Arc) { pub(crate) fn cancel(node: &Arc) { let mut locked_node = node.inner.lock().unwrap(); - if locked_node.is_cancelled { + if node.is_cancelled.load(Ordering::Acquire) { return; } @@ -313,7 +325,7 @@ pub(crate) fn cancel(node: &Arc) { locked_child.parent_idx = 0; // If child is already cancelled, detaching is enough - if locked_child.is_cancelled { + if child.is_cancelled.load(Ordering::Acquire) { continue; } @@ -328,7 +340,7 @@ pub(crate) fn cancel(node: &Arc) { locked_grandchild.parent_idx = 0; // If grandchild is already cancelled, detaching is enough - if locked_grandchild.is_cancelled { + if grandchild.is_cancelled.load(Ordering::Acquire) { continue; } @@ -336,7 +348,7 @@ pub(crate) fn cancel(node: &Arc) { // Otherwise, just cancel them right away, no need for another iteration. if locked_grandchild.children.is_empty() { // Cancel the grandchild - locked_grandchild.is_cancelled = true; + grandchild.is_cancelled.store(true, Ordering::Release); locked_grandchild.children = Vec::new(); drop(locked_grandchild); grandchild.waker.notify_waiters(); @@ -350,7 +362,7 @@ pub(crate) fn cancel(node: &Arc) { } // Cancel the child - locked_child.is_cancelled = true; + child.is_cancelled.store(true, Ordering::Release); locked_child.children = Vec::new(); drop(locked_child); child.waker.notify_waiters(); @@ -360,7 +372,7 @@ pub(crate) fn cancel(node: &Arc) { } // Cancel the node itself. - locked_node.is_cancelled = true; + node.is_cancelled.store(true, Ordering::Release); locked_node.children = Vec::new(); drop(locked_node); node.waker.notify_waiters();