Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

taskdump: implement task dumps for multi-thread runtime #5717

Merged
merged 4 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 20 additions & 12 deletions examples/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
#[tokio::main(flavor = "current_thread")]
#[tokio::main]
async fn main() {
use std::hint::black_box;

Expand All @@ -22,21 +22,29 @@ async fn main() {

#[inline(never)]
async fn c() {
black_box(tokio::task::yield_now()).await
loop {
tokio::task::yield_now().await;
}
}

tokio::spawn(a());
tokio::spawn(b());
tokio::spawn(c());
async fn dump() {
let handle = tokio::runtime::Handle::current();
let dump = handle.dump().await;

let handle = tokio::runtime::Handle::current();
let dump = handle.dump();

for (i, task) in dump.tasks().iter().enumerate() {
let trace = task.trace();
println!("task {i} trace:");
println!("{trace}");
for (i, task) in dump.tasks().iter().enumerate() {
let trace = task.trace();
println!("task {i} trace:");
println!("{trace}\n");
}
}

tokio::select!(
biased;
_ = tokio::spawn(a()) => {},
_ = tokio::spawn(b()) => {},
_ = tokio::spawn(c()) => {},
_ = dump() => {},
);
}

#[cfg(not(all(
Expand Down
217 changes: 217 additions & 0 deletions tokio/src/loom/std/barrier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
//! A `Barrier` that provides `wait_timeout`.
//!
//! This implementation mirrors that of the Rust standard library.

use crate::loom::sync::{Condvar, Mutex};
use std::fmt;
use std::time::{Duration, Instant};

/// A barrier enables multiple threads to synchronize the beginning
/// of some computation.
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Barrier};
/// use std::thread;
///
/// let mut handles = Vec::with_capacity(10);
/// let barrier = Arc::new(Barrier::new(10));
/// for _ in 0..10 {
/// let c = Arc::clone(&barrier);
/// // The same messages will be printed together.
/// // You will NOT see any interleaving.
/// handles.push(thread::spawn(move|| {
/// println!("before wait");
/// c.wait();
/// println!("after wait");
/// }));
/// }
/// // Wait for other threads to finish.
/// for handle in handles {
/// handle.join().unwrap();
/// }
/// ```
pub(crate) struct Barrier {
lock: Mutex<BarrierState>,
cvar: Condvar,
num_threads: usize,
}

// The inner state of a double barrier
struct BarrierState {
count: usize,
generation_id: usize,
}

/// A `BarrierWaitResult` is returned by [`Barrier::wait()`] when all threads
/// in the [`Barrier`] have rendezvoused.
///
/// # Examples
///
/// ```
/// use std::sync::Barrier;
///
/// let barrier = Barrier::new(1);
/// let barrier_wait_result = barrier.wait();
/// ```
pub(crate) struct BarrierWaitResult(bool);

impl fmt::Debug for Barrier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Barrier").finish_non_exhaustive()
}
}

impl Barrier {
/// Creates a new barrier that can block a given number of threads.
///
/// A barrier will block `n`-1 threads which call [`wait()`] and then wake
/// up all threads at once when the `n`th thread calls [`wait()`].
///
/// [`wait()`]: Barrier::wait
///
/// # Examples
///
/// ```
/// use std::sync::Barrier;
///
/// let barrier = Barrier::new(10);
/// ```
#[must_use]
pub(crate) fn new(n: usize) -> Barrier {
Barrier {
lock: Mutex::new(BarrierState {
count: 0,
generation_id: 0,
}),
cvar: Condvar::new(),
num_threads: n,
}
}

/// Blocks the current thread until all threads have rendezvoused here.
///
/// Barriers are re-usable after all threads have rendezvoused once, and can
/// be used continuously.
///
/// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that
/// returns `true` from [`BarrierWaitResult::is_leader()`] when returning
/// from this function, and all other threads will receive a result that
/// will return `false` from [`BarrierWaitResult::is_leader()`].
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Barrier};
/// use std::thread;
///
/// let mut handles = Vec::with_capacity(10);
/// let barrier = Arc::new(Barrier::new(10));
/// for _ in 0..10 {
/// let c = Arc::clone(&barrier);
/// // The same messages will be printed together.
/// // You will NOT see any interleaving.
/// handles.push(thread::spawn(move|| {
/// println!("before wait");
/// c.wait();
/// println!("after wait");
/// }));
/// }
/// // Wait for other threads to finish.
/// for handle in handles {
/// handle.join().unwrap();
/// }
/// ```
pub(crate) fn wait(&self) -> BarrierWaitResult {
let mut lock = self.lock.lock();
let local_gen = lock.generation_id;
lock.count += 1;
if lock.count < self.num_threads {
// We need a while loop to guard against spurious wakeups.
// https://en.wikipedia.org/wiki/Spurious_wakeup
while local_gen == lock.generation_id {
lock = self.cvar.wait(lock).unwrap();
}
BarrierWaitResult(false)
} else {
lock.count = 0;
lock.generation_id = lock.generation_id.wrapping_add(1);
self.cvar.notify_all();
BarrierWaitResult(true)
}
}

