Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: batch pop from injection queue when idle #5705

Merged
merged 14 commits into from
May 23, 2023
22 changes: 14 additions & 8 deletions .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
scope:
- --skip loom_pool
- loom_pool::group_a
- loom_pool::group_b
- loom_pool::group_c
- loom_pool::group_d
- time::driver
include:
- scope: --skip loom_pool
max_preemptions: 2
- scope: loom_pool::group_a
max_preemptions: 1
- scope: loom_pool::group_b
max_preemptions: 2
- scope: loom_pool::group_c
max_preemptions: 1
- scope: loom_pool::group_d
max_preemptions: 1
- scope: time::driver
max_preemptions: 2
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
Expand All @@ -43,6 +49,6 @@ jobs:
working-directory: tokio
env:
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings
LOOM_MAX_PREEMPTIONS: 2
LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }}
LOOM_MAX_BRANCHES: 10000
SCOPE: ${{ matrix.scope }}
63 changes: 59 additions & 4 deletions benches/rt_multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{mpsc, Arc};

fn spawn_many(b: &mut Bencher) {
const NUM_SPAWN: usize = 10_000;
const NUM_WORKERS: usize = 4;
const NUM_SPAWN: usize = 10_000;

fn spawn_many_local(b: &mut Bencher) {
let rt = rt();

let (tx, rx) = mpsc::sync_channel(1000);
Expand All @@ -38,6 +39,52 @@ fn spawn_many(b: &mut Bencher) {
});
}

fn spawn_many_remote_idle(b: &mut Bencher) {
let rt = rt();

let mut handles = Vec::with_capacity(NUM_SPAWN);

b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt.spawn(async {}));
}

rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
});
}

fn spawn_many_remote_busy(b: &mut Bencher) {
let rt = rt();
let rt_handle = rt.handle();
let mut handles = Vec::with_capacity(NUM_SPAWN);

// Spawn some tasks to keep the runtimes busy
for _ in 0..(2 * NUM_WORKERS) {
rt.spawn(async {
loop {
tokio::task::yield_now().await;
std::thread::sleep(std::time::Duration::from_micros(10));
}
});
}

b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}

rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
});
}

fn yield_many(b: &mut Bencher) {
const NUM_YIELD: usize = 1_000;
const TASKS: usize = 200;
Expand Down Expand Up @@ -140,12 +187,20 @@ fn chained_spawn(b: &mut Bencher) {

fn rt() -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(4)
.worker_threads(NUM_WORKERS)
.enable_all()
.build()
.unwrap()
}

benchmark_group!(scheduler, spawn_many, ping_pong, yield_many, chained_spawn,);
benchmark_group!(
scheduler,
spawn_many_local,
spawn_many_remote_idle,
spawn_many_remote_busy,
ping_pong,
yield_many,
chained_spawn,
);

benchmark_main!(scheduler);
42 changes: 41 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ impl<T> Local<T> {
!self.inner.is_empty()
}

/// How many tasks can be pushed into the queue
pub(crate) fn capacity(&self) -> usize {
self.inner.capacity()
}

pub(crate) fn max_capacity(&self) -> usize {
LOCAL_QUEUE_CAPACITY
}

/// Returns false if there are any entries in the queue
///
/// Separate to is_stealable so that refactors of is_stealable to "protect"
Expand All @@ -118,8 +127,27 @@ impl<T> Local<T> {
!self.inner.is_empty()
}

pub(crate) fn push_back(&mut self, task: task::Notified<T>) -> Result<(), task::Notified<T>> {
carllerche marked this conversation as resolved.
Show resolved Hide resolved
let head = self.inner.head.load(Acquire);
let (steal, _) = unpack(head);

// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };

if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort {
// Yes, this if condition is structured a bit weird (first block
// does nothing, second returns an error). It is this way to match
// `push_back_or_overflow`.
} else {
return Err(task);
}

self.push_back_finish(task, tail);
Ok(())
}

/// Pushes a task to the back of the local queue, skipping the LIFO slot.
carllerche marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn push_back(
pub(crate) fn push_back_or_overflow(
&mut self,
mut task: task::Notified<T>,
inject: &Inject<T>,
Expand Down Expand Up @@ -153,6 +181,11 @@ impl<T> Local<T> {
}
};

self.push_back_finish(task, tail);
}

// Second half of `push_back`
fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
// Map the position to a slot index.
let idx = tail as usize & MASK;

Expand Down Expand Up @@ -501,6 +534,13 @@ impl<T> Drop for Local<T> {
}

impl<T> Inner<T> {
fn capacity(&self) -> usize {
let (steal, _) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);

LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize)
}
carllerche marked this conversation as resolved.
Show resolved Hide resolved

fn len(&self) -> UnsignedShort {
let (_, head) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);
Expand Down
36 changes: 31 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};

use std::cell::RefCell;
use std::cmp;
use std::time::Duration;

