Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce the lock contention in task spawn. #6001

Merged
merged 104 commits into from Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
895958f
reduce the lock contention in task spawn.
wathenjiang Sep 11, 2023
ae64dfe
Merge branch 'master' into reduce-lock-contention
wathenjiang Sep 12, 2023
5ffbf01
rm extra criterion in dependencies
wathenjiang Sep 12, 2023
8e2c0b2
restart a ci
wathenjiang Sep 12, 2023
33ab489
fix for each
wathenjiang Sep 12, 2023
ab2452c
fix for_each
wathenjiang Sep 12, 2023
a500a79
reduce the size of header
wathenjiang Sep 12, 2023
632a8d3
code refactor in list.rs
wathenjiang Sep 13, 2023
6453017
change ordering of atomic
wathenjiang Sep 13, 2023
9bfb4f1
fix iterate in close_and_shutdown
wathenjiang Sep 13, 2023
41ee62a
Merge branch 'master' into reduce-lock-contention
wathenjiang Sep 13, 2023
b4ac885
fix iterate in close_and_shutdown
wathenjiang Sep 13, 2023
4b386c5
fix atomic method
wathenjiang Sep 13, 2023
9ded74b
rm type CountedOwnedTasksInner
wathenjiang Sep 20, 2023
191edf6
refactor for_each
wathenjiang Sep 20, 2023
06a675c
Merge branch 'master' into reduce-lock-contention
wathenjiang Sep 20, 2023
2be70c4
use atomic type in loom instead
wathenjiang Sep 20, 2023
17b0be9
fix: put read closed flag after got the lock for avoid concurrency pr…
wathenjiang Sep 20, 2023
3bb484e
introduce random make shutdown faster
wathenjiang Sep 20, 2023
e325825
use grain instead
wathenjiang Sep 20, 2023
a11f80c
fix: dead lock
wathenjiang Sep 20, 2023
259582e
Merge branch 'master' into reduce-lock-contention
wathenjiang Sep 20, 2023
cab1f58
fix: use list_inner to offer lock of list
wathenjiang Sep 21, 2023
389e6b9
fix: use segment_size instead of grain
wathenjiang Sep 21, 2023
833377c
clippy
wathenjiang Sep 21, 2023
5422895
feat: let spawn_concurrency_level configurable
wathenjiang Sep 21, 2023
b90101a
feat: update benchmark
wathenjiang Sep 21, 2023
68af71a
feat: add benchmarks for spawn_concurrency_level
wathenjiang Sep 21, 2023
8e4716a
change spawn_concurrency_level to be 4 times the number of worker thr…
wathenjiang Sep 22, 2023
4a3ff7a
Merge branch 'master' into reduce-lock-contention
wathenjiang Sep 22, 2023
7bbc2e4
change benchmark tests name from shutdown_parallel_multi_thread to sh…
wathenjiang Sep 22, 2023
c15c0bd
fix the comments on spawn_concurrency_level
wathenjiang Sep 22, 2023
6706a35
add comments for parameter in OwnedTasks.close_and_shutdown_all
wathenjiang Sep 22, 2023
d2c7668
fix comments
wathenjiang Sep 22, 2023
5105610
style nit: simplify code, apply suggestions from hawkw
wathenjiang Sep 24, 2023
a30df11
make spawn_concurrency_level is constant 4 in loom test
wathenjiang Sep 25, 2023
df4ab61
use Header::get_id to get task_id
wathenjiang Sep 28, 2023
63a7679
change owned_id to u64 back
wathenjiang Sep 28, 2023
b2010d7
refactor: use local_staic in loom
wathenjiang Sep 28, 2023
052e141
fix: OwnedTasks get all locks first
wathenjiang Sep 29, 2023
65670eb
fix: rm segment_size field of OwnedTasks, use method to return it ins…
wathenjiang Sep 29, 2023
08d7d0c
feat: make spawn_concurrency_level to be a unstable api
wathenjiang Oct 1, 2023
26621d4
feat: rm shutdown flag
wathenjiang Oct 1, 2023
5b010bc
rm benches/spawn_concurrent.rs because it is unstable now in tokio
wathenjiang Oct 1, 2023
66fa190
use get_unchecked to get segment lock
wathenjiang Oct 3, 2023
2ac0b96
feat: drop lock promptly and explicitly
wathenjiang Oct 15, 2023
47820b3
feat: move the atomic operations of count into the lock
wathenjiang Oct 15, 2023
d0acd70
Merge branch 'master' into reduce-lock-contention
wathenjiang Oct 17, 2023
7ef0265
first commit
wathenjiang Oct 23, 2023
01da1ed
add Safety
wathenjiang Oct 23, 2023
6f5eaa2
mutable ref to immutable ref
wathenjiang Oct 23, 2023
3257cb7
use std AtomicUsize
wathenjiang Oct 23, 2023
7b101ee
fix: count sub in pop_back
wathenjiang Oct 23, 2023
608d2c6
refactor doube check closed flag in bind_inner
wathenjiang Oct 23, 2023
1c214e0
cast task_id to usize
wathenjiang Oct 23, 2023
e24db6d
use ShardGuard
wathenjiang Oct 23, 2023
3c90918
update comments
wathenjiang Oct 23, 2023
cd5fb20
update comments
wathenjiang Oct 23, 2023
38c9eba
fix: remove needless reference
wathenjiang Oct 23, 2023
7968b51
fix: release lock as far as possible
wathenjiang Oct 23, 2023
5d3da9e
Update tokio/src/runtime/task/list.rs
wathenjiang Nov 6, 2023
1d1a7a3
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
13c4b93
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
ee53e23
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
01afd7b
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
36c2355
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
d6606b8
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
c9f32d2
Update tokio/src/runtime/task/list.rs
wathenjiang Nov 6, 2023
bbefb70
fix: accept ownedship of closue in method for_each of OwnedTasks
wathenjiang Nov 6, 2023
f97748c
Merge branch 'master' into reduce-lock-contention
wathenjiang Nov 6, 2023
8baf79e
Apply suggestions from code review
wathenjiang Nov 7, 2023
87e70c3
Apply suggestions from code review
wathenjiang Nov 7, 2023
af92f20
Apply suggestions from code review
wathenjiang Nov 7, 2023
60104b8
Apply suggestions from code review
wathenjiang Nov 7, 2023
bb4458b
rm unused push method
wathenjiang Nov 7, 2023
1adad70
rename get_sharded_id to get_shard_id
wathenjiang Nov 7, 2023
ef8d2b7
Apply suggestions from code review
wathenjiang Nov 7, 2023
2d4fbf6
rename get_sharded_id to get_shard_id
wathenjiang Nov 7, 2023
777d97e
add sentence in comments
wathenjiang Nov 7, 2023
5406a7e
rm dead_code attr
wathenjiang Nov 7, 2023
c9b05ee
Merge branch 'master' into reduce-lock-contention
wathenjiang Nov 7, 2023
680848e
move spawn_concurrent_level size from shardedList to builder
wathenjiang Nov 7, 2023
6fb70b1
update comments
wathenjiang Nov 7, 2023
0ba87db
update comments
wathenjiang Nov 7, 2023
8bb106f
update comments
wathenjiang Nov 7, 2023
e0fb9e2
rm loop in ShardedList::new
wathenjiang Nov 7, 2023
57133a5
fix rustfmt
wathenjiang Nov 7, 2023
71e8983
fix
wathenjiang Nov 7, 2023
d1ce613
fix spawn_concurrency_level
wathenjiang Nov 7, 2023
e6b8db1
fix spawn_concurrency_level
wathenjiang Nov 7, 2023
592f432
rm get_spawn_concurrency_level to cfg_rt_multi_thread
wathenjiang Nov 7, 2023
f083757
add allow(dead_code))]
wathenjiang Nov 7, 2023
3105af7
rm dead_code attr
wathenjiang Nov 8, 2023
8e189cf
make spawn_concurrency_level unconfigurable
wathenjiang Nov 22, 2023
75d3081
Apply suggestions from code review
wathenjiang Nov 24, 2023
ed27f70
apply suggestions from core review
wathenjiang Nov 24, 2023
db58076
move get_spawn_concurrency_level from builder to task/list.rs
wathenjiang Nov 24, 2023
06656b9
feat: add comments and rm loom cfg
wathenjiang Nov 24, 2023
caa74c9
feat: update comments for gen_shared_list_size
wathenjiang Nov 24, 2023
865de08
feat: update comments for gen_shared_list_size
wathenjiang Nov 24, 2023
7a76ad5
Apply suggestions from code review
wathenjiang Nov 26, 2023
78d1dea
fix: fmt and typo fix
wathenjiang Nov 26, 2023
3844bd3
Update tokio/src/util/sharded_list.rs
Darksonn Dec 7, 2023
038650f
Merge branch 'master' into reduce-lock-contention
Darksonn Dec 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions benches/Cargo.toml
Expand Up @@ -12,6 +12,7 @@ tokio = { version = "1.5.0", path = "../tokio", features = ["full"] }
criterion = "0.5.1"
rand = "0.8"
rand_chacha = "0.3"
num_cpus = "1.16.0"