/// Blocks the current thread until all threads have rendezvoused here for
/// at most `timeout` duration.
pub(crate) fn wait_timeout(&self, timeout: Duration) -> Option<BarrierWaitResult> {
// This implementation mirrors `wait`, but with each blocking operation
// replaced by a timeout-amenable alternative.

let deadline = Instant::now() + timeout;

// Acquire `self.lock` with at most `timeout` duration.
let mut lock = loop {
if let Some(guard) = self.lock.try_lock() {
break guard;
} else if Instant::now() > deadline {
return None;
} else {
std::thread::yield_now();
}
};

// Shrink the `timeout` to account for the time taken to acquire `lock`.
let timeout = deadline.saturating_duration_since(Instant::now());

let local_gen = lock.generation_id;
lock.count += 1;
if lock.count < self.num_threads {
// We need a while loop to guard against spurious wakeups.
// https://en.wikipedia.org/wiki/Spurious_wakeup
while local_gen == lock.generation_id {
let (guard, timeout_result) = self.cvar.wait_timeout(lock, timeout).unwrap();
lock = guard;
if timeout_result.timed_out() {
return None;
}
}
Some(BarrierWaitResult(false))
} else {
lock.count = 0;
lock.generation_id = lock.generation_id.wrapping_add(1);
self.cvar.notify_all();
Some(BarrierWaitResult(true))
}
}
}

impl fmt::Debug for BarrierWaitResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BarrierWaitResult")
.field("is_leader", &self.is_leader())
.finish()
}
}

impl BarrierWaitResult {
/// Returns `true` if this thread is the "leader thread" for the call to
/// [`Barrier::wait()`].
///
/// Only one thread will have `true` returned from their result, all other
/// threads will have `false` returned.
///
/// # Examples
///
/// ```
/// use std::sync::Barrier;
///
/// let barrier = Barrier::new(1);
/// let barrier_wait_result = barrier.wait();
/// println!("{:?}", barrier_wait_result.is_leader());
/// ```
#[must_use]
pub(crate) fn is_leader(&self) -> bool {
self.0
}
}
3 changes: 3 additions & 0 deletions tokio/src/loom/std/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod atomic_u16;
mod atomic_u32;
mod atomic_u64;
mod atomic_usize;
mod barrier;
mod mutex;
#[cfg(feature = "parking_lot")]
mod parking_lot;
Expand Down Expand Up @@ -76,6 +77,8 @@ pub(crate) mod sync {

pub(crate) use std::sync::atomic::{fence, AtomicBool, AtomicPtr, AtomicU8, Ordering};
}

pub(crate) use super::barrier::Barrier;
}

pub(crate) mod sys {
Expand Down
31 changes: 28 additions & 3 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,40 @@ cfg_metrics! {
cfg_taskdump! {
impl Handle {
/// Capture a snapshot of this runtime's state.
pub fn dump(&self) -> crate::runtime::Dump {
pub async fn dump(&self) -> crate::runtime::Dump {
match &self.inner {
scheduler::Handle::CurrentThread(handle) => handle.dump(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) =>
unimplemented!("taskdumps are unsupported on the multi-thread runtime"),
scheduler::Handle::MultiThread(handle) => {
// perform the trace in a separate thread so that the
// trace itself does not appear in the taskdump.
let handle = handle.clone();
spawn_thread(async {
let handle = handle;
handle.dump().await
}).await
},
}
}
}

cfg_rt_multi_thread! {
/// Spawn a new thread and asynchronously await on its result.
async fn spawn_thread<F>(f: F) -> <F as Future>::Output
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static
{
let (tx, rx) = crate::sync::oneshot::channel();
crate::loom::thread::spawn(|| {
let rt = crate::runtime::Builder::new_current_thread().build().unwrap();
rt.block_on(async {
let _ = tx.send(f.await);
});
});
rx.await.unwrap()
}
}
}

/// Error returned by `try_current` when no Runtime has been started
Expand Down
25 changes: 25 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,31 @@ cfg_metrics! {
}
}

cfg_taskdump! {
impl Handle {
pub(crate) async fn dump(&self) -> crate::runtime::Dump {
let trace_status = &self.shared.trace_status;

// If a dump is in progress, block.
trace_status.start_trace_request(&self).await;

let result = loop {
if let Some(result) = trace_status.take_result() {
break result;
} else {
self.notify_all();
trace_status.result_ready.notified().await;
}
};

// Allow other queued dumps to proceed.
trace_status.end_trace_request(&self).await;

result
}
}
}

impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("multi_thread::Handle { ... }").finish()
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub(crate) mod queue;
mod worker;
pub(crate) use worker::{Context, Launch, Shared};

cfg_taskdump! {
pub(crate) use worker::Synced;
}

pub(crate) use worker::block_in_place;

use crate::loom::sync::Arc;
Expand Down