diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index b0bedbed9c4..64f3a6916f3 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -88,6 +88,8 @@ time = [] io-uring = ["dep:io-uring", "libc", "mio/os-poll", "mio/os-ext", "dep:slab"] # Unstable feature. Requires `--cfg tokio_unstable` to enable. taskdump = ["dep:backtrace"] +# Unstable feature. Requires `--cfg tokio_unstable` to enable. +schedule-latency = [] [dependencies] tokio-macros = { version = "~2.7.0", optional = true } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 7da7e21fd9c..de5621796a3 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -354,6 +354,7 @@ //! Some feature flags are only available when specifying the `tokio_unstable` flag: //! //! - `tracing`: Enables tracing events. +//! - `schedule-latency`: Allows measurement of task scheduling latencies. //! - `io-uring`: Enables `io-uring` (Linux only). //! - `taskdump`: Enables `taskdump` (Linux only). //! diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index efbd163c7f4..58a011e8a7a 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -746,3 +746,22 @@ macro_rules! cfg_io_uring { )* }; } + +macro_rules! cfg_schedule_latency { + ($($item:item)*) => { + $( + #[cfg(all(tokio_unstable, feature = "schedule-latency", target_pointer_width = "64"))] + #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "schedule-latency"))))] + $item + )* + }; +} + +macro_rules! cfg_not_schedule_latency { + ($($item:item)*) => { + $( + #[cfg(any(not(tokio_unstable), not(feature = "schedule-latency"), not(target_pointer_width = "64")))] + $item + )* + } +} diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 6032f2c604f..f3cf01fdd0f 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -138,6 +138,12 @@ pub struct Builder { /// Configures the task poll count histogram pub(super) metrics_poll_count_histogram: HistogramBuilder, + /// When true, enables task schedule latency instrumentation. + pub(super) metrics_schedule_latency_histogram_enabled: bool, + + /// Configures the task schedule latency histogram. + pub(super) metrics_schedule_latency_histogram: HistogramBuilder, + #[cfg(tokio_unstable)] pub(super) unhandled_panic: UnhandledPanic, @@ -335,6 +341,10 @@ impl Builder { metrics_poll_count_histogram: HistogramBuilder::default(), + metrics_schedule_latency_histogram_enabled: false, + + metrics_schedule_latency_histogram: HistogramBuilder::default(), + disable_lifo_slot: false, timer_flavor: TimerFlavor::Traditional, @@ -1708,6 +1718,8 @@ impl Builder { enable_eager_driver_handoff: false, seed_generator: seed_generator_1, metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), + metrics_schedule_latency_histogram: self + .metrics_schedule_latency_histogram_builder(), }, local_tid, self.name.clone(), @@ -1727,6 +1739,14 @@ impl Builder { None } } + + fn metrics_schedule_latency_histogram_builder(&self) -> Option { + if self.metrics_schedule_latency_histogram_enabled { + Some(self.metrics_schedule_latency_histogram.clone()) + } else { + None + } + } } cfg_io_driver! { @@ -1848,6 +1868,135 @@ cfg_test_util! { } } +cfg_schedule_latency! { + impl Builder { + /// Enables tracking the distribution of task schedule latencies. Task + /// schedule latency is the time between when a task is scheduled for + /// execution and when it is polled. + /// + /// **This feature is only supported on 64-bit targets.** + /// + /// Task schedule latencies are not instrumented by default as doing + /// so requires calling [`Instant::now()`] when a task is scheduled + /// and when it is polled, which could add measurable overhead. Use + /// the [`Handle::metrics()`] to access the metrics data. + /// + /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. + /// This has an extremely low memory footprint, but may not provide enough granularity. For + /// better granularity with low memory usage, use [`metrics_schedule_latency_histogram_configuration()`] + /// to select [`LogHistogram`] instead. + /// + /// # Examples + /// + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap(); + /// # // Test default values here + /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) } + /// # let m = rt.handle().metrics(); + /// # assert_eq!(m.schedule_latency_histogram_num_buckets(), 10); + /// # assert_eq!(m.schedule_latency_histogram_bucket_range(0), us(0)..us(100)); + /// # assert_eq!(m.schedule_latency_histogram_bucket_range(1), us(100)..us(200)); + /// # } + /// ``` + /// + /// [`Handle::metrics()`]: crate::runtime::Handle::metrics + /// [`Instant::now()`]: std::time::Instant::now + /// [`LogHistogram`]: crate::runtime::LogHistogram + /// [`metrics_schedule_latency_histogram_configuration()`]: Builder::metrics_schedule_latency_histogram_configuration + pub fn enable_metrics_schedule_latency_histogram(&mut self) -> &mut Self { + self.metrics_schedule_latency_histogram_enabled = true; + self + } + + /// Configure the histogram for tracking task schedule latencies. + /// + /// Tracking of task schedule latencies must be enabled with + /// [`enable_metrics_schedule_latency_histogram()`] for this function + /// to have any effect. + /// + /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used. + /// This has an extremely low memory footprint, but may not provide enough granularity. For + /// better granularity with low memory usage, use [`LogHistogram`] instead. + /// + /// # Examples + /// Configure a [`LogHistogram`] with [default configuration]: + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::log(LogHistogram::default()) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// Configure a linear histogram with 100 buckets, each 10μs wide + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use tokio::runtime; + /// use std::time::Duration; + /// use tokio::runtime::HistogramConfiguration; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::linear(Duration::from_micros(10), 100) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// Configure a [`LogHistogram`] with the following settings: + /// - Measure times from 100ns to 120s + /// - Max error of 0.1 + /// - No more than 1024 buckets + /// ``` + /// # #[cfg(not(target_family = "wasm"))] + /// # { + /// use std::time::Duration; + /// use tokio::runtime; + /// use tokio::runtime::{HistogramConfiguration, LogHistogram}; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .metrics_schedule_latency_histogram_configuration( + /// HistogramConfiguration::log(LogHistogram::builder() + /// .max_value(Duration::from_secs(120)) + /// .min_value(Duration::from_nanos(100)) + /// .max_error(0.1) + /// .max_buckets(1024) + /// .expect("configuration uses 488 buckets") + /// ) + /// ) + /// .build() + /// .unwrap(); + /// # } + /// ``` + /// + /// [`LogHistogram`]: crate::runtime::LogHistogram + /// [`enable_metrics_schedule_latency_histogram()`]: Builder::enable_metrics_schedule_latency_histogram + pub fn metrics_schedule_latency_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self { + self.metrics_schedule_latency_histogram.histogram_type = configuration.inner; + self + } + } +} + cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { @@ -1891,6 +2040,7 @@ cfg_rt_multi_thread! { enable_eager_driver_handoff: self.enable_eager_driver_handoff, seed_generator: seed_generator_1, metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), + metrics_schedule_latency_histogram: self.metrics_schedule_latency_histogram_builder(), }, self.timer_flavor, self.name.clone(), diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index ad25eb32642..efb5deed19c 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -50,6 +50,9 @@ pub(crate) struct Config { /// How to build poll time histograms pub(crate) metrics_poll_count_histogram: Option, + /// How to build schedule latency histograms + pub(crate) metrics_schedule_latency_histogram: Option, + #[cfg(tokio_unstable)] /// How to respond to unhandled task panics. pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index fe2f4a9da4e..5be59b72b8a 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -4,6 +4,7 @@ cfg_unstable_metrics! { use crate::runtime::metrics::HistogramBatch; } +use crate::runtime::metrics::ScheduleLatencyContext; use std::sync::atomic::Ordering::Relaxed; use std::time::{Duration, Instant}; @@ -53,6 +54,9 @@ pub(crate) struct MetricsBatch { #[cfg(tokio_unstable)] /// If `Some`, tracks poll times in nanoseconds poll_timer: Option, + + #[cfg(tokio_unstable)] + schedule_latencies: Option, } cfg_unstable_metrics! { @@ -95,6 +99,13 @@ impl MetricsBatch { poll_started_at: now, }) }); + // Schedule latencies cannot be tracked if `Instant::now()` is unavailable + let schedule_latencies = maybe_now.and_then(|_| { + worker_metrics + .schedule_latency_histogram + .as_ref() + .map(HistogramBatch::from_histogram) + }); MetricsBatch { park_count: 0, park_unpark_count: 0, @@ -108,6 +119,7 @@ impl MetricsBatch { busy_duration_total: 0, processing_scheduled_tasks_started_at: maybe_now, poll_timer, + schedule_latencies, } } } @@ -155,6 +167,11 @@ impl MetricsBatch { let dst = worker.poll_count_histogram.as_ref().unwrap(); poll_timer.poll_counts.submit(dst); } + + if let Some(schedule_latencies) = &self.schedule_latencies { + let dst = worker.schedule_latency_histogram.as_ref().unwrap(); + schedule_latencies.submit(dst); + } } } } @@ -206,15 +223,29 @@ impl MetricsBatch { cfg_metrics_variant! { stable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self) {} + pub(crate) fn start_poll(&mut self, _task_scheduled_at: Option) {} }, unstable: { /// Start polling an individual task - pub(crate) fn start_poll(&mut self) { + /// + /// # Arguments + /// + /// `task_scheduled_at` is used to calculate task schedule latency. + /// A `ScheduleLatencyContext` can be obtained by calling `prepare` on a task's + /// `ScheduleLatencyInstant`. + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { self.poll_count += 1; if let Some(poll_timer) = &mut self.poll_timer { poll_timer.poll_started_at = Instant::now(); } + if let Some(task_scheduled_at) = task_scheduled_at { + if let Some(schedule_latencies) = &mut self.schedule_latencies { + if let Some(now) = self.poll_timer.as_ref().map(|p| p.poll_started_at).or_else(now) { + let elapsed = task_scheduled_at.elapsed_nanos(now); + schedule_latencies.measure(elapsed, 1); + } + } + } } } } diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index b2772f1842a..2227fa508b6 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -38,3 +38,13 @@ cfg_not_unstable_metrics! { mod mock; pub(crate) use mock::{SchedulerMetrics, HistogramBuilder}; } + +cfg_schedule_latency! { + mod schedule_latency; + pub(crate) use schedule_latency::{ScheduleLatencyInstant, ScheduleLatencyContext}; +} + +cfg_not_schedule_latency! { + mod schedule_latency_mock; + pub(crate) use schedule_latency_mock::{ScheduleLatencyInstant, ScheduleLatencyContext}; +} diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 8aeb608bd02..393ab430eca 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1029,6 +1029,199 @@ impl RuntimeMetrics { } } + feature! { + #![all( + tokio_unstable, + target_has_atomic = "64", + target_pointer_width = "64", + feature = "schedule-latency" + )] + /// Returns `true` if the runtime is tracking the distribution of task + /// schedule latencies. + /// + /// Task schedule latencies are not instrumented by default as doing so + /// requires calling [`Instant::now()`] when a task is scheduled and when + /// it is polled. The feature is enabled by calling + /// [`enable_metrics_schedule_latency_histogram()`] when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let enabled = metrics.schedule_latency_histogram_enabled(); + /// + /// println!("Tracking task schedule latency distribution: {:?}", enabled); + /// }); + /// } + /// ``` + /// + /// [`enable_metrics_schedule_latency_histogram()`]: crate::runtime::Builder::enable_metrics_schedule_latency_histogram + /// [`Instant::now()`]: std::time::Instant::now + pub fn schedule_latency_histogram_enabled(&self) -> bool { + self.handle.inner.worker_metrics(0).schedule_latency_histogram.is_some() + } + + /// Returns the number of histogram buckets tracking the distribution of + /// task schedule latencies. + /// + /// This value is configured by calling + /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// println!("Histogram buckets: {:?}", buckets); + /// }); + /// } + /// ``` + /// + /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + pub fn schedule_latency_histogram_num_buckets(&self) -> usize { + self.handle + .inner + .worker_metrics(0) + .schedule_latency_histogram + .as_ref() + .map(|histogram| histogram.num_buckets()) + .unwrap_or_default() + } + + /// Returns the range of task schedule latencies tracked by the given bucket. + /// + /// This value is configured by calling + /// [`metrics_schedule_latency_histogram_configuration()`] when building the runtime. + /// + /// # Panics + /// + /// The method panics if `bucket` represents an invalid bucket index, i.e. + /// is greater than or equal to `schedule_latency_histogram_num_buckets()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// for i in 0..buckets { + /// let range = metrics.schedule_latency_histogram_bucket_range(i); + /// println!("Histogram bucket {} range: {:?}", i, range); + /// } + /// }); + /// } + /// ``` + /// + /// [`metrics_schedule_latency_histogram_configuration()`]: crate::runtime::Builder::metrics_schedule_latency_histogram_configuration + #[track_caller] + pub fn schedule_latency_histogram_bucket_range(&self, bucket: usize) -> Range { + self.handle + .inner + .worker_metrics(0) + .schedule_latency_histogram + .as_ref() + .map(|histogram| { + let range = histogram.bucket_range(bucket); + std::ops::Range { + start: Duration::from_nanos(range.start), + end: Duration::from_nanos(range.end), + } + }) + .unwrap_or_default() + } + + /// Returns the number of times the given worker polled tasks with a schedule + /// latency within the given bucket's range. + /// + /// Each worker maintains its own histogram and the counts for each bucket + /// starts at zero when the runtime is created. Each time the worker polls a + /// task, it tracks the time elapsed between when the task was scheduled and + /// when it was polled and increments the associated bucket by 1. + /// + /// Each bucket is a monotonically increasing counter. It is never + /// decremented or reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// `bucket` is the index of the bucket being queried. The bucket is scoped + /// to the worker. The range represented by the bucket can be queried by + /// calling [`schedule_latency_histogram_bucket_range()`]. Each worker maintains + /// identical bucket ranges. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()` or if `bucket` represents an + /// invalid bucket. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, Handle}; + /// + /// fn main() { + /// runtime::Builder::new_current_thread() + /// .enable_metrics_schedule_latency_histogram() + /// .build() + /// .unwrap() + /// .block_on(async { + /// let metrics = Handle::current().metrics(); + /// let buckets = metrics.schedule_latency_histogram_num_buckets(); + /// + /// for worker in 0..metrics.num_workers() { + /// for i in 0..buckets { + /// let range = metrics.schedule_latency_histogram_bucket_range(i); + /// let count = metrics.schedule_latency_histogram_bucket_count(worker, i); + /// println!("{} tasks encountered a scheduling latency between {}us and {}us", count, range.start.as_micros(), range.end.as_micros()); + /// } + /// } + /// }); + /// } + /// ``` + /// + /// [`schedule_latency_histogram_bucket_range()`]: crate::runtime::RuntimeMetrics::schedule_latency_histogram_bucket_range + #[track_caller] + pub fn schedule_latency_histogram_bucket_count(&self, worker: usize, bucket: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .schedule_latency_histogram + .as_ref() + .map(|histogram| histogram.get(bucket)) + .unwrap_or_default() + } + } + feature! { #![all( tokio_unstable, diff --git a/tokio/src/runtime/metrics/schedule_latency.rs b/tokio/src/runtime/metrics/schedule_latency.rs new file mode 100644 index 00000000000..85672664855 --- /dev/null +++ b/tokio/src/runtime/metrics/schedule_latency.rs @@ -0,0 +1,61 @@ +use std::num::NonZeroU64; +use std::time::Instant; + +/// `ScheduleLatencyInstant` tracks the time a task was scheduled. +/// +/// The time a task was scheduled is stored as the number of nanoseconds +/// since startup of the task's scheduler. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [unstable]: crate#unstable-features +#[derive(Copy, Clone)] +pub(crate) struct ScheduleLatencyInstant(Option); + +impl ScheduleLatencyInstant { + /// Create a new `ScheduleLatencyInstant` using the provided scheduler startup Instant. + pub(crate) fn new(scheduler_start: Option) -> Self { + Self(scheduler_start.map(|scheduler_start| { + NonZeroU64::new(scheduler_start.elapsed().as_nanos() as u64).unwrap_or(NonZeroU64::MIN) + })) + } + + /// Prepare a context that can calculate the number of nanoseconds elapsed + /// since this task was scheduled. + pub(crate) fn prepare( + self, + scheduler_start: Option, + ) -> Option { + match (scheduler_start, self.0) { + (Some(scheduler_start), Some(scheduled_at_delta)) => Some(ScheduleLatencyContext { + scheduler_start, + scheduled_at_delta, + }), + _ => None, + } + } +} + +/// `ScheduleLatencyContext` contains all the data required to calculate the time elapsed +/// since a task was scheduled. +/// +/// `ScheduleLatencyInstant` on its own in insufficient because it only contains a delta. +/// The scheduler startup time is required to convert the delta back into an actual time +/// but is omitted from `ScheduleLatencyInstant` to keep its memory size minimal. +pub(crate) struct ScheduleLatencyContext { + scheduler_start: Instant, + scheduled_at_delta: NonZeroU64, +} + +impl ScheduleLatencyContext { + /// Calculate how many nanoseconds have elapsed between `now` and when this task + /// was last scheduled. + pub(crate) fn elapsed_nanos(&self, now: Instant) -> u64 { + let nanos_since_start = now + .saturating_duration_since(self.scheduler_start) + .as_nanos() as u64; + nanos_since_start.saturating_sub(self.scheduled_at_delta.get()) + } +} diff --git a/tokio/src/runtime/metrics/schedule_latency_mock.rs b/tokio/src/runtime/metrics/schedule_latency_mock.rs new file mode 100644 index 00000000000..932fd798841 --- /dev/null +++ b/tokio/src/runtime/metrics/schedule_latency_mock.rs @@ -0,0 +1,31 @@ +//! A mock implementation of the types in `schedule_latency`. These types +//! are zero-sized so that the size of types using them are not increased +//! unless schedule latency tracking is explicitly enabled. + +use std::time::Instant; + +#[derive(Copy, Clone)] +pub(crate) struct ScheduleLatencyInstant(); + +impl ScheduleLatencyInstant { + pub(crate) fn new(_runtime_start: Option) -> Self { + Self() + } + + pub(crate) fn prepare(self, _runtime_start: Option) -> Option { + None + } +} + +pub(crate) struct ScheduleLatencyContext { + _private: (), +} + +impl ScheduleLatencyContext { + // This method is referenced (but never called) when the `schedule-latency` + // feature is disabled and `tokio_unstable` is enabled. + #[allow(dead_code)] + pub(crate) fn elapsed_nanos(&self, _now: Instant) -> u64 { + unimplemented!("This should never be called because prepare() always returns None") + } +} diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 30926b2a6c2..e65c6e335de 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -65,6 +65,10 @@ pub(crate) struct WorkerMetrics { #[cfg(tokio_unstable)] /// If `Some`, tracks the number of polls by duration range. pub(super) poll_count_histogram: Option, + + #[cfg(tokio_unstable)] + /// If `Some`, tracks the number of times tasks were scheduled by duration range. + pub(super) schedule_latency_histogram: Option, } impl WorkerMetrics { @@ -93,6 +97,10 @@ impl WorkerMetrics { .metrics_poll_count_histogram .as_ref() .map(|histogram_builder| histogram_builder.build()); + worker_metrics.schedule_latency_histogram = config + .metrics_schedule_latency_histogram + .as_ref() + .map(|histogram_builder| histogram_builder.build()); worker_metrics } } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index f0b072d577e..536c3d758f1 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -3,7 +3,8 @@ use crate::loom::sync::Arc; use crate::runtime::driver::{self, Driver}; use crate::runtime::scheduler::{self, Defer, Inject}; use crate::runtime::task::{ - self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks, + self, JoinHandle, LocalNotified, OwnedTasks, Schedule, SpawnLocation, Task, + TaskHarnessScheduleHooks, }; use crate::runtime::{ blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics, @@ -20,6 +21,7 @@ use std::task::Poll::{Pending, Ready}; use std::task::Waker; use std::thread::ThreadId; use std::time::Duration; +use std::time::Instant; use std::{fmt, thread}; /// Executes tasks on the current thread @@ -100,6 +102,11 @@ struct Shared { /// This scheduler only has one worker. worker_metrics: WorkerMetrics, + + /// Startup time of this scheduler. + /// + /// This instant is used as the basis of task `scheduled_at` measurements. + started_at: Option, } /// Thread-local context. @@ -145,6 +152,11 @@ impl CurrentThread { .global_queue_interval .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL); + let started_at = config + .metrics_schedule_latency_histogram + .as_ref() + .map(|_| Instant::now()); + let handle = Arc::new(Handle { name, task_hooks: TaskHooks { @@ -162,6 +174,7 @@ impl CurrentThread { config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics, + started_at, }, driver: driver_handle, blocking_spawner, @@ -368,11 +381,28 @@ fn wake_deferred_tasks_and_free(context: &Context) { impl Context { /// Execute the closure with the given scheduler core stored in the /// thread-local context. - fn run_task(&self, mut core: Box, f: impl FnOnce() -> R) -> (Box, R) { - core.metrics.start_poll(); - let mut ret = self.enter(core, || crate::task::coop::budget(f)); - ret.0.metrics.end_poll(); - ret + fn run_task(&self, task: LocalNotified>, mut core: Box) -> Box { + #[cfg(tokio_unstable)] + let task_meta = task.task_meta(); + + core.metrics.start_poll( + task.get_scheduled_at() + .prepare(self.handle.shared.started_at), + ); + + let (mut c, ()) = self.enter(core, || { + crate::task::coop::budget(|| { + #[cfg(tokio_unstable)] + self.handle.task_hooks.poll_start_callback(&task_meta); + + task.run(); + + #[cfg(tokio_unstable)] + self.handle.task_hooks.poll_stop_callback(&task_meta); + }) + }); + c.metrics.end_poll(); + c } /// Blocks the current thread until an event is received by the driver, @@ -638,6 +668,7 @@ cfg_unstable_metrics! { } } +use crate::runtime::metrics::ScheduleLatencyInstant; use std::num::NonZeroU64; impl Handle { @@ -666,6 +697,15 @@ impl Schedule for Arc { fn schedule(&self, task: task::Notified) { use scheduler::Context::CurrentThread; + if self + .shared + .config + .metrics_schedule_latency_histogram + .is_some() + { + task.set_scheduled_at(ScheduleLatencyInstant::new(self.shared.started_at)); + } + context::with_scheduler(|maybe_cx| match maybe_cx { Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { let mut core = cx.core.borrow_mut(); @@ -815,18 +855,7 @@ impl CoreGuard<'_> { let task = context.handle.shared.owned.assert_owner(task); - #[cfg(tokio_unstable)] - let task_meta = task.task_meta(); - - let (c, ()) = context.run_task(core, || { - #[cfg(tokio_unstable)] - context.handle.task_hooks.poll_start_callback(&task_meta); - - task.run(); - - #[cfg(tokio_unstable)] - context.handle.task_hooks.poll_stop_callback(&task_meta); - }); + let c = context.run_task(task, core); core = c; } diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index c59d4373ab8..a1f1f919f31 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -1,3 +1,4 @@ +use crate::runtime::metrics::ScheduleLatencyContext; use crate::runtime::{Config, MetricsBatch, WorkerMetrics}; use std::time::{Duration, Instant}; @@ -113,8 +114,8 @@ impl Stats { } } - pub(crate) fn start_poll(&mut self) { - self.batch.start_poll(); + pub(crate) fn start_poll(&mut self, task_scheduled_at: Option) { + self.batch.start_poll(task_scheduled_at); self.tasks_polled_in_batch += 1; } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index ab3b38c3fd8..1c19e4cefcb 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -74,7 +74,7 @@ use crate::util::rand::{FastRand, RngSeedGenerator}; use std::cell::RefCell; use std::task::Waker; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; mod metrics; @@ -92,6 +92,7 @@ use crate::loom::sync::atomic::AtomicBool; #[cfg(all(tokio_unstable, feature = "time"))] use crate::runtime::time_alt; +use crate::runtime::metrics::ScheduleLatencyInstant; #[cfg(all(tokio_unstable, feature = "time"))] use crate::runtime::scheduler::util; @@ -195,6 +196,11 @@ pub(crate) struct Shared { pub(super) worker_metrics: Box<[WorkerMetrics]>, + /// Startup time of this scheduler. + /// + /// This instant is used as the basis of task `scheduled_at` measurements. + started_at: Option, + /// Only held to trigger some code on drop. This is used to get internal /// runtime metrics that can be useful when doing performance /// investigations. This does nothing (empty struct, no drop impl) unless @@ -303,6 +309,10 @@ pub(super) fn create( let (idle, idle_synced) = Idle::new(size); let (inject, inject_synced) = inject::Shared::new(); + let started_at = config + .metrics_schedule_latency_histogram + .as_ref() + .map(|_| Instant::now()); let remotes_len = remotes.len(); let handle = Arc::new(Handle { @@ -324,6 +334,7 @@ pub(super) fn create( config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: worker_metrics.into_boxed_slice(), + started_at, _counters: Counters, }, driver: driver_handle, @@ -658,7 +669,10 @@ impl Context { // tasks under this measurement. In this case, the tasks came from the // LIFO slot and are considered part of the current task for scheduling // purposes. These tasks inherent the "parent"'s limits. - core.stats.start_poll(); + core.stats.start_poll( + task.get_scheduled_at() + .prepare(self.worker.handle.shared.started_at), + ); // Make the core available to the runtime context *self.core.borrow_mut() = Some(core); @@ -1317,6 +1331,15 @@ impl Worker { impl Handle { pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { + if self + .shared + .config + .metrics_schedule_latency_histogram + .is_some() + { + task.set_scheduled_at(ScheduleLatencyInstant::new(self.shared.started_at)); + } + with_current(|maybe_cx| { if let Some(cx) = maybe_cx { // Make sure the task is part of the **current** scheduler. diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index aa3f61a2217..7be801c4700 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -21,6 +21,7 @@ use crate::future::Future; use crate::loom::cell::UnsafeCell; use crate::runtime::context; +use crate::runtime::metrics::ScheduleLatencyInstant; use crate::runtime::task::raw::{self, Vtable}; use crate::runtime::task::state::State; use crate::runtime::task::{Id, Schedule, TaskHarnessScheduleHooks}; @@ -191,6 +192,9 @@ pub(crate) struct Header { /// The tracing ID for this instrumented task. #[cfg(all(tokio_unstable, feature = "tracing"))] pub(super) tracing_id: Option, + + /// The last time this task was scheduled. Used to measure schedule latency. + pub(super) scheduled_at: UnsafeCell, } unsafe impl Send for Header {} @@ -247,6 +251,7 @@ impl Cell { owner_id: UnsafeCell::new(None), #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, + scheduled_at: UnsafeCell::new(ScheduleLatencyInstant::new(None)), } } @@ -534,6 +539,23 @@ impl Header { pub(super) unsafe fn get_tracing_id(me: &NonNull
) -> Option<&tracing::Id> { me.as_ref().tracing_id.as_ref() } + + /// Updates the last time this task was scheduled. Used to calculate + /// the time elapsed between task scheduling and polling. + /// + /// # Safety + /// + /// The caller must guarantee exclusive access to this field. + pub(super) unsafe fn set_scheduled_at(&self, scheduled_at: ScheduleLatencyInstant) { + self.scheduled_at.with_mut(|ptr| *ptr = scheduled_at); + } + + /// Gets the last time this task was scheduled. + pub(super) fn get_scheduled_at(&self) -> ScheduleLatencyInstant { + // Safety: If there are concurrent writes, then that write has violated + // the safety requirements on `set_scheduled_at`. + unsafe { self.scheduled_at.with(|ptr| *ptr) } + } } impl Trailer { diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 7740c8824a6..c9061c1f0c8 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -226,6 +226,7 @@ use crate::future::Future; use crate::util::linked_list; use crate::util::sharded_list; +use crate::runtime::metrics::ScheduleLatencyInstant; use crate::runtime::TaskCallback; use std::marker::PhantomData; use std::panic::Location; @@ -252,6 +253,15 @@ impl Notified { pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> { self.0.task_meta() } + + pub(crate) fn set_scheduled_at(&self, scheduled_at: ScheduleLatencyInstant) { + // SAFETY: There are no concurrent writes because there is only ever one `Notified` + // reference per task. There are no concurrent reads because this field is only read + // when polling the task, which can only happen after it's scheduled. + unsafe { + self.0.header().set_scheduled_at(scheduled_at); + } + } } // safety: This type cannot be used to touch the task without first verifying @@ -273,6 +283,10 @@ impl LocalNotified { pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> { self.task.task_meta() } + + pub(crate) fn get_scheduled_at(&self) -> ScheduleLatencyInstant { + self.task.header().get_scheduled_at() + } } /// A task that is not owned by any `OwnedTasks`. Used for blocking tasks. diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 37fbb2ad288..b088e5a4291 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -804,6 +804,56 @@ fn io_driver_ready_count() { assert_eq!(metrics.io_driver_ready_count(), 1); } +// Schedule latency tracking is only supported on 64-bit targets +#[cfg(all(feature = "schedule-latency", target_pointer_width = "64"))] +#[test] +fn schedule_latency_counts() { + const N: u64 = 50; + let rts = [ + tokio::runtime::Builder::new_current_thread() + .enable_all() + .enable_metrics_schedule_latency_histogram() + .metrics_schedule_latency_histogram_configuration(HistogramConfiguration::linear( + Duration::from_millis(50), + 3, + )) + .build() + .unwrap(), + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .enable_metrics_schedule_latency_histogram() + .metrics_schedule_latency_histogram_configuration(HistogramConfiguration::linear( + Duration::from_millis(50), + 3, + )) + .build() + .unwrap(), + ]; + + for rt in rts { + let metrics = rt.metrics(); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async {}).await.unwrap(); + } + }); + drop(rt); + + let num_workers = metrics.num_workers(); + let num_buckets = metrics.schedule_latency_histogram_num_buckets(); + + assert!(metrics.schedule_latency_histogram_enabled()); + assert_eq!(num_buckets, 3); + + let n = (0..num_workers) + .flat_map(|i| (0..num_buckets).map(move |j| (i, j))) + .map(|(worker, bucket)| metrics.schedule_latency_histogram_bucket_count(worker, bucket)) + .sum(); + assert_eq!(N, n); + } +} + async fn try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError> { // We use a blocking channel to synchronize the tasks. let (tx, rx) = mpsc::channel();