Skip to content

Commit

Permalink
add new thread-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
nikomatsakis committed Apr 7, 2020
1 parent 23ab4f9 commit dc1f826
Show file tree
Hide file tree
Showing 7 changed files with 805 additions and 325 deletions.
4 changes: 2 additions & 2 deletions rayon-core/src/join/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::job::StackJob;
use crate::latch::{LatchProbe, SpinLatch};
use crate::latch::SpinLatch;
use crate::registry::{self, WorkerThread};
use crate::unwind;
use std::any::Any;
Expand Down Expand Up @@ -133,7 +133,7 @@ where
// Create virtual wrapper for task b; this all has to be
// done here so that the stack frame can keep it all live
// long enough.
let job_b = StackJob::new(call_b(oper_b), SpinLatch::new());
let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread));
let job_b_ref = job_b.as_job_ref();
worker_thread.push(job_b_ref);

Expand Down
220 changes: 147 additions & 73 deletions rayon-core/src/latch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Condvar, Mutex};
use std::usize;

use crate::sleep::Sleep;
use crate::registry::{Registry, WorkerThread};

/// We define various kinds of latches, which are all a primitive signaling
/// mechanism. A latch starts as false. Eventually someone calls `set()` and
Expand Down Expand Up @@ -30,43 +30,140 @@ use crate::sleep::Sleep;
/// - Once `set()` occurs, the next `probe()` *will* observe it. This
/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
pub(super) trait Latch: LatchProbe {
pub(super) trait Latch {
/// Set the latch, signalling others.
fn set(&self);
}

pub(super) trait LatchProbe {
/// Test if the latch is set.
fn probe(&self) -> bool;
pub(super) trait AsCoreLatch {
fn as_core_latch(&self) -> &CoreLatch;
}

/// Latch is not set, owning thread is awake
const UNSET: usize = 0;

/// Latch is not set, owning thread is going to sleep on this latch
/// (but has not yet fallen asleep).
const SLEEPY: usize = 1;

/// Latch is not set, owning thread is asleep on this latch and
/// must be awoken.
const SLEEPING: usize = 2;

/// Latch is set.
const SET: usize = 3;

/// Spin latches are the simplest, most efficient kind, but they do
/// not support a `wait()` operation. They just have a boolean flag
/// that becomes true when `set()` is called.
#[derive(Debug)]
pub(super) struct CoreLatch {
state: AtomicUsize,
}

impl CoreLatch {
fn new() -> Self {
Self {
state: AtomicUsize::new(0),
}
}

/// Returns the address of this core latch as an integer. Used
/// for logging.
pub(super) fn addr(&self) -> usize {
self as *const CoreLatch as usize
}

/// Invoked by owning thread as it prepares to sleep. Returns true
/// if the owning thread may proceed to fall asleep, false if the
/// latch was set in the meantime.
pub(super) fn get_sleepy(&self) -> bool {
self
.state
.compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
}

/// Invoked by owning thread as it falls asleep sleep. Returns
/// true if the owning thread should block, or false if the latch
/// was set in the meantime.
pub(super) fn fall_asleep(&self) -> bool {
self
.state
.compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
}

/// Invoked by owning thread as it falls asleep sleep. Returns
/// true if the owning thread should block, or false if the latch
/// was set in the meantime.
pub(super) fn wake_up(&self) {
if !self.probe() {
let _ =
self
.state
.compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
}
}

/// Set the latch. If this returns true, the owning thread was sleeping
/// and must be awoken.
///
/// This is private because, typically, setting a latch involves
/// doing some wakeups; those are encapsulated in the surrounding
/// latch code.
fn set(&self) -> bool {
let old_state = self.state.swap(SET, Ordering::AcqRel);
old_state == SLEEPING
}

/// Test if this latch has been set.
pub(super) fn probe(&self) -> bool {
self.state.load(Ordering::Acquire) == SET
}
}

/// Spin latches are the simplest, most efficient kind, but they do
/// not support a `wait()` operation. They just have a boolean flag
/// that becomes true when `set()` is called.
pub(super) struct SpinLatch {
b: AtomicBool,
pub(super) struct SpinLatch<'r> {
core_latch: CoreLatch,
registry: &'r Registry,
target_worker_index: usize,
}

impl SpinLatch {
impl<'r> SpinLatch<'r> {
/// Creates a new spin latch that is owned by `thread`. This means
/// that `thread` is the only thread that should be blocking on
/// this latch -- it also means that when the latch is set, we
/// will wake `thread` if it is sleeping.
#[inline]
pub(super) fn new() -> SpinLatch {
pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch {
SpinLatch {
b: AtomicBool::new(false),
core_latch: CoreLatch::new(),
registry: thread.registry(),
target_worker_index: thread.index(),
}
}

pub(super) fn probe(&self) -> bool {
self.core_latch.probe()
}
}

impl LatchProbe for SpinLatch {
impl<'r> AsCoreLatch for SpinLatch<'r> {
#[inline]
fn probe(&self) -> bool {
self.b.load(Ordering::SeqCst)
fn as_core_latch(&self) -> &CoreLatch {
&self.core_latch
}
}

impl Latch for SpinLatch {
impl<'r> Latch for SpinLatch<'r> {
#[inline]
fn set(&self) {
self.b.store(true, Ordering::SeqCst);
if self.core_latch.set() {
self.registry.notify_worker_latch_is_set(self.target_worker_index);
}
}
}

Expand Down Expand Up @@ -104,15 +201,6 @@ impl LockLatch {
}
}

impl LatchProbe for LockLatch {
#[inline]
fn probe(&self) -> bool {
// Not particularly efficient, but we don't really use this operation
let guard = self.m.lock().unwrap();
*guard
}
}

impl Latch for LockLatch {
#[inline]
fn set(&self) {
Expand All @@ -127,81 +215,67 @@ impl Latch for LockLatch {
/// necessarily make the latch be considered `set()`; instead, it just
/// decrements the counter. The latch is only "set" (in the sense that
/// `probe()` returns true) once the counter reaches zero.
///
/// Note: like a `SpinLatch`, count laches are always associated with
/// some registry that is probing them, which must be tickled when
/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
/// reference to that registry. This is because in some cases the
/// registry owns the count-latch, and that would create a cycle. So a
/// `CountLatch` must be given a reference to its owning registry when
/// it is set. For this reason, it does not implement the `Latch`
/// trait (but it doesn't have to, as it is not used in those generic
/// contexts).
#[derive(Debug)]
pub(super) struct CountLatch {
core_latch: CoreLatch,
counter: AtomicUsize,
}

impl CountLatch {
#[inline]
pub(super) fn new() -> CountLatch {
CountLatch {
core_latch: CoreLatch::new(),
counter: AtomicUsize::new(1),
}
}

#[inline]
pub(super) fn increment(&self) {
debug_assert!(!self.probe());
debug_assert!(!self.core_latch.probe());
self.counter.fetch_add(1, Ordering::Relaxed);
}
}

impl LatchProbe for CountLatch {
/// Decrements the latch counter by one. If this is the final
/// count, then the latch is **set**, and calls to `probe()` will
/// return true. Returns whether the latch was set. This is an
/// internal operation, as it does not tickle, and to fail to
/// tickle would lead to deadlock.
#[inline]
fn probe(&self) -> bool {
// Need to acquire any memory reads before latch was set:
self.counter.load(Ordering::SeqCst) == 0
}
}

impl Latch for CountLatch {
/// Set the latch to true, releasing all threads who are waiting.
#[inline]
fn set(&self) {
self.counter.fetch_sub(1, Ordering::SeqCst);
}
}

/// A tickling latch wraps another latch type, and will also awaken a thread
/// pool when it is set. This is useful for jobs injected between thread pools,
/// so the source pool can continue processing its own work while waiting.
pub(super) struct TickleLatch<'a, L: Latch> {
inner: L,
sleep: &'a Sleep,
}

impl<'a, L: Latch> TickleLatch<'a, L> {
#[inline]
pub(super) fn new(latch: L, sleep: &'a Sleep) -> Self {
TickleLatch {
inner: latch,
sleep,
fn set(&self) -> bool {
if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
self.core_latch.set();
true
} else {
false
}
}
}

impl<'a, L: Latch> LatchProbe for TickleLatch<'a, L> {
/// Decrements the latch counter by one and possibly set it. If
/// the latch is set, then the specific worker thread is tickled,
/// which should be the one that owns this latch.
#[inline]
fn probe(&self) -> bool {
self.inner.probe()
pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) {
if self.set() {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
}

impl<'a, L: Latch> Latch for TickleLatch<'a, L> {
impl AsCoreLatch for CountLatch {
#[inline]
fn set(&self) {
self.inner.set();
self.sleep.tickle(usize::MAX);
}
}

impl<'a, L> LatchProbe for &'a L
where
L: LatchProbe,
{
fn probe(&self) -> bool {
L::probe(self)
fn as_core_latch(&self) -> &CoreLatch {
&self.core_latch
}
}

Expand Down

0 comments on commit dc1f826

Please sign in to comment.