Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions tokio/tests/rt_spawn_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi"), not(miri)))]

use std::time::Duration;

use tokio::runtime::Builder;
use tokio::sync::mpsc;
use tokio::task::JoinSet;

#[test]
fn issue_8056_regression_test() {
type Senders = Vec<mpsc::Sender<()>>;
type Handles = JoinSet<()>;

fn make_writer() -> (Senders, Handles) {
let mut senders = vec![];
let mut handles = JoinSet::new();
for _ in 0..2 {
let (tx, mut rx) = mpsc::channel::<()>(1);
senders.push(tx);
handles.spawn_blocking(move || while rx.blocking_recv().is_some() {});
}
(senders, handles)
}

async fn drive_writer(senders: Senders, mut handles: Handles) {
for tx in &senders {
tx.send(()).await.unwrap();
}
drop(senders);
while let Some(res) = handles.join_next().await {
res.unwrap();
}
}

// Regression test for the lost-spawn race in the blocking pool introduced
// by #7757. See https://github.com/tokio-rs/tokio/issues/8056 for complete
// details.
//
// The bug was a race condition in `Spawner::spawn_task` - after pushing a
// task to the queue, it chose between "wake an idle worker" and "spawn a
// new worker", by reading the `num_idle_threads` `AtomicUsize`. The
// `num_idle_threads` counter was incremented by a worker before it calls
// `wait_for_task`, and was deremented after `Inner::run` after
// `wait_for_task` had already returned a `Task`.
//
// In that window, the counter indicates a worker is idle even though it
// has already claimed a notification and is about to run a task. In the
// the omnicron test case that reproduced this, this task was long lived,
// which would essential result in "under spawning" workers.
//
// This test attempts to reproduce that scenario by doing the following on
// a fresh runtime
// - Two awaited `spawn_blocking` calls to get pool workers cycling through
// idle → notified → busy transitions.
// - `make_writer` spawns two closures that park in
// `mpsc::Receiver::blocking_recv` — long-lived blocking tasks that turn
// a stranded spawn into a real deadlock instead of a transient stall.
// - Two more rounds of "create a writer and immediately drop it" churn the
// worker pool while those persistent tasks are still blocked, and then a
// second persistent writer is created — this `spawn_blocking` is most
// likely to trigger the bug, because the prior churn has left a worker
// mid-transition with a stale `num_idle_threads`.
// - Finally, we finish all the tasks -- if any of them never got pulled
// from the `spawn_blocking` queue then it hits the timeout.

// Run multiple times to make hitting the race condition very likely.
for _ in 0..512 {
Comment thread
alex marked this conversation as resolved.
Comment on lines +67 to +68
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a race condition that we have to run multiple iterations in order to catch, I wonder if this might make sense to try and reproduce using loom (perhaps in addition to a normal test). It would be nice if we could use the model checker to deterministically simulate the interleaving that reproduces the bug, rather than just trying to run some code a whole bunch of times and hope one of them hits the race. It's hard to say for sure without a better understanding of the bug, but I wonder if loom's deadlock detection could catch this...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally thought loom would be a good fit, but this ended up being a quicker path for me to get a reproducer. Is there an existing loom test you think would be a good model for this type of test?

let rt = Builder::new_current_thread().enable_all().build().unwrap();

let completed = rt.block_on(async {
// We use a timeout to turn a hang into a reliable (and fast)
// failure. On working code this needs like a millisecond, so any
// timeout will do.
tokio::time::timeout(Duration::from_secs(1), async {
let quick = || async {
tokio::task::spawn_blocking(|| {}).await.unwrap();
};

quick().await;
quick().await;
let (persistent_senders, persistent_handles) = make_writer();

for _ in 0..2 {
quick().await;
quick().await;
let (senders, mut handles) = make_writer();
drop(senders);
while let Some(res) = handles.join_next().await {
res.unwrap();
}
}

quick().await;
quick().await;
let (second_senders, second_handles) = make_writer();

drive_writer(persistent_senders, persistent_handles).await;
drive_writer(second_senders, second_handles).await;
Comment thread
alex marked this conversation as resolved.
})
.await
.is_ok()
});

assert!(completed);
}
}