Skip to content

Commit

Permalink
tokio: optimize taskdumps when waiting for dump result
Browse files Browse the repository at this point in the history
Don't just repeatedly yield the thread; wait for it to be notified.

ref: tokio-rs#5717 (comment)
  • Loading branch information
jswrenn committed May 31, 2023
1 parent 4d97e1c commit d636b84
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 15 deletions.
15 changes: 9 additions & 6 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,24 +349,27 @@ cfg_taskdump! {
// perform the trace in a separate thread so that the
// trace itself does not appear in the taskdump.
let handle = handle.clone();
spawn_thread(|| {
spawn_thread(async {
let handle = handle;
handle.dump()
handle.dump().await
}).await
},
}
}
}

/// Spawn a new thread and asynchronously await on its result.
async fn spawn_thread<F, T>(f: F) -> T
async fn spawn_thread<F>(f: F) -> <F as Future>::Output
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static
{
let (tx, rx) = crate::sync::oneshot::channel();
crate::loom::thread::spawn(|| {
let _ = tx.send(f());
let rt = crate::runtime::Builder::new_current_thread().build().unwrap();
rt.block_on(async {
let _ = tx.send(f.await);
});
});
rx.await.unwrap()
}
Expand Down
8 changes: 4 additions & 4 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,23 @@ cfg_metrics! {

cfg_taskdump! {
impl Handle {
pub(crate) fn dump(&self) -> crate::runtime::Dump {
pub(crate) async fn dump(&self) -> crate::runtime::Dump {
let trace_status = &self.shared.trace_status;

// If a dump is in progress, block.
trace_status.start_trace_request(&self);
trace_status.start_trace_request(&self).await;

let result = loop {
if let Some(result) = trace_status.take_result() {
break result;
} else {
self.notify_all();
crate::loom::thread::yield_now();
trace_status.result_ready.notified().await;
}
};

// Allow other queued dumps to proceed.
trace_status.end_trace_request(&self);
trace_status.end_trace_request(&self).await;

result
}
Expand Down
17 changes: 12 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ cfg_taskdump! {
pub(super) trace_requested: AtomicBool,
trace_start: Barrier,
trace_end: Barrier,
pub(super) result_ready: crate::sync::Notify,
pub(super) trace_result: Mutex<Option<Dump>>,
}

Expand All @@ -223,6 +224,7 @@ cfg_taskdump! {
trace_requested: AtomicBool::new(false),
trace_start: Barrier::new(remotes_len),
trace_end: Barrier::new(remotes_len),
result_ready: crate::sync::Notify::new(),
trace_result: Mutex::new(None),
}
}
Expand All @@ -231,29 +233,34 @@ cfg_taskdump! {
self.trace_requested.load(Ordering::Relaxed)
}

pub(super) fn start_trace_request(&self, handle: &Handle) {
pub(super) async fn start_trace_request(&self, handle: &Handle) {
while self.trace_requested.compare_exchange(false,
true,
Ordering::Acquire,
Ordering::Relaxed).is_err()
{
handle.notify_all();
crate::loom::thread::yield_now();
crate::task::yield_now().await;
}
}

fn stash_result(&self, dump: Dump) {
let _ = self.trace_result.lock().insert(dump);
self.result_ready.notify_one();
}

pub(super) fn take_result(&self) -> Option<Dump> {
self.trace_result.lock().take()
}

pub(super) fn end_trace_request(&self, handle: &Handle) {
pub(super) async fn end_trace_request(&self, handle: &Handle) {
while self.trace_requested.compare_exchange(true,
false,
Ordering::Acquire,
Ordering::Relaxed).is_err()
{
handle.notify_all();
crate::loom::thread::yield_now();
crate::task::yield_now().await;
}
}
}
Expand Down Expand Up @@ -1121,7 +1128,7 @@ impl Handle {
let result = dump::Dump::new(traces);

// stash the result
let _ = self.shared.trace_status.trace_result.lock().insert(result);
self.shared.trace_status.stash_result(result);

// allow other workers to proceed
self.shared.trace_status.trace_end.wait();
Expand Down

0 comments on commit d636b84

Please sign in to comment.