rt: shard the multi-thread inject queue to reduce remote spawn contention#7973
rt: shard the multi-thread inject queue to reduce remote spawn contention#7973alex wants to merge 6 commits intotokio-rs:masterfrom
Conversation
adfa19f to
de52c66
Compare
|
Is the inject queue still a FIFO queue? |
|
Approximately -- each of the queue shards is FIFO, but nothing attempts to ensure ordering cross-shard. |
Darksonn
left a comment
There was a problem hiding this comment.
In general this looks quite reasonable. Let's do it.
71aadec to
ba043df
Compare
| /// Home shard index for the sharded inject queue. External threads | ||
| /// pushing tasks are assigned a shard on first push and stick with it | ||
| /// for cache locality. `INJECT_SHARD_UNASSIGNED` means not yet assigned. | ||
| #[cfg(feature = "rt-multi-thread")] | ||
| inject_push_shard: Cell<usize>, |
There was a problem hiding this comment.
How does this interact with block_in_place? Let's say thread A is assigned shard 4, and then it invokes block_in_place and stops being a worker thread. One of the blocking threads become a worker thread to replace it. Then it might pick a different shard from 4, right? I guess this means we can end up with multiple workers using the same shard.
There was a problem hiding this comment.
I guess this is the wrong question ... this is for when you push to a runtime from outside it, so you're not talking about your own shard.
There was a problem hiding this comment.
Yes, but that's true either way -- our max number of shards is 8, and you can have way more workers than that. A worker does not uniquely own its shard.
There was a problem hiding this comment.
Ok, I guess it's okay then. Should we periodically pick a new shard? Perhaps after every 100 spawns? That way, if a program has two threads that continuously spawn a lot on the same shard, then eventually they pick a new shard and stop contending.
There was a problem hiding this comment.
Hmm, I suppose we could do that for the pathological case. 100 spawns might be too few though? If you set it too low you risk having a single thread bounce around and just dirty up everyone else's cache lines.
My preference would be to land this without the adaptive behavior and do that as a follow up -- even for the pathological case of two threads that happen to land on the same shard, this should still pareto dominate being unsharded.
There was a problem hiding this comment.
I'm raising the concern as a liveness issue, which per this guarantee makes it a correctness concern.
how did you end up with an item in B queue but that worker never woken up?
The code for waking up a worker after pushing to the queue is here:
tokio/tokio/src/runtime/scheduler/multi_thread/worker.rs
Lines 1287 to 1289 in e5ab8fb
There is no relationship between which shard the item was pushed to, and which worker is woken up. All this code ensures that, after pushing an item, then if all workers are idle, then a worker is woken up. But it might not be the same worker as where the item was pushed, and if there is already a non-idle worker searching for work, no wakeup occurs.
There was a problem hiding this comment.
I've been thinking more about this, and I've come to the conclusion that it is okay as-is. As a perf matter, it could be beneficial to give notify_parked_remote() a hint about which worker it should prefer to wake up, but not a blocker.
There was a problem hiding this comment.
I wonder if we can easily produce a test case for this scenario, perhaps by having one worker execute a future that just spawns a big pile of tasks in a loop without yielding? That's not a totally contrived scenario as you might imagine an accept loop or something under load where there's basically always new connections to handle without having to wait for a long period of time.
There was a problem hiding this comment.
The reason I'm no longer worried about it is that this scenario will cause all worker threads to wake up, and once all worker threads are alive, you can't have this kind of starvation.
|
Please rebase or merge master to avoid conflicts with the LIFO slot changes. |
|
Ah, looks like loom timed out? |
| for shard in self.shards.iter() { | ||
| if !shard.shared.is_empty() { | ||
| return false; | ||
| } | ||
| } | ||
| true |
There was a problem hiding this comment.
take it or leave it, but this could also be written as:
| for shard in self.shards.iter() { | |
| if !shard.shared.is_empty() { | |
| return false; | |
| } | |
| } | |
| true | |
| self.shards.iter().all(|shard| shard.is_empty()) |
| /// Home shard index for the sharded inject queue. External threads | ||
| /// pushing tasks are assigned a shard on first push and stick with it | ||
| /// for cache locality. `INJECT_SHARD_UNASSIGNED` means not yet assigned. | ||
| #[cfg(feature = "rt-multi-thread")] | ||
| inject_push_shard: Cell<usize>, |
There was a problem hiding this comment.
I wonder if we can easily produce a test case for this scenario, perhaps by having one worker execute a future that just spawns a big pile of tasks in a loop without yielding? That's not a totally contrived scenario as you might imagine an accept loop or something under load where there's basically always new connections to handle without having to wait for a long period of time.
…tion The multi-threaded scheduler's inject queue was protected by a single global mutex (shared with idle coordination state). Every remote task spawn — any spawn from outside a worker thread — acquired this lock, serializing concurrent spawners and limiting throughput. This change introduces `inject::Sharded`, which splits the inject queue into up to 8 independent shards, each an existing `Shared`/`Synced` pair with its own mutex and cache-line padding. Design: - Push: each thread is assigned a home shard on first push (via a global counter) and sticks with it. This keeps consecutive pushes from one thread cache-local while spreading distinct threads across distinct locks. - Pop: workers rotate through shards starting at their own index, skipping empty shards via per-shard atomic length. pop_n drains from one shard at a time to keep critical sections bounded. - Shard count: capped at 8 (and 1 under loom). Contention drops off steeply past a handful of shards, and is_empty()/len() scan all shards in the worker hot loop. - is_closed: a single Release atomic set after all shards are closed, so the shutdown check stays lock-free. Random shard selection via context::thread_rng_n (as used in tokio-rs#7757 for the blocking pool) was measured and found to be 20-33% slower on remote_spawn at 8+ threads. The inject workload is a tight loop of trivial pushes where producer-side cache locality dominates: with RNG, a hot thread bounces between shard cache lines on every push; with sticky assignment it stays hot on one mutex and list tail. RNG did win slightly (5-9%) on single-producer benchmarks where spreading tasks lets workers pop in parallel, but not enough to offset the regression at scale. The inject state is removed from the global Synced mutex, which now only guards idle coordination. This also helps the single-threaded path since remote pushes no longer contend with worker park/unpark. Results on remote_spawn benchmark (12,800 no-op tasks, N spawner threads, 64-core box): threads before after improvement 1 9.38 ms 7.33 ms -22% 2 14.94 ms 6.64 ms -56% 4 23.69 ms 5.34 ms -77% 8 34.81 ms 4.69 ms -87% 16 32.33 ms 4.54 ms -86% 32 30.37 ms 4.73 ms -84% 64 26.59 ms 5.34 ms -80% rt_multi_threaded benchmarks: spawn_many_local -8%, spawn_many_remote_idle -7%, yield_many -1%, rest neutral. Developed in conjunction with Claude.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sharded::pop already checks each shard's emptiness as a fast path before locking, so the outer is_empty scan was iterating all shards twice for no benefit. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Fix incorrect claim that a closed shard implies all shards are closed (close() operates shard-by-shard), and document that MAX_SHARDS must be a power of two. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Is there a recommended way to debug the loom tests timing? Looks like even in normal operations they take several hours :-( I'm assuming the likely cause of timeouts is accidentally making the state space too large? (I was hoping that limiting to 1 shard on loom would prevent that, but perhaps not) |
|
Limiting it to one shard is not necessarily enough. Any operation involved in concurrency (such as atomics or taking/releasing mutexes) that happens while other threads exists is a location where preemption can happen in loom, so as you introduce more mutex lock/unlocks during the part of the test with more than one thread, that increases the search space. |
|
Ah yeah. Same question though, what's the best way to debug it :-) |
|
If there's an actual bug that it's catching, I'll do it with printlns to understand the specific interleaving it fails on. I'm not sure when it's just too many interleavings. Probably the test has to be simplified. |
this may also marginally help performance for real code by removing duplicate checks for shared emptiness under locks
|
Looks like the latest change was sufficient, hopefully it looks reasonable. |
hawkw
left a comment
There was a problem hiding this comment.
Given that the previous change to use a sharded queue in spawn_blocking (#7757) introduced a regression that caused programs using spawn_blocking to hang (#8056), and that change ultimately had to be reverted (in #8057), I think we should be cautious about moving forwards with this.
In particular, I feel like --- as a substantial change to runtime internals that potentially affects all uses of the multi-threaded runtime, and which introduces a complex new concurrent data structure involving unsafe code --- this is the type of change which should really be introduced as an opt-in tokio::runtime::Builder setting which requires tokio_unstable. This way, we can allow users to start testing this in production without running the risk of introducing regressions which block users from picking up new tokio releases. We've taken this approach in the past for changes such as the alternative timer implementation (#7467) and eager I/O driver handoff (#8010)1, and I think it would be appropriate to do something similar here, as well.
Footnotes
-
The latter of which is a much smaller change! ↩
|
On my TODO list to figure out how practice this is this weekend. |
Great! If it's possible, I think that starting out as an opt-in experimental feature can be a useful way to get big improvements to runtime internals like this one merged faster and start trying them out in production |
|
The "how [practical] this is" was specifically making this opt in. |
|
Ok, spent some time this morning looking at a "pre refactor" (pre-factor?) to make it easy for us to support both the shared lock and sharded lock queues. Unfortunately, I'm extremely unhappy with the results -- the current diff is +340/-219 (just for the refactor), basically as big as this patch. And there's a bunch of annoying design pieces. What I want with is an This leads to splitting off the Mutex that's currently covering both The other obvious change is that I also ended up with 2x implementations of All this to say: I think this can be done, the PR is much larger than I'd like. If anyone has any for a better architecture, let me know and I can mull on it. Otherwise I think I'll make #8068 my priority and come back this once that's done. |
|
(I'm also happy to put it up as a draft PR if it's of interest to anyone.) |
|
So, very naively, I had kind of hoped we would be able to do this by making the number of shards a parameter that's provided when the sharded inject queue is constructed, and having the runtime builder either construct it with 1 or 8 shards depending on whether the sharded queue is enabled. But, this only works if the behavior of the queue with a single shard is more or less equivalent to a single queue. It sounds like this not actually be the case, though, because of splitting the mutex currently guarding both I actually think that we should probably be at least a little concerned about separating the inject queue and idle state mutex. Off the top of my head, I can't think of any code that relies on the assumption that these two pieces of shared state are not modified concurrently, but I think we'll have to look closely to make sure that's the case. |
|
Yeah, I think that's the right summary. Maybe there's an even more incremental change of just using different mutexes for idle and inject in the existing shared structure? It's a tiny diff and should be the biggest semantic risk for the migration? |
The multi-threaded scheduler's inject queue was protected by a single global mutex (shared with idle coordination state). Every remote task spawn — any spawn from outside a worker thread — acquired this lock, serializing concurrent spawners and limiting throughput.
This change introduces
inject::Sharded, which splits the inject queue into up to 8 independent shards, each an existingShared/Syncedpair with its own mutex and cache-line padding.Design:
Random shard selection via context::thread_rng_n (as used in #7757 for the blocking pool) was measured and found to be 20-33% slower on remote_spawn at 8+ threads. The inject workload is a tight loop of trivial pushes where producer-side cache locality dominates: with RNG, a hot thread bounces between shard cache lines on every push; with sticky assignment it stays hot on one mutex and list tail. RNG did win slightly (5-9%) on single-producer benchmarks where spreading tasks lets workers pop in parallel, but not enough to offset the regression at scale.
The inject state is removed from the global Synced mutex, which now only guards idle coordination. This also helps the single-threaded path since remote pushes no longer contend with worker park/unpark.
Results on remote_spawn benchmark (12,800 no-op tasks, N spawner threads, 64-core box):
threads before after improvement
1 9.38 ms 7.33 ms -22%
2 14.94 ms 6.64 ms -56%
4 23.69 ms 5.34 ms -77%
8 34.81 ms 4.69 ms -87%
16 32.33 ms 4.54 ms -86%
32 30.37 ms 4.73 ms -84%
64 26.59 ms 5.34 ms -80%
rt_multi_threaded benchmarks: spawn_many_local -8%, spawn_many_remote_idle -7%, yield_many -1%, rest neutral.
Developed in conjunction with Claude.