From 096b3a4a10095bdb5661c6e9faad4603fa228079 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Thu, 5 Mar 2026 12:09:34 -0500 Subject: [PATCH 01/12] metrics: add task schedule latency metric This new metric tracks the amount of time between when a task is scheduled and when it is polled, also known as queue delay. This duration is recorded in a histogram, just like the poll time metric. This metric is useful for implementing queue management algorithms in systems using tokio. For example, it could be used to implement a generic http load shedder using the CoDel algorithm. --- tokio/src/runtime/builder.rs | 137 +++++++++++++ tokio/src/runtime/config.rs | 3 + tokio/src/runtime/metrics/batch.rs | 24 ++- tokio/src/runtime/metrics/runtime.rs | 184 ++++++++++++++++++ tokio/src/runtime/metrics/worker.rs | 7 + .../runtime/scheduler/current_thread/mod.rs | 51 +++-- .../runtime/scheduler/multi_thread/stats.rs | 4 +- .../runtime/scheduler/multi_thread/worker.rs | 13 +- tokio/src/runtime/task/core.rs | 27 +++ tokio/src/runtime/task/mod.rs | 14 ++ tokio/tests/rt_unstable_metrics.rs | 47 +++++ 11 files changed, 488 insertions(+), 23 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index aea4de7503e..8519cc2027f 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -133,6 +133,10 @@ pub struct Builder { /// Configures the task poll count histogram pub(super) metrics_poll_count_histogram: HistogramBuilder, + pub(super) metrics_schedule_latency_histogram_enable: bool, + + pub(super) metrics_schedule_latency_histogram: HistogramBuilder, + #[cfg(tokio_unstable)] pub(super) unhandled_panic: UnhandledPanic, @@ -323,6 +327,10 @@ impl Builder { metrics_poll_count_histogram: HistogramBuilder::default(), + metrics_schedule_latency_histogram_enable: false, + + metrics_schedule_latency_histogram: HistogramBuilder::default(), + disable_lifo_slot: false, timer_flavor: TimerFlavor::Traditional, @@ -1556,6 +1564,124 @@ impl Builder { self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets); self } + + /// Enables tracking the distribution of task schedule latencies. Task + /// schedule latency is the time between when a task is scheduled for + /// execution and when it is polled. + /// + /// Task schedule latencies are not instrumented by default as doing + /// so requires calling [`Instant::now()`] twice per task poll, which + /// could add measurable overhead. Use the [`Handle::metrics()`] to + /// access the metrics data. + /// + /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. + /// This has an extremely low memory footprint, but may not provide enough granularity. For + /// better granularity with low memory usage, use [`metrics_schedule_latency_histogram_configuration()`] + /// to select [`LogHistogram`] instead. + /// + /// # Examples + /// + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap(); + /// # // Test default values here + /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) } + /// # let m = rt.handle().metrics(); + /// # assert_eq!(m.schedule_latency_histogram_num_buckets(), 10); + /// # assert_eq!(m.schedule_latency_histogram_bucket_range(0), us(0)..us(100)); + /// # assert_eq!(m.schedule_latency_histogram_bucket_range(1), us(100)..us(200)); + /// # } + /// ``` + /// + /// [`Handle::metrics()`]: crate::runtime::Handle::metrics + /// [`Instant::now()`]: std::time::Instant::now + /// [`LogHistogram`]: crate::runtime::LogHistogram + /// [`metrics_schedule_latency_histogram_configuration()`]: Builder::metrics_schedule_latency_histogram_configuration + pub fn enable_metrics_schedule_latency_histogram(&mut self) -> &mut Self { + self.metrics_schedule_latency_histogram_enable = true; + self + } + + /// Configure the histogram for tracking task schedule latencies. + /// + /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. + /// This has an extremely low memory footprint, but may not provide enough granularity. For + /// better granularity with low memory usage, use [`LogHistogram`] instead. + /// + /// # Examples + /// Configure a [`LogHistogram`] with [default configuration]: + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::log(LogHistogram::default()) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// Configure a linear histogram with 100 buckets, each 10μs wide + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use std::time::Duration; + /// use tokio::runtime::HistogramConfiguration; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::linear(Duration::from_micros(10), 100) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// Configure a [`LogHistogram`] with the following settings: + /// - Measure times from 100ns to 120s + /// - Max error of 0.1 + /// - No more than 1024 buckets + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use std::time::Duration; + /// use tokio::runtime; + /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::log(LogHistogram::builder() + /// .max_value(Duration::from_secs(120)) + /// .min_value(Duration::from_nanos(100)) + /// .max_error(0.1) + /// .max_buckets(1024) + /// .expect("configuration uses 488 buckets") + /// ) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// [`LogHistogram`]: crate::runtime::LogHistogram + pub fn metrics_schedule_latency_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { + self.metrics_schedule_latency_histogram.histogram_type = configuration.inner; + self + } } fn build_current_thread_runtime(&mut self) -> io::Result { @@ -1631,6 +1757,8 @@ impl Builder { disable_lifo_slot: self.disable_lifo_slot, seed_generator: seed_generator_1, metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), + metrics_schedule_latency_histogram: self + .metrics_schedule_latency_histogram_builder(), }, local_tid, ); @@ -1649,6 +1777,14 @@ impl Builder { None } } + + fn metrics_schedule_latency_histogram_builder(&self) -> Option { + if self.metrics_schedule_latency_histogram_enable { + Some(self.metrics_schedule_latency_histogram.clone()) + } else { + None + } + } } cfg_io_driver! { @@ -1812,6 +1948,7 @@ cfg_rt_multi_thread! { disable_lifo_slot: self.disable_lifo_slot, seed_generator: seed_generator_1, metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), + metrics_schedule_latency_histogram: self.metrics_schedule_latency_histogram_builder(), }, self.timer_flavor, ); diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index b79df96e1e2..549af9975ab 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -48,6 +48,9 @@ pub(crate) struct Config { /// How to build poll time histograms pub(crate) metrics_poll_count_histogram: Option, + /// How to build schedule latency histograms + pub(crate) metrics_schedule_latency_histogram: Option, + #[cfg(tokio_unstable)] /// How to respond to unhandled task panics. pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index fe2f4a9da4e..afab4a3cdab 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -53,6 +53,9 @@ pub(crate) struct MetricsBatch { #[cfg(tokio_unstable)] /// If `Some`, tracks poll times in nanoseconds poll_timer: Option, + + #[cfg(tokio_unstable)] + schedule_latencies: Option, } cfg_unstable_metrics! { @@ -95,6 +98,10 @@ impl MetricsBatch { poll_started_at: now, }) }); + let schedule_latencies = worker_metrics + .schedule_latency_histogram + .as_ref() + .map(HistogramBatch::from_histogram); MetricsBatch { park_count: 0, park_unpark_count: 0, @@ -108,6 +115,7 @@ impl MetricsBatch { busy_duration_total: 0, processing_scheduled_tasks_started_at: maybe_now, poll_timer, + schedule_latencies, } } } @@ -155,6 +163,11 @@ impl MetricsBatch { let dst = worker.poll_count_histogram.as_ref().unwrap(); poll_timer.poll_counts.submit(dst); } + + if let Some(schedule_latencies) = &self.schedule_latencies { + let dst = worker.schedule_latency_histogram.as_ref().unwrap(); + schedule_latencies.submit(dst); + } } } } @@ -206,15 +219,22 @@ impl MetricsBatch { cfg_metrics_variant! { stable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self) {} + pub(crate) fn start_poll(&mut self, _task_scheduled_at: Option) {} }, unstable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self) { + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { self.poll_count += 1; if let Some(poll_timer) = &mut self.poll_timer { poll_timer.poll_started_at = Instant::now(); } + if let Some(task_scheduled_at) = task_scheduled_at { + if let Some(schedule_latencies) = &mut self.schedule_latencies { + let now = self.poll_timer.as_ref().map(|p| p.poll_started_at).unwrap_or_else(Instant::now); + let elapsed = duration_as_u64(now - task_scheduled_at); + schedule_latencies.measure(elapsed, 1); + } + } } } } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 8aeb608bd02..187d41ed231 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -571,6 +571,124 @@ impl RuntimeMetrics { pub fn blocking_queue_depth(&self) -> usize { self.handle.inner.blocking_queue_depth() } + + /// Returns `true` if the runtime is tracking the distribution of task + /// schedule latencies. + /// + /// Task schedule latencies times are not instrumented by default as doing + /// so requires calling [`Instant::now()`] twice per task poll. The feature + /// is enabled by calling [`enable_metrics_schedule_latency_histogram()`] + /// when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let enabled = metrics.schedule_latency_histogram_enabled(); + /// + /// println!("Tracking task schedule latency distribution: {:?}", enabled); + /// }); + /// } + /// ``` + /// + /// [`enable_metrics_schedule_latency_histogram()`]: crate::runtime::Builder::enable_metrics_schedule_latency_histogram + /// [`Instant::now()`]: std::time::Instant::now + pub fn schedule_latency_histogram_enabled(&self) -> bool { + self.handle.inner.worker_metrics(0).schedule_latency_histogram.is_some() + } + + /// Returns the number of histogram buckets tracking the distribution of + /// task schedule latencies. + /// + /// This value is configured by calling + /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// println!("Histogram buckets: {:?}", buckets); + /// }); + /// } + /// ``` + /// + /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + pub fn schedule_latency_histogram_num_buckets(&self) -> usize { + self.handle + .inner + .worker_metrics(0) + .schedule_latency_histogram + .as_ref() + .map(|histogram| histogram.num_buckets()) + .unwrap_or_default() + } + + /// Returns the range of task schedule latencies tracked by the given bucket. + /// + /// This value is configured by calling + /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. + /// + /// # Panics + /// + /// The method panics if `bucket` represents an invalid bucket index, i.e. + /// is greater than or equal to `schedule_latency_histogram_num_buckets()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// for i in 0..buckets { + /// let range = metrics.schedule_latency_histogram_bucket_range(i); + /// println!("Histogram bucket {} range: {:?}", i, range); + /// } + /// }); + /// } + /// ``` + /// + /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + pub fn schedule_latency_histogram_bucket_range(&self, bucket: usize) -> Range { + self.handle + .inner + .worker_metrics(0) + .schedule_latency_histogram + .as_ref() + .map(|histogram| { + let range = histogram.bucket_range(bucket); + std::ops::Range { + start: Duration::from_nanos(range.start), + end: Duration::from_nanos(range.end), + } + }) + .unwrap_or_default() + } } feature! { @@ -1027,6 +1145,72 @@ impl RuntimeMetrics { .load(Relaxed); Duration::from_nanos(nanos) } + + /// Returns the number of times the given worker polled tasks with a schedule + /// latency within the given bucket's range. + /// + /// Each worker maintains its own histogram and the counts for each bucket + /// starts at zero when the runtime is created. Each time the worker polls a + /// task, it tracks the time elapsed between when the task was scheduled and + /// when it was polled and increments the associated bucket by 1. + /// + /// Each bucket is a monotonically increasing counter. It is never + /// decremented or reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// `bucket` is the index of the bucket being queried. The bucket is scoped + /// to the worker. The range represented by the bucket can be queried by + /// calling [`schedule_latency_histogram_bucket_range()`]. Each worker maintains + /// identical bucket ranges. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()` or if `bucket` represents an + /// invalid bucket. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// for worker in 0..metrics.num_workers() { + /// for i in 0..buckets { + /// let range = metrics.schedule_latency_histogram_bucket_range(i); + /// let count = metrics.schedule_latency_histogram_bucket_count(worker, i); + /// println!("{} tasks encountered a scheduling latency between {}us and {}us", count, range.start.as_micros(), range.end.as_micros()); + /// } + /// } + /// }); + /// } + /// ``` + /// + /// [`schedule_latency_histogram_bucket_range()`]: crate::runtime::RuntimeMetrics::schedule_latency_histogram_bucket_range + #[track_caller] + pub fn schedule_latency_histogram_bucket_count(&self, worker: usize, bucket: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .schedule_latency_histogram + .as_ref() + .map(|histogram| histogram.get(bucket)) + .unwrap_or_default() + } } feature! { diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 30926b2a6c2..7d3f0ce40cd 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -65,6 +65,9 @@ pub(crate) struct WorkerMetrics { #[cfg(tokio_unstable)] /// If `Some`, tracks the number of polls by duration range. pub(super) poll_count_histogram: Option, + + #[cfg(tokio_unstable)] + pub(super) schedule_latency_histogram: Option, } impl WorkerMetrics { @@ -93,6 +96,10 @@ impl WorkerMetrics { .metrics_poll_count_histogram .as_ref() .map(|histogram_builder| histogram_builder.build()); + worker_metrics.schedule_latency_histogram = config + .metrics_schedule_latency_histogram + .as_ref() + .map(|histogram_builder| histogram_builder.build()); worker_metrics } } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 68ab17f1402..e4e1f8d0f9a 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -3,7 +3,8 @@ use crate::loom::sync::Arc; use crate::runtime::driver::{self, Driver}; use crate::runtime::scheduler::{self, Defer, Inject}; use crate::runtime::task::{ - self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks, + self, JoinHandle, LocalNotified, OwnedTasks, Schedule, SpawnLocation, Task, + TaskHarnessScheduleHooks, }; use crate::runtime::{ blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics, @@ -363,11 +364,28 @@ fn wake_deferred_tasks_and_free(context: &Context) { impl Context { /// Execute the closure with the given scheduler core stored in the /// thread-local context. - fn run_task(&self, mut core: Box, f: impl FnOnce() -> R) -> (Box, R) { - core.metrics.start_poll(); - let mut ret = self.enter(core, || crate::task::coop::budget(f)); - ret.0.metrics.end_poll(); - ret + fn run_task(&self, task: LocalNotified>, mut core: Box) -> Box { + #[cfg(tokio_unstable)] + let task_meta = task.task_meta(); + + #[cfg(tokio_unstable)] + core.metrics.start_poll(task.get_scheduled_at()); + #[cfg(not(tokio_unstable))] + core.metrics.start_poll(None); + + let (mut c, ()) = self.enter(core, || { + crate::task::coop::budget(|| { + #[cfg(tokio_unstable)] + self.handle.task_hooks.poll_start_callback(&task_meta); + + task.run(); + + #[cfg(tokio_unstable)] + self.handle.task_hooks.poll_stop_callback(&task_meta); + }) + }); + c.metrics.end_poll(); + c } /// Blocks the current thread until an event is received by the driver, @@ -657,6 +675,14 @@ impl Schedule for Arc { fn schedule(&self, task: task::Notified) { use scheduler::Context::CurrentThread; + // SAFETY: There are no concurrent writes because tasks cannot be scheduled in + // multiple places concurrently. There are no concurrent reads because this field + // is only read when polling the task, which can only happen after it's scheduled. + #[cfg(tokio_unstable)] + unsafe { + task.set_scheduled_at(std::time::Instant::now()); + } + context::with_scheduler(|maybe_cx| match maybe_cx { Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { let mut core = cx.core.borrow_mut(); @@ -806,18 +832,7 @@ impl CoreGuard<'_> { let task = context.handle.shared.owned.assert_owner(task); - #[cfg(tokio_unstable)] - let task_meta = task.task_meta(); - - let (c, ()) = context.run_task(core, || { - #[cfg(tokio_unstable)] - context.handle.task_hooks.poll_start_callback(&task_meta); - - task.run(); - - #[cfg(tokio_unstable)] - context.handle.task_hooks.poll_stop_callback(&task_meta); - }); + let c = context.run_task(task, core); core = c; } diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index c59d4373ab8..840d88c66f0 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -113,8 +113,8 @@ impl Stats { } } - pub(crate) fn start_poll(&mut self) { - self.batch.start_poll(); + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { + self.batch.start_poll(task_scheduled_at); self.tasks_polled_in_batch += 1; } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 72bdc2bd31c..c51d7e60aac 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -629,7 +629,10 @@ impl Context { // tasks under this measurement. In this case, the tasks came from the // LIFO slot and are considered part of the current task for scheduling // purposes. These tasks inherent the "parent"'s limits. - core.stats.start_poll(); + #[cfg(tokio_unstable)] + core.stats.start_poll(task.get_scheduled_at()); + #[cfg(not(tokio_unstable))] + core.stats.start_poll(None); // Make the core available to the runtime context *self.core.borrow_mut() = Some(core); @@ -1271,6 +1274,14 @@ impl Worker { impl Handle { pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { + // SAFETY: There are no concurrent writes because tasks cannot be scheduled in + // multiple places concurrently. There are no concurrent reads because this field + // is only read when polling the task, which can only happen after it's scheduled. + #[cfg(tokio_unstable)] + unsafe { + task.set_scheduled_at(std::time::Instant::now()); + } + with_current(|maybe_cx| { if let Some(cx) = maybe_cx { // Make sure the task is part of the **current** scheduler. diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index aa3f61a2217..c94bcc3e991 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -32,6 +32,8 @@ use std::panic::Location; use std::pin::Pin; use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; +#[cfg(tokio_unstable)] +use std::time::Instant; /// The task cell. Contains the components of the task. /// @@ -191,6 +193,10 @@ pub(crate) struct Header { /// The tracing ID for this instrumented task. #[cfg(all(tokio_unstable, feature = "tracing"))] pub(super) tracing_id: Option, + + /// The last time this task was scheduled. Used to measure schedule latency. + #[cfg(tokio_unstable)] + pub(super) scheduled_at: UnsafeCell>, } unsafe impl Send for Header {} @@ -247,6 +253,8 @@ impl Cell { owner_id: UnsafeCell::new(None), #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, + #[cfg(tokio_unstable)] + scheduled_at: UnsafeCell::new(None), } } @@ -534,6 +542,25 @@ impl Header { pub(super) unsafe fn get_tracing_id(me: &NonNull
) -> Option<&tracing::Id> { me.as_ref().tracing_id.as_ref() } + + /// Updates the last time this task was scheduled. Used to calculate + /// the time elapsed between task scheduling and polling. + /// + /// # Safety + /// + /// The caller must guarantee exclusive access to this field. + #[cfg(tokio_unstable)] + pub(super) unsafe fn set_scheduled_at(&self, now: Instant) { + self.scheduled_at.with_mut(|ptr| *ptr = Some(now)); + } + + /// Gets the last time this task was scheduled. + #[cfg(tokio_unstable)] + pub(super) fn get_scheduled_at(&self) -> Option { + // Safety: If there are concurrent writes, then that write has violated + // the safety requirements on `set_scheduled_at`. + unsafe { self.scheduled_at.with(|ptr| *ptr) } + } } impl Trailer { diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 53c477d52de..2689d6bd1f7 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -225,6 +225,8 @@ use crate::runtime::TaskCallback; use std::marker::PhantomData; use std::panic::Location; use std::ptr::NonNull; +#[cfg(tokio_unstable)] +use std::time::Instant; use std::{fmt, mem}; /// An owned handle to the task, tracked by ref count. @@ -247,6 +249,13 @@ impl Notified { pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> { self.0.task_meta() } + + #[cfg(tokio_unstable)] + pub(crate) unsafe fn set_scheduled_at(&self, now: Instant) { + unsafe { + self.0.header().set_scheduled_at(now); + } + } } // safety: This type cannot be used to touch the task without first verifying @@ -268,6 +277,11 @@ impl LocalNotified { pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> { self.task.task_meta() } + + #[cfg(tokio_unstable)] + pub(crate) fn get_scheduled_at(&self) -> Option { + self.task.header().get_scheduled_at() + } } /// A task that is not owned by any `OwnedTasks`. Used for blocking tasks. diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index b6de3159134..1086e14f206 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -800,6 +800,53 @@ fn io_driver_ready_count() { assert_eq!(metrics.io_driver_ready_count(), 1); } +#[test] +fn schedule_latency_counts() { + const N: u64 = 50; + let rts = [ + tokio::runtime::Builder::new_current_thread() + .enable_all() + .enable_metrics_schedule_latency_histogram() + .metrics_schedule_latency_histogram_configuration(HistogramConfiguration::linear( + Duration::from_millis(50), + 3, + )) + .build() + .unwrap(), + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .enable_metrics_schedule_latency_histogram() + .metrics_schedule_latency_histogram_configuration(HistogramConfiguration::linear( + Duration::from_millis(50), + 3, + )) + .build() + .unwrap(), + ]; + + for rt in rts { + let metrics = rt.metrics(); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async {}).await.unwrap(); + } + }); + drop(rt); + + let num_workers = metrics.num_workers(); + let num_buckets = metrics.schedule_latency_histogram_num_buckets(); + + assert!(metrics.schedule_latency_histogram_enabled()); + assert_eq!(num_buckets, 3); + + let n = (0..num_workers) + .flat_map(|i| (0..num_buckets).map(move |j| (i, j))) + .map(|(worker, bucket)| metrics.schedule_latency_histogram_bucket_count(worker, bucket)) + .sum(); + assert_eq!(N, n); + } +} + async fn try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError> { // We use a blocking channel to synchronize the tasks. let (tx, rx) = mpsc::channel(); From a355889cba25b8e98b649b18dc55d44b1b29ca40 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 25 Mar 2026 11:12:09 -0400 Subject: [PATCH 02/12] metrics: clarify schedule latency docs --- tokio/src/runtime/builder.rs | 11 ++++++++--- tokio/src/runtime/metrics/runtime.rs | 8 ++++---- tokio/src/runtime/metrics/worker.rs | 1 + 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 8519cc2027f..c04a44a9d6e 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1570,9 +1570,9 @@ impl Builder { /// execution and when it is polled. /// /// Task schedule latencies are not instrumented by default as doing - /// so requires calling [`Instant::now()`] twice per task poll, which - /// could add measurable overhead. Use the [`Handle::metrics()`] to - /// access the metrics data. + /// so requires calling [`Instant::now()`] when a task is scheduled + /// and when it is polled, which could add measurable overhead. Use + /// the [`Handle::metrics()`] to access the metrics data. /// /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. /// This has an extremely low memory footprint, but may not provide enough granularity. For @@ -1610,6 +1610,10 @@ impl Builder { /// Configure the histogram for tracking task schedule latencies. /// + /// Tracking of task schedule latencies must be enabled with + /// [`enable_metrics_schedule_latency_histogram()`] for this function + /// to have any effect. + /// /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. /// This has an extremely low memory footprint, but may not provide enough granularity. For /// better granularity with low memory usage, use [`LogHistogram`] instead. @@ -1678,6 +1682,7 @@ impl Builder { /// ``` /// /// [`LogHistogram`]: crate::runtime::LogHistogram + /// [`enable_metrics_schedule_latency_histogram()`]: Builder::enable_metrics_schedule_latency_histogram pub fn metrics_schedule_latency_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { self.metrics_schedule_latency_histogram.histogram_type = configuration.inner; self diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 187d41ed231..e14eddfc68e 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -575,10 +575,10 @@ impl RuntimeMetrics { /// Returns `true` if the runtime is tracking the distribution of task /// schedule latencies. /// - /// Task schedule latencies times are not instrumented by default as doing - /// so requires calling [`Instant::now()`] twice per task poll. The feature - /// is enabled by calling [`enable_metrics_schedule_latency_histogram()`] - /// when building the runtime. + /// Task schedule latencies are not instrumented by default as doing so + /// requires calling [`Instant::now()`] when a task is scheduled and when + /// it is polled. The feature is enabled by calling + /// [`enable_metrics_schedule_latency_histogram()`] when building the runtime. /// /// # Examples /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 7d3f0ce40cd..e65c6e335de 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -67,6 +67,7 @@ pub(crate) struct WorkerMetrics { pub(super) poll_count_histogram: Option, #[cfg(tokio_unstable)] + /// If `Some`, tracks the number of times tasks were scheduled by duration range. pub(super) schedule_latency_histogram: Option, } From eb3cd6e6101ef6553c36732da6bf258cbcccc56c Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 25 Mar 2026 11:13:26 -0400 Subject: [PATCH 03/12] metrics: restrict supported platforms for schedule latency --- tokio/src/runtime/metrics/batch.rs | 14 +++++++---- .../runtime/scheduler/current_thread/mod.rs | 23 ++++++++++++------- .../runtime/scheduler/multi_thread/worker.rs | 23 ++++++++++++------- tokio/src/runtime/task/core.rs | 11 +++++---- tokio/src/runtime/task/mod.rs | 4 ++-- tokio/tests/rt_unstable_metrics.rs | 1 + 6 files changed, 49 insertions(+), 27 deletions(-) diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index afab4a3cdab..acace513fa5 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -98,10 +98,13 @@ impl MetricsBatch { poll_started_at: now, }) }); - let schedule_latencies = worker_metrics + // Schedule latencies cannot be tracked if `Instant::now()` is unavailable + let schedule_latencies = maybe_now.and_then(|_| { + worker_metrics .schedule_latency_histogram .as_ref() - .map(HistogramBatch::from_histogram); + .map(HistogramBatch::from_histogram) + }); MetricsBatch { park_count: 0, park_unpark_count: 0, @@ -230,9 +233,10 @@ impl MetricsBatch { } if let Some(task_scheduled_at) = task_scheduled_at { if let Some(schedule_latencies) = &mut self.schedule_latencies { - let now = self.poll_timer.as_ref().map(|p| p.poll_started_at).unwrap_or_else(Instant::now); - let elapsed = duration_as_u64(now - task_scheduled_at); - schedule_latencies.measure(elapsed, 1); + if let Some(now) = self.poll_timer.as_ref().map(|p| p.poll_started_at).or_else(now) { + let elapsed = duration_as_u64(now.saturating_duration_since(task_scheduled_at)); + schedule_latencies.measure(elapsed, 1); + } } } } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index e4e1f8d0f9a..98ef69d0b34 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -368,9 +368,9 @@ impl Context { #[cfg(tokio_unstable)] let task_meta = task.task_meta(); - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] core.metrics.start_poll(task.get_scheduled_at()); - #[cfg(not(tokio_unstable))] + #[cfg(not(all(tokio_unstable, target_has_atomic = "64")))] core.metrics.start_poll(None); let (mut c, ()) = self.enter(core, || { @@ -675,12 +675,19 @@ impl Schedule for Arc { fn schedule(&self, task: task::Notified) { use scheduler::Context::CurrentThread; - // SAFETY: There are no concurrent writes because tasks cannot be scheduled in - // multiple places concurrently. There are no concurrent reads because this field - // is only read when polling the task, which can only happen after it's scheduled. - #[cfg(tokio_unstable)] - unsafe { - task.set_scheduled_at(std::time::Instant::now()); + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] + if self + .shared + .config + .metrics_schedule_latency_histogram + .is_some() + { + // SAFETY: There are no concurrent writes because tasks cannot be scheduled in + // multiple places concurrently. There are no concurrent reads because this field + // is only read when polling the task, which can only happen after it's scheduled. + unsafe { + task.set_scheduled_at(std::time::Instant::now()); + } } context::with_scheduler(|maybe_cx| match maybe_cx { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index c51d7e60aac..e03793ee9f3 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -629,9 +629,9 @@ impl Context { // tasks under this measurement. In this case, the tasks came from the // LIFO slot and are considered part of the current task for scheduling // purposes. These tasks inherent the "parent"'s limits. - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] core.stats.start_poll(task.get_scheduled_at()); - #[cfg(not(tokio_unstable))] + #[cfg(not(all(tokio_unstable, target_has_atomic = "64")))] core.stats.start_poll(None); // Make the core available to the runtime context @@ -1274,12 +1274,19 @@ impl Worker { impl Handle { pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { - // SAFETY: There are no concurrent writes because tasks cannot be scheduled in - // multiple places concurrently. There are no concurrent reads because this field - // is only read when polling the task, which can only happen after it's scheduled. - #[cfg(tokio_unstable)] - unsafe { - task.set_scheduled_at(std::time::Instant::now()); + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] + if self + .shared + .config + .metrics_schedule_latency_histogram + .is_some() + { + // SAFETY: There are no concurrent writes because tasks cannot be scheduled in + // multiple places concurrently. There are no concurrent reads because this field + // is only read when polling the task, which can only happen after it's scheduled. + unsafe { + task.set_scheduled_at(std::time::Instant::now()); + } } with_current(|maybe_cx| { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index c94bcc3e991..04871f372e1 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -195,7 +195,10 @@ pub(crate) struct Header { pub(super) tracing_id: Option, /// The last time this task was scheduled. Used to measure schedule latency. - #[cfg(tokio_unstable)] + /// + /// Only enabled when the target supports 64-bit atomics because the metric + /// that uses this field also requires 64-bit atomics. + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] pub(super) scheduled_at: UnsafeCell>, } @@ -253,7 +256,7 @@ impl Cell { owner_id: UnsafeCell::new(None), #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] scheduled_at: UnsafeCell::new(None), } } @@ -549,13 +552,13 @@ impl Header { /// # Safety /// /// The caller must guarantee exclusive access to this field. - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] pub(super) unsafe fn set_scheduled_at(&self, now: Instant) { self.scheduled_at.with_mut(|ptr| *ptr = Some(now)); } /// Gets the last time this task was scheduled. - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] pub(super) fn get_scheduled_at(&self) -> Option { // Safety: If there are concurrent writes, then that write has violated // the safety requirements on `set_scheduled_at`. diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 2689d6bd1f7..0da8d13d4d6 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -250,7 +250,7 @@ impl Notified { self.0.task_meta() } - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] pub(crate) unsafe fn set_scheduled_at(&self, now: Instant) { unsafe { self.0.header().set_scheduled_at(now); @@ -278,7 +278,7 @@ impl LocalNotified { self.task.task_meta() } - #[cfg(tokio_unstable)] + #[cfg(all(tokio_unstable, target_has_atomic = "64"))] pub(crate) fn get_scheduled_at(&self) -> Option { self.task.header().get_scheduled_at() } diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 1086e14f206..05ef9f05fcb 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -814,6 +814,7 @@ fn schedule_latency_counts() { .build() .unwrap(), tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) .enable_all() .enable_metrics_schedule_latency_histogram() .metrics_schedule_latency_histogram_configuration(HistogramConfiguration::linear( From 46db3a285c9153017fab946eca9e367f59ea0b03 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 25 Mar 2026 11:55:56 -0400 Subject: [PATCH 04/12] fix unused import on targets without 64-bit atomics --- tokio/src/runtime/task/core.rs | 2 +- tokio/src/runtime/task/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 04871f372e1..72c35c1c7cb 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -32,7 +32,7 @@ use std::panic::Location; use std::pin::Pin; use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; -#[cfg(tokio_unstable)] +#[cfg(all(tokio_unstable, target_has_atomic = "64"))] use std::time::Instant; /// The task cell. Contains the components of the task. diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 0da8d13d4d6..a2fe47b378c 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -225,7 +225,7 @@ use crate::runtime::TaskCallback; use std::marker::PhantomData; use std::panic::Location; use std::ptr::NonNull; -#[cfg(tokio_unstable)] +#[cfg(all(tokio_unstable, target_has_atomic = "64"))] use std::time::Instant; use std::{fmt, mem}; From 804b42a38fefff9edc8bea40223e6117980f4dc7 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 25 Mar 2026 13:24:56 -0400 Subject: [PATCH 05/12] make Notified::set_scheduled_at safe --- tokio/src/runtime/scheduler/current_thread/mod.rs | 7 +------ tokio/src/runtime/scheduler/multi_thread/worker.rs | 7 +------ tokio/src/runtime/task/mod.rs | 5 ++++- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 98ef69d0b34..a917ee48790 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -682,12 +682,7 @@ impl Schedule for Arc { .metrics_schedule_latency_histogram .is_some() { - // SAFETY: There are no concurrent writes because tasks cannot be scheduled in - // multiple places concurrently. There are no concurrent reads because this field - // is only read when polling the task, which can only happen after it's scheduled. - unsafe { - task.set_scheduled_at(std::time::Instant::now()); - } + task.set_scheduled_at(std::time::Instant::now()); } context::with_scheduler(|maybe_cx| match maybe_cx { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index e03793ee9f3..8e3bd88db20 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -1281,12 +1281,7 @@ impl Handle { .metrics_schedule_latency_histogram .is_some() { - // SAFETY: There are no concurrent writes because tasks cannot be scheduled in - // multiple places concurrently. There are no concurrent reads because this field - // is only read when polling the task, which can only happen after it's scheduled. - unsafe { - task.set_scheduled_at(std::time::Instant::now()); - } + task.set_scheduled_at(std::time::Instant::now()); } with_current(|maybe_cx| { diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index a2fe47b378c..aa514a0b21f 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -251,7 +251,10 @@ impl Notified { } #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(crate) unsafe fn set_scheduled_at(&self, now: Instant) { + pub(crate) fn set_scheduled_at(&self, now: Instant) { + // SAFETY: There are no concurrent writes because there is only ever one `Notified` + // reference per task. There are no concurrent reads because this field is only read + // when polling the task, which can only happen after it's scheduled. unsafe { self.0.header().set_scheduled_at(now); } From f409f157d0d34a055114a87c6c9dc356b06b6f8f Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Fri, 27 Mar 2026 14:17:45 -0400 Subject: [PATCH 06/12] use u64 for scheduled_at instead of Instant --- tokio/src/runtime/builder.rs | 2 ++ tokio/src/runtime/metrics/batch.rs | 16 +++++++--- tokio/src/runtime/metrics/runtime.rs | 7 +++++ .../runtime/scheduler/current_thread/mod.rs | 27 +++++++++++++---- .../runtime/scheduler/multi_thread/stats.rs | 2 +- .../runtime/scheduler/multi_thread/worker.rs | 29 +++++++++++++++---- tokio/src/runtime/task/core.rs | 29 +++++++++++-------- tokio/src/runtime/task/mod.rs | 14 ++++----- 8 files changed, 92 insertions(+), 34 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index c04a44a9d6e..487bb82b1e8 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1569,6 +1569,8 @@ impl Builder { /// schedule latency is the time between when a task is scheduled for /// execution and when it is polled. /// + /// **This feature is only supported on 64-bit targets.** + /// /// Task schedule latencies are not instrumented by default as doing /// so requires calling [`Instant::now()`] when a task is scheduled /// and when it is polled, which could add measurable overhead. Use diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index acace513fa5..d6247c8120d 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -222,19 +222,27 @@ impl MetricsBatch { cfg_metrics_variant! { stable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self, _task_scheduled_at: Option) {} + pub(crate) fn start_poll(&mut self, _task_scheduled_at: Option<(Instant, u64)>) {} }, unstable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { + /// + /// # Arguments + /// + /// `task_scheduled_at` is an optional tuple containing the Instant the scheduler + /// was started and the number of nanoseconds elapsed between that instant and + /// the time the task being polled was scheduled. + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option<(Instant, u64)>) { self.poll_count += 1; if let Some(poll_timer) = &mut self.poll_timer { poll_timer.poll_started_at = Instant::now(); } - if let Some(task_scheduled_at) = task_scheduled_at { + if let Some((runtime_started_at, task_scheduled_at)) = task_scheduled_at { if let Some(schedule_latencies) = &mut self.schedule_latencies { if let Some(now) = self.poll_timer.as_ref().map(|p| p.poll_started_at).or_else(now) { - let elapsed = duration_as_u64(now.saturating_duration_since(task_scheduled_at)); + // `u64::MAX` as nanoseconds is equal to 584 years + let nanos_since_start = now.saturating_duration_since(runtime_started_at).as_nanos() as u64; + let elapsed = nanos_since_start.saturating_sub(task_scheduled_at); schedule_latencies.measure(elapsed, 1); } } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index e14eddfc68e..c96b70cc6e6 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1145,7 +1145,14 @@ impl RuntimeMetrics { .load(Relaxed); Duration::from_nanos(nanos) } + } + feature! { + #![all( + tokio_unstable, + target_has_atomic = "64", + target_pointer_width = "64" + )] /// Returns the number of times the given worker polled tasks with a schedule /// latency within the given bucket's range. /// diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index a917ee48790..f28154b7d7d 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -21,6 +21,8 @@ use std::task::Poll::{Pending, Ready}; use std::task::Waker; use std::thread::ThreadId; use std::time::Duration; +#[cfg(all(tokio_unstable, target_pointer_width = "64"))] +use std::time::Instant; use std::{fmt, thread}; /// Executes tasks on the current thread @@ -98,6 +100,12 @@ struct Shared { /// This scheduler only has one worker. worker_metrics: WorkerMetrics, + + /// Startup time of this scheduler. + /// + /// This instant is used as the basis of task `scheduled_at` measurements. + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + started_at: Instant, } /// Thread-local context. @@ -158,6 +166,8 @@ impl CurrentThread { config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics, + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + started_at: Instant::now(), }, driver: driver_handle, blocking_spawner, @@ -368,9 +378,12 @@ impl Context { #[cfg(tokio_unstable)] let task_meta = task.task_meta(); - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - core.metrics.start_poll(task.get_scheduled_at()); - #[cfg(not(all(tokio_unstable, target_has_atomic = "64")))] + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + core.metrics.start_poll( + task.get_scheduled_at() + .map(|t| (self.handle.shared.started_at, t.get())), + ); + #[cfg(not(all(tokio_unstable, target_pointer_width = "64")))] core.metrics.start_poll(None); let (mut c, ()) = self.enter(core, || { @@ -675,14 +688,18 @@ impl Schedule for Arc { fn schedule(&self, task: task::Notified) { use scheduler::Context::CurrentThread; - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] if self .shared .config .metrics_schedule_latency_histogram .is_some() { - task.set_scheduled_at(std::time::Instant::now()); + // SAFETY: `.max(1)` ensures the value can never be 0. + let scheduled_at = unsafe { + NonZeroU64::new_unchecked(self.shared.started_at.elapsed().as_nanos().max(1) as u64) + }; + task.set_scheduled_at(scheduled_at); } context::with_scheduler(|maybe_cx| match maybe_cx { diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index 840d88c66f0..ab3aa3400ba 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -113,7 +113,7 @@ impl Stats { } } - pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option<(Instant, u64)>) { self.batch.start_poll(task_scheduled_at); self.tasks_polled_in_batch += 1; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 8e3bd88db20..62831762ee4 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -75,6 +75,8 @@ use std::cell::RefCell; use std::task::Waker; use std::thread; use std::time::Duration; +#[cfg(all(tokio_unstable, target_pointer_width = "64"))] +use std::time::Instant; mod metrics; @@ -193,6 +195,12 @@ pub(crate) struct Shared { pub(super) worker_metrics: Box<[WorkerMetrics]>, + /// Startup time of this scheduler. + /// + /// This instant is used as the basis of task `scheduled_at` measurements. + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + started_at: Instant, + /// Only held to trigger some code on drop. This is used to get internal /// runtime metrics that can be useful when doing performance /// investigations. This does nothing (empty struct, no drop impl) unless @@ -318,6 +326,8 @@ pub(super) fn create( config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: worker_metrics.into_boxed_slice(), + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + started_at: Instant::now(), _counters: Counters, }, driver: driver_handle, @@ -629,9 +639,12 @@ impl Context { // tasks under this measurement. In this case, the tasks came from the // LIFO slot and are considered part of the current task for scheduling // purposes. These tasks inherent the "parent"'s limits. - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - core.stats.start_poll(task.get_scheduled_at()); - #[cfg(not(all(tokio_unstable, target_has_atomic = "64")))] + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + core.stats.start_poll( + task.get_scheduled_at() + .map(|t| (self.worker.handle.shared.started_at, t.get())), + ); + #[cfg(not(all(tokio_unstable, target_pointer_width = "64")))] core.stats.start_poll(None); // Make the core available to the runtime context @@ -1274,14 +1287,20 @@ impl Worker { impl Handle { pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] if self .shared .config .metrics_schedule_latency_histogram .is_some() { - task.set_scheduled_at(std::time::Instant::now()); + // SAFETY: `.max(1)` ensures the value can never be 0. + let scheduled_at = unsafe { + std::num::NonZeroU64::new_unchecked( + self.shared.started_at.elapsed().as_nanos().max(1) as u64, + ) + }; + task.set_scheduled_at(scheduled_at); } with_current(|maybe_cx| { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 72c35c1c7cb..9624e20cef7 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -32,8 +32,6 @@ use std::panic::Location; use std::pin::Pin; use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; -#[cfg(all(tokio_unstable, target_has_atomic = "64"))] -use std::time::Instant; /// The task cell. Contains the components of the task. /// @@ -195,11 +193,12 @@ pub(crate) struct Header { pub(super) tracing_id: Option, /// The last time this task was scheduled. Used to measure schedule latency. + /// Stored as the number of nanoseconds since scheduler startup. /// - /// Only enabled when the target supports 64-bit atomics because the metric - /// that uses this field also requires 64-bit atomics. - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(super) scheduled_at: UnsafeCell>, + /// Only enabled on 64-bit targets because this field extends the size of this + /// struct beyond the size of one cache line on 32-bit targets. + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + pub(super) scheduled_at: UnsafeCell>, } unsafe impl Send for Header {} @@ -256,7 +255,7 @@ impl Cell { owner_id: UnsafeCell::new(None), #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] scheduled_at: UnsafeCell::new(None), } } @@ -549,17 +548,23 @@ impl Header { /// Updates the last time this task was scheduled. Used to calculate /// the time elapsed between task scheduling and polling. /// + /// + /// # Arguments + /// + /// `nanos` is the number of nanoseconds elapsed since the scheduler + /// was started. + /// /// # Safety /// /// The caller must guarantee exclusive access to this field. - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(super) unsafe fn set_scheduled_at(&self, now: Instant) { - self.scheduled_at.with_mut(|ptr| *ptr = Some(now)); + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + pub(super) unsafe fn set_scheduled_at(&self, nanos: NonZeroU64) { + self.scheduled_at.with_mut(|ptr| *ptr = Some(nanos)); } /// Gets the last time this task was scheduled. - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(super) fn get_scheduled_at(&self) -> Option { + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + pub(super) fn get_scheduled_at(&self) -> Option { // Safety: If there are concurrent writes, then that write has violated // the safety requirements on `set_scheduled_at`. unsafe { self.scheduled_at.with(|ptr| *ptr) } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index aa514a0b21f..dfce849561b 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -223,10 +223,10 @@ use crate::util::sharded_list; use crate::runtime::TaskCallback; use std::marker::PhantomData; +#[cfg(all(tokio_unstable, target_pointer_width = "64"))] +use std::num::NonZeroU64; use std::panic::Location; use std::ptr::NonNull; -#[cfg(all(tokio_unstable, target_has_atomic = "64"))] -use std::time::Instant; use std::{fmt, mem}; /// An owned handle to the task, tracked by ref count. @@ -250,13 +250,13 @@ impl Notified { self.0.task_meta() } - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(crate) fn set_scheduled_at(&self, now: Instant) { + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + pub(crate) fn set_scheduled_at(&self, nanos: NonZeroU64) { // SAFETY: There are no concurrent writes because there is only ever one `Notified` // reference per task. There are no concurrent reads because this field is only read // when polling the task, which can only happen after it's scheduled. unsafe { - self.0.header().set_scheduled_at(now); + self.0.header().set_scheduled_at(nanos); } } } @@ -281,8 +281,8 @@ impl LocalNotified { self.task.task_meta() } - #[cfg(all(tokio_unstable, target_has_atomic = "64"))] - pub(crate) fn get_scheduled_at(&self) -> Option { + #[cfg(all(tokio_unstable, target_pointer_width = "64"))] + pub(crate) fn get_scheduled_at(&self) -> Option { self.task.header().get_scheduled_at() } } From cc3ba46d80197e0040345da2915096c5bf01913e Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Mon, 30 Mar 2026 10:21:43 -0400 Subject: [PATCH 07/12] Only run schedule latency test on 64-bit targets --- tokio/tests/rt_unstable_metrics.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 05ef9f05fcb..a238b78ca02 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -800,6 +800,8 @@ fn io_driver_ready_count() { assert_eq!(metrics.io_driver_ready_count(), 1); } +// Schedule latency tracking is only supported on 64-bit targets +#[cfg(target_pointer_width = "64")] #[test] fn schedule_latency_counts() { const N: u64 = 50; From 68522a1ca9aa09743b90355b0a3f87315a1eabcf Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 1 Apr 2026 15:07:41 -0400 Subject: [PATCH 08/12] Improve docs and fix naming --- tokio/src/runtime/builder.rs | 10 ++++++---- tokio/src/runtime/metrics/runtime.rs | 1 + 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 487bb82b1e8..9ff2ebadd30 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -133,8 +133,10 @@ pub struct Builder { /// Configures the task poll count histogram pub(super) metrics_poll_count_histogram: HistogramBuilder, - pub(super) metrics_schedule_latency_histogram_enable: bool, + /// When true, enables task schedule latency instrumentation. + pub(super) metrics_schedule_latency_histogram_enabled: bool, + /// Configures the task schedule latency histogram. pub(super) metrics_schedule_latency_histogram: HistogramBuilder, #[cfg(tokio_unstable)] @@ -327,7 +329,7 @@ impl Builder { metrics_poll_count_histogram: HistogramBuilder::default(), - metrics_schedule_latency_histogram_enable: false, + metrics_schedule_latency_histogram_enabled: false, metrics_schedule_latency_histogram: HistogramBuilder::default(), @@ -1606,7 +1608,7 @@ impl Builder { /// [`LogHistogram`]: crate::runtime::LogHistogram /// [`metrics_schedule_latency_histogram_configuration()`]: Builder::metrics_schedule_latency_histogram_configuration pub fn enable_metrics_schedule_latency_histogram(&mut self) -> &mut Self { - self.metrics_schedule_latency_histogram_enable = true; + self.metrics_schedule_latency_histogram_enabled = true; self } @@ -1786,7 +1788,7 @@ impl Builder { } fn metrics_schedule_latency_histogram_builder(&self) -> Option { - if self.metrics_schedule_latency_histogram_enable { + if self.metrics_schedule_latency_histogram_enabled { Some(self.metrics_schedule_latency_histogram.clone()) } else { None diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index c96b70cc6e6..cab257fc304 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -674,6 +674,7 @@ impl RuntimeMetrics { /// ``` /// /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + #[track_caller] pub fn schedule_latency_histogram_bucket_range(&self, bucket: usize) -> Range { self.handle .inner From 475cf18118b4ad4b8670d06ea7329d252899b653 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Tue, 7 Apr 2026 10:41:41 -0400 Subject: [PATCH 09/12] Consolidate schedule latency logic into ScheduleLatencyInstant --- tokio/src/runtime/metrics/batch.rs | 17 ++- .../runtime/scheduler/current_thread/mod.rs | 25 ++-- .../runtime/scheduler/multi_thread/stats.rs | 3 +- .../runtime/scheduler/multi_thread/worker.rs | 29 ++--- tokio/src/runtime/task/core.rs | 25 +--- tokio/src/runtime/task/mod.rs | 107 ++++++++++++++++-- 6 files changed, 136 insertions(+), 70 deletions(-) diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index d6247c8120d..d84854cc066 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -4,6 +4,7 @@ cfg_unstable_metrics! { use crate::runtime::metrics::HistogramBatch; } +use crate::runtime::task::schedule_latency::ScheduleLatencyContext; use std::sync::atomic::Ordering::Relaxed; use std::time::{Duration, Instant}; @@ -222,27 +223,25 @@ impl MetricsBatch { cfg_metrics_variant! { stable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self, _task_scheduled_at: Option<(Instant, u64)>) {} + pub(crate) fn start_poll(&mut self, _task_scheduled_at: Option) {} }, unstable: { /// Start polling an individual task /// /// # Arguments /// - /// `task_scheduled_at` is an optional tuple containing the Instant the scheduler - /// was started and the number of nanoseconds elapsed between that instant and - /// the time the task being polled was scheduled. - pub(crate) fn start_poll(&mut self, task_scheduled_at: Option<(Instant, u64)>) { + /// `task_scheduled_at` is used to calculate task schedule latency. + /// A `ScheduleLatencyContext` can be obtained by calling `prepare` on a task's + /// `ScheduleLatencyInstant`. + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { self.poll_count += 1; if let Some(poll_timer) = &mut self.poll_timer { poll_timer.poll_started_at = Instant::now(); } - if let Some((runtime_started_at, task_scheduled_at)) = task_scheduled_at { + if let Some(task_scheduled_at) = task_scheduled_at { if let Some(schedule_latencies) = &mut self.schedule_latencies { if let Some(now) = self.poll_timer.as_ref().map(|p| p.poll_started_at).or_else(now) { - // `u64::MAX` as nanoseconds is equal to 584 years - let nanos_since_start = now.saturating_duration_since(runtime_started_at).as_nanos() as u64; - let elapsed = nanos_since_start.saturating_sub(task_scheduled_at); + let elapsed = task_scheduled_at.elapsed_nanos(now); schedule_latencies.measure(elapsed, 1); } } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index f28154b7d7d..868f6145166 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -21,7 +21,6 @@ use std::task::Poll::{Pending, Ready}; use std::task::Waker; use std::thread::ThreadId; use std::time::Duration; -#[cfg(all(tokio_unstable, target_pointer_width = "64"))] use std::time::Instant; use std::{fmt, thread}; @@ -104,8 +103,7 @@ struct Shared { /// Startup time of this scheduler. /// /// This instant is used as the basis of task `scheduled_at` measurements. - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] - started_at: Instant, + started_at: Option, } /// Thread-local context. @@ -150,6 +148,11 @@ impl CurrentThread { .global_queue_interval .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL); + let started_at = config + .metrics_schedule_latency_histogram + .as_ref() + .map(|_| Instant::now()); + let handle = Arc::new(Handle { task_hooks: TaskHooks { task_spawn_callback: config.before_spawn.clone(), @@ -166,8 +169,7 @@ impl CurrentThread { config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics, - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] - started_at: Instant::now(), + started_at, }, driver: driver_handle, blocking_spawner, @@ -378,13 +380,10 @@ impl Context { #[cfg(tokio_unstable)] let task_meta = task.task_meta(); - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] core.metrics.start_poll( task.get_scheduled_at() - .map(|t| (self.handle.shared.started_at, t.get())), + .prepare(self.handle.shared.started_at), ); - #[cfg(not(all(tokio_unstable, target_pointer_width = "64")))] - core.metrics.start_poll(None); let (mut c, ()) = self.enter(core, || { crate::task::coop::budget(|| { @@ -664,6 +663,7 @@ cfg_unstable_metrics! { } } +use crate::runtime::task::schedule_latency::ScheduleLatencyInstant; use std::num::NonZeroU64; impl Handle { @@ -688,18 +688,13 @@ impl Schedule for Arc { fn schedule(&self, task: task::Notified) { use scheduler::Context::CurrentThread; - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] if self .shared .config .metrics_schedule_latency_histogram .is_some() { - // SAFETY: `.max(1)` ensures the value can never be 0. - let scheduled_at = unsafe { - NonZeroU64::new_unchecked(self.shared.started_at.elapsed().as_nanos().max(1) as u64) - }; - task.set_scheduled_at(scheduled_at); + task.set_scheduled_at(ScheduleLatencyInstant::new(self.shared.started_at)); } context::with_scheduler(|maybe_cx| match maybe_cx { diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index ab3aa3400ba..4c41c0678c6 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -1,3 +1,4 @@ +use crate::runtime::task::schedule_latency::ScheduleLatencyContext; use crate::runtime::{Config, MetricsBatch, WorkerMetrics}; use std::time::{Duration, Instant}; @@ -113,7 +114,7 @@ impl Stats { } } - pub(crate) fn start_poll(&mut self, task_scheduled_at: Option<(Instant, u64)>) { + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { self.batch.start_poll(task_scheduled_at); self.tasks_polled_in_batch += 1; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 62831762ee4..b0f67211397 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -74,9 +74,7 @@ use crate::util::rand::{FastRand, RngSeedGenerator}; use std::cell::RefCell; use std::task::Waker; use std::thread; -use std::time::Duration; -#[cfg(all(tokio_unstable, target_pointer_width = "64"))] -use std::time::Instant; +use std::time::{Duration, Instant}; mod metrics; @@ -96,6 +94,7 @@ use crate::runtime::time_alt; #[cfg(all(tokio_unstable, feature = "time"))] use crate::runtime::scheduler::util; +use crate::runtime::task::schedule_latency::ScheduleLatencyInstant; /// A scheduler worker pub(super) struct Worker { @@ -198,8 +197,7 @@ pub(crate) struct Shared { /// Startup time of this scheduler. /// /// This instant is used as the basis of task `scheduled_at` measurements. - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] - started_at: Instant, + started_at: Option, /// Only held to trigger some code on drop. This is used to get internal /// runtime metrics that can be useful when doing performance @@ -306,6 +304,10 @@ pub(super) fn create( let (idle, idle_synced) = Idle::new(size); let (inject, inject_synced) = inject::Shared::new(); + let started_at = config + .metrics_schedule_latency_histogram + .as_ref() + .map(|_| Instant::now()); let remotes_len = remotes.len(); let handle = Arc::new(Handle { @@ -326,8 +328,7 @@ pub(super) fn create( config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: worker_metrics.into_boxed_slice(), - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] - started_at: Instant::now(), + started_at, _counters: Counters, }, driver: driver_handle, @@ -639,13 +640,10 @@ impl Context { // tasks under this measurement. In this case, the tasks came from the // LIFO slot and are considered part of the current task for scheduling // purposes. These tasks inherent the "parent"'s limits. - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] core.stats.start_poll( task.get_scheduled_at() - .map(|t| (self.worker.handle.shared.started_at, t.get())), + .prepare(self.worker.handle.shared.started_at), ); - #[cfg(not(all(tokio_unstable, target_pointer_width = "64")))] - core.stats.start_poll(None); // Make the core available to the runtime context *self.core.borrow_mut() = Some(core); @@ -1287,20 +1285,13 @@ impl Worker { impl Handle { pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] if self .shared .config .metrics_schedule_latency_histogram .is_some() { - // SAFETY: `.max(1)` ensures the value can never be 0. - let scheduled_at = unsafe { - std::num::NonZeroU64::new_unchecked( - self.shared.started_at.elapsed().as_nanos().max(1) as u64, - ) - }; - task.set_scheduled_at(scheduled_at); + task.set_scheduled_at(ScheduleLatencyInstant::new(self.shared.started_at)); } with_current(|maybe_cx| { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 9624e20cef7..f52d085fc06 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -22,6 +22,7 @@ use crate::future::Future; use crate::loom::cell::UnsafeCell; use crate::runtime::context; use crate::runtime::task::raw::{self, Vtable}; +use crate::runtime::task::schedule_latency::ScheduleLatencyInstant; use crate::runtime::task::state::State; use crate::runtime::task::{Id, Schedule, TaskHarnessScheduleHooks}; use crate::util::linked_list; @@ -193,12 +194,7 @@ pub(crate) struct Header { pub(super) tracing_id: Option, /// The last time this task was scheduled. Used to measure schedule latency. - /// Stored as the number of nanoseconds since scheduler startup. - /// - /// Only enabled on 64-bit targets because this field extends the size of this - /// struct beyond the size of one cache line on 32-bit targets. - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] - pub(super) scheduled_at: UnsafeCell>, + pub(super) scheduled_at: UnsafeCell, } unsafe impl Send for Header {} @@ -255,8 +251,7 @@ impl Cell { owner_id: UnsafeCell::new(None), #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] - scheduled_at: UnsafeCell::new(None), + scheduled_at: UnsafeCell::new(ScheduleLatencyInstant::new(None)), } } @@ -548,23 +543,15 @@ impl Header { /// Updates the last time this task was scheduled. Used to calculate /// the time elapsed between task scheduling and polling. /// - /// - /// # Arguments - /// - /// `nanos` is the number of nanoseconds elapsed since the scheduler - /// was started. - /// /// # Safety /// /// The caller must guarantee exclusive access to this field. - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] - pub(super) unsafe fn set_scheduled_at(&self, nanos: NonZeroU64) { - self.scheduled_at.with_mut(|ptr| *ptr = Some(nanos)); + pub(super) unsafe fn set_scheduled_at(&self, scheduled_at: ScheduleLatencyInstant) { + self.scheduled_at.with_mut(|ptr| *ptr = scheduled_at); } /// Gets the last time this task was scheduled. - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] - pub(super) fn get_scheduled_at(&self) -> Option { + pub(super) fn get_scheduled_at(&self) -> ScheduleLatencyInstant { // Safety: If there are concurrent writes, then that write has violated // the safety requirements on `set_scheduled_at`. unsafe { self.scheduled_at.with(|ptr| *ptr) } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index dfce849561b..a5f273fdaac 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -221,10 +221,9 @@ use crate::future::Future; use crate::util::linked_list; use crate::util::sharded_list; +use crate::runtime::task::schedule_latency::ScheduleLatencyInstant; use crate::runtime::TaskCallback; use std::marker::PhantomData; -#[cfg(all(tokio_unstable, target_pointer_width = "64"))] -use std::num::NonZeroU64; use std::panic::Location; use std::ptr::NonNull; use std::{fmt, mem}; @@ -250,13 +249,12 @@ impl Notified { self.0.task_meta() } - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] - pub(crate) fn set_scheduled_at(&self, nanos: NonZeroU64) { + pub(crate) fn set_scheduled_at(&self, scheduled_at: ScheduleLatencyInstant) { // SAFETY: There are no concurrent writes because there is only ever one `Notified` // reference per task. There are no concurrent reads because this field is only read // when polling the task, which can only happen after it's scheduled. unsafe { - self.0.header().set_scheduled_at(nanos); + self.0.header().set_scheduled_at(scheduled_at); } } } @@ -281,8 +279,7 @@ impl LocalNotified { self.task.task_meta() } - #[cfg(all(tokio_unstable, target_pointer_width = "64"))] - pub(crate) fn get_scheduled_at(&self) -> Option { + pub(crate) fn get_scheduled_at(&self) -> ScheduleLatencyInstant { self.task.header().get_scheduled_at() } } @@ -680,3 +677,99 @@ impl SpawnLocation { Self::from(Location::caller()) } } + +// Task schedule latency is only tracked on 64-bit targets to avoid increasing the size +// of the task Header beyond the bounds of one CPU cache line on 32-bit targets. +#[cfg(all(tokio_unstable, target_pointer_width = "64"))] +pub(crate) mod schedule_latency { + use std::num::NonZeroU64; + use std::time::Instant; + + /// ScheduleLatencyInstant tracks the time a task was scheduled. + /// + /// The time a task was scheduled is stored as the number of nanoseconds + /// since startup of the task's scheduler. + #[derive(Copy, Clone)] + pub(crate) struct ScheduleLatencyInstant(Option); + + impl ScheduleLatencyInstant { + /// Create a new ScheduleLatencyInstant using the provided scheduler startup Instant. + pub(crate) fn new(scheduler_start: Option) -> Self { + Self(scheduler_start.map(|scheduler_start| { + NonZeroU64::new(scheduler_start.elapsed().as_nanos() as u64) + .unwrap_or(NonZeroU64::MIN) + })) + } + + /// Prepare a context that can calculate the number of nanoseconds elapsed + /// since this task was scheduled. + pub(crate) fn prepare( + self, + scheduler_start: Option, + ) -> Option { + match (scheduler_start, self.0) { + (Some(scheduler_start), Some(scheduled_at_delta)) => Some(ScheduleLatencyContext { + scheduler_start, + scheduled_at_delta, + }), + _ => None, + } + } + } + + /// ScheduleLatencyContext contains all the data required to calculate the time elapsed + /// since a task was scheduled. + /// + /// `ScheduleLatencyInstant` on its own in insufficient because it only contains a delta. + /// The scheduler startup time is required to convert the delta back into an actual time + /// but is omitted from `ScheduleLatencyInstant` to keep its memory size minimal. + pub(crate) struct ScheduleLatencyContext { + scheduler_start: Instant, + scheduled_at_delta: NonZeroU64, + } + + impl ScheduleLatencyContext { + /// Calculate how many nanoseconds have elapsed between `now` and when this task + /// was last scheduled. + pub(crate) fn elapsed_nanos(&self, now: Instant) -> u64 { + let nanos_since_start = now + .saturating_duration_since(self.scheduler_start) + .as_nanos() as u64; + nanos_since_start.saturating_sub(self.scheduled_at_delta.get()) + } + } +} + +#[cfg(not(all(tokio_unstable, target_pointer_width = "64")))] +pub(crate) mod schedule_latency { + use std::time::Instant; + + #[derive(Copy, Clone)] + pub(crate) struct ScheduleLatencyInstant(); + + impl ScheduleLatencyInstant { + pub(crate) fn new(_runtime_start: Option) -> Self { + Self() + } + + pub(crate) fn prepare( + self, + _runtime_start: Option, + ) -> Option { + None + } + } + + pub(crate) struct ScheduleLatencyContext { + _private: (), + } + + impl ScheduleLatencyContext { + // This method is referenced (but never called) on 32-bit targets when + // `tokio_unstable` is enabled. + #[allow(dead_code)] + pub(crate) fn elapsed_nanos(&self, _now: Instant) -> u64 { + unimplemented!("This should never be called because prepare() always returns None") + } + } +} From f97ad0c1dcd665f64cf681fa9bad168a47a30174 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Tue, 7 Apr 2026 14:27:46 -0400 Subject: [PATCH 10/12] fix spellcheck test --- tokio/src/runtime/task/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index a5f273fdaac..bc7b2e9720c 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -685,7 +685,7 @@ pub(crate) mod schedule_latency { use std::num::NonZeroU64; use std::time::Instant; - /// ScheduleLatencyInstant tracks the time a task was scheduled. + /// `ScheduleLatencyInstant` tracks the time a task was scheduled. /// /// The time a task was scheduled is stored as the number of nanoseconds /// since startup of the task's scheduler. @@ -693,7 +693,7 @@ pub(crate) mod schedule_latency { pub(crate) struct ScheduleLatencyInstant(Option); impl ScheduleLatencyInstant { - /// Create a new ScheduleLatencyInstant using the provided scheduler startup Instant. + /// Create a new `ScheduleLatencyInstant` using the provided scheduler startup Instant. pub(crate) fn new(scheduler_start: Option) -> Self { Self(scheduler_start.map(|scheduler_start| { NonZeroU64::new(scheduler_start.elapsed().as_nanos() as u64) @@ -717,7 +717,7 @@ pub(crate) mod schedule_latency { } } - /// ScheduleLatencyContext contains all the data required to calculate the time elapsed + /// `ScheduleLatencyContext` contains all the data required to calculate the time elapsed /// since a task was scheduled. /// /// `ScheduleLatencyInstant` on its own in insufficient because it only contains a delta. From ecf3b1345eb0392f732ab87b8016f78061157a4e Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 15 Apr 2026 10:51:12 -0400 Subject: [PATCH 11/12] Add schedule-latency crate feature --- tokio/Cargo.toml | 2 + tokio/src/lib.rs | 1 + tokio/src/macros/cfg.rs | 19 ++ tokio/src/runtime/builder.rs | 254 +++++++++--------- tokio/src/runtime/metrics/batch.rs | 2 +- tokio/src/runtime/metrics/mod.rs | 10 + tokio/src/runtime/metrics/runtime.rs | 3 +- tokio/src/runtime/metrics/schedule_latency.rs | 61 +++++ .../runtime/metrics/schedule_latency_mock.rs | 31 +++ .../runtime/scheduler/current_thread/mod.rs | 2 +- .../runtime/scheduler/multi_thread/stats.rs | 2 +- .../runtime/scheduler/multi_thread/worker.rs | 2 +- tokio/src/runtime/task/core.rs | 2 +- tokio/src/runtime/task/mod.rs | 98 +------ tokio/tests/rt_unstable_metrics.rs | 2 +- 15 files changed, 262 insertions(+), 229 deletions(-) create mode 100644 tokio/src/runtime/metrics/schedule_latency.rs create mode 100644 tokio/src/runtime/metrics/schedule_latency_mock.rs diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index debfc0a047d..b3915b21aac 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -88,6 +88,8 @@ time = [] io-uring = ["dep:io-uring", "libc", "mio/os-poll", "mio/os-ext", "dep:slab"] # Unstable feature. Requires `--cfg tokio_unstable` to enable. taskdump = ["dep:backtrace"] +# Unstable feature. Requires `--cfg tokio_unstable` to enable. +schedule-latency = [] [dependencies] tokio-macros = { version = "~2.6.0", optional = true } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 7da7e21fd9c..de5621796a3 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -354,6 +354,7 @@ //! Some feature flags are only available when specifying the `tokio_unstable` flag: //! //! - `tracing`: Enables tracing events. +//! - `schedule-latency`: Allows measurement of task scheduling latencies. //! - `io-uring`: Enables `io-uring` (Linux only). //! - `taskdump`: Enables `taskdump` (Linux only). //! diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 9af23b01cbd..e53275e25d4 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -711,3 +711,22 @@ macro_rules! cfg_io_uring { )* }; } + +macro_rules! cfg_schedule_latency { + ($($item:item)*) => { + $( + #[cfg(all(tokio_unstable, feature = "schedule-latency", target_pointer_width = "64"))] + #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "schedule-latency"))))] + $item + )* + }; +} + +macro_rules! cfg_not_schedule_latency { + ($($item:item)*) => { + $( + #[cfg(any(not(tokio_unstable), not(feature = "schedule-latency"), not(target_pointer_width = "64")))] + $item + )* + } +} diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 9ff2ebadd30..ba8cc15c5d7 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1566,131 +1566,6 @@ impl Builder { self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets); self } - - /// Enables tracking the distribution of task schedule latencies. Task - /// schedule latency is the time between when a task is scheduled for - /// execution and when it is polled. - /// - /// **This feature is only supported on 64-bit targets.** - /// - /// Task schedule latencies are not instrumented by default as doing - /// so requires calling [`Instant::now()`] when a task is scheduled - /// and when it is polled, which could add measurable overhead. Use - /// the [`Handle::metrics()`] to access the metrics data. - /// - /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. - /// This has an extremely low memory footprint, but may not provide enough granularity. For - /// better granularity with low memory usage, use [`metrics_schedule_latency_histogram_configuration()`] - /// to select [`LogHistogram`] instead. - /// - /// # Examples - /// - /// ``` - /// # #[cfg(not(target_family = "wasm"))] - /// # { - /// use tokio::runtime; - /// - /// let rt = runtime::Builder::new_multi_thread() - /// .enable_metrics_schedule_latency_histogram() - /// .build() - /// .unwrap(); - /// # // Test default values here - /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) } - /// # let m = rt.handle().metrics(); - /// # assert_eq!(m.schedule_latency_histogram_num_buckets(), 10); - /// # assert_eq!(m.schedule_latency_histogram_bucket_range(0), us(0)..us(100)); - /// # assert_eq!(m.schedule_latency_histogram_bucket_range(1), us(100)..us(200)); - /// # } - /// ``` - /// - /// [`Handle::metrics()`]: crate::runtime::Handle::metrics - /// [`Instant::now()`]: std::time::Instant::now - /// [`LogHistogram`]: crate::runtime::LogHistogram - /// [`metrics_schedule_latency_histogram_configuration()`]: Builder::metrics_schedule_latency_histogram_configuration - pub fn enable_metrics_schedule_latency_histogram(&mut self) -> &mut Self { - self.metrics_schedule_latency_histogram_enabled = true; - self - } - - /// Configure the histogram for tracking task schedule latencies. - /// - /// Tracking of task schedule latencies must be enabled with - /// [`enable_metrics_schedule_latency_histogram()`] for this function - /// to have any effect. - /// - /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. - /// This has an extremely low memory footprint, but may not provide enough granularity. For - /// better granularity with low memory usage, use [`LogHistogram`] instead. - /// - /// # Examples - /// Configure a [`LogHistogram`] with [default configuration]: - /// ``` - /// # #[cfg(not(target_family = "wasm"))] - /// # { - /// use tokio::runtime; - /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; - /// - /// let rt = runtime::Builder::new_multi_thread() - /// .enable_metrics_schedule_latency_histogram() - /// .metrics_schedule_latency_histogram_configuration( - /// HistogramConfiguration::log(LogHistogram::default()) - /// ) - /// .build() - /// .unwrap(); - /// # } - /// ``` - /// - /// Configure a linear histogram with 100 buckets, each 10μs wide - /// ``` - /// # #[cfg(not(target_family = "wasm"))] - /// # { - /// use tokio::runtime; - /// use std::time::Duration; - /// use tokio::runtime::HistogramConfiguration; - /// - /// let rt = runtime::Builder::new_multi_thread() - /// .enable_metrics_schedule_latency_histogram() - /// .metrics_schedule_latency_histogram_configuration( - /// HistogramConfiguration::linear(Duration::from_micros(10), 100) - /// ) - /// .build() - /// .unwrap(); - /// # } - /// ``` - /// - /// Configure a [`LogHistogram`] with the following settings: - /// - Measure times from 100ns to 120s - /// - Max error of 0.1 - /// - No more than 1024 buckets - /// ``` - /// # #[cfg(not(target_family = "wasm"))] - /// # { - /// use std::time::Duration; - /// use tokio::runtime; - /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; - /// - /// let rt = runtime::Builder::new_multi_thread() - /// .enable_metrics_schedule_latency_histogram() - /// .metrics_schedule_latency_histogram_configuration( - /// HistogramConfiguration::log(LogHistogram::builder() - /// .max_value(Duration::from_secs(120)) - /// .min_value(Duration::from_nanos(100)) - /// .max_error(0.1) - /// .max_buckets(1024) - /// .expect("configuration uses 488 buckets") - /// ) - /// ) - /// .build() - /// .unwrap(); - /// # } - /// ``` - /// - /// [`LogHistogram`]: crate::runtime::LogHistogram - /// [`enable_metrics_schedule_latency_histogram()`]: Builder::enable_metrics_schedule_latency_histogram - pub fn metrics_schedule_latency_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { - self.metrics_schedule_latency_histogram.histogram_type = configuration.inner; - self - } } fn build_current_thread_runtime(&mut self) -> io::Result { @@ -1915,6 +1790,135 @@ cfg_test_util! { } } +cfg_schedule_latency! { + impl Builder { + /// Enables tracking the distribution of task schedule latencies. Task + /// schedule latency is the time between when a task is scheduled for + /// execution and when it is polled. + /// + /// **This feature is only supported on 64-bit targets.** + /// + /// Task schedule latencies are not instrumented by default as doing + /// so requires calling [`Instant::now()`] when a task is scheduled + /// and when it is polled, which could add measurable overhead. Use + /// the [`Handle::metrics()`] to access the metrics data. + /// + /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. + /// This has an extremely low memory footprint, but may not provide enough granularity. For + /// better granularity with low memory usage, use [`metrics_schedule_latency_histogram_configuration()`] + /// to select [`LogHistogram`] instead. + /// + /// # Examples + /// + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap(); + /// # // Test default values here + /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) } + /// # let m = rt.handle().metrics(); + /// # assert_eq!(m.schedule_latency_histogram_num_buckets(), 10); + /// # assert_eq!(m.schedule_latency_histogram_bucket_range(0), us(0)..us(100)); + /// # assert_eq!(m.schedule_latency_histogram_bucket_range(1), us(100)..us(200)); + /// # } + /// ``` + /// + /// [`Handle::metrics()`]: crate::runtime::Handle::metrics + /// [`Instant::now()`]: std::time::Instant::now + /// [`LogHistogram`]: crate::runtime::LogHistogram + /// [`metrics_schedule_latency_histogram_configuration()`]: Builder::metrics_schedule_latency_histogram_configuration + pub fn enable_metrics_schedule_latency_histogram(&mut self) -> &mut Self { + self.metrics_schedule_latency_histogram_enabled = true; + self + } + + /// Configure the histogram for tracking task schedule latencies. + /// + /// Tracking of task schedule latencies must be enabled with + /// [`enable_metrics_schedule_latency_histogram()`] for this function + /// to have any effect. + /// + /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. + /// This has an extremely low memory footprint, but may not provide enough granularity. For + /// better granularity with low memory usage, use [`LogHistogram`] instead. + /// + /// # Examples + /// Configure a [`LogHistogram`] with [default configuration]: + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::log(LogHistogram::default()) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// Configure a linear histogram with 100 buckets, each 10μs wide + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use std::time::Duration; + /// use tokio::runtime::HistogramConfiguration; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::linear(Duration::from_micros(10), 100) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// Configure a [`LogHistogram`] with the following settings: + /// - Measure times from 100ns to 120s + /// - Max error of 0.1 + /// - No more than 1024 buckets + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use std::time::Duration; + /// use tokio::runtime; + /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::log(LogHistogram::builder() + /// .max_value(Duration::from_secs(120)) + /// .min_value(Duration::from_nanos(100)) + /// .max_error(0.1) + /// .max_buckets(1024) + /// .expect("configuration uses 488 buckets") + /// ) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// [`LogHistogram`]: crate::runtime::LogHistogram + /// [`enable_metrics_schedule_latency_histogram()`]: Builder::enable_metrics_schedule_latency_histogram + pub fn metrics_schedule_latency_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { + self.metrics_schedule_latency_histogram.histogram_type = configuration.inner; + self + } + } +} + cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index d84854cc066..5be59b72b8a 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -4,7 +4,7 @@ cfg_unstable_metrics! { use crate::runtime::metrics::HistogramBatch; } -use crate::runtime::task::schedule_latency::ScheduleLatencyContext; +use crate::runtime::metrics::ScheduleLatencyContext; use std::sync::atomic::Ordering::Relaxed; use std::time::{Duration, Instant}; diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index b2772f1842a..2227fa508b6 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -38,3 +38,13 @@ cfg_not_unstable_metrics! { mod mock; pub(crate) use mock::{SchedulerMetrics, HistogramBuilder}; } + +cfg_schedule_latency! { + mod schedule_latency; + pub(crate) use schedule_latency::{ScheduleLatencyInstant, ScheduleLatencyContext}; +} + +cfg_not_schedule_latency! { + mod schedule_latency_mock; + pub(crate) use schedule_latency_mock::{ScheduleLatencyInstant, ScheduleLatencyContext}; +} diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index cab257fc304..07619336aab 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1152,7 +1152,8 @@ impl RuntimeMetrics { #![all( tokio_unstable, target_has_atomic = "64", - target_pointer_width = "64" + target_pointer_width = "64", + feature = "schedule-latency" )] /// Returns the number of times the given worker polled tasks with a schedule /// latency within the given bucket's range. diff --git a/tokio/src/runtime/metrics/schedule_latency.rs b/tokio/src/runtime/metrics/schedule_latency.rs new file mode 100644 index 00000000000..85672664855 --- /dev/null +++ b/tokio/src/runtime/metrics/schedule_latency.rs @@ -0,0 +1,61 @@ +use std::num::NonZeroU64; +use std::time::Instant; + +/// `ScheduleLatencyInstant` tracks the time a task was scheduled. +/// +/// The time a task was scheduled is stored as the number of nanoseconds +/// since startup of the task's scheduler. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [unstable]: crate#unstable-features +#[derive(Copy, Clone)] +pub(crate) struct ScheduleLatencyInstant(Option); + +impl ScheduleLatencyInstant { + /// Create a new `ScheduleLatencyInstant` using the provided scheduler startup Instant. + pub(crate) fn new(scheduler_start: Option) -> Self { + Self(scheduler_start.map(|scheduler_start| { + NonZeroU64::new(scheduler_start.elapsed().as_nanos() as u64).unwrap_or(NonZeroU64::MIN) + })) + } + + /// Prepare a context that can calculate the number of nanoseconds elapsed + /// since this task was scheduled. + pub(crate) fn prepare( + self, + scheduler_start: Option, + ) -> Option { + match (scheduler_start, self.0) { + (Some(scheduler_start), Some(scheduled_at_delta)) => Some(ScheduleLatencyContext { + scheduler_start, + scheduled_at_delta, + }), + _ => None, + } + } +} + +/// `ScheduleLatencyContext` contains all the data required to calculate the time elapsed +/// since a task was scheduled. +/// +/// `ScheduleLatencyInstant` on its own in insufficient because it only contains a delta. +/// The scheduler startup time is required to convert the delta back into an actual time +/// but is omitted from `ScheduleLatencyInstant` to keep its memory size minimal. +pub(crate) struct ScheduleLatencyContext { + scheduler_start: Instant, + scheduled_at_delta: NonZeroU64, +} + +impl ScheduleLatencyContext { + /// Calculate how many nanoseconds have elapsed between `now` and when this task + /// was last scheduled. + pub(crate) fn elapsed_nanos(&self, now: Instant) -> u64 { + let nanos_since_start = now + .saturating_duration_since(self.scheduler_start) + .as_nanos() as u64; + nanos_since_start.saturating_sub(self.scheduled_at_delta.get()) + } +} diff --git a/tokio/src/runtime/metrics/schedule_latency_mock.rs b/tokio/src/runtime/metrics/schedule_latency_mock.rs new file mode 100644 index 00000000000..932fd798841 --- /dev/null +++ b/tokio/src/runtime/metrics/schedule_latency_mock.rs @@ -0,0 +1,31 @@ +//! A mock implementation of the types in `schedule_latency`. These types +//! are zero-sized so that the size of types using them are not increased +//! unless schedule latency tracking is explicitly enabled. + +use std::time::Instant; + +#[derive(Copy, Clone)] +pub(crate) struct ScheduleLatencyInstant(); + +impl ScheduleLatencyInstant { + pub(crate) fn new(_runtime_start: Option) -> Self { + Self() + } + + pub(crate) fn prepare(self, _runtime_start: Option) -> Option { + None + } +} + +pub(crate) struct ScheduleLatencyContext { + _private: (), +} + +impl ScheduleLatencyContext { + // This method is referenced (but never called) when the `schedule-latency` + // feature is disabled and `tokio_unstable` is enabled. + #[allow(dead_code)] + pub(crate) fn elapsed_nanos(&self, _now: Instant) -> u64 { + unimplemented!("This should never be called because prepare() always returns None") + } +} diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 868f6145166..f3f1fdd9e6f 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -663,7 +663,7 @@ cfg_unstable_metrics! { } } -use crate::runtime::task::schedule_latency::ScheduleLatencyInstant; +use crate::runtime::metrics::ScheduleLatencyInstant; use std::num::NonZeroU64; impl Handle { diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index 4c41c0678c6..a1f1f919f31 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -1,4 +1,4 @@ -use crate::runtime::task::schedule_latency::ScheduleLatencyContext; +use crate::runtime::metrics::ScheduleLatencyContext; use crate::runtime::{Config, MetricsBatch, WorkerMetrics}; use std::time::{Duration, Instant}; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index b0f67211397..57d1c8f87e6 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -92,9 +92,9 @@ use crate::loom::sync::atomic::AtomicBool; #[cfg(all(tokio_unstable, feature = "time"))] use crate::runtime::time_alt; +use crate::runtime::metrics::ScheduleLatencyInstant; #[cfg(all(tokio_unstable, feature = "time"))] use crate::runtime::scheduler::util; -use crate::runtime::task::schedule_latency::ScheduleLatencyInstant; /// A scheduler worker pub(super) struct Worker { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index f52d085fc06..7be801c4700 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -21,8 +21,8 @@ use crate::future::Future; use crate::loom::cell::UnsafeCell; use crate::runtime::context; +use crate::runtime::metrics::ScheduleLatencyInstant; use crate::runtime::task::raw::{self, Vtable}; -use crate::runtime::task::schedule_latency::ScheduleLatencyInstant; use crate::runtime::task::state::State; use crate::runtime::task::{Id, Schedule, TaskHarnessScheduleHooks}; use crate::util::linked_list; diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index bc7b2e9720c..569b53c109c 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -221,7 +221,7 @@ use crate::future::Future; use crate::util::linked_list; use crate::util::sharded_list; -use crate::runtime::task::schedule_latency::ScheduleLatencyInstant; +use crate::runtime::metrics::ScheduleLatencyInstant; use crate::runtime::TaskCallback; use std::marker::PhantomData; use std::panic::Location; @@ -677,99 +677,3 @@ impl SpawnLocation { Self::from(Location::caller()) } } - -// Task schedule latency is only tracked on 64-bit targets to avoid increasing the size -// of the task Header beyond the bounds of one CPU cache line on 32-bit targets. -#[cfg(all(tokio_unstable, target_pointer_width = "64"))] -pub(crate) mod schedule_latency { - use std::num::NonZeroU64; - use std::time::Instant; - - /// `ScheduleLatencyInstant` tracks the time a task was scheduled. - /// - /// The time a task was scheduled is stored as the number of nanoseconds - /// since startup of the task's scheduler. - #[derive(Copy, Clone)] - pub(crate) struct ScheduleLatencyInstant(Option); - - impl ScheduleLatencyInstant { - /// Create a new `ScheduleLatencyInstant` using the provided scheduler startup Instant. - pub(crate) fn new(scheduler_start: Option) -> Self { - Self(scheduler_start.map(|scheduler_start| { - NonZeroU64::new(scheduler_start.elapsed().as_nanos() as u64) - .unwrap_or(NonZeroU64::MIN) - })) - } - - /// Prepare a context that can calculate the number of nanoseconds elapsed - /// since this task was scheduled. - pub(crate) fn prepare( - self, - scheduler_start: Option, - ) -> Option { - match (scheduler_start, self.0) { - (Some(scheduler_start), Some(scheduled_at_delta)) => Some(ScheduleLatencyContext { - scheduler_start, - scheduled_at_delta, - }), - _ => None, - } - } - } - - /// `ScheduleLatencyContext` contains all the data required to calculate the time elapsed - /// since a task was scheduled. - /// - /// `ScheduleLatencyInstant` on its own in insufficient because it only contains a delta. - /// The scheduler startup time is required to convert the delta back into an actual time - /// but is omitted from `ScheduleLatencyInstant` to keep its memory size minimal. - pub(crate) struct ScheduleLatencyContext { - scheduler_start: Instant, - scheduled_at_delta: NonZeroU64, - } - - impl ScheduleLatencyContext { - /// Calculate how many nanoseconds have elapsed between `now` and when this task - /// was last scheduled. - pub(crate) fn elapsed_nanos(&self, now: Instant) -> u64 { - let nanos_since_start = now - .saturating_duration_since(self.scheduler_start) - .as_nanos() as u64; - nanos_since_start.saturating_sub(self.scheduled_at_delta.get()) - } - } -} - -#[cfg(not(all(tokio_unstable, target_pointer_width = "64")))] -pub(crate) mod schedule_latency { - use std::time::Instant; - - #[derive(Copy, Clone)] - pub(crate) struct ScheduleLatencyInstant(); - - impl ScheduleLatencyInstant { - pub(crate) fn new(_runtime_start: Option) -> Self { - Self() - } - - pub(crate) fn prepare( - self, - _runtime_start: Option, - ) -> Option { - None - } - } - - pub(crate) struct ScheduleLatencyContext { - _private: (), - } - - impl ScheduleLatencyContext { - // This method is referenced (but never called) on 32-bit targets when - // `tokio_unstable` is enabled. - #[allow(dead_code)] - pub(crate) fn elapsed_nanos(&self, _now: Instant) -> u64 { - unimplemented!("This should never be called because prepare() always returns None") - } - } -} diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index a238b78ca02..f228bafab19 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -801,7 +801,7 @@ fn io_driver_ready_count() { } // Schedule latency tracking is only supported on 64-bit targets -#[cfg(target_pointer_width = "64")] +#[cfg(all(feature = "schedule-latency", target_pointer_width = "64"))] #[test] fn schedule_latency_counts() { const N: u64 = 50; From cb8383fa7bb39b0f68eba4a9d2e2fac592a17707 Mon Sep 17 00:00:00 2001 From: Robert Holt Date: Wed, 15 Apr 2026 11:03:14 -0400 Subject: [PATCH 12/12] Put schedule_latency_histogram methods behind feature too --- tokio/src/runtime/metrics/runtime.rs | 238 +++++++++++++-------------- 1 file changed, 119 insertions(+), 119 deletions(-) diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 07619336aab..393ab430eca 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -571,125 +571,6 @@ impl RuntimeMetrics { pub fn blocking_queue_depth(&self) -> usize { self.handle.inner.blocking_queue_depth() } - - /// Returns `true` if the runtime is tracking the distribution of task - /// schedule latencies. - /// - /// Task schedule latencies are not instrumented by default as doing so - /// requires calling [`Instant::now()`] when a task is scheduled and when - /// it is polled. The feature is enabled by calling - /// [`enable_metrics_schedule_latency_histogram()`] when building the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_schedule_latency_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let enabled = metrics.schedule_latency_histogram_enabled(); - /// - /// println!("Tracking task schedule latency distribution: {:?}", enabled); - /// }); - /// } - /// ``` - /// - /// [`enable_metrics_schedule_latency_histogram()`]: crate::runtime::Builder::enable_metrics_schedule_latency_histogram - /// [`Instant::now()`]: std::time::Instant::now - pub fn schedule_latency_histogram_enabled(&self) -> bool { - self.handle.inner.worker_metrics(0).schedule_latency_histogram.is_some() - } - - /// Returns the number of histogram buckets tracking the distribution of - /// task schedule latencies. - /// - /// This value is configured by calling - /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_schedule_latency_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let buckets = metrics.schedule_latency_histogram_num_buckets(); - /// - /// println!("Histogram buckets: {:?}", buckets); - /// }); - /// } - /// ``` - /// - /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration - pub fn schedule_latency_histogram_num_buckets(&self) -> usize { - self.handle - .inner - .worker_metrics(0) - .schedule_latency_histogram - .as_ref() - .map(|histogram| histogram.num_buckets()) - .unwrap_or_default() - } - - /// Returns the range of task schedule latencies tracked by the given bucket. - /// - /// This value is configured by calling - /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. - /// - /// # Panics - /// - /// The method panics if `bucket` represents an invalid bucket index, i.e. - /// is greater than or equal to `schedule_latency_histogram_num_buckets()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::{self, Handle}; - /// - /// fn main() { - /// runtime::Builder::new_current_thread() - /// .enable_metrics_schedule_latency_histogram() - /// .build() - /// .unwrap() - /// .block_on(async { - /// let metrics = Handle::current().metrics(); - /// let buckets = metrics.schedule_latency_histogram_num_buckets(); - /// - /// for i in 0..buckets { - /// let range = metrics.schedule_latency_histogram_bucket_range(i); - /// println!("Histogram bucket {} range: {:?}", i, range); - /// } - /// }); - /// } - /// ``` - /// - /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration - #[track_caller] - pub fn schedule_latency_histogram_bucket_range(&self, bucket: usize) -> Range { - self.handle - .inner - .worker_metrics(0) - .schedule_latency_histogram - .as_ref() - .map(|histogram| { - let range = histogram.bucket_range(bucket); - std::ops::Range { - start: Duration::from_nanos(range.start), - end: Duration::from_nanos(range.end), - } - }) - .unwrap_or_default() - } } feature! { @@ -1155,6 +1036,125 @@ impl RuntimeMetrics { target_pointer_width = "64", feature = "schedule-latency" )] + /// Returns `true` if the runtime is tracking the distribution of task + /// schedule latencies. + /// + /// Task schedule latencies are not instrumented by default as doing so + /// requires calling [`Instant::now()`] when a task is scheduled and when + /// it is polled. The feature is enabled by calling + /// [`enable_metrics_schedule_latency_histogram()`] when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let enabled = metrics.schedule_latency_histogram_enabled(); + /// + /// println!("Tracking task schedule latency distribution: {:?}", enabled); + /// }); + /// } + /// ``` + /// + /// [`enable_metrics_schedule_latency_histogram()`]: crate::runtime::Builder::enable_metrics_schedule_latency_histogram + /// [`Instant::now()`]: std::time::Instant::now + pub fn schedule_latency_histogram_enabled(&self) -> bool { + self.handle.inner.worker_metrics(0).schedule_latency_histogram.is_some() + } + + /// Returns the number of histogram buckets tracking the distribution of + /// task schedule latencies. + /// + /// This value is configured by calling + /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// println!("Histogram buckets: {:?}", buckets); + /// }); + /// } + /// ``` + /// + /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + pub fn schedule_latency_histogram_num_buckets(&self) -> usize { + self.handle + .inner + .worker_metrics(0) + .schedule_latency_histogram + .as_ref() + .map(|histogram| histogram.num_buckets()) + .unwrap_or_default() + } + + /// Returns the range of task schedule latencies tracked by the given bucket. + /// + /// This value is configured by calling + /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. + /// + /// # Panics + /// + /// The method panics if `bucket` represents an invalid bucket index, i.e. + /// is greater than or equal to `schedule_latency_histogram_num_buckets()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// for i in 0..buckets { + /// let range = metrics.schedule_latency_histogram_bucket_range(i); + /// println!("Histogram bucket {} range: {:?}", i, range); + /// } + /// }); + /// } + /// ``` + /// + /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + #[track_caller] + pub fn schedule_latency_histogram_bucket_range(&self, bucket: usize) -> Range { + self.handle + .inner + .worker_metrics(0) + .schedule_latency_histogram + .as_ref() + .map(|histogram| { + let range = histogram.bucket_range(bucket); + std::ops::Range { + start: Duration::from_nanos(range.start), + end: Duration::from_nanos(range.end), + } + }) + .unwrap_or_default() + } + /// Returns the number of times the given worker polled tasks with a schedule /// latency within the given bucket's range. ///