Skip to content

Commit

Permalink
core: Remove clean_up_use_current thread as per PR discussion.
Browse files Browse the repository at this point in the history
  • Loading branch information
emilio committed Sep 4, 2023
1 parent 09125f5 commit d343448
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 117 deletions.
17 changes: 6 additions & 11 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ pub use self::scope::{in_place_scope, scope, Scope};
pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::{
clean_up_use_current_thread, current_thread_has_pending_tasks, current_thread_index,
yield_local, yield_now, ThreadPool, Yield,
current_thread_has_pending_tasks, current_thread_index, yield_local, yield_now, ThreadPool,
Yield,
};

use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
Expand Down Expand Up @@ -537,22 +537,17 @@ impl<S> ThreadPoolBuilder<S> {
///
/// Note that the current thread won't run the main work-stealing loop, so jobs spawned into
/// the thread-pool will generally not be picked up automatically by this thread unless you
/// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], [`scope()`], or
/// [`clean_up_use_current_thread()`].
/// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`].
///
/// # Panics
///
/// This function won't panic itself, but [`ThreadPoolBuilder::build()`] will panic if you've
/// called this function and the current thread is already part of another [`ThreadPool`].
///
/// # Cleaning up a local thread-pool
/// # Local thread-pools
///
/// In order to properly clean-up the worker thread state, for local thread-pools you should
/// call [`clean_up_use_current_thread()`] from the same thread that built the thread-pool.
/// See that function's documentation for more details.
///
/// This call is not required, but without it the registry will leak even if the pool is
/// otherwise terminated.
/// Using this in a local thread-pool means the registry will be leaked. In future versions
/// there might be a way of cleaning up the current-thread state.
pub fn use_current_thread(mut self) -> Self {
self.use_current_thread = true;
self
Expand Down
3 changes: 0 additions & 3 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,6 @@ impl Registry {
// Rather than starting a new thread, we're just taking over the current thread
// *without* running the main loop, so we can still return from here.
// The WorkerThread is leaked, but we never shutdown the global pool anyway.
//
// For local pools, the caller is responsible of cleaning this up if they need to
// by using clean_up_use_current_thread.
let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread)));

unsafe {
Expand Down
33 changes: 0 additions & 33 deletions rayon-core/src/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,39 +461,6 @@ pub fn yield_local() -> Option<Yield> {
}
}

/// Waits for termination of the thread-pool (if pending), and cleans up resources allocated by
/// [`ThreadPoolBuilder::use_current_thread()`]. Should only be called from the thread that built
/// the thread-pool, and only when [`ThreadPoolBuilder::use_current_thread()`] is used.
///
/// Calling this function from a thread pool job will block indefinitely.
///
/// Calling this function before before the thread-pool has been dropped will cause the thread to
/// not return control flow to the caller until that happens (stealing work as necessary).
///
/// # Panics
///
/// If the calling thread is no the creator thread of a thread-pool, or not part of that
/// thread-pool, via [`ThreadPoolBuilder::use_current_thread()`].
pub fn clean_up_use_current_thread() {
unsafe {
let thread = WorkerThread::current()
.as_ref()
.expect("Should be called from a worker thread");
assert!(
thread.registry().used_creator_thread(),
"Should only be used to clean up the pool creator constructor thread"
);
assert_eq!(
thread.index(),
0,
"Should be called from the thread that created the pool"
);
crate::registry::wait_until_out_of_work(thread);
let _ = Box::from_raw(WorkerThread::current() as *mut WorkerThread);
}
assert!(WorkerThread::current().is_null());
}

/// Result of [`yield_now()`] or [`yield_local()`].
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Yield {
Expand Down
70 changes: 0 additions & 70 deletions rayon-core/tests/use_current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,80 +38,10 @@ fn use_current_thread_basic() {
.wait_while(pair.0.lock().unwrap(), |ran| !*ran)
.unwrap();
std::mem::drop(pool); // Drop the pool.
rayon_core::clean_up_use_current_thread(); // Clean-up
assert_eq!(rayon_core::current_thread_index(), None);

// Wait until all threads have actually exited. This is not really needed, other than to
// reduce noise of leak-checking tools.
for handle in std::mem::take(&mut *JOIN_HANDLES.lock().unwrap()) {
let _ = handle.join();
}
}

#[test]
fn bogus_use_current_thread_cleanup_wrong_thread() {
assert_eq!(rayon_core::current_thread_index(), None);
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let pair2 = Arc::clone(&pair);
let pool = ThreadPoolBuilder::new()
.num_threads(2)
.use_current_thread()
.panic_handler(move |e| {
let &(ref panic_message, ref condvar) = &*pair2;
*panic_message.lock().unwrap() = Some(e.downcast::<String>().unwrap());
condvar.notify_one();
})
.build()
.unwrap();
assert_eq!(rayon_core::current_thread_index(), Some(0));

pool.spawn(move || {
rayon_core::clean_up_use_current_thread();
// Should never get here.
});

std::mem::drop(pool); // Drop the pool.
let panic_message = pair
.1
.wait_while(pair.0.lock().unwrap(), |panic_message| {
panic_message.is_none()
})
.unwrap();
let panic_message = panic_message.as_ref().unwrap();
assert!(
panic_message.contains("Should be called from the thread that created the pool"),
"{panic_message}"
);
}

#[test]
#[should_panic]
fn bogus_use_current_thread_cleanup_no_thread() {
assert_eq!(rayon_core::current_thread_index(), None);
rayon_core::clean_up_use_current_thread();
}

#[test]
#[ignore = "Hard to test things that should just infinitely block"]
fn bogus_use_current_thread_cleanup_reentrant() {
let pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(1)
.use_current_thread()
.build()
.unwrap(),
);
assert_eq!(rayon_core::current_thread_index(), Some(0));
let pool_clone = Arc::clone(&pool);
let handle = std::thread::spawn(move || {
pool_clone.spawn(move || {
// This should not execute until we clean_up_use_current_thread, so it'll get called
// reentrantly. This shouldn't result in UB or a use-after-free (should just infinitely
// block).
rayon_core::clean_up_use_current_thread();
});
});
handle.join().unwrap();
std::mem::drop(pool);
rayon_core::clean_up_use_current_thread();
}

0 comments on commit d343448

Please sign in to comment.