Skip to content

Commit

Permalink
taskdump: instrument JoinHandle and tokio::fs (#5676)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed May 9, 2023
1 parent 56239a9 commit 7430865
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 60 deletions.
19 changes: 17 additions & 2 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ impl AsyncRead for File {
cx: &mut Context<'_>,
dst: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
ready!(crate::trace::trace_leaf(cx));
let me = self.get_mut();
let inner = me.inner.get_mut();

Expand Down Expand Up @@ -594,6 +595,7 @@ impl AsyncSeek for File {
}

fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
ready!(crate::trace::trace_leaf(cx));
let inner = self.inner.get_mut();

loop {
Expand Down Expand Up @@ -629,6 +631,7 @@ impl AsyncWrite for File {
cx: &mut Context<'_>,
src: &[u8],
) -> Poll<io::Result<usize>> {
ready!(crate::trace::trace_leaf(cx));
let me = self.get_mut();
let inner = me.inner.get_mut();

Expand Down Expand Up @@ -695,11 +698,13 @@ impl AsyncWrite for File {
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
ready!(crate::trace::trace_leaf(cx));
let inner = self.inner.get_mut();
inner.poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
ready!(crate::trace::trace_leaf(cx));
self.poll_flush(cx)
}
}
Expand Down Expand Up @@ -774,8 +779,18 @@ impl Inner {
async fn complete_inflight(&mut self) {
use crate::future::poll_fn;

if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
self.last_write_err = Some(e.kind());
poll_fn(|cx| self.poll_complete_inflight(cx)).await
}

fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
ready!(crate::trace::trace_leaf(cx));
match self.poll_flush(cx) {
Poll::Ready(Err(e)) => {
self.last_write_err = Some(e.kind());
Poll::Ready(())
}
Poll::Ready(Ok(())) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}

Expand Down
14 changes: 14 additions & 0 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,20 @@ cfg_time! {
pub mod time;
}

mod trace {
cfg_taskdump! {
pub(crate) use crate::runtime::task::trace::trace_leaf;
}

cfg_not_taskdump! {
#[inline(always)]
#[allow(dead_code)]
pub(crate) fn trace_leaf(_: &mut std::task::Context<'_>) -> std::task::Poll<()> {
std::task::Poll::Ready(())
}
}
}

mod util;

/// Due to the `Stream` trait's inclusion in `std` landing later than Tokio's 1.0
Expand Down
12 changes: 10 additions & 2 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,15 @@ macro_rules! cfg_taskdump {
target_arch = "x86_64"
)
))]
#[cfg_attr(docsrs, doc(cfg(all(
$item
)*
};
}

macro_rules! cfg_not_taskdump {
($($item:item)*) => {
$(
#[cfg(not(all(
tokio_unstable,
tokio_taskdump,
feature = "rt",
Expand All @@ -397,7 +405,7 @@ macro_rules! cfg_taskdump {
target_arch = "x86",
target_arch = "x86_64"
)
))))]
)))]
$item
)*
};
Expand Down
15 changes: 13 additions & 2 deletions tokio/src/runtime/defer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ impl Defer {
}
}

pub(crate) fn defer(&mut self, waker: Waker) {
self.deferred.push(waker);
pub(crate) fn defer(&mut self, waker: &Waker) {
// If the same task adds itself a bunch of times, then only add it once.
if let Some(last) = self.deferred.last() {
if last.will_wake(waker) {
return;
}
}
self.deferred.push(waker.clone());
}

pub(crate) fn is_empty(&self) -> bool {
Expand All @@ -24,4 +30,9 @@ impl Defer {
waker.wake();
}
}

#[cfg(tokio_taskdump)]
pub(crate) fn take_deferred(&mut self) -> Vec<Waker> {
std::mem::take(&mut self.deferred)
}
}
15 changes: 15 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,16 @@ fn wake_deferred_tasks() {
context::with_defer(|deferred| deferred.wake());
}

#[cfg(tokio_taskdump)]
fn wake_deferred_tasks_and_free() {
let wakers = context::with_defer(|deferred| deferred.take_deferred());
if let Some(wakers) = wakers {
for waker in wakers {
waker.wake();
}
}
}

// ===== impl Context =====

impl Context {
Expand Down Expand Up @@ -419,6 +429,11 @@ impl Handle {
.collect();
});

// Taking a taskdump could wakes every task, but we probably don't want
// the `yield_now` vector to be that large under normal circumstances.
// Therefore, we free its allocation.
wake_deferred_tasks_and_free();

dump::Dump::new(traces)
}

