Skip to content

Commit

Permalink
rt: instrument task poll times with a histogram (#5685)
Browse files Browse the repository at this point in the history
Adds support for instrumenting the poll times of all spawned tasks. Data is tracked in a histogram. The user must specify the histogram scale and bucket ranges. Implementation-wise, the same strategy is used in the runtime where we are just using atomic counters. Because instrumenting each poll duration will result in frequent calls to `Instant::now()`, I think it should be an opt-in metric.
  • Loading branch information
carllerche committed May 15, 2023
1 parent a883fd4 commit c84d0a1
Show file tree
Hide file tree
Showing 14 changed files with 1,105 additions and 40 deletions.
149 changes: 148 additions & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::runtime::handle::Handle;
use crate::runtime::{blocking, driver, Callback, Runtime};
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
use crate::util::rand::{RngSeed, RngSeedGenerator};

use std::fmt;
Expand Down Expand Up @@ -95,6 +95,12 @@ pub struct Builder {
/// Specify a random number generator seed to provide deterministic results
pub(super) seed_generator: RngSeedGenerator,

/// When true, enables task poll count histogram instrumentation.
pub(super) metrics_poll_count_histogram_enable: bool,

/// Configures the task poll count histogram
pub(super) metrics_poll_count_histogram: HistogramBuilder,

#[cfg(tokio_unstable)]
pub(super) unhandled_panic: UnhandledPanic,
}
Expand Down Expand Up @@ -268,6 +274,10 @@ impl Builder {
#[cfg(tokio_unstable)]
unhandled_panic: UnhandledPanic::Ignore,

metrics_poll_count_histogram_enable: false,

metrics_poll_count_histogram: Default::default(),

disable_lifo_slot: false,
}
}
Expand Down Expand Up @@ -877,6 +887,133 @@ impl Builder {
}
}

cfg_metrics! {
/// Enables tracking the distribution of task poll times.
///
/// Task poll times are not instrumented by default as doing so requires
/// calling [`Instant::now()`] twice per task poll, which could add
/// measurable overhead. Use the [`Handle::metrics()`] to access the
/// metrics data.
///
/// The histogram uses fixed bucket sizes. In other words, the histogram
/// buckets are not dynamic based on input values. Use the
/// `metrics_poll_count_histogram_` builder methods to configure the
/// histogram details.
///
/// # Examples
///
/// ```
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new_multi_thread()
/// .enable_metrics_poll_count_histogram()
/// .build()
/// .unwrap();
/// # // Test default values here
/// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
/// # let m = rt.handle().metrics();
/// # assert_eq!(m.poll_count_histogram_num_buckets(), 10);
/// # assert_eq!(m.poll_count_histogram_bucket_range(0), us(0)..us(100));
/// # assert_eq!(m.poll_count_histogram_bucket_range(1), us(100)..us(200));
/// ```
///
/// [`Handle::metrics()`]: crate::runtime::Handle::metrics
/// [`Instant::now()`]: std::time::Instant::now
pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
self.metrics_poll_count_histogram_enable = true;
self
}

/// Sets the histogram scale for tracking the distribution of task poll
/// times.
///
/// Tracking the distribution of task poll times can be done using a
/// linear or log scale. When using linear scale, each histogram bucket
/// will represent the same range of poll times. When using log scale,
/// each histogram bucket will cover a range twice as big as the
/// previous bucket.
///
/// **Default:** linear scale.
///
/// # Examples
///
/// ```
/// use tokio::runtime::{self, HistogramScale};
///
/// let rt = runtime::Builder::new_multi_thread()
/// .enable_metrics_poll_count_histogram()
/// .metrics_poll_count_histogram_scale(HistogramScale::Log)
/// .build()
/// .unwrap();
/// ```
pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
self.metrics_poll_count_histogram.scale = histogram_scale;
self
}

/// Sets the histogram resolution for tracking the distribution of task
/// poll times.
///
/// The resolution is the histogram's first bucket's range. When using a
/// linear histogram scale, each bucket will cover the same range. When
/// using a log scale, each bucket will cover a range twice as big as
/// the previous bucket. In the log case, the resolution represents the
/// smallest bucket range.
///
/// Note that, when using log scale, the resolution is rounded up to the
/// nearest power of 2 in nanoseconds.
///
/// **Default:** 100 microseconds.
///
/// # Examples
///
/// ```
/// use tokio::runtime;
/// use std::time::Duration;
///
/// let rt = runtime::Builder::new_multi_thread()
/// .enable_metrics_poll_count_histogram()
/// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
/// .build()
/// .unwrap();
/// ```
pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
assert!(resolution > Duration::from_secs(0));
// Sanity check the argument and also make the cast below safe.
assert!(resolution <= Duration::from_secs(1));

let resolution = resolution.as_nanos() as u64;
self.metrics_poll_count_histogram.resolution = resolution;
self
}

