Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: skip notified tasks during taskdumps #6194

Merged
merged 2 commits into from Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 11 additions & 4 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -363,10 +363,17 @@ impl<S: 'static> Task<S> {
}

cfg_taskdump! {
pub(super) fn notify_for_tracing(&self) -> Notified<S> {
self.as_raw().state().transition_to_notified_for_tracing();
// SAFETY: `transition_to_notified_for_tracing` increments the refcount.
unsafe { Notified(Task::new(self.raw)) }
/// Notify the task for task dumping.
///
/// Returns `None` if the task has already been notified.
pub(super) fn notify_for_tracing(&self) -> Option<Notified<S>> {
if self.as_raw().state().transition_to_notified_for_tracing() {
// SAFETY: `transition_to_notified_for_tracing` increments the
// refcount.
Some(unsafe { Notified(Task::new(self.raw)) })
} else {
None
}
}
}
}
Expand Down
20 changes: 14 additions & 6 deletions tokio/src/runtime/task/state.rs
Expand Up @@ -270,20 +270,28 @@ impl State {
})
}

/// Transitions the state to `NOTIFIED`, unconditionally increasing the ref count.
/// Transitions the state to `NOTIFIED`, unconditionally increasing the ref
/// count.
///
/// Returns `true` if the notified bit was transitioned from `0` to `1`;
/// otherwise `false.`
#[cfg(all(
tokio_unstable,
tokio_taskdump,
feature = "rt",
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
pub(super) fn transition_to_notified_for_tracing(&self) {
pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
self.fetch_update_action(|mut snapshot| {
snapshot.set_notified();
snapshot.ref_inc();
((), Some(snapshot))
});
if snapshot.is_notified() {
(false, None)
} else {
snapshot.set_notified();
snapshot.ref_inc();
(true, Some(snapshot))
}
})
}

/// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
Expand Down
39 changes: 26 additions & 13 deletions tokio/src/runtime/task/trace/mod.rs
Expand Up @@ -272,14 +272,19 @@ pub(in crate::runtime) fn trace_current_thread(
injection: &Inject<Arc<current_thread::Handle>>,
) -> Vec<Trace> {
// clear the local and injection queues
local.clear();

let mut dequeued = Vec::new();

while let Some(task) = local.pop_back() {
dequeued.push(task);
}

while let Some(task) = injection.pop() {
drop(task);
dequeued.push(task);
}

// precondition: We have drained the tasks from the injection queue.
trace_owned(owned)
trace_owned(owned, dequeued)
}

cfg_rt_multi_thread! {
Expand All @@ -299,22 +304,24 @@ cfg_rt_multi_thread! {
synced: &Mutex<Synced>,
injection: &Shared<Arc<multi_thread::Handle>>,
) -> Vec<Trace> {
let mut dequeued = Vec::new();

// clear the local queue
while let Some(notified) = local.pop() {
drop(notified);
dequeued.push(notified);
}

// clear the injection queue
let mut synced = synced.lock();
while let Some(notified) = injection.pop(&mut synced.inject) {
drop(notified);
dequeued.push(notified);
}

drop(synced);

// precondition: we have drained the tasks from the local and injection
// queues.
trace_owned(owned)
trace_owned(owned, dequeued)
}
}

Expand All @@ -324,14 +331,20 @@ cfg_rt_multi_thread! {
///
/// This helper presumes exclusive access to each task. The tasks must not exist
/// in any other queue.
fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>) -> Vec<Trace> {
// notify each task
let mut tasks = vec![];
fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<Trace> {
let mut tasks = dequeued;
// Notify and trace all un-notified tasks. The dequeued tasks are already
// notified and so do not need to be re-notified.
owned.for_each(|task| {
// notify the task (and thus make it poll-able) and stash it
tasks.push(task.notify_for_tracing());
// we do not poll it here since we hold a lock on `owned` and the task
// may complete and need to remove itself from `owned`.
// Notify the task (and thus make it poll-able) and stash it. This fails
// if the task is already notified. In these cases, we skip tracing the
// task.
if let Some(notified) = task.notify_for_tracing() {
tasks.push(notified);
}
// We do not poll tasks here, since we hold a lock on `owned` and the
// task may complete and need to remove itself from `owned`. Polling
// such a task here would result in a deadlock.
});

tasks
Expand Down
41 changes: 41 additions & 0 deletions tokio/tests/dump.rs
Expand Up @@ -147,10 +147,51 @@ mod future_completes_during_trace {
async fn dump() {
let handle = Handle::current();
let _dump = handle.dump().await;
tokio::task::yield_now().await;
}

rt.block_on(async {
let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump());
});
}
}

/// Regression test for #6051.
///
/// This test ensures that tasks notified outside of a worker will not be
/// traced, since doing so will un-set their notified bit prior to them being
/// run and panic.
#[test]
fn notified_during_tracing() {
let rt = runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(3)
.build()
.unwrap();

let timeout = async {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
};

let timer = rt.spawn(async {
loop {
tokio::time::sleep(tokio::time::Duration::from_nanos(1)).await;
}
});

let dump = async {
loop {
let handle = Handle::current();
let _dump = handle.dump().await;
}
};

rt.block_on(async {
tokio::select!(
biased;
_ = timeout => {},
_ = timer => {},
_ = dump => {},
);
});
}