diff --git a/spellcheck.dic b/spellcheck.dic index e3cbfe0772b..fe298051483 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -318 +321 & + < @@ -148,6 +148,7 @@ implementor implementors incrementing inlining +interleavings interoperate invariants Invariants @@ -188,6 +189,7 @@ Multithreaded mut mutex Mutex +mutexes Nagle namespace nonblocking @@ -285,6 +287,7 @@ tx udp UDP UID +uncontended unhandled unix unlink diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 42e17bc66ed..2395678c5b9 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -60,6 +60,12 @@ struct Context { #[cfg(any(feature = "rt", feature = "macros"))] rng: Cell>, + /// Home shard index for the sharded inject queue. External threads + /// pushing tasks are assigned a shard on first push and stick with it + /// for cache locality. `INJECT_SHARD_UNASSIGNED` means not yet assigned. + #[cfg(feature = "rt-multi-thread")] + inject_push_shard: Cell, + /// Tracks the amount of "work" a task may still do before yielding back to /// the scheduler budget: Cell, @@ -92,6 +98,9 @@ tokio_thread_local! { #[cfg(feature = "rt")] current_task_id: Cell::new(None), + #[cfg(feature = "rt-multi-thread")] + inject_push_shard: Cell::new(INJECT_SHARD_UNASSIGNED), + // Tracks if the current thread is currently driving a runtime. // Note, that if this is set to "entered", the current scheduler // handle may not reference the runtime currently executing. This @@ -203,3 +212,27 @@ cfg_rt! { } } } + +cfg_rt_multi_thread! { + /// Sentinel indicating the per-thread inject push shard has not been assigned. + const INJECT_SHARD_UNASSIGNED: usize = usize::MAX; + + /// Returns the calling thread's inject-queue push shard, assigning one + /// via `init` on first use. Falls back to `init()` if the thread local + /// is inaccessible (e.g. during thread shutdown). + pub(crate) fn inject_push_shard(init: impl FnOnce() -> usize) -> usize { + let mut init = Some(init); + CONTEXT + .try_with(|ctx| { + let idx = ctx.inject_push_shard.get(); + if idx == INJECT_SHARD_UNASSIGNED { + let new = (init.take().unwrap())(); + ctx.inject_push_shard.set(new); + new + } else { + idx + } + }) + .unwrap_or_else(|_| (init.take().unwrap())()) + } +} diff --git a/tokio/src/runtime/scheduler/inject.rs b/tokio/src/runtime/scheduler/inject.rs index 6b02ecc3247..86dd67afdec 100644 --- a/tokio/src/runtime/scheduler/inject.rs +++ b/tokio/src/runtime/scheduler/inject.rs @@ -14,6 +14,9 @@ pub(crate) use synced::Synced; cfg_rt_multi_thread! { mod rt_multi_thread; + + mod sharded; + pub(crate) use sharded::Sharded; } mod metrics; diff --git a/tokio/src/runtime/scheduler/inject/sharded.rs b/tokio/src/runtime/scheduler/inject/sharded.rs new file mode 100644 index 00000000000..d3a4a7aefab --- /dev/null +++ b/tokio/src/runtime/scheduler/inject/sharded.rs @@ -0,0 +1,288 @@ +//! Sharded inject queue for the multi-threaded scheduler. +//! +//! A single global mutex is the dominant source of contention when many +//! external threads spawn into the runtime concurrently. `Sharded` splits +//! the inject queue into independent shards, each with its own mutex and +//! intrusive linked list. Pushes are distributed across shards using a +//! per-thread counter, so uncontended threads never touch the same lock. +//! Workers drain shards starting from their own index and rotate. + +use super::{Pop, Shared, Synced}; + +use crate::loom::sync::atomic::AtomicBool; +use crate::loom::sync::{Mutex, MutexGuard}; +use crate::runtime::task; +use crate::util::cacheline::CachePadded; + +use std::sync::atomic::Ordering::{Acquire, Release}; + +/// Sharded inject queue. +/// +/// Internally composed of `N` independent [`Shared`] / [`Synced`] pairs, +/// each protected by its own mutex and padded to avoid false sharing. +pub(crate) struct Sharded { + /// One entry per shard. + shards: Box<[CachePadded>]>, + + /// `shards.len() - 1`, used for fast modulo. Shard count is always + /// a power of two. + shard_mask: usize, + + /// Set once all shards have been closed. Allows `is_closed` to be + /// checked without locking a shard. + is_closed: AtomicBool, +} + +struct Shard { + shared: Shared, + synced: Mutex, +} + +cfg_not_loom! { + use std::sync::atomic::{AtomicUsize as StdAtomicUsize, Ordering::Relaxed}; + + /// Hands out shard indices to threads on first push. Shared across all + /// `Sharded` instances, which is fine: it only needs to spread threads + /// out. Uses `std` atomics directly (not loom) because shard selection + /// has no correctness implications and loom caps shards at 1 anyway. + static NEXT_SHARD: StdAtomicUsize = StdAtomicUsize::new(0); +} + +/// Upper bound on shard count. More shards reduce push contention but +/// make `is_empty`/`len` (which scan every shard) slower, and those are +/// called in the worker hot loop. Contention drops off steeply past a +/// handful of shards, so a small cap captures the win. Must be a power +/// of two. +/// +/// Under loom, additional shards would multiply the modeled state space +/// without testing any new interleavings: each shard is an independent +/// instance of the already-loom-tested `Shared`/`Synced` pair, and the +/// cross-shard rotation is plain sequential code. +#[cfg(loom)] +const MAX_SHARDS: usize = 1; + +#[cfg(not(loom))] +const MAX_SHARDS: usize = 8; + +impl Sharded { + /// Creates a new sharded inject queue with a shard count derived + /// from the requested hint (rounded up to a power of two). + pub(crate) fn new(shard_hint: usize) -> Sharded { + let num_shards = shard_hint.clamp(1, MAX_SHARDS).next_power_of_two(); + + let shards = (0..num_shards) + .map(|_| { + let (shared, synced) = Shared::new(); + CachePadded::new(Shard { + shared, + synced: Mutex::new(synced), + }) + }) + .collect::>() + .into_boxed_slice(); + + Sharded { + shards, + shard_mask: num_shards - 1, + is_closed: AtomicBool::new(false), + } + } + + /// Returns the total number of tasks across all shards. + /// + /// This is a sum of per-shard atomic reads and is thus an + /// approximation under concurrent modification. With the shard + /// count capped small, the scan is cheap. + pub(crate) fn len(&self) -> usize { + let mut len = 0; + for shard in self.shards.iter() { + len += shard.shared.len(); + } + len + } + + /// Returns `true` if every shard reports empty. + pub(crate) fn is_empty(&self) -> bool { + for shard in self.shards.iter() { + if !shard.shared.is_empty() { + return false; + } + } + true + } + + /// Returns `true` if `close` has been called. + pub(crate) fn is_closed(&self) -> bool { + self.is_closed.load(Acquire) + } + + /// Closes all shards and prevents further pushes. + /// + /// Returns `true` if the queue was open when the transition was made. + pub(crate) fn close(&self) -> bool { + // Close each shard under its own lock. After this loop no shard + // will accept a push. + let mut was_open = false; + for shard in self.shards.iter() { + let mut synced = shard.synced.lock(); + was_open |= shard.shared.close(&mut synced); + } + + // Publish the closed state for lock-free observers. + if was_open { + self.is_closed.store(true, Release); + } + + was_open + } + + /// Pushes a task into the queue. + /// + /// Selects a shard using the calling thread's home-shard index. Does + /// nothing if the queue is closed. + pub(crate) fn push(&self, task: task::Notified) { + let idx = self.next_push_shard(); + let shard = &*self.shards[idx]; + + let mut synced = shard.synced.lock(); + // safety: `synced` belongs to `shard.shared` + unsafe { shard.shared.push(&mut synced, task) }; + } + + /// Pushes a batch of tasks. The whole batch is placed in a single + /// shard to avoid taking multiple locks. + pub(crate) fn push_batch(&self, iter: I) + where + I: Iterator>, + { + let idx = self.next_push_shard(); + let shard = &*self.shards[idx]; + + // safety: `&shard.synced` yields `&mut Synced` for the same + // `Shared` instance that `push_batch` operates on. The underlying + // implementation links the batch outside the lock and only + // acquires it for the list splice. + unsafe { shard.shared.push_batch(&shard.synced, iter) }; + } + + /// Pops a single task, rotating through shards starting at `hint`. + pub(crate) fn pop(&self, hint: usize) -> Option> { + let num_shards = self.shards.len(); + let start = hint & self.shard_mask; + + for i in 0..num_shards { + let idx = (start + i) & self.shard_mask; + let shard = &*self.shards[idx]; + + // Fast path: skip empty shards without locking. Skip under loom to + // minimize the number of atomic ops to model. + #[cfg(not(loom))] + { + if shard.shared.is_empty() { + continue; + } + } + + let mut synced = shard.synced.lock(); + + // safety: `synced` belongs to `shard.shared` + if let Some(task) = unsafe { shard.shared.pop(&mut synced) } { + return Some(task); + } + } + + None + } + + /// Pops up to `n` tasks from the first non-empty shard, starting the + /// search at `hint`, and passes them to `f`. + /// + /// Draining from a single shard keeps the critical section short and + /// bounded; if that shard has fewer than `n` tasks, fewer are yielded. + /// The caller will return for more on a subsequent tick. + /// + /// Returns `None` (without calling `f`) if no shard has any tasks. + pub(crate) fn pop_n( + &self, + hint: usize, + n: usize, + f: impl FnOnce(Pop<'_, T>) -> R, + ) -> Option { + debug_assert!(n > 0); + + let num_shards = self.shards.len(); + let start = hint & self.shard_mask; + + for i in 0..num_shards { + let idx = (start + i) & self.shard_mask; + let shard = &*self.shards[idx]; + + // Skip under loom to minimize the number of atomic ops to be + // tracked. + #[cfg(not(loom))] + { + if shard.shared.is_empty() { + continue; + } + } + + let mut synced = shard.synced.lock(); + + // safety: `synced` belongs to `shard.shared` + let pop = unsafe { shard.shared.pop_n(&mut synced, n) }; + + // Check the length of the pop result rathern than pre-checking + // `shard.shared.is_empty()` to avoid needing to check 2x. + if pop.len() == 0 { + continue; + } + return Some(f(pop)); + } + + None + } + + /// Picks the shard for a push operation. + /// + /// Each thread is assigned a shard on first push and sticks with it. + /// This keeps a single thread's pushes cache-local while spreading + /// distinct threads across distinct mutexes. + /// + /// The shard index is stored in the per-thread `CONTEXT` thread local + /// (shared with the rest of the runtime) to avoid consuming an extra + /// thread-local slot. + #[cfg(not(loom))] + fn next_push_shard(&self) -> usize { + // If there's only one shard, skip the thread-local lookup. + if self.shard_mask == 0 { + return 0; + } + + crate::runtime::context::inject_push_shard(|| NEXT_SHARD.fetch_add(1, Relaxed)) + & self.shard_mask + } + + #[cfg(loom)] + fn next_push_shard(&self) -> usize { + // Shard count is capped at 1 under loom. + debug_assert_eq!(self.shard_mask, 0); + 0 + } +} + +// `Shared::push_batch` links the batch before acquiring the lock via the +// `Lock` trait. Implementing `Lock` on a shard's mutex reference lets us +// reuse that machinery, keeping the critical section to just the splice. +impl<'a> super::super::Lock for &'a Mutex { + type Handle = MutexGuard<'a, Synced>; + + fn lock(self) -> Self::Handle { + self.lock() + } +} + +impl AsMut for MutexGuard<'_, Synced> { + fn as_mut(&mut self) -> &mut Synced { + self + } +} diff --git a/tokio/src/runtime/scheduler/inject/shared.rs b/tokio/src/runtime/scheduler/inject/shared.rs index a73e7fb3448..6bec66ca7a6 100644 --- a/tokio/src/runtime/scheduler/inject/shared.rs +++ b/tokio/src/runtime/scheduler/inject/shared.rs @@ -38,7 +38,7 @@ impl Shared { } // Kind of annoying to have to include the cfg here - #[cfg(any(feature = "taskdump", feature = "rt-multi-thread"))] + #[cfg(feature = "taskdump")] pub(crate) fn is_closed(&self, synced: &Synced) -> bool { synced.is_closed } diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index c04a3ef55a6..5bc7f100b7e 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -26,8 +26,6 @@ pub(crate) use worker::{Context, Launch, Shared}; cfg_taskdump! { mod trace; use trace::TraceStatus; - - pub(crate) use worker::Synced; } cfg_not_taskdump! { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index ab3b38c3fd8..0ae0855da39 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -61,7 +61,7 @@ use crate::runtime; use crate::runtime::scheduler::multi_thread::{ idle, park, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker, }; -use crate::runtime::scheduler::{inject, Defer, Lock}; +use crate::runtime::scheduler::{inject, Defer}; use crate::runtime::task::OwnedTasks; use crate::runtime::{ blocking, driver, scheduler, task, Config, SchedulerMetrics, TimerFlavor, WorkerMetrics, @@ -166,7 +166,10 @@ pub(crate) struct Shared { /// Global task queue used for: /// 1. Submit work to the scheduler while **not** currently on a worker thread. /// 2. Submit work to the scheduler when a worker run queue is saturated - pub(super) inject: inject::Shared>, + /// + /// The queue is sharded across multiple mutexes to reduce contention + /// when many external threads spawn tasks concurrently. + pub(super) inject: inject::Sharded>, /// Coordinates idle workers idle: Idle, @@ -203,13 +206,10 @@ pub(crate) struct Shared { } /// Data synchronized by the scheduler mutex -pub(crate) struct Synced { +pub(super) struct Synced { /// Synchronized state for `Idle`. pub(super) idle: idle::Synced, - /// Synchronized state for `Inject`. - pub(crate) inject: inject::Synced, - #[cfg(all(tokio_unstable, feature = "time"))] /// Timers pending to be registered. /// This is used to register a timer but the [`Core`] @@ -302,7 +302,7 @@ pub(super) fn create( } let (idle, idle_synced) = Idle::new(size); - let (inject, inject_synced) = inject::Shared::new(); + let inject = inject::Sharded::new(size); let remotes_len = remotes.len(); let handle = Arc::new(Handle { @@ -315,7 +315,6 @@ pub(super) fn create( owned: OwnedTasks::new(size), synced: Mutex::new(Synced { idle: idle_synced, - inject: inject_synced, #[cfg(all(tokio_unstable, feature = "time"))] inject_timers: Vec::new(), }), @@ -1058,7 +1057,7 @@ impl Core { worker .handle - .next_remote_task() + .next_remote_task(worker.index) .or_else(|| self.next_local_task()) } else { let maybe_task = self.next_local_task(); @@ -1107,17 +1106,17 @@ impl Core { // and not pushed onto the local queue. let n = usize::max(1, n); - let mut synced = worker.handle.shared.synced.lock(); - // safety: passing in the correct `inject::Synced`. - let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) }; - - // Pop the first task to return immediately - let ret = tasks.next(); + // Drain up to `n` tasks from one shard. The shard lock is held + // only for the duration of the closure. + worker.inject().pop_n(worker.index, n, |mut tasks| { + // Pop the first task to return immediately + let ret = tasks.next(); - // Push the rest of the on the run queue - self.run_queue.push_back(tasks); + // Push the rest of the on the run queue + self.run_queue.push_back(tasks); - ret + ret + })? } } @@ -1157,7 +1156,7 @@ impl Core { } // Fallback on checking the global queue - worker.handle.next_remote_task() + worker.handle.next_remote_task(worker.index) } fn transition_to_searching(&mut self, worker: &Worker) -> bool { @@ -1257,8 +1256,7 @@ impl Core { if !self.is_shutdown { // Check if the scheduler has been shutdown - let synced = worker.handle.shared.synced.lock(); - self.is_shutdown = worker.inject().is_closed(&synced.inject); + self.is_shutdown = worker.inject().is_closed(); } if !self.is_traced { @@ -1310,7 +1308,7 @@ impl Core { impl Worker { /// Returns a reference to the scheduler's injection queue. - fn inject(&self) -> &inject::Shared> { + fn inject(&self) -> &inject::Sharded> { &self.handle.shared.inject } } @@ -1370,24 +1368,14 @@ impl Handle { } } - fn next_remote_task(&self) -> Option { - if self.shared.inject.is_empty() { - return None; - } - - let mut synced = self.shared.synced.lock(); - // safety: passing in correct `idle::Synced` - unsafe { self.shared.inject.pop(&mut synced.inject) } + fn next_remote_task(&self, hint: usize) -> Option { + self.shared.inject.pop(hint) } fn push_remote_task(&self, task: Notified) { self.shared.scheduler_metrics.inc_remote_schedule_count(); - let mut synced = self.shared.synced.lock(); - // safety: passing in correct `idle::Synced` - unsafe { - self.shared.inject.push(&mut synced.inject, task); - } + self.shared.inject.push(task); } #[cfg(all(tokio_unstable, feature = "time"))] @@ -1412,11 +1400,7 @@ impl Handle { } pub(super) fn close(&self) { - if self - .shared - .inject - .close(&mut self.shared.synced.lock().inject) - { + if self.shared.inject.close() { self.notify_all(); } } @@ -1493,7 +1477,7 @@ impl Handle { // Drain the injection queue // // We already shut down every task, so we can simply drop the tasks. - while let Some(task) = self.next_remote_task() { + while let Some(task) = self.next_remote_task(0) { drop(task); } } @@ -1512,29 +1496,7 @@ impl Overflow> for Handle { where I: Iterator>>, { - unsafe { - self.shared.inject.push_batch(self, iter); - } - } -} - -pub(crate) struct InjectGuard<'a> { - lock: crate::loom::sync::MutexGuard<'a, Synced>, -} - -impl<'a> AsMut for InjectGuard<'a> { - fn as_mut(&mut self) -> &mut inject::Synced { - &mut self.lock.inject - } -} - -impl<'a> Lock for &'a Handle { - type Handle = InjectGuard<'a>; - - fn lock(self) -> Self::Handle { - InjectGuard { - lock: self.shared.synced.lock(), - } + self.shared.inject.push_batch(iter); } } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs b/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs index 312673034d3..bd057629323 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs @@ -35,12 +35,9 @@ impl Handle { let owned = &self.shared.owned; let mut local = self.shared.steal_all(); - let synced = &self.shared.synced; let injection = &self.shared.inject; - // safety: `trace_multi_thread` is invoked with the same `synced` that `injection` - // was created with. - let traces = unsafe { trace_multi_thread(owned, &mut local, synced, injection) } + let traces = trace_multi_thread(owned, &mut local, injection) .into_iter() .map(|(id, trace)| dump::Task::new(id, trace)) .collect(); diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 5455e1133db..1ac81b56b34 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -351,21 +351,14 @@ pub(in crate::runtime) fn trace_current_thread( } cfg_rt_multi_thread! { - use crate::loom::sync::Mutex; use crate::runtime::scheduler::multi_thread; - use crate::runtime::scheduler::multi_thread::Synced; - use crate::runtime::scheduler::inject::Shared; + use crate::runtime::scheduler::inject::Sharded; /// Trace and poll all tasks of the `current_thread` runtime. - /// - /// ## Safety - /// - /// Must be called with the same `synced` that `injection` was created with. - pub(in crate::runtime) unsafe fn trace_multi_thread( + pub(in crate::runtime) fn trace_multi_thread( owned: &OwnedTasks>, local: &mut multi_thread::queue::Local>, - synced: &Mutex, - injection: &Shared>, + injection: &Sharded>, ) -> Vec<(Id, Trace)> { let mut dequeued = Vec::new(); @@ -375,14 +368,10 @@ cfg_rt_multi_thread! { } // clear the injection queue - let mut synced = synced.lock(); - // Safety: exactly the same safety requirements as `trace_multi_thread` function. - while let Some(notified) = unsafe { injection.pop(&mut synced.inject) } { + while let Some(notified) = injection.pop(0) { dequeued.push(notified); } - drop(synced); - // precondition: we have drained the tasks from the local and injection // queues. trace_owned(owned, dequeued)