Skip to content

Commit

Permalink
core: Simplify use_current_thread clean-up.
Browse files Browse the repository at this point in the history
Do it from Registry::terminate() instead of exposing a new API.
  • Loading branch information
emilio committed Aug 23, 2023
1 parent 9561b27 commit 38c411b
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 114 deletions.
11 changes: 4 additions & 7 deletions rayon-core/src/lib.rs
Expand Up @@ -100,8 +100,8 @@ pub use self::scope::{in_place_scope, scope, Scope};
pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::{
clean_up_use_current_thread, current_thread_has_pending_tasks, current_thread_index,
yield_local, yield_now, ThreadPool, Yield,
current_thread_has_pending_tasks, current_thread_index, yield_local, yield_now, ThreadPool,
Yield,
};

use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
Expand Down Expand Up @@ -548,11 +548,8 @@ impl<S> ThreadPoolBuilder<S> {
/// # Cleaning up a local thread-pool
///
/// In order to properly clean-up the worker thread state, for local thread-pools you should
/// call [`clean_up_use_current_thread()`] from the same thread that built the thread-pool.
/// See that function's documentation for more details.
///
/// This call is not required, but without it the registry will leak even if the pool is
/// otherwise terminated.
/// drop the thread pool from the same thread it was created on. Otherwise the registry will
/// leak (even if the pool is otherwise terminated).
pub fn use_current_thread(mut self) -> Self {
self.use_current_thread = true;
self
Expand Down
26 changes: 21 additions & 5 deletions rayon-core/src/registry.rs
Expand Up @@ -227,7 +227,8 @@ struct Terminator<'a>(&'a Arc<Registry>);

impl<'a> Drop for Terminator<'a> {
fn drop(&mut self) {
self.0.terminate()
self.0.terminate();
self.0.clean_up_creator_thread_if_needed();
}
}

Expand Down Expand Up @@ -617,14 +618,29 @@ impl Registry {
}
}

/// Cleans-up the creator thread from the pool if needed.
pub(super) fn clean_up_creator_thread_if_needed(&self) {
if !self.used_creator_thread {
return;
}
let t = match self.current_thread() {
Some(t) => t,
None => return,
};
if t.index != 0 {
return;
}
unsafe {
wait_until_out_of_work(t);
let _ = Box::from_raw(WorkerThread::current() as *mut WorkerThread);
}
assert!(WorkerThread::current().is_null());
}

/// Notify the worker that the latch they are sleeping on has been "set".
pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
self.sleep.notify_worker_latch_is_set(target_worker_index);
}

pub(super) fn used_creator_thread(&self) -> bool {
self.used_creator_thread
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
Expand Down
34 changes: 1 addition & 33 deletions rayon-core/src/thread_pool/mod.rs
Expand Up @@ -368,6 +368,7 @@ impl ThreadPool {
impl Drop for ThreadPool {
fn drop(&mut self) {
self.registry.terminate();
self.registry.clean_up_creator_thread_if_needed();
}
}

Expand Down Expand Up @@ -461,39 +462,6 @@ pub fn yield_local() -> Option<Yield> {
}
}

/// Waits for termination of the thread-pool (if pending), and cleans up resources allocated by
/// [`ThreadPoolBuilder::use_current_thread()`]. Should only be called from the thread that built
/// the thread-pool, and only when [`ThreadPoolBuilder::use_current_thread()`] is used.
///
/// Calling this function from a thread pool job will block indefinitely.
///
/// Calling this function before before the thread-pool has been dropped will cause the thread to
/// not return control flow to the caller until that happens (stealing work as necessary).
///
/// # Panics
///
/// If the calling thread is no the creator thread of a thread-pool, or not part of that
/// thread-pool, via [`ThreadPoolBuilder::use_current_thread()`].
pub fn clean_up_use_current_thread() {
unsafe {
let thread = WorkerThread::current()
.as_ref()
.expect("Should be called from a worker thread");
assert!(
thread.registry().used_creator_thread(),
"Should only be used to clean up the pool creator constructor thread"
);
assert_eq!(
thread.index(),
0,
"Should be called from the thread that created the pool"
);
crate::registry::wait_until_out_of_work(thread);
let _ = Box::from_raw(WorkerThread::current() as *mut WorkerThread);
}
assert!(WorkerThread::current().is_null());
}

/// Result of [`yield_now()`] or [`yield_local()`].
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Yield {
Expand Down
69 changes: 0 additions & 69 deletions rayon-core/tests/use_current_thread.rs
Expand Up @@ -38,7 +38,6 @@ fn use_current_thread_basic() {
.wait_while(pair.0.lock().unwrap(), |ran| !*ran)
.unwrap();
std::mem::drop(pool); // Drop the pool.
rayon_core::clean_up_use_current_thread(); // Clean-up
assert_eq!(rayon_core::current_thread_index(), None);

// Wait until all threads have actually exited. This is not really needed, other than to
Expand All @@ -47,71 +46,3 @@ fn use_current_thread_basic() {
let _ = handle.join();
}
}

#[test]
fn bogus_use_current_thread_cleanup_wrong_thread() {
assert_eq!(rayon_core::current_thread_index(), None);
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let pair2 = Arc::clone(&pair);
let pool = ThreadPoolBuilder::new()
.num_threads(2)
.use_current_thread()
.panic_handler(move |e| {
let &(ref panic_message, ref condvar) = &*pair2;
*panic_message.lock().unwrap() = Some(e.downcast::<String>().unwrap());
condvar.notify_one();
})
.build()
.unwrap();
assert_eq!(rayon_core::current_thread_index(), Some(0));

pool.spawn(move || {
rayon_core::clean_up_use_current_thread();
// Should never get here.
});

std::mem::drop(pool); // Drop the pool.
let panic_message = pair
.1
.wait_while(pair.0.lock().unwrap(), |panic_message| {
panic_message.is_none()
})
.unwrap();
let panic_message = panic_message.as_ref().unwrap();
assert!(
panic_message.contains("Should be called from the thread that created the pool"),
"{panic_message}"
);
}

#[test]
#[should_panic]
fn bogus_use_current_thread_cleanup_no_thread() {
assert_eq!(rayon_core::current_thread_index(), None);
rayon_core::clean_up_use_current_thread();
}

#[test]
#[ignore = "Hard to test things that should just infinitely block"]
fn bogus_use_current_thread_cleanup_reentrant() {
let pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(1)
.use_current_thread()
.build()
.unwrap(),
);
assert_eq!(rayon_core::current_thread_index(), Some(0));
let pool_clone = Arc::clone(&pool);
let handle = std::thread::spawn(move || {
pool_clone.spawn(move || {
// This should not execute until we clean_up_use_current_thread, so it'll get called
// reentrantly. This shouldn't result in UB or a use-after-free (should just infinitely
// block).
rayon_core::clean_up_use_current_thread();
});
});
handle.join().unwrap();
std::mem::drop(pool);
rayon_core::clean_up_use_current_thread();
}

0 comments on commit 38c411b

Please sign in to comment.