From a8577e890e47857d0a0ac8d23933ae792942341b Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Wed, 27 Sep 2023 18:35:12 +0000 Subject: [PATCH 1/8] rt: do not trace tasks while locking `OwnedTasks` If a polled tasks completes while `OwnedTasks` is locked, it will not be able to remove itself from `OwnedTasks`, resulting in a deadlock. Fixes #6035. --- tokio/src/runtime/task/trace/mod.rs | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index af2c5644b1a..fa74ebca874 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -275,6 +275,8 @@ pub(in crate::runtime) fn trace_current_thread( task.as_raw().state().transition_to_notified_for_tracing(); // store the raw tasks into a vec tasks.push(task.as_raw()); + // do NOT poll `task` here, since we hold a lock on `owned` and the task + // may complete and need to remove itself from `owned`. }); tasks @@ -317,20 +319,28 @@ cfg_rt_multi_thread! { drop(synced); // notify each task - let mut traces = vec![]; + let mut tasks = vec![]; owned.for_each(|task| { // set the notified bit task.as_raw().state().transition_to_notified_for_tracing(); + // store the raw tasks into a vec + tasks.push(task.as_raw()); + // do NOT poll `task` here, since we hold a lock on `owned` and the task + // may complete and need to remove itself from `owned`. + }); - // trace the task - let ((), trace) = Trace::capture(|| task.as_raw().poll()); - traces.push(trace); + tasks + .into_iter() + .map(|task| { + // trace the task + let ((), trace) = Trace::capture(|| task.poll()); - // reschedule the task - let _ = task.as_raw().state().transition_to_notified_by_ref(); - task.as_raw().schedule(); - }); + // reschedule the task + let _ = task.state().transition_to_notified_by_ref(); + task.schedule(); - traces + trace + }) + .collect() } } From db49d5575b241b0334f46c0d71ad24b87a016a17 Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Fri, 29 Sep 2023 16:06:27 +0000 Subject: [PATCH 2/8] remove needless re-scheduling and add regression tests --- tokio/src/runtime/handle.rs | 8 ++++ tokio/src/runtime/task/trace/mod.rs | 15 +++++--- tokio/tests/dump.rs | 57 +++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 5 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 0c6fd0d2f2c..999352d6f18 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -543,6 +543,14 @@ cfg_taskdump! { scheduler::Handle::MultiThreadAlt(_) => panic!("task dump not implemented for this runtime flavor"), } } + + /// Produces `true` if the current task is being traced for a dump; + /// otherwise false. This function is only public for integration + /// testing purposes. Do not rely on it. + #[doc(hidden)] + pub fn is_tracing() -> bool { + super::task::trace::Context::is_tracing() + } } cfg_rt_multi_thread! { diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index fa74ebca874..5fb9e71474e 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -100,6 +100,16 @@ impl Context { Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL) } } + + /// Produces `true` if the current task is being traced; otherwise false. + pub(crate) fn is_tracing() -> bool { + Self::with_current_collector(|maybe_collector| { + let collector = maybe_collector.take(); + let result = collector.is_some(); + maybe_collector.set(collector); + result + }) + } } impl Trace { @@ -334,11 +344,6 @@ cfg_rt_multi_thread! { .map(|task| { // trace the task let ((), trace) = Trace::capture(|| task.poll()); - - // reschedule the task - let _ = task.state().transition_to_notified_by_ref(); - task.schedule(); - trace }) .collect() diff --git a/tokio/tests/dump.rs b/tokio/tests/dump.rs index 658ee4b9bfc..4da0c9e8e18 100644 --- a/tokio/tests/dump.rs +++ b/tokio/tests/dump.rs @@ -97,3 +97,60 @@ fn multi_thread() { ); }); } + +/// Regression tests for #6035. +/// +/// These tests ensure that dumping will not deadlock if a future completes +/// during a trace. +mod future_completes_during_trace { + use super::*; + + use core::future::{poll_fn, Future}; + + /// A future that completes only during a trace. + fn complete_during_trace() -> impl Future + Send { + use std::task::Poll; + poll_fn(|cx| { + if Handle::is_tracing() { + Poll::Ready(()) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + } + + #[test] + fn current_thread() { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + async fn dump() { + let handle = Handle::current(); + let _dump = handle.dump().await; + } + + rt.block_on(async { + let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump()); + }); + } + + #[test] + fn multi_thread() { + let rt = runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + async fn dump() { + let handle = Handle::current(); + let _dump = handle.dump().await; + } + + rt.block_on(async { + let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump()); + }); + } +} From 2849fb3b374210cae446dabe71e24fb861cc8618 Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Mon, 2 Oct 2023 17:27:54 +0000 Subject: [PATCH 3/8] rt: don't persist raw tasks --- tokio/src/runtime/task/mod.rs | 10 +++++ tokio/src/runtime/task/trace/mod.rs | 69 +++++++++++++---------------- 2 files changed, 42 insertions(+), 37 deletions(-) diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index abf7cc266e7..75157625252 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -444,6 +444,16 @@ impl UnownedTask { } } +impl Clone for Task { + fn clone(&self) -> Task { + // SAFETY: We increment the ref count. + unsafe { + self.raw.ref_inc(); + Task::new(self.raw) + } + } +} + impl Drop for Task { fn drop(&mut self) { // Decrement the ref count diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 5fb9e71474e..f6f1d9ef72f 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -278,24 +278,8 @@ pub(in crate::runtime) fn trace_current_thread( drop(task); } - // notify each task - let mut tasks = vec![]; - owned.for_each(|task| { - // set the notified bit - task.as_raw().state().transition_to_notified_for_tracing(); - // store the raw tasks into a vec - tasks.push(task.as_raw()); - // do NOT poll `task` here, since we hold a lock on `owned` and the task - // may complete and need to remove itself from `owned`. - }); - - tasks - .into_iter() - .map(|task| { - let ((), trace) = Trace::capture(|| task.poll()); - trace - }) - .collect() + // precondition: We have drained the tasks from the injection queue. + trace_owned(owned) } cfg_rt_multi_thread! { @@ -328,24 +312,35 @@ cfg_rt_multi_thread! { drop(synced); - // notify each task - let mut tasks = vec![]; - owned.for_each(|task| { - // set the notified bit - task.as_raw().state().transition_to_notified_for_tracing(); - // store the raw tasks into a vec - tasks.push(task.as_raw()); - // do NOT poll `task` here, since we hold a lock on `owned` and the task - // may complete and need to remove itself from `owned`. - }); - - tasks - .into_iter() - .map(|task| { - // trace the task - let ((), trace) = Trace::capture(|| task.poll()); - trace - }) - .collect() + // precondition: we have drained the tasks from the local and injection + // queues. + trace_owned(owned) } } + +/// Trace the `OwnedTasks`. +/// +/// # Preconditions +/// +/// This helper presumes exclusive access to each task. The tasks must not exist +/// in any other queue. +fn trace_owned(owned: &OwnedTasks>) -> Vec { + // notify each task + let mut tasks = vec![]; + owned.for_each(|task| { + // set the notified bit + task.as_raw().state().transition_to_notified_for_tracing(); + // store the tasks into a vec + tasks.push(task.clone()); + // do NOT poll `task` here, since we hold a lock on `owned` and the task + // may complete and need to remove itself from `owned`. + }); + + tasks + .into_iter() + .map(|task| { + let ((), trace) = Trace::capture(|| task.as_raw().poll()); + trace + }) + .collect() +} From fe4b3a85487f22ea24fa9c2d2d5dc5f1e5ffad3d Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Wed, 4 Oct 2023 20:33:41 +0000 Subject: [PATCH 4/8] encapsulate ref-count management for tracing See https://github.com/tokio-rs/tokio/pull/6036#discussion_r1345723108 --- tokio/src/runtime/task/mod.rs | 7 +++++++ tokio/src/runtime/task/raw.rs | 3 ++- tokio/src/runtime/task/trace/mod.rs | 15 +++++++-------- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 75157625252..66a57c75b1a 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -361,6 +361,13 @@ impl Task { fn header_ptr(&self) -> NonNull
{ self.raw.header_ptr() } + + cfg_taskdump! { + pub(super) fn notify_for_tracing(&self) -> Notified { + self.as_raw().state().transition_to_notified_for_tracing(); + Notified(self.clone()) + } + } } impl Notified { diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index 8078859285d..e6f46a65358 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -236,7 +236,8 @@ impl RawTask { /// Increment the task's reference count. /// - /// Currently, this is used only when creating an `AbortHandle`. + /// Currently, this is used only when creating an `AbortHandle`, + /// and when cloning a `Task`. pub(super) fn ref_inc(self) { self.header().state.ref_inc(); } diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index f6f1d9ef72f..185d682a47c 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -18,7 +18,7 @@ mod tree; use symbol::Symbol; use tree::Tree; -use super::{Notified, OwnedTasks}; +use super::{Notified, OwnedTasks, Schedule}; type Backtrace = Vec; type SymbolTrace = Vec; @@ -324,22 +324,21 @@ cfg_rt_multi_thread! { /// /// This helper presumes exclusive access to each task. The tasks must not exist /// in any other queue. -fn trace_owned(owned: &OwnedTasks>) -> Vec { +fn trace_owned(owned: &OwnedTasks) -> Vec { // notify each task let mut tasks = vec![]; owned.for_each(|task| { - // set the notified bit - task.as_raw().state().transition_to_notified_for_tracing(); - // store the tasks into a vec - tasks.push(task.clone()); - // do NOT poll `task` here, since we hold a lock on `owned` and the task + // notify the task (and thus make it poll-able) and stash it + tasks.push(task.notify_for_tracing()); + // we do not poll it here since we hold a lock on `owned` and the task // may complete and need to remove itself from `owned`. }); tasks .into_iter() .map(|task| { - let ((), trace) = Trace::capture(|| task.as_raw().poll()); + let local_notified = owned.assert_owner(task); + let ((), trace) = Trace::capture(|| local_notified.run()); trace }) .collect() From 1a904c8eae0c4a98b7355606a0b425abe423ab78 Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Fri, 6 Oct 2023 12:47:37 +0000 Subject: [PATCH 5/8] don't increment refcount in `transition_to_notified_for_tracing` `notify_for_tracing` already increments refcount with its `clone`, and `LocalNotified::run` forgets (not drops) the underlying task. --- tokio/src/runtime/task/state.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 12f54491813..81209abb9d9 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -281,7 +281,6 @@ impl State { pub(super) fn transition_to_notified_for_tracing(&self) { self.fetch_update_action(|mut snapshot| { snapshot.set_notified(); - snapshot.ref_inc(); ((), Some(snapshot)) }); } From 4264f61442b5bb55ece0da6671e3c48b494838d3 Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Fri, 6 Oct 2023 18:25:18 +0000 Subject: [PATCH 6/8] don't clone in `notify_for_tracing` --- tokio/src/runtime/task/mod.rs | 4 ++-- tokio/src/runtime/task/trace/mod.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 66a57c75b1a..6204d0597f2 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -363,9 +363,9 @@ impl Task { } cfg_taskdump! { - pub(super) fn notify_for_tracing(&self) -> Notified { + pub(super) fn notify_for_tracing(self) -> Notified { self.as_raw().state().transition_to_notified_for_tracing(); - Notified(self.clone()) + Notified(self) } } } diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 185d682a47c..cc6b95d2b63 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -329,7 +329,7 @@ fn trace_owned(owned: &OwnedTasks) -> Vec { let mut tasks = vec![]; owned.for_each(|task| { // notify the task (and thus make it poll-able) and stash it - tasks.push(task.notify_for_tracing()); + tasks.push(task.clone().notify_for_tracing()); // we do not poll it here since we hold a lock on `owned` and the task // may complete and need to remove itself from `owned`. }); From f7706733635f1c2df155556a0e5d134609badb2d Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Fri, 6 Oct 2023 18:56:14 +0000 Subject: [PATCH 7/8] don't clone in `trace_owned` --- tokio/src/runtime/task/mod.rs | 5 +++-- tokio/src/runtime/task/state.rs | 1 + tokio/src/runtime/task/trace/mod.rs | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 6204d0597f2..ae7f967fb05 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -363,9 +363,10 @@ impl Task { } cfg_taskdump! { - pub(super) fn notify_for_tracing(self) -> Notified { + pub(super) fn notify_for_tracing(&self) -> Notified { self.as_raw().state().transition_to_notified_for_tracing(); - Notified(self) + // SAFETY: `transition_to_notified_for_tracing` increments the refcount. + unsafe { Notified(Task::new(self.raw)) } } } } diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 81209abb9d9..12f54491813 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -281,6 +281,7 @@ impl State { pub(super) fn transition_to_notified_for_tracing(&self) { self.fetch_update_action(|mut snapshot| { snapshot.set_notified(); + snapshot.ref_inc(); ((), Some(snapshot)) }); } diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index cc6b95d2b63..185d682a47c 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -329,7 +329,7 @@ fn trace_owned(owned: &OwnedTasks) -> Vec { let mut tasks = vec![]; owned.for_each(|task| { // notify the task (and thus make it poll-able) and stash it - tasks.push(task.clone().notify_for_tracing()); + tasks.push(task.notify_for_tracing()); // we do not poll it here since we hold a lock on `owned` and the task // may complete and need to remove itself from `owned`. }); From 05a342256809b981721b130423627936dbbc0498 Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Fri, 6 Oct 2023 19:21:52 +0000 Subject: [PATCH 8/8] remove dead `Clone` impl for `Task` --- tokio/src/runtime/task/mod.rs | 10 ---------- tokio/src/runtime/task/raw.rs | 3 +-- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index ae7f967fb05..e73ad93d4fa 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -452,16 +452,6 @@ impl UnownedTask { } } -impl Clone for Task { - fn clone(&self) -> Task { - // SAFETY: We increment the ref count. - unsafe { - self.raw.ref_inc(); - Task::new(self.raw) - } - } -} - impl Drop for Task { fn drop(&mut self) { // Decrement the ref count diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index e6f46a65358..8078859285d 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -236,8 +236,7 @@ impl RawTask { /// Increment the task's reference count. /// - /// Currently, this is used only when creating an `AbortHandle`, - /// and when cloning a `Task`. + /// Currently, this is used only when creating an `AbortHandle`. pub(super) fn ref_inc(self) { self.header().state.ref_inc(); }