Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

taskdump: instrument the remaining leaf futures #5708

Merged
merged 9 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions tokio/src/io/util/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl AsyncRead for Empty {
cx: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(()))
}
Expand All @@ -61,6 +62,7 @@ impl AsyncRead for Empty {
impl AsyncBufRead for Empty {
#[inline]
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(&[]))
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/io/util/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl AsyncRead for Pipe {
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
ready!(crate::trace::trace_leaf(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let ret = self.poll_read_internal(cx, buf);
Expand All @@ -249,6 +250,7 @@ impl AsyncRead for Pipe {
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
ready!(crate::trace::trace_leaf(cx));
self.poll_read_internal(cx, buf)
}
}
Expand All @@ -261,6 +263,7 @@ impl AsyncWrite for Pipe {
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
ready!(crate::trace::trace_leaf(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let ret = self.poll_write_internal(cx, buf);
Expand All @@ -277,6 +280,7 @@ impl AsyncWrite for Pipe {
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
ready!(crate::trace::trace_leaf(cx));
self.poll_write_internal(cx, buf)
}
}
Expand Down
20 changes: 20 additions & 0 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ cfg_time! {
}

mod trace {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

cfg_taskdump! {
pub(crate) use crate::runtime::task::trace::trace_leaf;
}
Expand All @@ -579,6 +583,22 @@ mod trace {
std::task::Poll::Ready(())
}
}

#[cfg_attr(not(feature = "sync"), allow(dead_code))]
pub(crate) fn async_trace_leaf() -> impl Future<Output = ()> {
struct Trace;

impl Future for Trace {
type Output = ();

#[inline(always)]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
trace_leaf(cx)
}
}

Trace
}
}

mod util;
Expand Down
1 change: 1 addition & 0 deletions tokio/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ where
type Output = Result<T, E>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(crate::trace::trace_leaf(cx));
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ cfg_rt! {
cfg_taskdump! {
/// SAFETY: Callers of this function must ensure that trace frames always
/// form a valid linked list.
pub(crate) unsafe fn with_trace<R>(f: impl FnOnce(&trace::Context) -> R) -> R {
CONTEXT.with(|c| f(&c.trace))
pub(crate) unsafe fn with_trace<R>(f: impl FnOnce(&trace::Context) -> R) -> Option<R> {
CONTEXT.try_with(|c| f(&c.trace)).ok()
}
}
}
Expand Down
1 change: 1 addition & 0 deletions tokio/src/runtime/io/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl Registration {
cx: &mut Context<'_>,
direction: Direction,
) -> Poll<io::Result<ReadyEvent>> {
ready!(crate::trace::trace_leaf(cx));
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));
Expand Down
18 changes: 14 additions & 4 deletions tokio/src/runtime/task/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ pin_project_lite::pin_project! {
}
}

const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \
as part of shutting down the curren \
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
thread, so collecting a taskdump is not \
possible.";

impl Context {
pub(crate) const fn new() -> Self {
Context {
Expand All @@ -70,7 +75,7 @@ impl Context {

/// SAFETY: Callers of this function must ensure that trace frames always
/// form a valid linked list.
unsafe fn with_current<F, R>(f: F) -> R
unsafe fn try_with_current<F, R>(f: F) -> Option<R>
where
F: FnOnce(&Self) -> R,
{
Expand All @@ -81,14 +86,18 @@ impl Context {
where
F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,
{
Self::with_current(|context| f(&context.active_frame))
Self::try_with_current(|context| f(&context.active_frame)).expect(FAIL_NO_THREAD_LOCAL)
}

fn with_current_collector<F, R>(f: F) -> R
where
F: FnOnce(&Cell<Option<Trace>>) -> R,
{
unsafe { Self::with_current(|context| f(&context.collector)) }
// SAFETY: This call can only access the collector field, so it cannot
// break the trace frame linked list.
unsafe {
Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL)
}
}
}

Expand Down Expand Up @@ -132,7 +141,7 @@ impl Trace {
pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
// Safety: We don't manipulate the current context's active frame.
let did_trace = unsafe {
Context::with_current(|context_cell| {
Context::try_with_current(|context_cell| {
if let Some(mut collector) = context_cell.collector.take() {
let mut frames = vec![];
let mut above_leaf = false;
Expand Down Expand Up @@ -164,6 +173,7 @@ pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
false
}
})
.unwrap_or(false)
};

if did_trace {
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/sync/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ impl Barrier {
return self.wait_internal().await;
}
async fn wait_internal(&self) -> BarrierWaitResult {
crate::trace::async_trace_leaf().await;

// NOTE: we are taking a _synchronous_ lock here.
// It is okay to do so because the critical section is fast and never yields, so it cannot
// deadlock even if another future is concurrently holding the lock.
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,8 @@ where
type Output = Result<T, RecvError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
ready!(crate::trace::trace_leaf(cx));

let (receiver, waiter) = self.project();

let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,8 @@ impl<T> Sender<T> {
}

async fn reserve_inner(&self) -> Result<(), SendError<()>> {
crate::trace::async_trace_leaf().await;

match self.chan.semaphore().semaphore.acquire(1).await {
Ok(_) => Ok(()),
Err(_) => Err(SendError(())),
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ impl<T, S: Semaphore> Rx<T, S> {
pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
use super::block::Read::*;

ready!(crate::trace::trace_leaf(cx));

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

Expand Down
2 changes: 2 additions & 0 deletions tokio/src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ impl<T: ?Sized> Mutex<T> {
}

async fn acquire(&self) {
crate::trace::async_trace_leaf().await;

self.s.acquire(1).await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and
// we own it exclusively, which means that this can never happen.
Expand Down
19 changes: 15 additions & 4 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ impl Notified<'_> {

let (notify, state, notify_waiters_calls, waiter) = self.project();

loop {
'outer_loop: loop {
match *state {
Init => {
let curr = notify.state.load(SeqCst);
Expand All @@ -901,7 +901,7 @@ impl Notified<'_> {
if res.is_ok() {
// Acquired the notification
*state = Done;
return Poll::Ready(());
continue 'outer_loop;
}

// Clone the waker before locking, a waker clone can be
Expand All @@ -919,7 +919,7 @@ impl Notified<'_> {
// was created, then we are done
if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
*state = Done;
return Poll::Ready(());
continue 'outer_loop;
}

// Transition the state to WAITING.
Expand Down Expand Up @@ -955,7 +955,7 @@ impl Notified<'_> {
Ok(_) => {
// Acquired the notification
*state = Done;
return Poll::Ready(());
continue 'outer_loop;
}
Err(actual) => {
assert_eq!(get_state(actual), EMPTY);
Expand Down Expand Up @@ -990,6 +990,12 @@ impl Notified<'_> {
return Poll::Pending;
}
Waiting => {
#[cfg(tokio_taskdump)]
if let Some(waker) = waker {
let mut ctx = Context::from_waker(waker);
ready!(crate::trace::trace_leaf(&mut ctx));
}

if waiter.notification.load(Acquire).is_some() {
// Safety: waiter is already unlinked and will not be shared again,
// so we have an exclusive access to `waker`.
Expand Down Expand Up @@ -1078,6 +1084,11 @@ impl Notified<'_> {
drop(old_waker);
}
Done => {
#[cfg(tokio_taskdump)]
if let Some(waker) = waker {
let mut ctx = Context::from_waker(waker);
ready!(crate::trace::trace_leaf(&mut ctx));
}
return Poll::Ready(());
}
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/sync/once_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ impl<T> OnceCell<T> {
F: FnOnce() -> Fut,
Fut: Future<Output = T>,
{
crate::trace::async_trace_leaf().await;

if self.initialized() {
// SAFETY: The OnceCell has been fully initialized.
unsafe { self.get_unchecked() }
Expand Down Expand Up @@ -349,6 +351,8 @@ impl<T> OnceCell<T> {
F: FnOnce() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
crate::trace::async_trace_leaf().await;

if self.initialized() {
// SAFETY: The OnceCell has been fully initialized.
unsafe { Ok(self.get_unchecked()) }
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,8 @@ impl<T> Sender<T> {
/// }
/// ```
pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
ready!(crate::trace::trace_leaf(cx));

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

Expand Down Expand Up @@ -1130,6 +1132,7 @@ impl<T> Inner<T> {
}

fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
ready!(crate::trace::trace_leaf(cx));
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,8 @@ async fn changed_impl<T>(
shared: &Shared<T>,
version: &mut Version,
) -> Result<(), error::RecvError> {
crate::trace::async_trace_leaf().await;

loop {
// In order to avoid a race condition, we first request a notification,
// **then** check the current value's version. If a new version exists,
Expand Down Expand Up @@ -1038,6 +1040,8 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn closed(&self) {
crate::trace::async_trace_leaf().await;

while self.receiver_count() > 0 {
let notified = self.shared.notify_tx.notified();

Expand Down
1 change: 1 addition & 0 deletions tokio/src/task/consume_budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub async fn consume_budget() {
let mut status = Poll::Pending;

crate::future::poll_fn(move |cx| {
ready!(crate::trace::trace_leaf(cx));
if status.is_ready() {
return status;
}
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/time/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ impl Sleep {
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();

ready!(crate::trace::trace_leaf(cx));

// Keep track of task budget
#[cfg(all(tokio_unstable, feature = "tracing"))]
let coop = ready!(trace_poll_op!(
Expand Down