Skip to content

Commit

Permalink
runtime: fix possible starvation when using lifo slot (#5686)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed May 15, 2023
1 parent dd9471d commit 70364b7
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
11 changes: 11 additions & 0 deletions tokio/src/runtime/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ cfg_rt_multi_thread! {
pub(crate) fn set(budget: Budget) {
let _ = context::budget(|cell| cell.set(budget));
}

/// Consume one unit of progress from the current task's budget.
pub(crate) fn consume_one() {
let _ = context::budget(|cell| {
let mut budget = cell.get();
if let Some(ref mut counter) = budget.0 {
*counter = counter.saturating_sub(1);
}
cell.set(budget);
});
}
}

cfg_rt! {
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@ impl Context {
None => return Ok(core),
};

// Polling a task doesn't necessarily consume any budget, if it
// doesn't use any Tokio leaf futures. To prevent such tasks
// from using the lifo slot in an infinite loop, we consume an
// extra unit of budget between each iteration of the loop.
coop::consume_one();

if coop::has_budget_remaining() {
// Run the LIFO task, then loop
core.metrics.incr_poll_count();
Expand Down
29 changes: 28 additions & 1 deletion tokio/tests/rt_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ fn single_thread() {
let _ = runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build();
.build()
.unwrap();
}

#[test]
Expand Down Expand Up @@ -160,6 +161,32 @@ fn many_multishot_futures() {
}
}

#[test]
fn lifo_slot_budget() {
async fn my_fn() {
spawn_another();
}

fn spawn_another() {
tokio::spawn(my_fn());
}

let rt = runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();

let (send, recv) = oneshot::channel();

rt.spawn(async move {
tokio::spawn(my_fn());
let _ = send.send(());
});

let _ = rt.block_on(recv);
}

#[test]
fn spawn_shutdown() {
let rt = rt();
Expand Down

0 comments on commit 70364b7

Please sign in to comment.