Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: Add a metric that counts currently active tasks #5628

19 changes: 19 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ impl RuntimeMetrics {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of active tasks in the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.active_tasks_count();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn active_tasks_count(&self) -> usize {
self.handle.inner.active_tasks_count()
}

/// Returns the number of idle threads, which have spawned by the runtime
/// for `spawn_blocking` calls.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ cfg_metrics! {
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ cfg_rt! {
}
}

pub(crate) fn active_tasks_count(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.active_tasks_count(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.active_tasks_count(),
}
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Handle::CurrentThread(handle) => handle.scheduler_metrics(),
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ cfg_metrics! {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
16 changes: 12 additions & 4 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::Mutex;
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
use crate::util::linked_list::{Link, LinkedList};
use crate::util::linked_list::{CountedLinkedList, Link, LinkedList};

use std::marker::PhantomData;

Expand Down Expand Up @@ -54,9 +54,13 @@ cfg_not_has_atomic_u64! {
}

pub(crate) struct OwnedTasks<S: 'static> {
inner: Mutex<OwnedTasksInner<S>>,
inner: Mutex<CountedOwnedTasksInner<S>>,
id: u64,
}
struct CountedOwnedTasksInner<S: 'static> {
list: CountedLinkedList<Task<S>, <Task<S> as Link>::Target>,
Comment on lines +60 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to also duplicate this one. For this particular case, I think it's fine to just keep the counter in both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it is not necessary for the code to function correctly, I believe using different types for OwnedTasks and LocalOwnedTasks helps convey our intention more clearly. If we use CountedLinkedList for LocalOwnedTasks, the reader may think we could be counting those too.

While it does add a few lines of code, I believe it is worth it to ensure clarity and readability. Looking forward to hearing your thoughts on this one!

closed: bool,
}
pub(crate) struct LocalOwnedTasks<S: 'static> {
inner: UnsafeCell<OwnedTasksInner<S>>,
id: u64,
Expand All @@ -70,8 +74,8 @@ struct OwnedTasksInner<S: 'static> {
impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new() -> Self {
Self {
inner: Mutex::new(OwnedTasksInner {
list: LinkedList::new(),
inner: Mutex::new(CountedOwnedTasksInner {
list: CountedLinkedList::new(),
closed: false,
}),
id: get_next_id(),
Expand Down Expand Up @@ -153,6 +157,10 @@ impl<S: 'static> OwnedTasks<S> {
}
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.inner.lock().list.count()
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
let task_id = task.header().get_owner_id();
if task_id == 0 {
Expand Down
67 changes: 67 additions & 0 deletions tokio/src/util/linked_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,53 @@ impl<L: Link> fmt::Debug for LinkedList<L, L::Target> {
}
}

// ===== impl CountedLinkedList ====

// Delegates operations to the base LinkedList implementation, and adds a counter to the elements
// in the list.
pub(crate) struct CountedLinkedList<L: Link, T> {
list: LinkedList<L, T>,
count: usize,
}

impl<L: Link> CountedLinkedList<L, L::Target> {
pub(crate) fn new() -> CountedLinkedList<L, L::Target> {
CountedLinkedList {
list: LinkedList::new(),
count: 0,
}
}

pub(crate) fn push_front(&mut self, val: L::Handle) {
self.list.push_front(val);
self.count += 1;
}

pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {
let val = self.list.pop_back();
if val.is_some() {
self.count -= 1;
}
val
}

pub(crate) fn is_empty(&self) -> bool {
self.list.is_empty()
}

pub(crate) unsafe fn remove(&mut self, node: NonNull<L::Target>) -> Option<L::Handle> {
let val = self.list.remove(node);
if val.is_some() {
self.count -= 1;
}
val
}

pub(crate) fn count(&self) -> usize {
self.count
}
}

#[cfg(any(
feature = "fs",
feature = "rt",
Expand Down Expand Up @@ -719,6 +766,26 @@ pub(crate) mod tests {
}
}

#[test]
fn count() {
let mut list = CountedLinkedList::<&Entry, <&Entry as Link>::Target>::new();
assert_eq!(0, list.count());

let a = entry(5);
let b = entry(7);
list.push_front(a.as_ref());
list.push_front(b.as_ref());
assert_eq!(2, list.count());

list.pop_back();
assert_eq!(1, list.count());

unsafe {
list.remove(ptr(&b));
}
assert_eq!(0, list.count());
}

/// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module.
#[cfg(fuzzing)]
pub fn fuzz_linked_list(ops: &[u8]) {
Expand Down
17 changes: 17 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ fn blocking_queue_depth() {
assert_eq!(0, rt.metrics().blocking_queue_depth());
}

#[test]
fn active_tasks_count() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
}

#[test]
fn remote_schedule_count() {
use std::thread;
Expand Down