Skip to content

Commit

Permalink
metrics: stabilize RuntimeMetrics::worker_count
Browse files Browse the repository at this point in the history
This PR stabilizes a single metric API to start the process of stabilizing metrics.
Future work will continue to stabilize more metrics.

Refs: tokio-rs#6546
  • Loading branch information
rcoh committed May 14, 2024
1 parent 6fcd9c0 commit 2ec8720
Show file tree
Hide file tree
Showing 18 changed files with 124 additions and 88 deletions.
1 change: 1 addition & 0 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ compile_error!("The `tokio_taskdump` feature requires `--cfg tokio_unstable`.");

#[cfg(all(
tokio_taskdump,
not(doc),
not(all(
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
Expand Down
24 changes: 21 additions & 3 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,22 +215,40 @@ macro_rules! cfg_macros {
}
}

// need to disable metrics on loom
macro_rules! cfg_metrics {
($($item:item)*) => {
$(
// For now, metrics is only disabled in loom tests.
// When stabilized, it might have a dedicated feature flag.
#[cfg(not(loom))]
$item
)*
}
}

macro_rules! cfg_unstable_metrics {
($($item:item)*) => {
$(
#[cfg(all(tokio_unstable, not(loom)))]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
$item
)*
}
}

macro_rules! cfg_not_unstable_metrics {
($($item:item)*) => {
$(
#[cfg(any(not(tokio_unstable), loom))]
$item
)*
}

}

macro_rules! cfg_not_metrics {
($($item:item)*) => {
$(
#[cfg(not(all(tokio_unstable, not(loom))))]
#[cfg(loom)]
$item
)*
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl SpawnerMetrics {
self.num_idle_threads.load(Ordering::Relaxed)
}

cfg_metrics! {
cfg_unstable_metrics! {
fn queue_depth(&self) -> usize {
self.queue_depth.load(Ordering::Relaxed)
}
Expand Down Expand Up @@ -474,7 +474,7 @@ impl Spawner {
}
}

cfg_metrics! {
cfg_unstable_metrics! {
impl Spawner {
pub(crate) fn num_threads(&self) -> usize {
self.inner.metrics.num_threads()
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ impl Builder {
}
}

cfg_metrics! {
cfg_unstable_metrics! {
/// Enables tracking the distribution of task poll times.
///
/// Task poll times are not instrumented by default as doing so requires
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ cfg_coop! {
}

cfg_rt! {
cfg_metrics! {
cfg_unstable_metrics! {
#[inline(always)]
fn inc_budget_forced_yield_count() {
let _ = context::with_current(|handle| {
Expand All @@ -206,7 +206,7 @@ cfg_coop! {
}
}

cfg_not_metrics! {
cfg_not_unstable_metrics! {
#[inline(always)]
fn inc_budget_forced_yield_count() {}
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/io/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cfg_not_rt_and_metrics_and_net! {

cfg_net! {
cfg_rt! {
cfg_metrics! {
cfg_unstable_metrics! {
pub(crate) use crate::runtime::IoDriverMetrics;
}
}
Expand Down
39 changes: 23 additions & 16 deletions tokio/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,34 @@
#![allow(clippy::module_inception)]

cfg_metrics! {
mod batch;
pub(crate) use batch::MetricsBatch;

mod histogram;
pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder};
#[allow(unreachable_pub)] // rust-lang/rust#57411
pub use histogram::HistogramScale;

mod runtime;
#[allow(unreachable_pub)] // rust-lang/rust#57411
pub use runtime::RuntimeMetrics;

mod scheduler;
pub(crate) use scheduler::SchedulerMetrics;
cfg_unstable_metrics! {
mod batch;
pub(crate) use batch::MetricsBatch;

mod histogram;
pub(crate) use histogram::{HistogramBatch, HistogramBuilder, Histogram};

mod scheduler;
pub(crate) use scheduler::SchedulerMetrics;

mod worker;
pub(crate) use worker::WorkerMetrics;

cfg_net! {
mod io;
pub(crate) use io::IoDriverMetrics;
}
#[allow(unreachable_pub)] // rust-lang/rust#57411
pub use histogram::HistogramScale;
}

mod worker;
pub(crate) use worker::WorkerMetrics;
cfg_not_unstable_metrics! {
mod mock;

cfg_net! {
mod io;
pub(crate) use io::IoDriverMetrics;
pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder};
}
}

Expand Down
17 changes: 10 additions & 7 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use crate::runtime::Handle;

use std::ops::Range;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;

/// Handle to the runtime's metrics.
///
/// This handle is internally reference-counted and can be freely cloned. A
Expand All @@ -15,6 +11,12 @@ pub struct RuntimeMetrics {
handle: Handle,
}

cfg_unstable_metrics! {
use std::time::Duration;
use std::sync::atomic::Ordering::Relaxed;
use std::ops::Range;
}

impl RuntimeMetrics {
pub(crate) fn new(handle: Handle) -> RuntimeMetrics {
RuntimeMetrics { handle }
Expand Down Expand Up @@ -43,6 +45,8 @@ impl RuntimeMetrics {
self.handle.inner.num_workers()
}

cfg_unstable_metrics! {

/// Returns the number of additional threads spawned by the runtime.
///
/// The number of workers is set by configuring `max_blocking_threads` on
Expand Down Expand Up @@ -833,10 +837,8 @@ impl RuntimeMetrics {
pub fn blocking_queue_depth(&self) -> usize {
self.handle.inner.blocking_queue_depth()
}
}

cfg_net! {
impl RuntimeMetrics {
cfg_net! {
/// Returns the number of file descriptors that have been registered with the
/// runtime's I/O driver.
///
Expand Down Expand Up @@ -921,4 +923,5 @@ cfg_net! {
.unwrap_or(0)
}
}
}
}
20 changes: 11 additions & 9 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,21 +388,23 @@ cfg_rt! {
mod thread_id;
pub(crate) use thread_id::ThreadId;

pub(crate) mod metrics;
cfg_metrics! {
mod metrics;
pub use metrics::{RuntimeMetrics, HistogramScale};
pub use metrics::RuntimeMetrics;

pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder};

cfg_net! {
pub(crate) use metrics::IoDriverMetrics;
cfg_unstable_metrics! {
pub use metrics::HistogramScale;


cfg_net! {
pub(crate) use metrics::IoDriverMetrics;
}
}
}

cfg_not_metrics! {
pub(crate) mod metrics;
pub(crate) use metrics::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder};
}
pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder};


/// After thread starts / before thread stops
type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
Expand Down
18 changes: 9 additions & 9 deletions tokio/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,14 @@ impl Runtime {
pub fn shutdown_background(self) {
self.shutdown_timeout(Duration::from_nanos(0));
}

cfg_metrics! {
/// Returns a view that lets you get information about how the runtime
/// is performing.
pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
self.handle.metrics()
}
}
}

#[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
Expand Down Expand Up @@ -487,12 +495,4 @@ impl std::panic::UnwindSafe for Runtime {}

impl std::panic::RefUnwindSafe for Runtime {}

cfg_metrics! {
impl Runtime {
/// Returns a view that lets you get information about how the runtime
/// is performing.
pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
self.handle.metrics()
}
}
}
impl Runtime {}
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl Handle {
}
}

cfg_metrics! {
cfg_unstable_metrics! {
impl Handle {
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/inject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cfg_rt_multi_thread! {
mod rt_multi_thread;
}

cfg_metrics! {
cfg_unstable_metrics! {
mod metrics;
}

Expand Down
9 changes: 7 additions & 2 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ cfg_rt! {
}

cfg_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};

impl Handle {
pub(crate) fn num_workers(&self) -> usize {
match self {
Expand All @@ -176,6 +174,12 @@ cfg_rt! {
Handle::MultiThreadAlt(handle) => handle.num_workers(),
}
}
}

cfg_unstable_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
impl Handle {


pub(crate) fn num_blocking_threads(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.num_blocking_threads())
Expand Down Expand Up @@ -210,6 +214,7 @@ cfg_rt! {
}
}
}
}

impl Context {
#[track_caller]
Expand Down
4 changes: 1 addition & 3 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use crate::util::RngSeedGenerator;

use std::fmt;

cfg_metrics! {
mod metrics;
}
mod metrics;

cfg_taskdump! {
mod taskdump;
Expand Down
52 changes: 27 additions & 25 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,43 @@
use super::Handle;

use crate::runtime::{SchedulerMetrics, WorkerMetrics};

impl Handle {
pub(crate) fn num_workers(&self) -> usize {
self.shared.worker_metrics.len()
}
}

pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}
cfg_unstable_metrics! {
impl Handle {
pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}
pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}
pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
pub(crate) fn scheduler_metrics(&self) -> &crate::runtime::SchedulerMetrics {
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.shared.worker_metrics[worker]
}
pub(crate) fn worker_metrics(&self, worker: usize) -> &crate::runtime::WorkerMetrics {
&self.shared.worker_metrics[worker]
}

pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}

0 comments on commit 2ec8720

Please sign in to comment.