diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 0c6fd0d2f2c..999352d6f18 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -543,6 +543,14 @@ cfg_taskdump! { scheduler::Handle::MultiThreadAlt(_) => panic!("task dump not implemented for this runtime flavor"), } } + + /// Produces `true` if the current task is being traced for a dump; + /// otherwise false. This function is only public for integration + /// testing purposes. Do not rely on it. + #[doc(hidden)] + pub fn is_tracing() -> bool { + super::task::trace::Context::is_tracing() + } } cfg_rt_multi_thread! { diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index abf7cc266e7..e73ad93d4fa 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -361,6 +361,14 @@ impl Task { fn header_ptr(&self) -> NonNull
{ self.raw.header_ptr() } + + cfg_taskdump! { + pub(super) fn notify_for_tracing(&self) -> Notified { + self.as_raw().state().transition_to_notified_for_tracing(); + // SAFETY: `transition_to_notified_for_tracing` increments the refcount. + unsafe { Notified(Task::new(self.raw)) } + } + } } impl Notified { diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index af2c5644b1a..185d682a47c 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -18,7 +18,7 @@ mod tree; use symbol::Symbol; use tree::Tree; -use super::{Notified, OwnedTasks}; +use super::{Notified, OwnedTasks, Schedule}; type Backtrace = Vec; type SymbolTrace = Vec; @@ -100,6 +100,16 @@ impl Context { Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL) } } + + /// Produces `true` if the current task is being traced; otherwise false. + pub(crate) fn is_tracing() -> bool { + Self::with_current_collector(|maybe_collector| { + let collector = maybe_collector.take(); + let result = collector.is_some(); + maybe_collector.set(collector); + result + }) + } } impl Trace { @@ -268,22 +278,8 @@ pub(in crate::runtime) fn trace_current_thread( drop(task); } - // notify each task - let mut tasks = vec![]; - owned.for_each(|task| { - // set the notified bit - task.as_raw().state().transition_to_notified_for_tracing(); - // store the raw tasks into a vec - tasks.push(task.as_raw()); - }); - - tasks - .into_iter() - .map(|task| { - let ((), trace) = Trace::capture(|| task.poll()); - trace - }) - .collect() + // precondition: We have drained the tasks from the injection queue. + trace_owned(owned) } cfg_rt_multi_thread! { @@ -316,21 +312,34 @@ cfg_rt_multi_thread! { drop(synced); - // notify each task - let mut traces = vec![]; - owned.for_each(|task| { - // set the notified bit - task.as_raw().state().transition_to_notified_for_tracing(); - - // trace the task - let ((), trace) = Trace::capture(|| task.as_raw().poll()); - traces.push(trace); + // precondition: we have drained the tasks from the local and injection + // queues. + trace_owned(owned) + } +} - // reschedule the task - let _ = task.as_raw().state().transition_to_notified_by_ref(); - task.as_raw().schedule(); - }); +/// Trace the `OwnedTasks`. +/// +/// # Preconditions +/// +/// This helper presumes exclusive access to each task. The tasks must not exist +/// in any other queue. +fn trace_owned(owned: &OwnedTasks) -> Vec { + // notify each task + let mut tasks = vec![]; + owned.for_each(|task| { + // notify the task (and thus make it poll-able) and stash it + tasks.push(task.notify_for_tracing()); + // we do not poll it here since we hold a lock on `owned` and the task + // may complete and need to remove itself from `owned`. + }); - traces - } + tasks + .into_iter() + .map(|task| { + let local_notified = owned.assert_owner(task); + let ((), trace) = Trace::capture(|| local_notified.run()); + trace + }) + .collect() } diff --git a/tokio/tests/dump.rs b/tokio/tests/dump.rs index 658ee4b9bfc..4da0c9e8e18 100644 --- a/tokio/tests/dump.rs +++ b/tokio/tests/dump.rs @@ -97,3 +97,60 @@ fn multi_thread() { ); }); } + +/// Regression tests for #6035. +/// +/// These tests ensure that dumping will not deadlock if a future completes +/// during a trace. +mod future_completes_during_trace { + use super::*; + + use core::future::{poll_fn, Future}; + + /// A future that completes only during a trace. + fn complete_during_trace() -> impl Future + Send { + use std::task::Poll; + poll_fn(|cx| { + if Handle::is_tracing() { + Poll::Ready(()) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + } + + #[test] + fn current_thread() { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + async fn dump() { + let handle = Handle::current(); + let _dump = handle.dump().await; + } + + rt.block_on(async { + let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump()); + }); + } + + #[test] + fn multi_thread() { + let rt = runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + async fn dump() { + let handle = Handle::current(); + let _dump = handle.dump().await; + } + + rt.block_on(async { + let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump()); + }); + } +}