Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: Add a metric that counts currently active tasks #5628

19 changes: 19 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ impl RuntimeMetrics {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of active tasks in the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.active_tasks_count();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn active_tasks_count(&self) -> usize {
self.handle.inner.active_tasks_count()
}

/// Returns the number of idle threads, which have spawned by the runtime
/// for `spawn_blocking` calls.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ cfg_metrics! {
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ cfg_rt! {
}
}

pub(crate) fn active_tasks_count(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.active_tasks_count(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.active_tasks_count(),
}
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Handle::CurrentThread(handle) => handle.scheduler_metrics(),
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ cfg_metrics! {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
18 changes: 13 additions & 5 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::Mutex;
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
use crate::util::linked_list::{Link, LinkedList};
use crate::util::linked_list::{CountedLinkedList, Link, LinkedList};

use std::marker::PhantomData;

Expand Down Expand Up @@ -54,9 +54,13 @@ cfg_not_has_atomic_u64! {
}

pub(crate) struct OwnedTasks<S: 'static> {
inner: Mutex<OwnedTasksInner<S>>,
inner: Mutex<CountedOwnedTasksInner<S>>,
id: u64,
}
struct CountedOwnedTasksInner<S: 'static> {
list: CountedLinkedList<Task<S>, <Task<S> as Link>::Target>,
Comment on lines +60 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to also duplicate this one. For this particular case, I think it's fine to just keep the counter in both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it is not necessary for the code to function correctly, I believe using different types for OwnedTasks and LocalOwnedTasks helps convey our intention more clearly. If we use CountedLinkedList for LocalOwnedTasks, the reader may think we could be counting those too.

While it does add a few lines of code, I believe it is worth it to ensure clarity and readability. Looking forward to hearing your thoughts on this one!

closed: bool,
}
pub(crate) struct LocalOwnedTasks<S: 'static> {
inner: UnsafeCell<OwnedTasksInner<S>>,
id: u64,
Expand All @@ -70,8 +74,8 @@ struct OwnedTasksInner<S: 'static> {
impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new() -> Self {
Self {
inner: Mutex::new(OwnedTasksInner {
list: LinkedList::new(),
inner: Mutex::new(CountedOwnedTasksInner {
list: CountedLinkedList::new(),
closed: false,
}),
id: get_next_id(),
Expand Down Expand Up @@ -153,6 +157,10 @@ impl<S: 'static> OwnedTasks<S> {
}
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.inner.lock().list.count()
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
let task_id = task.header().get_owner_id();
if task_id == 0 {
Expand Down Expand Up @@ -296,4 +304,4 @@ mod tests {
last_id = next_id;
}
}
}
}
68 changes: 68 additions & 0 deletions tokio/src/util/linked_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use core::fmt;
use core::marker::{PhantomData, PhantomPinned};
use core::mem::ManuallyDrop;
use core::ptr::{self, NonNull};
use std::sync::atomic::{AtomicUsize, Ordering};

/// An intrusive linked list.
///
Expand Down Expand Up @@ -228,6 +229,53 @@ impl<L: Link> fmt::Debug for LinkedList<L, L::Target> {
}
}

// ===== impl CountedLinkedList ====

// Delegates operations to the base LinkedList implementation, and adds a counter to the elements
// in the list.
pub(crate) struct CountedLinkedList<L: Link, T> {
_list: LinkedList<L, T>,
_count: AtomicUsize,
matildasmeds marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a mutable reference any time you modify it, so you don't need an atomic.

Suggested change
_list: LinkedList<L, T>,
_count: AtomicUsize,
_list: LinkedList<L, T>,
_count: usize,

If you want an atomic, then you should place it outside of the mutex. In that case, the logic would need to go in OwnedTasks rather than in the linked list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You seem to have marked this as resolved, but it does not appear to have been addressed.

The point of using an atomic is to avoid locking the mutex when accessing the counter. However, because the atomic is inside the mutex, you are still locking the mutex when accessing the counter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My apologies marking this resolved without actually implementing the changes you had requested. It was not intentional, and I regret any inconvenience this may have caused. I will make sure to be more careful in the future to avoid similar mistakes. Thank you for your time and understanding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it looks better now.

There's no need to apologize for something like this. I was not offended or unhappy about it — I just wanted to clarify what I meant in case my review comment was unclear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, all good! 😄

}

impl<L: Link> CountedLinkedList<L, L::Target> {
pub(crate) const fn new() -> CountedLinkedList<L, L::Target> {
CountedLinkedList {
_list: LinkedList::new(),
_count: AtomicUsize::new(0),
}
}

pub(crate) fn push_front(&mut self, val: L::Handle) {
self._list.push_front(val);
self._count.fetch_add(1, Ordering::Relaxed);
}

pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {
let val = self._list.pop_back();
if val.is_some() {
self._count.fetch_sub(1, Ordering::Relaxed);
}
val
}

pub(crate) fn is_empty(&self) -> bool {
self._list.is_empty()
}

pub(crate) unsafe fn remove(&mut self, node: NonNull<L::Target>) -> Option<L::Handle> {
let val = self._list.remove(node);
if val.is_some() {
self._count.fetch_sub(1, Ordering::Relaxed);
}
val
}

pub(crate) fn count(&self) -> usize {
self._count.load(Ordering::Relaxed)
}
}

#[cfg(any(
feature = "fs",
feature = "rt",
Expand Down Expand Up @@ -719,6 +767,26 @@ pub(crate) mod tests {
}
}

#[test]
fn count() {
let mut list = CountedLinkedList::<&Entry, <&Entry as Link>::Target>::new();
assert_eq!(0, list.count());

let a = entry(5);
let b = entry(7);
list.push_front(a.as_ref());
list.push_front(b.as_ref());
assert_eq!(2, list.count());

list.pop_back();
assert_eq!(1, list.count());

unsafe {
list.remove(ptr(&b));
}
assert_eq!(0, list.count());
}

/// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module.
#[cfg(fuzzing)]
pub fn fuzz_linked_list(ops: &[u8]) {
Expand Down
17 changes: 17 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ fn blocking_queue_depth() {
assert_eq!(0, rt.metrics().blocking_queue_depth());
}

#[test]
fn active_tasks_count() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
}

#[test]
fn remote_schedule_count() {
use std::thread;
Expand Down