Skip to content

Commit

Permalink
metrics: add metric for number of tasks (#5628)
Browse files Browse the repository at this point in the history
  • Loading branch information
matildasmeds committed Apr 27, 2023
1 parent 6a8f6f5 commit 1d785fd
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 4 deletions.
19 changes: 19 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ impl RuntimeMetrics {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of active tasks in the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.active_tasks_count();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn active_tasks_count(&self) -> usize {
self.handle.inner.active_tasks_count()
}

/// Returns the number of idle threads, which have spawned by the runtime
/// for `spawn_blocking` calls.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ cfg_metrics! {
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ cfg_rt! {
}
}

pub(crate) fn active_tasks_count(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.active_tasks_count(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.active_tasks_count(),
}
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Handle::CurrentThread(handle) => handle.scheduler_metrics(),
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ cfg_metrics! {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
16 changes: 12 additions & 4 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::Mutex;
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
use crate::util::linked_list::{Link, LinkedList};
use crate::util::linked_list::{CountedLinkedList, Link, LinkedList};

use std::marker::PhantomData;

Expand Down Expand Up @@ -54,9 +54,13 @@ cfg_not_has_atomic_u64! {
}

pub(crate) struct OwnedTasks<S: 'static> {
inner: Mutex<OwnedTasksInner<S>>,
inner: Mutex<CountedOwnedTasksInner<S>>,
id: u64,
}
struct CountedOwnedTasksInner<S: 'static> {
list: CountedLinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
}
pub(crate) struct LocalOwnedTasks<S: 'static> {
inner: UnsafeCell<OwnedTasksInner<S>>,
id: u64,
Expand All @@ -70,8 +74,8 @@ struct OwnedTasksInner<S: 'static> {
impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new() -> Self {
Self {
inner: Mutex::new(OwnedTasksInner {
list: LinkedList::new(),
inner: Mutex::new(CountedOwnedTasksInner {
list: CountedLinkedList::new(),
closed: false,
}),
id: get_next_id(),
Expand Down Expand Up @@ -153,6 +157,10 @@ impl<S: 'static> OwnedTasks<S> {
}
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.inner.lock().list.count()
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
let task_id = task.header().get_owner_id();
if task_id == 0 {
Expand Down
67 changes: 67 additions & 0 deletions tokio/src/util/linked_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,53 @@ impl<L: Link> fmt::Debug for LinkedList<L, L::Target> {
}
}

// ===== impl CountedLinkedList ====

// Delegates operations to the base LinkedList implementation, and adds a counter to the elements
// in the list.
pub(crate) struct CountedLinkedList<L: Link, T> {
list: LinkedList<L, T>,
count: usize,
}

impl<L: Link> CountedLinkedList<L, L::Target> {
pub(crate) fn new() -> CountedLinkedList<L, L::Target> {
CountedLinkedList {
list: LinkedList::new(),
count: 0,
}
}

pub(crate) fn push_front(&mut self, val: L::Handle) {
self.list.push_front(val);
self.count += 1;
}

pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {
let val = self.list.pop_back();
if val.is_some() {
self.count -= 1;
}
val
}

pub(crate) fn is_empty(&self) -> bool {
self.list.is_empty()
}

pub(crate) unsafe fn remove(&mut self, node: NonNull<L::Target>) -> Option<L::Handle> {
let val = self.list.remove(node);
if val.is_some() {
self.count -= 1;
}
val
}

pub(crate) fn count(&self) -> usize {
self.count
}
}

#[cfg(any(
feature = "fs",
feature = "rt",
Expand Down Expand Up @@ -719,6 +766,26 @@ pub(crate) mod tests {
}
}

#[test]
fn count() {
let mut list = CountedLinkedList::<&Entry, <&Entry as Link>::Target>::new();
assert_eq!(0, list.count());

let a = entry(5);
let b = entry(7);
list.push_front(a.as_ref());
list.push_front(b.as_ref());
assert_eq!(2, list.count());

list.pop_back();
assert_eq!(1, list.count());

unsafe {
list.remove(ptr(&b));
}
assert_eq!(0, list.count());
}

/// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module.
#[cfg(fuzzing)]
pub fn fuzz_linked_list(ops: &[u8]) {
Expand Down
17 changes: 17 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ fn blocking_queue_depth() {
assert_eq!(0, rt.metrics().blocking_queue_depth());
}

#[test]
fn active_tasks_count() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
}

#[test]
fn remote_schedule_count() {
use std::thread;
Expand Down

0 comments on commit 1d785fd

Please sign in to comment.