/// Sets the number of buckets for the histogram tracking the
/// distribution of task poll times.
///
/// The last bucket tracks all greater values that fall out of other
/// ranges. So, configuring the histogram using a linear scale,
/// resolution of 50ms, and 10 buckets, the 10th bucket will track task
/// polls that take more than 450ms to complete.
///
/// **Default:** 10
///
/// # Examples
///
/// ```
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new_multi_thread()
/// .enable_metrics_poll_count_histogram()
/// .metrics_poll_count_histogram_buckets(15)
/// .build()
/// .unwrap();
/// ```
pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
self.metrics_poll_count_histogram.num_buckets = buckets;
self
}
}

fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::scheduler::{self, CurrentThread};
use crate::runtime::{runtime::Scheduler, Config};
Expand Down Expand Up @@ -909,6 +1046,7 @@ impl Builder {
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
);

Expand All @@ -922,6 +1060,14 @@ impl Builder {
blocking_pool,
))
}

fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
if self.metrics_poll_count_histogram_enable {
Some(self.metrics_poll_count_histogram.clone())
} else {
None
}
}
}

cfg_io_driver! {
Expand Down Expand Up @@ -1050,6 +1196,7 @@ cfg_rt_multi_thread! {
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
);

Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub(crate) struct Config {
/// deterministic way.
pub(crate) seed_generator: RngSeedGenerator,

/// How to build poll time histograms
pub(crate) metrics_poll_count_histogram: Option<crate::runtime::HistogramBuilder>,

#[cfg(tokio_unstable)]
/// How to respond to unhandled task panics.
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
Expand Down
61 changes: 51 additions & 10 deletions tokio/src/runtime/metrics/batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::runtime::WorkerMetrics;
use crate::runtime::metrics::{HistogramBatch, WorkerMetrics};

use std::sync::atomic::Ordering::Relaxed;
use std::time::Instant;
use std::time::{Duration, Instant};

pub(crate) struct MetricsBatch {
/// Number of times the worker parked.
Expand Down Expand Up @@ -32,11 +32,26 @@ pub(crate) struct MetricsBatch {

/// The total busy duration in nanoseconds.
busy_duration_total: u64,

/// Instant at which work last resumed (continued after park).
last_resume_time: Instant,

/// If `Some`, tracks poll times in nanoseconds
poll_timer: Option<PollTimer>,
}

struct PollTimer {
/// Histogram of poll counts within each band.
poll_counts: HistogramBatch,

/// Instant when the most recent task started polling.
poll_started_at: Instant,
}

impl MetricsBatch {
pub(crate) fn new() -> MetricsBatch {
pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
let now = Instant::now();

MetricsBatch {
park_count: 0,
noop_count: 0,
Expand All @@ -47,7 +62,14 @@ impl MetricsBatch {
local_schedule_count: 0,
overflow_count: 0,
busy_duration_total: 0,
last_resume_time: Instant::now(),
last_resume_time: now,
poll_timer: worker_metrics
.poll_count_histogram
.as_ref()
.map(|worker_poll_counts| PollTimer {
poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
poll_started_at: now,
}),
}
}

Expand All @@ -68,6 +90,11 @@ impl MetricsBatch {
.local_schedule_count
.store(self.local_schedule_count, Relaxed);
worker.overflow_count.store(self.overflow_count, Relaxed);

if let Some(poll_timer) = &self.poll_timer {
let dst = worker.poll_count_histogram.as_ref().unwrap();
poll_timer.poll_counts.submit(dst);
}
}

/// The worker is about to park.
Expand All @@ -81,8 +108,22 @@ impl MetricsBatch {
}

let busy_duration = self.last_resume_time.elapsed();
let busy_duration = u64::try_from(busy_duration.as_nanos()).unwrap_or(u64::MAX);
self.busy_duration_total += busy_duration;
self.busy_duration_total += duration_as_u64(busy_duration);
}

pub(crate) fn start_poll(&mut self) {
self.poll_count += 1;

if let Some(poll_timer) = &mut self.poll_timer {
poll_timer.poll_started_at = Instant::now();
}
}

pub(crate) fn end_poll(&mut self) {
if let Some(poll_timer) = &mut self.poll_timer {
let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
poll_timer.poll_counts.measure(elapsed, 1);
}
}

pub(crate) fn returned_from_park(&mut self) {
Expand All @@ -92,10 +133,6 @@ impl MetricsBatch {
pub(crate) fn inc_local_schedule_count(&mut self) {
self.local_schedule_count += 1;
}

pub(crate) fn incr_poll_count(&mut self) {
self.poll_count += 1;
}
}

cfg_rt_multi_thread! {
Expand All @@ -113,3 +150,7 @@ cfg_rt_multi_thread! {
}
}
}

fn duration_as_u64(dur: Duration) -> u64 {
u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
}

0 comments on commit c84d0a1

Please sign in to comment.