Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tokio-util/src/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,11 @@ pub(crate) mod sync {
pub(crate) use loom::sync::{Arc, Mutex, MutexGuard};
#[cfg(not(all(test, loom)))]
pub(crate) use std::sync::{Arc, Mutex, MutexGuard};

pub(crate) mod atomic {
#[cfg(all(test, loom))]
pub(crate) use loom::sync::atomic::{AtomicBool, Ordering};
#[cfg(not(all(test, loom)))]
pub(crate) use std::sync::atomic::{AtomicBool, Ordering};
}
}
18 changes: 12 additions & 6 deletions tokio-util/src/sync/cancellation_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ impl CancellationToken {
tree_node::is_cancelled(&self.inner)
}

/// Internal method that checks cancellation with mutex lock for proper
/// synchronization in `WaitForCancellationFuture::poll()`.
fn is_cancelled_with_lock(&self) -> bool {
tree_node::is_cancelled_with_lock(&self.inner)
}

/// Returns a [`Future`] that gets fulfilled when cancellation is requested.
///
/// Equivalent to:
Expand Down Expand Up @@ -329,14 +335,14 @@ impl<'a> Future for WaitForCancellationFuture<'a> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();
loop {
if this.cancellation_token.is_cancelled() {
if this.cancellation_token.is_cancelled_with_lock() {
return Poll::Ready(());
}

// No wakeups can be lost here because there is always a call to
// `is_cancelled` between the creation of the future and the call to
// `is_cancelled_with_lock` between the creation of the future and the call to
// `poll`, and the code that sets the cancelled flag does so before
// waking the `Notified`.
// waking the `Notified`. The mutex lock ensures proper synchronization.
if this.future.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
Expand Down Expand Up @@ -390,14 +396,14 @@ impl Future for WaitForCancellationFutureOwned {
let mut this = self.project();

loop {
if this.cancellation_token.is_cancelled() {
if this.cancellation_token.is_cancelled_with_lock() {
return Poll::Ready(());
}

// No wakeups can be lost here because there is always a call to
// `is_cancelled` between the creation of the future and the call to
// `is_cancelled_with_lock` between the creation of the future and the call to
// `poll`, and the code that sets the cancelled flag does so before
// waking the `Notified`.
// waking the `Notified`. The mutex lock ensures proper synchronization.
if this.future.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
Expand Down
38 changes: 25 additions & 13 deletions tokio-util/src/sync/cancellation_token/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
//! Specifically, through invariant #2, we know that we always have to lock a parent
//! before its child.
//!
use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex, MutexGuard};

/// A node of the cancellation tree structure
Expand All @@ -46,6 +47,10 @@ use crate::loom::sync::{Arc, Mutex, MutexGuard};
pub(crate) struct TreeNode {
inner: Mutex<Inner>,
waker: tokio::sync::Notify,
/// Atomic flag for lock-free `is_cancelled()` checks.
/// This is set to `true` when the node is cancelled, using `Release` ordering
/// to ensure all prior writes are visible to readers using `Acquire` ordering.
is_cancelled: AtomicBool,
}
impl TreeNode {
pub(crate) fn new() -> Self {
Expand All @@ -54,10 +59,10 @@ impl TreeNode {
parent: None,
parent_idx: 0,
children: vec![],
is_cancelled: false,
num_handles: 1,
}),
waker: tokio::sync::Notify::new(),
is_cancelled: AtomicBool::new(false),
}
}

Expand All @@ -69,18 +74,25 @@ impl TreeNode {
/// The data contained inside a `TreeNode`.
///
/// This struct exists so that the data of the node can be wrapped
/// in a Mutex.
/// in a `Mutex`.
struct Inner {
parent: Option<Arc<TreeNode>>,
parent_idx: usize,
children: Vec<Arc<TreeNode>>,
is_cancelled: bool,
num_handles: usize,
}

/// Returns whether or not the node is cancelled
pub(crate) fn is_cancelled(node: &Arc<TreeNode>) -> bool {
node.inner.lock().unwrap().is_cancelled
node.is_cancelled.load(Ordering::Acquire)
Comment on lines 85 to +87
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Acquire is not actually strong enough here. When WaitForCancellationFuture::poll() returns pending in parallel with being woken up, we currently rely on the following logic:

  1. Inside WaitForCancellationFuture, we create a Notified future.
  2. Inside WaitForCancellationFuture, we read false from is_cancelled.
  3. Inside CancellationToken::cancel(), we write true to is_cancelled.
  4. Inside CancellationToken::cancel(), we call notify_waiters().

And here it is really important that notify_waiters() will wake up the Notified future created in step 1. Otherwise the WaitForCancellationFuture might sleep forever. With a mutex, that's fine, but with the current atomics, it's not actually satisfied!

That's because what we need is that step 2 happens-before step 3. However, for that to be true, operation 2 must be Release, and operation 3 must be Acquire. Unfortunately, it's reversed! Operation 2 is Acquire, and operation 3 is Release.

Unfortunately, a load() can't be Release and a store() can't be Acquire, so we have to upgrade them to read-modify-write operations for this to work:

  1. Use AtomicBool::swap(true, AcqRel) to write true.
  2. Use AtomicBool::compare_exchange(false, false, AcqRel, Acquire) to read. (fetch_and(true, AcqRel) would also work)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can just take the mutex in WaitForCancellationFuture::poll(), and leave the AtomicBool optimization for users of is_cancelled() only.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for the review and explaining the memory order issue. I am learning a ton.

I've gone with your simpler suggestion and added a separate
is_cancelled_with_lock() function that takes the mutex for proper synchronization in
WaitForCancellationFuture::poll(), while keeping the lock-free AtomicBool path for direct
is_cancelled() calls.

I've also just pushed a fix for the cargo-spellcheck CI failures

Let me know if you need anything else.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO using both AtomicBool and Mutex for the same purpose but for different use cases is not a good idea. It makes the reasoning harder - should I use the atomics based solution or the mutex one for use case 3 (a use case from the future) ?!
If it could be implemented fully with atomics (swap + compare_exchange) it would be better!

A good start would be to add an integration (or bench) test that reproduces the problem with Acquire+Release and then fixing it by using swap+compare_exchange.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by a use-case from the future? As #7775 indicates, there are users today that call is_cancelled() instead of .awaiting a future, and such users would get the benefit today.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and such users would get the benefit today

This is clear!

What bothers me is that some use cases could use is_cancelled() (lock-free, for both external and internal users) and other use cases (internal user) like WaitForCancellationFuture::poll() can't and they need to depend on the Mutex.
With a "use case from the future" I mean the second - a new Tokio internal functionality that needs to know whether a token is cancelled or not. What should the developer use - is_cancelled() or is_cancelled_with_lock() ?! I guess the answer is it depends.
But if by using swap+compare_exchange there won't be a need of is_cancelled_with_lock() then the decision will be simple.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are any changes I should make let me know!

}

/// Returns whether or not the node is cancelled, using a mutex lock for proper
/// synchronization. This is used in `WaitForCancellationFuture::poll()` to ensure
/// correct memory ordering with the cancellation process.
pub(crate) fn is_cancelled_with_lock(node: &Arc<TreeNode>) -> bool {
let _guard = node.inner.lock().unwrap();
node.is_cancelled.load(Ordering::Acquire)
}

/// Creates a child node
Expand All @@ -90,16 +102,16 @@ pub(crate) fn child_node(parent: &Arc<TreeNode>) -> Arc<TreeNode> {
// Do not register as child if we are already cancelled.
// Cancelled trees can never be uncancelled and therefore
// need no connection to parents or children any more.
if locked_parent.is_cancelled {
if parent.is_cancelled.load(Ordering::Acquire) {
return Arc::new(TreeNode {
inner: Mutex::new(Inner {
parent: None,
parent_idx: 0,
children: vec![],
is_cancelled: true,
num_handles: 1,
}),
waker: tokio::sync::Notify::new(),
is_cancelled: AtomicBool::new(true),
});
}

Expand All @@ -108,10 +120,10 @@ pub(crate) fn child_node(parent: &Arc<TreeNode>) -> Arc<TreeNode> {
parent: Some(parent.clone()),
parent_idx: locked_parent.children.len(),
children: vec![],
is_cancelled: false,
num_handles: 1,
}),
waker: tokio::sync::Notify::new(),
is_cancelled: AtomicBool::new(false),
});

locked_parent.children.push(child.clone());
Expand Down Expand Up @@ -297,7 +309,7 @@ pub(crate) fn decrease_handle_refcount(node: &Arc<TreeNode>) {
pub(crate) fn cancel(node: &Arc<TreeNode>) {
let mut locked_node = node.inner.lock().unwrap();

if locked_node.is_cancelled {
if node.is_cancelled.load(Ordering::Acquire) {
return;
}

Expand All @@ -313,7 +325,7 @@ pub(crate) fn cancel(node: &Arc<TreeNode>) {
locked_child.parent_idx = 0;

// If child is already cancelled, detaching is enough
if locked_child.is_cancelled {
if child.is_cancelled.load(Ordering::Acquire) {
continue;
}

Expand All @@ -328,15 +340,15 @@ pub(crate) fn cancel(node: &Arc<TreeNode>) {
locked_grandchild.parent_idx = 0;

// If grandchild is already cancelled, detaching is enough
if locked_grandchild.is_cancelled {
if grandchild.is_cancelled.load(Ordering::Acquire) {
continue;
}

// For performance reasons, only adopt grandchildren that have children.
// Otherwise, just cancel them right away, no need for another iteration.
if locked_grandchild.children.is_empty() {
// Cancel the grandchild
locked_grandchild.is_cancelled = true;
grandchild.is_cancelled.store(true, Ordering::Release);
locked_grandchild.children = Vec::new();
drop(locked_grandchild);
grandchild.waker.notify_waiters();
Expand All @@ -350,7 +362,7 @@ pub(crate) fn cancel(node: &Arc<TreeNode>) {
}

// Cancel the child
locked_child.is_cancelled = true;
child.is_cancelled.store(true, Ordering::Release);
locked_child.children = Vec::new();
drop(locked_child);
child.waker.notify_waiters();
Expand All @@ -360,7 +372,7 @@ pub(crate) fn cancel(node: &Arc<TreeNode>) {
}

// Cancel the node itself.
locked_node.is_cancelled = true;
node.is_cancelled.store(true, Ordering::Release);
locked_node.children = Vec::new();
drop(locked_node);
node.waker.notify_waiters();
Expand Down