Skip to content

Commit

Permalink
core: Deal with drop(ThreadPool) from a local operation.
Browse files Browse the repository at this point in the history
From other injected jobs this is not an issue, we'll end up waiting for
ever till out of work.
  • Loading branch information
emilio committed Sep 2, 2023
1 parent e117ebd commit 8c840bd
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 8 deletions.
32 changes: 24 additions & 8 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,7 @@ impl Registry {
} else if (*worker_thread).registry().id() != self.id() {
self.in_worker_cross(&*worker_thread, op)
} else {
// Perfectly valid to give them a `&T`: this is the
// current thread, so we know the data structure won't be
// invalidated until we return.
op(&*worker_thread, false)
(*worker_thread).execute_local_op(op)
}
}
}
Expand Down Expand Up @@ -630,6 +627,9 @@ impl Registry {
if t.index != 0 {
return;
}
if t.in_local_op.get() {
return;
}
unsafe {
wait_until_out_of_work(t);
let _ = Box::from_raw(WorkerThread::current() as *mut WorkerThread);
Expand Down Expand Up @@ -698,6 +698,9 @@ pub(super) struct WorkerThread {
rng: XorShift64Star,

registry: Arc<Registry>,

/// Whether we're executing a local operation that must keep us alive.
in_local_op: Cell<bool>,
}

// This is a bit sketchy, but basically: the WorkerThread is
Expand All @@ -718,6 +721,7 @@ impl From<ThreadBuilder> for WorkerThread {
index: thread.index,
rng: XorShift64Star::new(),
registry: thread.registry,
in_local_op: Cell::new(false),
}
}
}
Expand Down Expand Up @@ -946,6 +950,21 @@ impl WorkerThread {
}
}
}

fn execute_local_op<OP, R>(&self, op: OP) -> R
where
OP: FnOnce(&WorkerThread, bool) -> R + Send,
R: Send,
{
let old_in_local_op = self.in_local_op.get();
self.in_local_op.set(true);

let r = op(self, false);

self.in_local_op.set(old_in_local_op);

r
}
}

/// ////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1017,10 +1036,7 @@ where
unsafe {
let owner_thread = WorkerThread::current();
if !owner_thread.is_null() {
// Perfectly valid to give them a `&T`: this is the
// current thread, so we know the data structure won't be
// invalidated until we return.
op(&*owner_thread, false)
(*owner_thread).execute_local_op(op)
} else {
global_registry().in_worker(op)
}
Expand Down
17 changes: 17 additions & 0 deletions rayon-core/tests/use_current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,20 @@ fn use_current_thread_basic() {
let _ = handle.join();
}
}

#[test]
fn use_current_thread_drop_from_pool() {
let pool = ThreadPoolBuilder::new()
.use_current_thread()
.build()
.unwrap();
assert_eq!(rayon_core::current_thread_index(), Some(0));

rayon_core::scope(|_| drop(pool));

assert_eq!(
rayon_core::current_thread_index(),
Some(0),
"Can't shut down current thread cleanly if in a local op"
);
}

0 comments on commit 8c840bd

Please sign in to comment.