/// A scheduler worker
Expand Down Expand Up @@ -509,8 +510,11 @@ impl Context {
} else {
// Not enough budget left to run the LIFO task, push it to
// the back of the queue and return.
core.run_queue
.push_back(task, self.worker.inject(), &mut core.metrics);
core.run_queue.push_back_or_overflow(
task,
self.worker.inject(),
&mut core.metrics,
);
return Ok(core);
}
}
Expand Down Expand Up @@ -612,7 +616,29 @@ impl Core {
if self.tick % worker.handle.shared.config.global_queue_interval == 0 {
worker.inject().pop().or_else(|| self.next_local_task())
} else {
self.next_local_task().or_else(|| worker.inject().pop())
let maybe_task = self.next_local_task();

if maybe_task.is_some() {
return maybe_task;
}
Comment on lines +618 to +622
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit: why not just

Suggested change
let maybe_task = self.next_local_task();
if maybe_task.is_some() {
return maybe_task;
}
if let task @ Some(_) = self.next_local_task() {
return task;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I'm not too fond of that style. I find @ syntax there harder to read

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could also be:

Suggested change
let maybe_task = self.next_local_task();
if maybe_task.is_some() {
return maybe_task;
}
if let Some(task) = self.next_local_task() {
return Some(task);
}

this isn't super important to me, but the reason i'm commenting on it is that the current code had me searching for the place where maybe_task was referenced again outside of the if, but it never actually is referenced again. not a big deal, but i found that slightly weird. 🤷‍♀️


let cap = self.run_queue.capacity();
carllerche marked this conversation as resolved.
Show resolved Hide resolved

// The worker is currently idle, pull a batch of work from the
// injection queue. We don't want to pull *all* the work so other
// workers can also get some.
let n = cmp::min(
worker.inject().len() / worker.handle.shared.remotes.len() + 1,
carllerche marked this conversation as resolved.
Show resolved Hide resolved
cap,
);

let n = cmp::min(n, self.run_queue.max_capacity() / 2);
carllerche marked this conversation as resolved.
Show resolved Hide resolved

worker.inject().pop_n(n, |task| {
if self.run_queue.push_back(task).is_err() {
carllerche marked this conversation as resolved.
Show resolved Hide resolved
panic!("[internal Tokio bug] pushing to local queue failed.");
}
})
}
}

Expand Down Expand Up @@ -808,7 +834,7 @@ impl Handle {
// flexibility and the task may go to the front of the queue.
let should_notify = if is_yield || self.shared.config.disable_lifo_slot {
core.run_queue
.push_back(task, &self.shared.inject, &mut core.metrics);
.push_back_or_overflow(task, &self.shared.inject, &mut core.metrics);
true
} else {
// Push to the LIFO slot
Expand All @@ -817,7 +843,7 @@ impl Handle {

if let Some(prev) = prev {
core.run_queue
.push_back(prev, &self.shared.inject, &mut core.metrics);
.push_back_or_overflow(prev, &self.shared.inject, &mut core.metrics);
}

core.lifo_slot = Some(task);
Expand Down
50 changes: 38 additions & 12 deletions tokio/src/runtime/task/inject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,34 +107,43 @@ impl<T: 'static> Inject<T> {
}

pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
self.pop_n(1, |_| {})
}

pub(crate) fn pop_n<F>(&self, n: usize, mut f: F) -> Option<task::Notified<T>>
carllerche marked this conversation as resolved.
Show resolved Hide resolved
where
F: FnMut(task::Notified<T>),
{
use std::cmp;

// Fast path, if len == 0, then there are no values
if self.is_empty() {
return None;
}

// Lock the queue
let mut p = self.pointers.lock();

// It is possible to hit null here if another thread popped the last
// task between us checking `len` and acquiring the lock.
let task = p.head?;
let n = cmp::min(n, unsafe { self.len.unsync_load() });

p.head = get_next(task);

if p.head.is_none() {
p.tail = None;
if n == 0 {
return None;
}

set_next(task, None);

// Decrement the count.
//
// safety: All updates to the len atomic are guarded by the mutex. As
// such, a non-atomic load followed by a store is safe.
self.len
.store(unsafe { self.len.unsync_load() } - 1, Release);
.store(unsafe { self.len.unsync_load() } - n, Release);

// safety: a `Notified` is pushed into the queue and now it is popped!
Some(unsafe { task::Notified::from_raw(task) })
let ret = p.pop().unwrap();

for _ in 1..n {
f(p.pop().unwrap());
}

Some(ret)
}
}

Expand Down Expand Up @@ -215,6 +224,23 @@ impl<T: 'static> Drop for Inject<T> {
}
}

impl Pointers {
fn pop<T: 'static>(&mut self) -> Option<task::Notified<T>> {
let task = self.head?;

self.head = get_next(task);

if self.head.is_none() {
self.tail = None;
}

set_next(task, None);

// safety: a `Notified` is pushed into the queue and now it is popped!
Some(unsafe { task::Notified::from_raw(task) })
}
}

fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
}
Expand Down