Skip to content

Commit

Permalink
rt(threaded): basic self-tuning of injection queue
Browse files Browse the repository at this point in the history
Each multi-threaded runtime worker prioritizes pulling tasks off of its
local queue. Every so often, it checks the injection (global) queue for
work submitted there. Previously, "every so often," was a constant
"number of tasks polled" value. Tokio sets a default of 61, but allows
users to configure this value.

If workers are under load with tasks that are slow to poll, the
injection queue can be starved. To prevent starvation in this case, this
commit implements some basic self-tuning. The multi-threaded scheduler
tracks the mean task poll time using an exponentially-weighted moving
average. It then uses this value to pick an interval at which to check
the injection queue.

This commit is a first pass at adding self-tuning to the scheduler.
There are other values in the scheduler that could benefit from
self-tuning (e.g. the maintenance interval). Additionally, the
current-thread scheduler could also benfit from self-tuning. However, we
have reached the point where we should start investigating ways to unify
logic in both schedulers. Adding self-tuning to the current-thread
scheduler will be punted until after this unification.
  • Loading branch information
carllerche committed May 24, 2023
1 parent 9eb3f5b commit 9b49b59
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 73 deletions.
65 changes: 59 additions & 6 deletions benches/rt_multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use tokio::runtime::{self, Runtime};
use tokio::sync::oneshot;

use bencher::{benchmark_group, benchmark_main, Bencher};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{mpsc, Arc};
use std::time::{Duration, Instant};

const NUM_WORKERS: usize = 4;
const NUM_SPAWN: usize = 10_000;
const STALL_DUR: Duration = Duration::from_micros(10);

fn spawn_many_local(b: &mut Bencher) {
let rt = rt();
Expand Down Expand Up @@ -57,19 +59,60 @@ fn spawn_many_remote_idle(b: &mut Bencher) {
});
}

fn spawn_many_remote_busy(b: &mut Bencher) {
fn spawn_many_remote_busy1(b: &mut Bencher) {
let rt = rt();
let rt_handle = rt.handle();
let mut handles = Vec::with_capacity(NUM_SPAWN);
let flag = Arc::new(AtomicBool::new(true));

// Spawn some tasks to keep the runtimes busy
for _ in 0..(2 * NUM_WORKERS) {
rt.spawn(async {
loop {
let flag = flag.clone();
rt.spawn(async move {
while flag.load(Relaxed) {
tokio::task::yield_now().await;
std::thread::sleep(std::time::Duration::from_micros(10));
stall();
}
});
}

b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}

rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
});

flag.store(false, Relaxed);
}

fn spawn_many_remote_busy2(b: &mut Bencher) {
const NUM_SPAWN: usize = 1_000;

let rt = rt();
let rt_handle = rt.handle();
let mut handles = Vec::with_capacity(NUM_SPAWN);
let flag = Arc::new(AtomicBool::new(true));

// Spawn some tasks to keep the runtimes busy
for _ in 0..(NUM_WORKERS) {
let flag = flag.clone();
fn iter(flag: Arc<AtomicBool>) {
tokio::spawn(async {
if flag.load(Relaxed) {
stall();
iter(flag);
}
});
}
rt.spawn(async {
iter(flag);
});
}

b.iter(|| {
Expand All @@ -83,6 +126,8 @@ fn spawn_many_remote_busy(b: &mut Bencher) {
}
});
});

flag.store(false, Relaxed);
}