[dev-dependencies]
tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] }
Expand Down
1 change: 0 additions & 1 deletion tokio/src/runtime/builder.rs
Expand Up @@ -1279,7 +1279,6 @@ cfg_rt_multi_thread! {
use crate::runtime::scheduler::MultiThreadAlt;

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

// Create the blocking pool
Expand Down
8 changes: 7 additions & 1 deletion tokio/src/runtime/id.rs
@@ -1,5 +1,5 @@
use std::fmt;
use std::num::NonZeroU64;
use std::num::{NonZeroU32, NonZeroU64};

/// An opaque ID that uniquely identifies a runtime relative to all other currently
/// running runtimes.
Expand Down Expand Up @@ -39,6 +39,12 @@ impl From<NonZeroU64> for Id {
}
}

impl From<NonZeroU32> for Id {
fn from(value: NonZeroU32) -> Self {
Id(value.into())
}
}

impl fmt::Display for Id {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Expand Up @@ -132,7 +132,7 @@ impl CurrentThread {
let handle = Arc::new(Handle {
shared: Shared {
inject: Inject::new(),
owned: OwnedTasks::new(),
owned: OwnedTasks::new(1),
woken: AtomicBool::new(false),
config,
scheduler_metrics: SchedulerMetrics::new(),
Expand Down Expand Up @@ -248,7 +248,7 @@ fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
handle.shared.owned.close_and_shutdown_all();
handle.shared.owned.close_and_shutdown_all(0);

// Drain local queue
// We already shut down every task, so we just need to drop the task.
Expand Down Expand Up @@ -614,7 +614,7 @@ impl Schedule for Arc<Handle> {
// If `None`, the runtime is shutting down, so there is no need to signal shutdown
if let Some(core) = core.as_mut() {
core.unhandled_panic = true;
self.shared.owned.close_and_shutdown_all();
self.shared.owned.close_and_shutdown_all(0);
}
}
_ => unreachable!("runtime core not set in CURRENT thread-local"),
Expand Down
13 changes: 10 additions & 3 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -287,7 +287,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(),
owned: OwnedTasks::new(size),
synced: Mutex::new(Synced {
idle: idle_synced,
inject: inject_synced,
Expand Down Expand Up @@ -548,7 +548,6 @@ impl Context {
}

core.pre_shutdown(&self.worker);

// Signal shutdown
self.worker.handle.shutdown_core(core);
Err(())
Expand Down Expand Up @@ -955,8 +954,16 @@ impl Core {
/// Signals all tasks to shut down, and waits for them to complete. Must run
/// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&mut self, worker: &Worker) {
// Start from a random inner list
let start = self
.rand
.fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
// Signal to all tasks to shut down.
worker.handle.shared.owned.close_and_shutdown_all();
worker
.handle
.shared
.owned
.close_and_shutdown_all(start as usize);

self.stats
.submit(&worker.handle.shared.worker_metrics[worker.index]);
Expand Down
6 changes: 4 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Expand Up @@ -307,7 +307,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(),
owned: OwnedTasks::new(num_cores),
synced: Mutex::new(Synced {
assigned_cores: (0..num_workers).map(|_| None).collect(),
shutdown_cores: Vec::with_capacity(num_cores),
Expand Down Expand Up @@ -1460,7 +1460,9 @@ impl Shared {
}

pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
self.owned.close_and_shutdown_all();
// Start from a random inner list
let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32);
self.owned.close_and_shutdown_all(start as usize);

core.stats.submit(&self.worker_metrics[core.index]);

Expand Down
19 changes: 15 additions & 4 deletions tokio/src/runtime/task/id.rs
Expand Up @@ -24,7 +24,7 @@ use std::fmt;
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
pub struct Id(u64);
pub struct Id(pub(crate) u64);

/// Returns the [`Id`] of the currently running task.
///
Expand Down Expand Up @@ -74,11 +74,22 @@ impl fmt::Display for Id {

impl Id {
pub(crate) fn next() -> Self {
use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::atomic::StaticAtomicU64;

static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
#[cfg(all(test, loom))]
{
crate::loom::lazy_static! {
static ref NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
}
Self(NEXT_ID.fetch_add(1, Relaxed))
}

Self(NEXT_ID.fetch_add(1, Relaxed))
#[cfg(not(all(test, loom)))]
{
static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
Self(NEXT_ID.fetch_add(1, Relaxed))
}
}

pub(crate) fn as_u64(&self) -> u64 {
Expand Down
110 changes: 63 additions & 47 deletions tokio/src/runtime/task/list.rs
Expand Up @@ -8,10 +8,11 @@

use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::Mutex;
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
use crate::util::linked_list::{CountedLinkedList, Link, LinkedList};
use crate::util::linked_list::{Link, LinkedList};
use crate::util::sharded_list;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use std::marker::PhantomData;
use std::num::NonZeroU64;

Expand All @@ -25,7 +26,7 @@ use std::num::NonZeroU64;
// mixed up runtimes happen to have the same id.

cfg_has_atomic_u64! {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::AtomicU64;

static NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1);

Expand All @@ -40,7 +41,7 @@ cfg_has_atomic_u64! {
}

cfg_not_has_atomic_u64! {
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::AtomicU32;

static NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1);

Expand All @@ -55,30 +56,30 @@ cfg_not_has_atomic_u64! {
}

pub(crate) struct OwnedTasks<S: 'static> {
inner: Mutex<CountedOwnedTasksInner<S>>,
list: List<S>,
pub(crate) id: NonZeroU64,
closed: AtomicBool,
}
struct CountedOwnedTasksInner<S: 'static> {
list: CountedLinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
}

type List<S> = sharded_list::ShardedList<Task<S>, <Task<S> as Link>::Target>;

pub(crate) struct LocalOwnedTasks<S: 'static> {
inner: UnsafeCell<OwnedTasksInner<S>>,
pub(crate) id: NonZeroU64,
_not_send_or_sync: PhantomData<*const ()>,
}

struct OwnedTasksInner<S: 'static> {
list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
}

impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new() -> Self {
pub(crate) fn new(num_cores: usize) -> Self {
let shard_size = Self::gen_shared_list_size(num_cores);
Self {
inner: Mutex::new(CountedOwnedTasksInner {
list: CountedLinkedList::new(),
closed: false,
}),
list: List::new(shard_size),
closed: AtomicBool::new(false),
id: get_next_id(),
}
}
Expand Down Expand Up @@ -112,24 +113,23 @@ impl<S: 'static> OwnedTasks<S> {
task.header().set_owner_id(self.id);
}

let mut lock = self.inner.lock();
if lock.closed {
drop(lock);
drop(notified);
let shard = self.list.lock_shard(&task);
// Check the closed flag in the lock for ensuring all that tasks
// will shut down after the OwnedTasks has been closed.
if self.closed.load(Ordering::Acquire) {
drop(shard);
task.shutdown();
None
} else {
lock.list.push_front(task);
Some(notified)
return None;
}
shard.push(task);
Some(notified)
}

/// Asserts that the given task is owned by this OwnedTasks and convert it to
/// a LocalNotified, giving the thread permission to poll this task.
#[inline]
pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
debug_assert_eq!(task.header().get_owner_id(), Some(self.id));

// safety: All tasks bound to this OwnedTasks are Send, so it is safe
// to poll it on this thread no matter what thread we are on.
LocalNotified {
Expand All @@ -140,34 +140,34 @@ impl<S: 'static> OwnedTasks<S> {

/// Shuts down all tasks in the collection. This call also closes the
/// collection, preventing new items from being added.
pub(crate) fn close_and_shutdown_all(&self)
///
/// The parameter start determines which shard this method will start at.
/// Using different values for each worker thread reduces contention.
pub(crate) fn close_and_shutdown_all(&self, start: usize)
where
S: Schedule,
{
// The first iteration of the loop was unrolled so it can set the
// closed bool.
let first_task = {
let mut lock = self.inner.lock();
lock.closed = true;
lock.list.pop_back()
};
match first_task {
Some(task) => task.shutdown(),
None => return,
self.closed.store(true, Ordering::Release);
for i in start..self.get_shard_size() + start {
loop {
let task = self.list.pop_back(i);
match task {
Some(task) => {
task.shutdown();
}
None => break,
}
}
}
}

loop {
let task = match self.inner.lock().list.pop_back() {
Some(task) => task,
None => return,
};

task.shutdown();
}
#[inline]
pub(crate) fn get_shard_size(&self) -> usize {
self.list.shard_size()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.inner.lock().list.count()
self.list.len()
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
Expand All @@ -179,11 +179,27 @@ impl<S: 'static> OwnedTasks<S> {

// safety: We just checked that the provided task is not in some other
// linked list.
unsafe { self.inner.lock().list.remove(task.header_ptr()) }
unsafe { self.list.remove(task.header_ptr()) }
}

pub(crate) fn is_empty(&self) -> bool {
self.inner.lock().list.is_empty()
self.list.is_empty()
}

/// Generates the size of the sharded list based on the number of worker threads.
///
/// The sharded lock design can effectively alleviate
/// lock contention performance problems caused by high concurrency.
///
/// However, as the number of shards increases, the memory continuity between
/// nodes in the intrusive linked list will diminish. Furthermore,
/// the construction time of the sharded list will also increase with a higher number of shards.
///
/// Due to the above reasons, we set a maximum value for the shared list size,
/// denoted as `MAX_SHARED_LIST_SIZE`.
fn gen_shared_list_size(num_cores: usize) -> usize {
const MAX_SHARED_LIST_SIZE: usize = 1 << 16;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This maximum seems really large to me. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just used the settings of the previous spawn_concurrency_level function. The previous function could support user configuration, so this value was set to a large value.

If you want a smaller value, I'm ok with that.

As far as I know, some current server CPUs can reach close to 200 CPU cores (or hyperthreadings), such as the AMD EPYC 9654 which has 192 threads. Maybe we can set the value to 256(4 times this value is 1<<10)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to retract my previous point. I believe this should be a memory-bound scenario, so we only need to ensure a certain number of mutexes. In fact, the number of mutexes should not depend on the number of worker threads, but actually depends on the possible level of concurrency.

Below, I describe the concurrency level that may actually occur, rather than what occurs in the benchmark.

In real applications that use Tokio, multiple threads may perform tokio::spawn concurrently. However, having too many threads concurrently performing tokio::spawn at the same time can be considered a poor design choice in the higher-level application architecture. It is more likely that multiple threads will perform removing tasks concurrently at the same time. Nonetheless, removing a task from the list only takes up a minimal amount of runtime during the entire task life cycle, so the concurrency level of removing tasks is usually not a significant concern.

Therefore, I agree with your idea, and it is reasonable to set the upper limit to a small value. Setting this to 64 or 128 might make sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright. What do you think about multiplying by 4?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The total number of random memory operations is consistent, providing multiple locks. Each thread obtains one of the locks through round robin, and can only perform random memory access after obtaining the lock. I got the following test results:

mutexes/threads 1 2 4 8 12 16 24 32 48 64 128
0 5.980172444 2.899437975 1.447906311 0.828731566 0.689618066 0.612355429 0.589401394 0.587380871 0.525477567 0.578456362 0.552132325
1 7.970250774 17.29034894 22.60164692 25.97284605 28.12352579 33.31359697 31.18786342 31.61139126 29.23225856 30.94094675 31.59191497
2 7.883931727 15.97845738 16.11107368 18.73377898 20.34614133 23.02624802 22.69439808 23.15802647 21.80570219 22.48815498 22.98585238
4 7.975676415 10.25364766 11.88074538 15.40198137 15.51024255 16.35328034 15.46874828 15.7982897 15.48703267 15.67227903 15.35829948
8 8.058803258 8.138193999 7.619081588 7.936418179 7.654288652 7.901945312 7.642439744 7.861542054 7.730389506 7.821229611 7.748344488
16 9.797308994 6.213334839 4.455407945 4.496371955 4.291254249 4.130849346 4.347601475 4.294096757 3.990391527 4.028562691 4.059085994
32 8.742854719 4.847656612 3.301780829 2.578327826 2.480488617 2.331294827 2.388718271 2.306257478 2.421350161 2.278177495 2.26569423
64 8.042672888 4.963568223 3.012473492 2.08243512 1.828237002 1.653421053 1.550811454 1.536452054 1.519761769 1.618966043 1.48010674
128 8.62801309 4.978525185 2.637936755 1.777546296 1.549096849 1.359814529 1.43875245 1.385468038 1.238832309 1.249940559 1.248131329
256 8.584906215 4.591742459 2.441556366 1.504790937 1.335449235 1.169191715 1.115906268 1.230570609 1.075581823 1.048285585 1.02977064
512 8.171549127 4.182283461 2.37535305 1.54202412 1.1690348 1.054650104 1.015366906 1.153238581 0.993319168 0.998864737 0.981392837
1024 8.533398132 4.175120792 2.209645233 1.412410651 1.055442085 0.938202817 1.122801927 0.940661156 0.888767412 0.914867532 0.92237305

This is mainly because when the number of worker threads is relatively small, setting the number of locks to 4 times has a significant performance improvement compared to 2 times.

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For complete testing, please refer to https://gist.github.com/wathenjiang/30b689a7ef20b4ea667a2e8f358c321d

I think this performance test can provide a reference for how many mutexes are needed for OwnedTasks.

usize::min(MAX_SHARED_LIST_SIZE, num_cores.next_power_of_two() * 4)
}
}

Expand All @@ -192,9 +208,9 @@ cfg_taskdump! {
/// Locks the tasks, and calls `f` on an iterator over them.
pub(crate) fn for_each<F>(&self, f: F)
where
F: FnMut(&Task<S>)
F: FnMut(&Task<S>),
{
self.inner.lock().list.for_each(f)
self.list.for_each(f);
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -208,6 +208,7 @@ cfg_taskdump! {

use crate::future::Future;
use crate::util::linked_list;
use crate::util::sharded_list;

use std::marker::PhantomData;
use std::ptr::NonNull;
Expand Down Expand Up @@ -503,3 +504,16 @@ unsafe impl<S> linked_list::Link for Task<S> {
self::core::Trailer::addr_of_owned(Header::get_trailer(target))
}
}

/// # Safety
///
/// The id of a task is never changed after creation of the task, so the return value of
/// `get_shard_id` will not change. (The cast may throw away the upper 32 bits of the task id, but
/// the shard id still won't change from call to call.)
unsafe impl<S> sharded_list::ShardedListItem for Task<S> {
unsafe fn get_shard_id(target: NonNull<Self::Target>) -> usize {
// SAFETY: The caller guarantees that `target` points at a valid task.
let task_id = unsafe { Header::get_id(target) };
task_id.0 as usize
}
}