-
-
Notifications
You must be signed in to change notification settings - Fork 3k
rt: shard the multi-thread inject queue to reduce remote spawn contention #7973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
alex
wants to merge
6
commits into
tokio-rs:master
Choose a base branch
from
alex:shard-remote-lock
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
e834ffa
rt: shard the multi-thread inject queue to reduce remote spawn conten…
alex 16a5b46
rt: address review feedback on sharded inject queue
alex f5e3fdc
rt: remove redundant is_empty check in next_remote_task
alex 9e765c1
fix spellcheck.dic
alex 8913d32
rt: address review feedback on sharded inject doc comments
alex deb01d3
attempt to reduce the state-space for loom to model.
alex File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,288 @@ | ||||||||||||||||
| //! Sharded inject queue for the multi-threaded scheduler. | ||||||||||||||||
| //! | ||||||||||||||||
| //! A single global mutex is the dominant source of contention when many | ||||||||||||||||
| //! external threads spawn into the runtime concurrently. `Sharded` splits | ||||||||||||||||
| //! the inject queue into independent shards, each with its own mutex and | ||||||||||||||||
| //! intrusive linked list. Pushes are distributed across shards using a | ||||||||||||||||
| //! per-thread counter, so uncontended threads never touch the same lock. | ||||||||||||||||
| //! Workers drain shards starting from their own index and rotate. | ||||||||||||||||
|
|
||||||||||||||||
| use super::{Pop, Shared, Synced}; | ||||||||||||||||
|
|
||||||||||||||||
| use crate::loom::sync::atomic::AtomicBool; | ||||||||||||||||
| use crate::loom::sync::{Mutex, MutexGuard}; | ||||||||||||||||
| use crate::runtime::task; | ||||||||||||||||
| use crate::util::cacheline::CachePadded; | ||||||||||||||||
|
|
||||||||||||||||
| use std::sync::atomic::Ordering::{Acquire, Release}; | ||||||||||||||||
|
|
||||||||||||||||
| /// Sharded inject queue. | ||||||||||||||||
| /// | ||||||||||||||||
| /// Internally composed of `N` independent [`Shared`] / [`Synced`] pairs, | ||||||||||||||||
| /// each protected by its own mutex and padded to avoid false sharing. | ||||||||||||||||
| pub(crate) struct Sharded<T: 'static> { | ||||||||||||||||
| /// One entry per shard. | ||||||||||||||||
| shards: Box<[CachePadded<Shard<T>>]>, | ||||||||||||||||
|
|
||||||||||||||||
| /// `shards.len() - 1`, used for fast modulo. Shard count is always | ||||||||||||||||
| /// a power of two. | ||||||||||||||||
| shard_mask: usize, | ||||||||||||||||
|
|
||||||||||||||||
| /// Set once all shards have been closed. Allows `is_closed` to be | ||||||||||||||||
| /// checked without locking a shard. | ||||||||||||||||
| is_closed: AtomicBool, | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| struct Shard<T: 'static> { | ||||||||||||||||
| shared: Shared<T>, | ||||||||||||||||
| synced: Mutex<Synced>, | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| cfg_not_loom! { | ||||||||||||||||
| use std::sync::atomic::{AtomicUsize as StdAtomicUsize, Ordering::Relaxed}; | ||||||||||||||||
|
|
||||||||||||||||
| /// Hands out shard indices to threads on first push. Shared across all | ||||||||||||||||
| /// `Sharded` instances, which is fine: it only needs to spread threads | ||||||||||||||||
| /// out. Uses `std` atomics directly (not loom) because shard selection | ||||||||||||||||
| /// has no correctness implications and loom caps shards at 1 anyway. | ||||||||||||||||
| static NEXT_SHARD: StdAtomicUsize = StdAtomicUsize::new(0); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Upper bound on shard count. More shards reduce push contention but | ||||||||||||||||
| /// make `is_empty`/`len` (which scan every shard) slower, and those are | ||||||||||||||||
| /// called in the worker hot loop. Contention drops off steeply past a | ||||||||||||||||
| /// handful of shards, so a small cap captures the win. Must be a power | ||||||||||||||||
| /// of two. | ||||||||||||||||
| /// | ||||||||||||||||
| /// Under loom, additional shards would multiply the modeled state space | ||||||||||||||||
| /// without testing any new interleavings: each shard is an independent | ||||||||||||||||
| /// instance of the already-loom-tested `Shared`/`Synced` pair, and the | ||||||||||||||||
| /// cross-shard rotation is plain sequential code. | ||||||||||||||||
| #[cfg(loom)] | ||||||||||||||||
| const MAX_SHARDS: usize = 1; | ||||||||||||||||
|
|
||||||||||||||||
| #[cfg(not(loom))] | ||||||||||||||||
| const MAX_SHARDS: usize = 8; | ||||||||||||||||
|
|
||||||||||||||||
| impl<T: 'static> Sharded<T> { | ||||||||||||||||
| /// Creates a new sharded inject queue with a shard count derived | ||||||||||||||||
| /// from the requested hint (rounded up to a power of two). | ||||||||||||||||
| pub(crate) fn new(shard_hint: usize) -> Sharded<T> { | ||||||||||||||||
| let num_shards = shard_hint.clamp(1, MAX_SHARDS).next_power_of_two(); | ||||||||||||||||
|
alex marked this conversation as resolved.
|
||||||||||||||||
|
|
||||||||||||||||
| let shards = (0..num_shards) | ||||||||||||||||
| .map(|_| { | ||||||||||||||||
| let (shared, synced) = Shared::new(); | ||||||||||||||||
| CachePadded::new(Shard { | ||||||||||||||||
| shared, | ||||||||||||||||
| synced: Mutex::new(synced), | ||||||||||||||||
| }) | ||||||||||||||||
| }) | ||||||||||||||||
| .collect::<Vec<_>>() | ||||||||||||||||
| .into_boxed_slice(); | ||||||||||||||||
|
|
||||||||||||||||
| Sharded { | ||||||||||||||||
| shards, | ||||||||||||||||
| shard_mask: num_shards - 1, | ||||||||||||||||
| is_closed: AtomicBool::new(false), | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Returns the total number of tasks across all shards. | ||||||||||||||||
| /// | ||||||||||||||||
| /// This is a sum of per-shard atomic reads and is thus an | ||||||||||||||||
| /// approximation under concurrent modification. With the shard | ||||||||||||||||
| /// count capped small, the scan is cheap. | ||||||||||||||||
| pub(crate) fn len(&self) -> usize { | ||||||||||||||||
| let mut len = 0; | ||||||||||||||||
| for shard in self.shards.iter() { | ||||||||||||||||
| len += shard.shared.len(); | ||||||||||||||||
| } | ||||||||||||||||
| len | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Returns `true` if every shard reports empty. | ||||||||||||||||
| pub(crate) fn is_empty(&self) -> bool { | ||||||||||||||||
| for shard in self.shards.iter() { | ||||||||||||||||
| if !shard.shared.is_empty() { | ||||||||||||||||
| return false; | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| true | ||||||||||||||||
|
Comment on lines
+106
to
+111
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. take it or leave it, but this could also be written as:
Suggested change
|
||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Returns `true` if `close` has been called. | ||||||||||||||||
| pub(crate) fn is_closed(&self) -> bool { | ||||||||||||||||
| self.is_closed.load(Acquire) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Closes all shards and prevents further pushes. | ||||||||||||||||
| /// | ||||||||||||||||
| /// Returns `true` if the queue was open when the transition was made. | ||||||||||||||||
| pub(crate) fn close(&self) -> bool { | ||||||||||||||||
| // Close each shard under its own lock. After this loop no shard | ||||||||||||||||
| // will accept a push. | ||||||||||||||||
| let mut was_open = false; | ||||||||||||||||
| for shard in self.shards.iter() { | ||||||||||||||||
| let mut synced = shard.synced.lock(); | ||||||||||||||||
| was_open |= shard.shared.close(&mut synced); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // Publish the closed state for lock-free observers. | ||||||||||||||||
| if was_open { | ||||||||||||||||
| self.is_closed.store(true, Release); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| was_open | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Pushes a task into the queue. | ||||||||||||||||
| /// | ||||||||||||||||
| /// Selects a shard using the calling thread's home-shard index. Does | ||||||||||||||||
| /// nothing if the queue is closed. | ||||||||||||||||
| pub(crate) fn push(&self, task: task::Notified<T>) { | ||||||||||||||||
| let idx = self.next_push_shard(); | ||||||||||||||||
|
martin-g marked this conversation as resolved.
|
||||||||||||||||
| let shard = &*self.shards[idx]; | ||||||||||||||||
|
|
||||||||||||||||
| let mut synced = shard.synced.lock(); | ||||||||||||||||
| // safety: `synced` belongs to `shard.shared` | ||||||||||||||||
| unsafe { shard.shared.push(&mut synced, task) }; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Pushes a batch of tasks. The whole batch is placed in a single | ||||||||||||||||
| /// shard to avoid taking multiple locks. | ||||||||||||||||
| pub(crate) fn push_batch<I>(&self, iter: I) | ||||||||||||||||
| where | ||||||||||||||||
| I: Iterator<Item = task::Notified<T>>, | ||||||||||||||||
| { | ||||||||||||||||
| let idx = self.next_push_shard(); | ||||||||||||||||
|
martin-g marked this conversation as resolved.
|
||||||||||||||||
| let shard = &*self.shards[idx]; | ||||||||||||||||
|
|
||||||||||||||||
| // safety: `&shard.synced` yields `&mut Synced` for the same | ||||||||||||||||
| // `Shared` instance that `push_batch` operates on. The underlying | ||||||||||||||||
| // implementation links the batch outside the lock and only | ||||||||||||||||
| // acquires it for the list splice. | ||||||||||||||||
| unsafe { shard.shared.push_batch(&shard.synced, iter) }; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Pops a single task, rotating through shards starting at `hint`. | ||||||||||||||||
| pub(crate) fn pop(&self, hint: usize) -> Option<task::Notified<T>> { | ||||||||||||||||
| let num_shards = self.shards.len(); | ||||||||||||||||
| let start = hint & self.shard_mask; | ||||||||||||||||
|
|
||||||||||||||||
| for i in 0..num_shards { | ||||||||||||||||
| let idx = (start + i) & self.shard_mask; | ||||||||||||||||
| let shard = &*self.shards[idx]; | ||||||||||||||||
|
|
||||||||||||||||
| // Fast path: skip empty shards without locking. Skip under loom to | ||||||||||||||||
| // minimize the number of atomic ops to model. | ||||||||||||||||
| #[cfg(not(loom))] | ||||||||||||||||
| { | ||||||||||||||||
| if shard.shared.is_empty() { | ||||||||||||||||
| continue; | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| let mut synced = shard.synced.lock(); | ||||||||||||||||
|
martin-g marked this conversation as resolved.
|
||||||||||||||||
|
|
||||||||||||||||
| // safety: `synced` belongs to `shard.shared` | ||||||||||||||||
| if let Some(task) = unsafe { shard.shared.pop(&mut synced) } { | ||||||||||||||||
| return Some(task); | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| None | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Pops up to `n` tasks from the first non-empty shard, starting the | ||||||||||||||||
| /// search at `hint`, and passes them to `f`. | ||||||||||||||||
| /// | ||||||||||||||||
| /// Draining from a single shard keeps the critical section short and | ||||||||||||||||
| /// bounded; if that shard has fewer than `n` tasks, fewer are yielded. | ||||||||||||||||
| /// The caller will return for more on a subsequent tick. | ||||||||||||||||
| /// | ||||||||||||||||
| /// Returns `None` (without calling `f`) if no shard has any tasks. | ||||||||||||||||
| pub(crate) fn pop_n<R>( | ||||||||||||||||
| &self, | ||||||||||||||||
| hint: usize, | ||||||||||||||||
| n: usize, | ||||||||||||||||
| f: impl FnOnce(Pop<'_, T>) -> R, | ||||||||||||||||
| ) -> Option<R> { | ||||||||||||||||
| debug_assert!(n > 0); | ||||||||||||||||
|
|
||||||||||||||||
| let num_shards = self.shards.len(); | ||||||||||||||||
| let start = hint & self.shard_mask; | ||||||||||||||||
|
|
||||||||||||||||
| for i in 0..num_shards { | ||||||||||||||||
| let idx = (start + i) & self.shard_mask; | ||||||||||||||||
| let shard = &*self.shards[idx]; | ||||||||||||||||
|
|
||||||||||||||||
| // Skip under loom to minimize the number of atomic ops to be | ||||||||||||||||
| // tracked. | ||||||||||||||||
| #[cfg(not(loom))] | ||||||||||||||||
| { | ||||||||||||||||
| if shard.shared.is_empty() { | ||||||||||||||||
| continue; | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| let mut synced = shard.synced.lock(); | ||||||||||||||||
|
|
||||||||||||||||
| // safety: `synced` belongs to `shard.shared` | ||||||||||||||||
| let pop = unsafe { shard.shared.pop_n(&mut synced, n) }; | ||||||||||||||||
|
|
||||||||||||||||
| // Check the length of the pop result rathern than pre-checking | ||||||||||||||||
| // `shard.shared.is_empty()` to avoid needing to check 2x. | ||||||||||||||||
| if pop.len() == 0 { | ||||||||||||||||
| continue; | ||||||||||||||||
| } | ||||||||||||||||
| return Some(f(pop)); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| None | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /// Picks the shard for a push operation. | ||||||||||||||||
| /// | ||||||||||||||||
| /// Each thread is assigned a shard on first push and sticks with it. | ||||||||||||||||
| /// This keeps a single thread's pushes cache-local while spreading | ||||||||||||||||
| /// distinct threads across distinct mutexes. | ||||||||||||||||
| /// | ||||||||||||||||
| /// The shard index is stored in the per-thread `CONTEXT` thread local | ||||||||||||||||
| /// (shared with the rest of the runtime) to avoid consuming an extra | ||||||||||||||||
| /// thread-local slot. | ||||||||||||||||
| #[cfg(not(loom))] | ||||||||||||||||
| fn next_push_shard(&self) -> usize { | ||||||||||||||||
| // If there's only one shard, skip the thread-local lookup. | ||||||||||||||||
| if self.shard_mask == 0 { | ||||||||||||||||
| return 0; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| crate::runtime::context::inject_push_shard(|| NEXT_SHARD.fetch_add(1, Relaxed)) | ||||||||||||||||
| & self.shard_mask | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| #[cfg(loom)] | ||||||||||||||||
| fn next_push_shard(&self) -> usize { | ||||||||||||||||
| // Shard count is capped at 1 under loom. | ||||||||||||||||
| debug_assert_eq!(self.shard_mask, 0); | ||||||||||||||||
| 0 | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // `Shared::push_batch` links the batch before acquiring the lock via the | ||||||||||||||||
| // `Lock` trait. Implementing `Lock` on a shard's mutex reference lets us | ||||||||||||||||
| // reuse that machinery, keeping the critical section to just the splice. | ||||||||||||||||
| impl<'a> super::super::Lock<Synced> for &'a Mutex<Synced> { | ||||||||||||||||
| type Handle = MutexGuard<'a, Synced>; | ||||||||||||||||
|
|
||||||||||||||||
| fn lock(self) -> Self::Handle { | ||||||||||||||||
| self.lock() | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| impl AsMut<Synced> for MutexGuard<'_, Synced> { | ||||||||||||||||
| fn as_mut(&mut self) -> &mut Synced { | ||||||||||||||||
| self | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this interact with
block_in_place? Let's say thread A is assigned shard 4, and then it invokesblock_in_placeand stops being a worker thread. One of the blocking threads become a worker thread to replace it. Then it might pick a different shard from 4, right? I guess this means we can end up with multiple workers using the same shard.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is the wrong question ... this is for when you push to a runtime from outside it, so you're not talking about your own shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but that's true either way -- our max number of shards is 8, and you can have way more workers than that. A worker does not uniquely own its shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I guess it's okay then. Should we periodically pick a new shard? Perhaps after every 100 spawns? That way, if a program has two threads that continuously spawn a lot on the same shard, then eventually they pick a new shard and stop contending.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I suppose we could do that for the pathological case. 100 spawns might be too few though? If you set it too low you risk having a single thread bounce around and just dirty up everyone else's cache lines.
My preference would be to land this without the adaptive behavior and do that as a follow up -- even for the pathological case of two threads that happen to land on the same shard, this should still pareto dominate being unsharded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm raising the concern as a liveness issue, which per this guarantee makes it a correctness concern.
The code for waking up a worker after pushing to the queue is here:
tokio/tokio/src/runtime/scheduler/multi_thread/worker.rs
Lines 1287 to 1289 in e5ab8fb
There is no relationship between which shard the item was pushed to, and which worker is woken up. All this code ensures that, after pushing an item, then if all workers are idle, then a worker is woken up. But it might not be the same worker as where the item was pushed, and if there is already a non-idle worker searching for work, no wakeup occurs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See also #8029
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking more about this, and I've come to the conclusion that it is okay as-is. As a perf matter, it could be beneficial to give
notify_parked_remote()a hint about which worker it should prefer to wake up, but not a blocker.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can easily produce a test case for this scenario, perhaps by having one worker execute a future that just spawns a big pile of tasks in a loop without yielding? That's not a totally contrived scenario as you might imagine an accept loop or something under load where there's basically always new connections to handle without having to wait for a long period of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I'm no longer worried about it is that this scenario will cause all worker threads to wake up, and once all worker threads are alive, you can't have this kind of starvation.