Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: use internal ThreadId implementation #5184

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .cirrus.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
freebsd_instance:
image: freebsd-12-3-release-amd64
env:
RUST_BACKTRACE: 1
RUST_STABLE: stable
RUST_NIGHTLY: nightly-2022-10-25
RUSTFLAGS: -D warnings
Expand Down
22 changes: 21 additions & 1 deletion tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ cfg_rt! {
}

struct Context {
/// Uniquely identifies the current thread
#[cfg(feature = "rt")]
thread_id: Cell<Option<ThreadId>>,

/// Handle to the runtime scheduler running on the current thread.
#[cfg(feature = "rt")]
handle: RefCell<Option<scheduler::Handle>>,
Expand All @@ -38,6 +42,9 @@ struct Context {
tokio_thread_local! {
static CONTEXT: Context = {
Context {
#[cfg(feature = "rt")]
thread_id: Cell::new(None),

/// Tracks the current runtime handle to use when spawning,
/// accessing drivers, etc...
#[cfg(feature = "rt")]
Expand Down Expand Up @@ -69,10 +76,23 @@ pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, A
}

cfg_rt! {
use crate::runtime::TryCurrentError;
use crate::runtime::{ThreadId, TryCurrentError};

use std::fmt;

pub(crate) fn thread_id() -> Result<ThreadId, AccessError> {
CONTEXT.try_with(|ctx| {
match ctx.thread_id.get() {
Some(id) => id,
None => {
let id = ThreadId::new();
ctx.thread_id.set(Some(id));
id
}
}
})
}

#[derive(Debug, Clone, Copy)]
#[must_use]
pub(crate) enum EnterRuntime {
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ cfg_rt! {
mod runtime;
pub use runtime::{Runtime, RuntimeFlavor};

mod thread_id;
pub(crate) use thread_id::ThreadId;

cfg_metrics! {
mod metrics;
pub use metrics::RuntimeMetrics;
Expand Down
46 changes: 46 additions & 0 deletions tokio/src/runtime/thread_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::num::NonZeroU64;

#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)]
pub(crate) struct ThreadId(NonZeroU64);

impl ThreadId {
cfg_has_atomic_u64! {
pub(crate) fn new() -> ThreadId {
use std::sync::atomic::{AtomicU64, Ordering::Relaxed};

static COUNTER: AtomicU64 = AtomicU64::new(0);

let mut last = COUNTER.load(Relaxed);
loop {
let id = match last.checked_add(1) {
Some(id) => id,
None => exhausted(),
};

match COUNTER.compare_exchange_weak(last, id, Relaxed, Relaxed) {
Ok(_) => return ThreadId(NonZeroU64::new(id).unwrap()),
Err(id) => last = id,
}
}
}
}
Comment on lines +7 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that doing the same as in list.rs would be sufficient here.

// The id from the module below is used to verify whether a given task is stored
// in this OwnedTasks, or some other task. The counter starts at one so we can
// use zero for tasks not owned by any list.
//
// The safety checks in this file can technically be violated if the counter is
// overflown, but the checks are not supposed to ever fail unless there is a
// bug in Tokio, so we accept that certain bugs would not be caught if the two
// mixed up runtimes happen to have the same id.
cfg_has_atomic_u64! {
use std::sync::atomic::{AtomicU64, Ordering};
static NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1);
fn get_next_id() -> u64 {
loop {
let id = NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed);
if id != 0 {
return id;
}
}
}
}
cfg_not_has_atomic_u64! {
use std::sync::atomic::{AtomicU32, Ordering};
static NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1);
fn get_next_id() -> u64 {
loop {
let id = NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed);
if id != 0 {
return u64::from(id);
}
}
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do use the thread ID for correctness on top of the debug checks. It seems riskier, in that case, to fall back to u32.

Copy link
Member

@hawkw hawkw Nov 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC std's thread IDs use a mutex on 32-bit targets, but I'd have to double check to make sure...

edit: oh, i see that that's not actually what this conversation was about, my bad.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we use it for correctness?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


cfg_not_has_atomic_u64! {
cfg_has_const_mutex_new! {
pub(crate) fn new() -> ThreadId {
todo!();
}
}

cfg_not_has_const_mutex_new! {
pub(crate) fn new() -> ThreadId {
todo!();
}
}
}
}

#[cold]
fn exhausted() -> ! {
panic!("failed to generate unique thread ID: bitspace exhausted")
}
36 changes: 7 additions & 29 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Runs `!Send` futures on the current thread.
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::{Arc, Mutex};
use crate::loom::thread::{self, ThreadId};
use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
use crate::runtime::{context, ThreadId};
use crate::sync::AtomicWaker;
use crate::util::RcCell;

Expand Down Expand Up @@ -277,12 +277,10 @@ pin_project! {
}

tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
thread_id: Cell::new(None),
ctx: RcCell::new(),
} });

struct LocalData {
thread_id: Cell<Option<ThreadId>>,
ctx: RcCell<Context>,
}

Expand Down Expand Up @@ -379,12 +377,13 @@ impl fmt::Debug for LocalEnterGuard {
impl LocalSet {
/// Returns a new local task set.
pub fn new() -> LocalSet {
let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
LocalSet {
tick: Cell::new(0),
context: Rc::new(Context {
shared: Arc::new(Shared {
local_state: LocalState {
owner: thread_id().expect("cannot create LocalSet during thread shutdown"),
owner,
owned: LocalOwnedTasks::new(),
local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
},
Expand Down Expand Up @@ -949,7 +948,7 @@ impl Shared {

// We are on the thread that owns the `LocalSet`, so we can
// wake to the local queue.
_ if localdata.get_id() == Some(self.local_state.owner) => {
_ if context::thread_id().ok() == Some(self.local_state.owner) => {
unsafe {
// Safety: we just checked that the thread ID matches
// the localset's owner, so this is safe.
Expand Down Expand Up @@ -1086,14 +1085,13 @@ impl LocalState {

#[track_caller]
fn assert_called_from_owner_thread(&self) {
// FreeBSD has some weirdness around thread-local destruction.
// TODO: remove this hack when thread id is cleaned up
#[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
debug_assert!(
// if we couldn't get the thread ID because we're dropping the local
// data, skip the assertion --- the `Drop` impl is not going to be
// called from another thread, because `LocalSet` is `!Send`
thread_id().map(|id| id == self.owner).unwrap_or(true),
context::thread_id()
.map(|id| id == self.owner)
.unwrap_or(true),
"`LocalSet`'s local run queue must not be accessed by another thread!"
);
}
Expand All @@ -1103,26 +1101,6 @@ impl LocalState {
// ensure they are on the same thread that owns the `LocalSet`.
unsafe impl Send for LocalState {}

impl LocalData {
fn get_id(&self) -> Option<ThreadId> {
self.thread_id.get()
}

fn get_or_insert_id(&self) -> ThreadId {
self.thread_id.get().unwrap_or_else(|| {
let id = thread::current().id();
self.thread_id.set(Some(id));
id
})
}
}

fn thread_id() -> Option<ThreadId> {
CURRENT
.try_with(|localdata| localdata.get_or_insert_id())
.ok()
}

#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
Expand Down