Skip to content

Commit

Permalink
Bug 1836948 - Patch rayon-core with proposed use_current_thread API. …
Browse files Browse the repository at this point in the history
…r=smaug,jnicol,supply-chain-reviewers

This applies rayon-rs/rayon#1063 to our
rayon-core. I'm hopeful it can be merged upstream soon, but meanwhile
this seems worth having on its ow.

Differential Revision: https://phabricator.services.mozilla.com/D186722
  • Loading branch information
emilio committed Aug 28, 2023
1 parent 793c42d commit 8affede
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 32 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Expand Up @@ -169,6 +169,9 @@ web-sys = { path = "build/rust/dummy-web/web-sys" }
# Overrides to allow easier use of common internal crates.
moz_asserts = { path = "mozglue/static/rust/moz_asserts" }

# Patch rayon core to import https://github.com/rayon-rs/rayon/pull/1063
rayon-core = { path = "third_party/rust/rayon-core" }

# Patch `rure` to disable building the cdylib and staticlib targets
# Cargo has no way to disable building targets your dependencies provide which
# you don't depend on, and linking the cdylib breaks during instrumentation
Expand Down
4 changes: 4 additions & 0 deletions supply-chain/config.toml
Expand Up @@ -177,6 +177,10 @@ notes = "This is a first-party crate which is entirely unrelated to the crates.i
audit-as-crates-io = true
notes = "This is a first-party crate which is also published to crates.io, but we should publish audits for it for the benefit of the ecosystem."

[policy.rayon-core]
audit-as-crates-io = true
notes = "Identical to upstream, with a Mozilla-authored PR, see Cargo.toml comment for details"

[policy.rure]
audit-as-crates-io = true
notes = "Identical to upstream, but with cdylib and staticlib targets disabled to avoid unnecessary build artifacts and linker errors."
Expand Down
43 changes: 39 additions & 4 deletions third_party/rust/rayon-core/src/lib.rs
Expand Up @@ -99,10 +99,10 @@ pub use self::registry::ThreadBuilder;
pub use self::scope::{in_place_scope, scope, Scope};
pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::current_thread_has_pending_tasks;
pub use self::thread_pool::current_thread_index;
pub use self::thread_pool::ThreadPool;
pub use self::thread_pool::{yield_local, yield_now, Yield};
pub use self::thread_pool::{
clean_up_use_current_thread, current_thread_has_pending_tasks, current_thread_index,
yield_local, yield_now, ThreadPool, Yield,
};

use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};

Expand Down Expand Up @@ -175,6 +175,9 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
/// If RAYON_NUM_THREADS is invalid or zero will use the default.
num_threads: usize,

/// The thread we're building *from* will also be part of the pool.
use_current_thread: bool,