Expand Down
1 change: 1 addition & 0 deletions tokio/src/runtime/task/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ impl<T> Future for JoinHandle<T> {
type Output = super::Result<T>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(crate::trace::trace_leaf(cx));
let mut ret = Poll::Pending;

// Keep track of task budget
Expand Down
105 changes: 60 additions & 45 deletions tokio/src/runtime/task/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct Frame {
/// An tree execution trace.
///
/// Traces are captured with [`Trace::capture`], rooted with [`Trace::root`]
/// and leaved with [`Trace::leaf`].
/// and leaved with [`trace_leaf`].
#[derive(Clone, Debug)]
pub(crate) struct Trace {
// The linear backtraces that comprise this trace. These linear traces can
Expand Down Expand Up @@ -93,7 +93,7 @@ impl Context {

impl Trace {
/// Invokes `f`, returning both its result and the collection of backtraces
/// captured at each sub-invocation of [`Trace::leaf`].
/// captured at each sub-invocation of [`trace_leaf`].
#[inline(never)]
pub(crate) fn capture<F, R>(f: F) -> (R, Trace)
where
Expand All @@ -116,51 +116,66 @@ impl Trace {
pub(crate) fn root<F>(future: F) -> Root<F> {
Root { future }
}
}

/// If this is a sub-invocation of [`Trace::capture`], capture a backtrace.
///
/// The captured backtrace will be returned by [`Trace::capture`].
///
/// Invoking this function does nothing when it is not a sub-invocation
/// [`Trace::capture`].
// This function is marked `#[inline(never)]` to ensure that it gets a distinct `Frame` in the
// backtrace, below which frames should not be included in the backtrace (since they reflect the
// internal implementation details of this crate).
#[inline(never)]
pub(crate) fn leaf() {
// Safety: We don't manipulate the current context's active frame.
unsafe {
Context::with_current(|context_cell| {
if let Some(mut collector) = context_cell.collector.take() {
let mut frames = vec![];
let mut above_leaf = false;

if let Some(active_frame) = context_cell.active_frame.get() {
let active_frame = active_frame.as_ref();

backtrace::trace(|frame| {
let below_root =
!ptr::eq(frame.symbol_address(), active_frame.inner_addr);

// only capture frames above `Trace::leaf` and below
// `Trace::root`.
if above_leaf && below_root {
frames.push(frame.to_owned().into());
}

if ptr::eq(frame.symbol_address(), Self::leaf as *const _) {
above_leaf = true;
}

// only continue unwinding if we're below `Trace::root`
below_root
});
}
collector.backtraces.push(frames);
context_cell.collector.set(Some(collector));
/// If this is a sub-invocation of [`Trace::capture`], capture a backtrace.
///
/// The captured backtrace will be returned by [`Trace::capture`].
///
/// Invoking this function does nothing when it is not a sub-invocation
/// [`Trace::capture`].
// This function is marked `#[inline(never)]` to ensure that it gets a distinct `Frame` in the
// backtrace, below which frames should not be included in the backtrace (since they reflect the
// internal implementation details of this crate).
#[inline(never)]
pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
// Safety: We don't manipulate the current context's active frame.
let did_trace = unsafe {
Context::with_current(|context_cell| {
if let Some(mut collector) = context_cell.collector.take() {
let mut frames = vec![];
let mut above_leaf = false;

if let Some(active_frame) = context_cell.active_frame.get() {
let active_frame = active_frame.as_ref();

backtrace::trace(|frame| {
let below_root = !ptr::eq(frame.symbol_address(), active_frame.inner_addr);

// only capture frames above `Trace::leaf` and below
// `Trace::root`.
if above_leaf && below_root {
frames.push(frame.to_owned().into());
}

if ptr::eq(frame.symbol_address(), trace_leaf as *const _) {
above_leaf = true;
}

// only continue unwinding if we're below `Trace::root`
below_root
});
}
});
}
collector.backtraces.push(frames);
context_cell.collector.set(Some(collector));
true
} else {
false
}
})
};

if did_trace {
// Use the same logic that `yield_now` uses to send out wakeups after
// the task yields.
let defer = crate::runtime::context::with_defer(|rt| {
rt.defer(cx.waker());
});
debug_assert!(defer.is_some());

Poll::Pending
} else {
Poll::Ready(())
}
}

Expand Down
11 changes: 2 additions & 9 deletions tokio/src/task/yield_now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,7 @@ pub async fn yield_now() {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
#[cfg(all(
tokio_unstable,
tokio_taskdump,
feature = "rt",
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
crate::runtime::task::trace::Trace::leaf();
ready!(crate::trace::trace_leaf(cx));

if self.yielded {
return Poll::Ready(());
Expand All @@ -62,7 +55,7 @@ pub async fn yield_now() {
self.yielded = true;

let defer = context::with_defer(|rt| {
rt.defer(cx.waker().clone());
rt.defer(cx.waker());
});

if defer.is_none() {
Expand Down

0 comments on commit 7430865

Please sign in to comment.