fn yield_many(b: &mut Bencher) {
Expand Down Expand Up @@ -193,11 +238,19 @@ fn rt() -> Runtime {
.unwrap()
}

fn stall() {
let now = Instant::now();
while now.elapsed() < STALL_DUR {
std::thread::yield_now();
}
}

benchmark_group!(
scheduler,
spawn_many_local,
spawn_many_remote_idle,
spawn_many_remote_busy,
spawn_many_remote_busy1,
spawn_many_remote_busy2,
ping_pong,
yield_many,
chained_spawn,
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub struct Builder {
pub(super) keep_alive: Option<Duration>,

/// How many ticks before pulling a task from the global/remote queue?
pub(super) global_queue_interval: u32,
pub(super) global_queue_interval: Option<u32>,

/// How many ticks before yielding to the driver for timer and I/O events?
pub(super) event_interval: u32,
Expand Down Expand Up @@ -211,7 +211,7 @@ impl Builder {
#[cfg(not(loom))]
const EVENT_INTERVAL: u32 = 61;

Builder::new(Kind::CurrentThread, 31, EVENT_INTERVAL)
Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
}

cfg_not_wasi! {
Expand All @@ -222,15 +222,15 @@ impl Builder {
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61, 61)
Builder::new(Kind::MultiThread, 61)
}
}

/// Returns a new runtime builder initialized with default configuration
/// values.
///
/// Configuration methods can be chained on the return value.
pub(crate) fn new(kind: Kind, global_queue_interval: u32, event_interval: u32) -> Builder {
pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
Builder {
kind,

Expand Down Expand Up @@ -266,7 +266,7 @@ impl Builder {

// Defaults for these values depend on the scheduler kind, so we get them
// as parameters.
global_queue_interval,
global_queue_interval: None,
event_interval,

seed_generator: RngSeedGenerator::new(RngSeed::new()),
Expand Down Expand Up @@ -716,7 +716,7 @@ impl Builder {
/// # }
/// ```
pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
self.global_queue_interval = val;
self.global_queue_interval = Some(val);
self
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::util::RngSeedGenerator;

pub(crate) struct Config {
/// How many ticks before pulling a task from the global/remote queue?
pub(crate) global_queue_interval: u32,
pub(crate) global_queue_interval: Option<u32>,

/// How many ticks before yielding to the driver for timer and I/O events?
pub(crate) event_interval: u32,
Expand Down
20 changes: 13 additions & 7 deletions tokio/src/runtime/metrics/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) struct MetricsBatch {
busy_duration_total: u64,

/// Instant at which work last resumed (continued after park).
last_resume_time: Instant,
processing_scheduled_tasks_started_at: Instant,

/// If `Some`, tracks poll times in nanoseconds
poll_timer: Option<PollTimer>,
Expand Down Expand Up @@ -62,7 +62,7 @@ impl MetricsBatch {
local_schedule_count: 0,
overflow_count: 0,
busy_duration_total: 0,
last_resume_time: now,
processing_scheduled_tasks_started_at: now,
poll_timer: worker_metrics
.poll_count_histogram
.as_ref()
Expand Down Expand Up @@ -106,11 +106,20 @@ impl MetricsBatch {
} else {
self.poll_count_on_last_park = self.poll_count;
}
}

/// Start processing a batch of tasks
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
self.processing_scheduled_tasks_started_at = Instant::now();
}

let busy_duration = self.last_resume_time.elapsed();
/// Stop processing a batch of tasks
pub(crate) fn end_processing_scheduled_tasks(&mut self) {
let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
self.busy_duration_total += duration_as_u64(busy_duration);
}

/// Start polling an individual task
pub(crate) fn start_poll(&mut self) {
self.poll_count += 1;

Expand All @@ -119,17 +128,14 @@ impl MetricsBatch {
}
}

/// Stop polling an individual task
pub(crate) fn end_poll(&mut self) {
if let Some(poll_timer) = &mut self.poll_timer {
let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
poll_timer.poll_counts.measure(elapsed, 1);
}
}

pub(crate) fn returned_from_park(&mut self) {
self.last_resume_time = Instant::now();
}

pub(crate) fn inc_local_schedule_count(&mut self) {
self.local_schedule_count += 1;
}
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ impl MetricsBatch {

pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn returned_from_park(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {}
pub(crate) fn start_poll(&mut self) {}
pub(crate) fn end_poll(&mut self) {}
}
Expand Down
27 changes: 25 additions & 2 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ struct Core {
/// Metrics batch
metrics: MetricsBatch,

/// How often to check the global queue
global_queue_interval: u32,

/// True if a task panicked without being handled and the runtime is
/// configured to shutdown on unhandled panic.
unhandled_panic: bool,
Expand Down Expand Up @@ -100,6 +103,11 @@ type Notified = task::Notified<Arc<Handle>>;
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;

/// Used if none is specified. This is a temporary constant and will be removed
/// as we unify tuning logic between the multi-thread and current-thread
/// schedulers.
const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 61;

// Tracks the current CurrentThread.
scoped_thread_local!(static CURRENT: Context);

Expand All @@ -113,6 +121,11 @@ impl CurrentThread {
) -> (CurrentThread, Arc<Handle>) {
let worker_metrics = WorkerMetrics::from_config(&config);

// Get the configured global queue interval, or use the default.
let global_queue_interval = config
.global_queue_interval
.unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);

let handle = Arc::new(Handle {
shared: Shared {
inject: Inject::new(),
Expand All @@ -132,6 +145,7 @@ impl CurrentThread {
tick: 0,
driver: Some(driver),
metrics: MetricsBatch::new(&handle.shared.worker_metrics),
global_queue_interval,
unhandled_panic: false,
})));

Expand Down Expand Up @@ -255,7 +269,7 @@ impl Core {
}

fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
if self.tick % handle.shared.config.global_queue_interval == 0 {
if self.tick % self.global_queue_interval == 0 {
handle
.next_remote_task()
.or_else(|| self.next_local_task(handle))
Expand Down Expand Up @@ -344,7 +358,6 @@ impl Context {
});

core = c;
core.metrics.returned_from_park();
}

if let Some(f) = &handle.shared.config.after_unpark {
Expand Down Expand Up @@ -603,6 +616,8 @@ impl CoreGuard<'_> {

pin!(future);

core.metrics.start_processing_scheduled_tasks();

'outer: loop {
let handle = &context.handle;

Expand Down Expand Up @@ -631,12 +646,16 @@ impl CoreGuard<'_> {
let task = match entry {
Some(entry) => entry,
None => {
core.metrics.end_processing_scheduled_tasks();

core = if did_defer_tasks() {
context.park_yield(core, handle)
} else {
context.park(core, handle)
};

core.metrics.start_processing_scheduled_tasks();

// Try polling the `block_on` future next
continue 'outer;
}
Expand All @@ -651,9 +670,13 @@ impl CoreGuard<'_> {
core = c;
}

core.metrics.end_processing_scheduled_tasks();

// Yield to the driver, this drives the timer and pulls any
// pending I/O events.
core = context.park_yield(core, handle);

core.metrics.start_processing_scheduled_tasks();
}
});

Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pub(crate) use handle::Handle;
mod idle;
use self::idle::Idle;

mod stats;
pub(crate) use stats::Stats;

mod park;
pub(crate) use park::{Parker, Unparker};

Expand Down

0 comments on commit 9b49b59

Please sign in to comment.