/// Custom closure, if any, to handle a panic that we cannot propagate
/// anywhere else.
panic_handler: Option<Box<PanicHandler>>,
Expand Down Expand Up @@ -228,6 +231,7 @@ impl Default for ThreadPoolBuilder {
fn default() -> Self {
ThreadPoolBuilder {
num_threads: 0,
use_current_thread: false,
panic_handler: None,
get_thread_name: None,
stack_size: None,
Expand Down Expand Up @@ -445,6 +449,7 @@ impl<S> ThreadPoolBuilder<S> {
spawn_handler: CustomSpawn::new(spawn),
// ..self
num_threads: self.num_threads,
use_current_thread: self.use_current_thread,
panic_handler: self.panic_handler,
get_thread_name: self.get_thread_name,
stack_size: self.stack_size,
Expand Down Expand Up @@ -532,6 +537,34 @@ impl<S> ThreadPoolBuilder<S> {
self
}

/// Use the current thread as one of the threads in the pool.
///
/// The current thread is guaranteed to be at index 0, and since the thread is not managed by
/// rayon, the spawn and exit handlers do not run for that thread.
///
/// Note that the current thread won't run the main work-stealing loop, so jobs spawned into
/// the thread-pool will generally not be picked up automatically by this thread unless you
/// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], [`scope()`], or
/// [`clean_up_use_current_thread()`].
///
/// # Panics
///
/// This function won't panic itself, but [`ThreadPoolBuilder::build()`] will panic if you've
/// called this function and the current thread is already part of another [`ThreadPool`].
///
/// # Cleaning up a local thread-pool
///
/// In order to properly clean-up the worker thread state, for local thread-pools you should
/// call [`clean_up_use_current_thread()`] from the same thread that built the thread-pool.
/// See that function's documentation for more details.
///
/// This call is not required, but without it the registry will leak even if the pool is
/// otherwise terminated.
pub fn use_current_thread(mut self) -> Self {
self.use_current_thread = true;
self
}

/// Returns a copy of the current panic handler.
fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
self.panic_handler.take()
Expand Down Expand Up @@ -771,6 +804,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ThreadPoolBuilder {
ref num_threads,
ref use_current_thread,
ref get_thread_name,
ref panic_handler,
ref stack_size,
Expand All @@ -795,6 +829,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {

f.debug_struct("ThreadPoolBuilder")
.field("num_threads", num_threads)
.field("use_current_thread", use_current_thread)
.field("get_thread_name", &get_thread_name)
.field("panic_handler", &panic_handler)
.field("stack_size", &stack_size)
Expand Down
68 changes: 42 additions & 26 deletions third_party/rust/rayon-core/src/registry.rs
Expand Up @@ -139,6 +139,9 @@ pub(super) struct Registry {
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,

/// Whether the first thread of our pool is the creator of the thread pool.
used_creator_thread: bool,

// When this latch reaches 0, it means that all work on this
// registry must be complete. This is ensured in the following ways:
//
Expand Down Expand Up @@ -210,26 +213,7 @@ fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
// is stubbed out, and we won't have to change anything if they do add real threading.
let unsupported = matches!(&result, Err(e) if e.is_unsupported());
if unsupported && WorkerThread::current().is_null() {
let builder = ThreadPoolBuilder::new()
.num_threads(1)
.spawn_handler(|thread| {
// Rather than starting a new thread, we're just taking over the current thread
// *without* running the main loop, so we can still return from here.
// The WorkerThread is leaked, but we never shutdown the global pool anyway.
let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
let registry = &*worker_thread.registry;
let index = worker_thread.index;

unsafe {
WorkerThread::set_current(worker_thread);

// let registry know we are ready to do work
Latch::set(&registry.thread_infos[index].primed);
}

Ok(())
});

let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread();
let fallback_result = Registry::new(builder);
if fallback_result.is_ok() {
return fallback_result;
Expand Down Expand Up @@ -291,6 +275,7 @@ impl Registry {
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
exit_handler: builder.take_exit_handler(),
used_creator_thread: builder.use_current_thread,
});

// If we return early or panic, make sure to terminate existing threads.
Expand All @@ -305,6 +290,23 @@ impl Registry {
stealer,
index,
};

if index == 0 && builder.use_current_thread {
// Rather than starting a new thread, we're just taking over the current thread
// *without* running the main loop, so we can still return from here.
// The WorkerThread is leaked, but we never shutdown the global pool anyway.
//
// For local pools, the caller is responsible of cleaning this up if they need to
// by using clean_up_use_current_thread.
let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread)));

unsafe {
WorkerThread::set_current(worker_thread);
Latch::set(&registry.thread_infos[index].primed);
}
continue;
}

if let Err(e) = builder.get_spawn_handler().spawn(thread) {
return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
}
Expand Down Expand Up @@ -619,6 +621,10 @@ impl Registry {
pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
self.sleep.notify_worker_latch_is_set(target_worker_index);
}

pub(super) fn used_creator_thread(&self) -> bool {
self.used_creator_thread
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
Expand Down Expand Up @@ -945,13 +951,8 @@ unsafe fn main_loop(thread: ThreadBuilder) {
worker: index,
terminate_addr: my_terminate_latch.as_core_latch().addr(),
});
worker_thread.wait_until(my_terminate_latch);

// Should not be any work left in our queue.
debug_assert!(worker_thread.take_local_job().is_none());

// let registry know we are done
Latch::set(&registry.thread_infos[index].stopped);
wait_until_out_of_work(worker_thread);

// Normal termination, do not abort.
mem::forget(abort_guard);
Expand All @@ -965,6 +966,21 @@ unsafe fn main_loop(thread: ThreadBuilder) {
}
}

pub(crate) unsafe fn wait_until_out_of_work(worker_thread: &WorkerThread) {
debug_assert_eq!(worker_thread as *const _, WorkerThread::current());
let registry = &*worker_thread.registry;
let index = worker_thread.index;
let my_terminate_latch = &registry.thread_infos[index].terminate;

worker_thread.wait_until(my_terminate_latch);

// Should not be any work left in our queue.
debug_assert!(worker_thread.take_local_job().is_none());

// let registry know we are done
Latch::set(&registry.thread_infos[index].stopped);
}

/// If already in a worker-thread, just execute `op`. Otherwise,
/// execute `op` in the default thread-pool. Either way, block until
/// `op` completes and return its return value. If `op` panics, that
Expand Down
33 changes: 33 additions & 0 deletions third_party/rust/rayon-core/src/thread_pool/mod.rs
Expand Up @@ -461,6 +461,39 @@ pub fn yield_local() -> Option<Yield> {
}
}

/// Waits for termination of the thread-pool (if pending), and cleans up resources allocated by
/// [`ThreadPoolBuilder::use_current_thread()`]. Should only be called from the thread that built
/// the thread-pool, and only when [`ThreadPoolBuilder::use_current_thread()`] is used.
///
/// Calling this function from a thread pool job will block indefinitely.
///
/// Calling this function before before the thread-pool has been dropped will cause the thread to
/// not return control flow to the caller until that happens (stealing work as necessary).
///
/// # Panics
///
/// If the calling thread is no the creator thread of a thread-pool, or not part of that
/// thread-pool, via [`ThreadPoolBuilder::use_current_thread()`].
pub fn clean_up_use_current_thread() {
unsafe {
let thread = WorkerThread::current()
.as_ref()
.expect("Should be called from a worker thread");
assert!(
thread.registry().used_creator_thread(),
"Should only be used to clean up the pool creator constructor thread"
);
assert_eq!(
thread.index(),
0,
"Should be called from the thread that created the pool"
);
crate::registry::wait_until_out_of_work(thread);
let _ = Box::from_raw(WorkerThread::current() as *mut WorkerThread);
}
assert!(WorkerThread::current().is_null());
}

/// Result of [`yield_now()`] or [`yield_local()`].
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Yield {
Expand Down

0 comments on commit 8affede

Please sign in to comment.