Skip to content

Commit

Permalink
move get_spawn_concurrency_level from builder to task/list.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
wathenjiang committed Nov 24, 2023
1 parent ed27f70 commit db58076
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 35 deletions.
25 changes: 0 additions & 25 deletions tokio/src/runtime/builder.rs
Expand Up @@ -1231,12 +1231,6 @@ cfg_rt_multi_thread! {
use crate::runtime::scheduler::{self, MultiThread};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
// Shrink the size of spawn_concurrency_level when using loom. This shouldn't impact
// logic, but allows loom to test more edge cases in a reasoable a mount of time.
#[cfg(loom)]
let spawn_concurrency_level = 4;
#[cfg(not(loom))]
let spawn_concurrency_level = Self::get_spawn_concurrency_level(core_threads);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

Expand All @@ -1255,7 +1249,6 @@ cfg_rt_multi_thread! {
driver_handle,
blocking_spawner,
seed_generator_2,
spawn_concurrency_level,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand Down Expand Up @@ -1286,14 +1279,6 @@ cfg_rt_multi_thread! {
use crate::runtime::scheduler::MultiThreadAlt;

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

// Shrink the size of spawn_concurrency_level when using loom. This shouldn't impact
// logic, but allows loom to test more edge cases in a reasoable a mount of time.
#[cfg(loom)]
let spawn_concurrency_level = 4;
#[cfg(not(loom))]
let spawn_concurrency_level = Self::get_spawn_concurrency_level(core_threads);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

// Create the blocking pool
Expand All @@ -1311,7 +1296,6 @@ cfg_rt_multi_thread! {
driver_handle,
blocking_spawner,
seed_generator_2,
spawn_concurrency_level,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand All @@ -1329,15 +1313,6 @@ cfg_rt_multi_thread! {
Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
}
}

fn get_spawn_concurrency_level(core_threads : usize) -> usize {
const MAX_SPAWN_CONCURRENCY_LEVEL: usize = 1 << 16;
let mut size = 1;
while size / 4 < core_threads && size < MAX_SPAWN_CONCURRENCY_LEVEL {
size <<= 1;
}
size.min(MAX_SPAWN_CONCURRENCY_LEVEL)
}
}
}

Expand Down
2 changes: 0 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Expand Up @@ -60,7 +60,6 @@ impl MultiThread {
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> (MultiThread, Arc<Handle>, Launch) {
let parker = Parker::new(driver);
Expand All @@ -70,7 +69,6 @@ impl MultiThread {
driver_handle,
blocking_spawner,
seed_generator,
spawn_concurrency_level,
config,
);

Expand Down
3 changes: 1 addition & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -245,7 +245,6 @@ pub(super) fn create(
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> (Arc<Handle>, Launch) {
let mut cores = Vec::with_capacity(size);
Expand Down Expand Up @@ -288,7 +287,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(spawn_concurrency_level as u32),
owned: OwnedTasks::new(size),
synced: Mutex::new(Synced {
idle: idle_synced,
inject: inject_synced,
Expand Down
2 changes: 0 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread_alt/mod.rs
Expand Up @@ -49,7 +49,6 @@ impl MultiThread {
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> (MultiThread, runtime::Handle) {
let handle = worker::create(
Expand All @@ -58,7 +57,6 @@ impl MultiThread {
driver_handle,
blocking_spawner,
seed_generator,
spawn_concurrency_level,
config,
);

Expand Down
3 changes: 1 addition & 2 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Expand Up @@ -259,7 +259,6 @@ pub(super) fn create(
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> runtime::Handle {
let mut num_workers = num_cores;
Expand Down Expand Up @@ -308,7 +307,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(spawn_concurrency_level as u32),
owned: OwnedTasks::new(num_cores),
synced: Mutex::new(Synced {
assigned_cores: (0..num_workers).map(|_| None).collect(),
shutdown_cores: Vec::with_capacity(num_cores),
Expand Down
22 changes: 20 additions & 2 deletions tokio/src/runtime/task/list.rs
Expand Up @@ -68,15 +68,17 @@ pub(crate) struct LocalOwnedTasks<S: 'static> {
pub(crate) id: NonZeroU64,
_not_send_or_sync: PhantomData<*const ()>,
}

struct OwnedTasksInner<S: 'static> {
list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
}

impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new(concurrency_level: u32) -> Self {
pub(crate) fn new(num_cores: usize) -> Self {
let shard_size = Self::gen_shared_list_size(num_cores);
Self {
list: List::new(concurrency_level as usize),
list: List::new(shard_size),
closed: AtomicBool::new(false),
id: get_next_id(),
}
Expand Down Expand Up @@ -183,6 +185,22 @@ impl<S: 'static> OwnedTasks<S> {
pub(crate) fn is_empty(&self) -> bool {
self.list.is_empty()
}

fn gen_shared_list_size(num_cores: usize) -> usize {
// Shrink the size of shared_list_size when using loom. This shouldn't impact
// logic, but allows loom to test more edge cases in a reasoable a mount of time.
#[cfg(loom)]
return 4;
#[cfg(not(loom))]
{
const MAX_SHARED_LIST_SIZE: usize = 1 << 16;
let mut size = 1;
while size / 4 < num_cores && size < MAX_SHARED_LIST_SIZE {
size <<= 1;
}
size.min(MAX_SHARED_LIST_SIZE)
}
}
}

cfg_taskdump! {
Expand Down

0 comments on commit db58076

Please sign in to comment.