Skip to content

Commit

Permalink
transition to even/odd scheme
Browse files Browse the repository at this point in the history
This avoids the need to increment the JEC on every new job while still,
I believe, avoiding deadlock.
  • Loading branch information
nikomatsakis committed May 15, 2020
1 parent 8c5fb89 commit 15d1fbf
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 55 deletions.
125 changes: 83 additions & 42 deletions rayon-core/src/sleep/README.md
Expand Up @@ -10,7 +10,7 @@ extracted from the RFC and meant to be kept up to date.
[Rayon RFC #5]: https://github.com/rayon-rs/rfcs/pull/5
[video walkthrough]: https://youtu.be/HvmQsE5M4cY

## The `Sleep` struct
# The `Sleep` struct

The `Sleep` struct is embedded into each registry. It performs several functions:

Expand All @@ -21,7 +21,7 @@ The `Sleep` struct is embedded into each registry. It performs several functions
events occur, and it will go and wake the appropriate threads if
they are sleeping.

## Thread states
# Thread states

There are three main thread states:

Expand All @@ -35,7 +35,7 @@ We sometimes refer to the final two states collectively as **inactive**.
Threads begin as idle but transition to idle and finally sleeping when
they're unable to find work to do.

### Sleepy threads
## Sleepy threads

There is one other special state worth mentioning. During the idle state,
threads can get **sleepy**. A sleepy thread is still idle, in that it is still
Expand All @@ -48,7 +48,7 @@ not guaranteed) see that the counter has changed and elect not to sleep, but
instead to search again. See the section on the **jobs event counter** for more
details.

## The counters
# The counters

One of the key structs in the sleep module is `AtomicCounters`, found in
`counters.rs`. It packs three counters into one atomically managed value:
Expand All @@ -57,7 +57,7 @@ One of the key structs in the sleep module is `AtomicCounters`, found in
* The **jobs event counter**, which is used to signal when new work is available.
It (sort of) tracks the number of jobs posted, but not quite, and it can rollover.

### Thread counters
## Thread counters

There are two thread counters, one that tracks **inactive** threads and one that
tracks **sleeping** threads. From this, one can deduce the number of threads
Expand All @@ -81,36 +81,50 @@ These counters are adjusted as follows:
* When a thread finds work, exiting the idle state: decrement the inactive
thread counter.

### Jobs event counter
## Jobs event counter

The final counter is the **jobs event counter**. The role of this counter is to
help sleepy threads detect when new work is posted in a lightweight fashion. The
counter is incremented every time a new job is posted -- naturally, it can also
rollover if there have been enough jobs posted.
help sleepy threads detect when new work is posted in a lightweight fashion. In
its simplest form, we would simply have a counter that gets incremented each
time a new job is posted. This way, when a thread gets sleepy, it could read the
counter, and then compare to see if the value has changed before it actually
goes to sleep. But this [turns out to be too expensive] in practice, so we use a
somewhat more complex scheme.

The counter is used as follows:
[turns out to be too expensive]: https://github.com/rayon-rs/rayon/pull/746#issuecomment-624802747

* When a thread gets **sleepy**, it reads the current value of the counter.
* Later, before it goes to sleep, it checks if the counter has changed.
If it has, that indicates that work was posted but we somehow missed it
while searching. We'll go back and search again.
The idea is that the counter toggles between two states, depending on whether
its value is even or odd (or, equivalently, on the value of its low bit):

Assuming no rollover, this protocol serves to prevent a race condition
like so:
* Even -- If the low bit is zero, then it means that there has been no new work
since the last thread got sleepy.
* Odd -- If the low bit is one, then it means that new work was posted since
the last thread got sleepy.

* Thread A gets sleepy.
* Thread A searches for work, finds nothing, and decides to go to sleep.
* Thread B posts work, but sees no sleeping threads, and hence no one to wake up.
* Thread A goes to sleep, incrementing the sleeping thread counter.
### New work is posted

However, because of rollover, the race condition cannot be completely thwarted.
It is possible, if exceedingly unlikely, that Thread A will get sleepy and read
a value of the JEC. And then, in between, there will be *just enough* activity
from other threads to roll the JEC back over to precisely that old value. We
have an extra check in the protocol to prevent deadlock in that (rather
unlikely) case.
When new work is posted, we check the value of the counter: if it is even,
then we increment it by one, so that it becomes odd.

## Protocol for a worker thread to fall asleep
### Worker thread gets sleepy

When a worker thread gets sleepy, it will read the value of the counter. If the
counter is odd, it will increment the counter so that it is even. Either way, it
remembers the final value of the counter. The final value will be used later,
when the thread is going to sleep. If at that time the counter has not changed,
then we can assume no new jobs have been posted (though note the remote
possibility of rollover, discussed in detail below).

# Protocol for a worker thread to post work

The full protocol for a thread to post work is as follows

* If the work is posted into the injection queue, then execute a seq-cst fence (see below).
* Load the counters, incrementing the JEC if it is even so that it is odd.
* Check if there are idle threads available to handle this new job. If not,
and there are sleeping threads, then wake one or more threads.

# Protocol for a worker thread to fall asleep

The full protocol for a thread to fall asleep is as follows:

Expand All @@ -119,19 +133,20 @@ The full protocol for a thread to fall asleep is as follows:
it searches all other work threads' queues, plus the 'injector queue' for
work injected from the outside. If work is found in this search, the thread
becomes active again and hence restarts this protocol from the top.
* After a certain number of rounds, the thread "gets sleepy" and reads the JEC.
It does one more search for work.
* After a certain number of rounds, the thread "gets sleepy" and executes `get_sleepy`
above, remembering the `final_value` of the JEC. It does one more search for work.
* If no work is found, the thread atomically:
* Checks the JEC to see that it hasn't changed.
* If it has, then the thread returns to *just before* the "sleepy state" to
search again (i.e., it won't search for a full set of rounds, just a few
more times).
* Checks the JEC to see that it has not changed from `final_value`.
* If it has, then the thread goes back to searchin for work. We reset to
just before we got sleepy, so that we will do one more search
before attending to sleep again (rather than searching for many rounds).
* Increments the number of sleeping threads by 1.
* The thread then executes a seq-cst fence operation (see below).
* The thread then does one final check for injected jobs (see below). If any
are available, it returns to the 'pre-sleepy' state as if the JEC had changed.
* The thread waits to be signaled. Once signaled, it returns to the idle state.

### The jobs event counter and deadlock
# The jobs event counter and deadlock

As described in the section on the JEC, the main concern around going to sleep
is avoiding a race condition wherein:
Expand All @@ -156,23 +171,49 @@ asleep. Note that this final check occurs **after** the number of sleeping
threads has been incremented. We are not concerned therefore with races against
injections that occur after that increment, only before.

Unfortunately, there is one rather subtle point concerning this final check:
we wish to avoid the possibility that:

* work is pushed into the injection queue by an outside thread X,
* the sleepy thread S sees the JEC but it has rolled over and is equal
* the sleepy thread S reads the injection queue but does not see the work posted by X.

This is possible because the C++ memory model typically offers guarantees of the
form "if you see the access A, then you must see those other accesses" -- but it
doesn't guarantee that you will see the access A (i.e., if you think of
processors with independent caches, you may be operating on very out of date
cache state).

## Using seq-cst fences to prevent deadlock

To overcome this problem, we have inserted two sequentially consistent fence
operations into the protocols above:

* One fence occurs after work is posted into the injection queue, but before the
counters are read (including the number of sleeping threads).
* Note that no fence is needed for work posted to internal queues, since it is ok
to overlook work in that case.
* One fence occurs after the number of sleeping threads is incremented, but
before the injection queue is read.

### Proof sketch

What follows is a "proof sketch" that the protocol is deadlock free. We model
two relevant bits of memory, the job injector queue J and the atomic counters C.

Consider the actions of the injecting thread:

* PushJob: Job is injected, which can be modeled as an atomic write to J with release semantics.
* IncJec: The JEC is incremented, which can be modeled as an atomic exchange to C with acquire-release semantics.
* PushFence: A sequentially consistent fence is executed.
* ReadSleepers: The counters C are read (they may also be incremented, but we just consider the read that comes first).

Meanwhile, the sleepy thread does the following:

* IncSleepers: The number of sleeping threads is incremented, which is atomic exchange to C with acquire-release semantics.
* IncSleepers: The number of sleeping threads is incremented, which is atomic exchange to C.
* SleepFence: A sequentially consistent fence is executed.
* ReadJob: We look to see if the queue is empty, which is a read of J with acquire semantics.

Both IncJec and IncSleepers modify the same memory location, and hence they must be fully ordered.
Either PushFence or SleepFence must come first:

* If IncSleepers came first, there is no problem, because the injecting thread
knows that everyone is asleep and it will wake up a thread.
* If IncJec came first, then it "synchronizes with" IncSleepers.
* Therefore, PushJob "happens before" ReadJob, and so the write will be visible during
this final check, and the thread will not go to sleep.
* If PushFence comes first, then PushJob must be visible to ReadJob.
* If SleepFence comes first, then IncSleepers is visible to ReadSleepers.
50 changes: 40 additions & 10 deletions rayon-core/src/sleep/counters.rs
Expand Up @@ -32,6 +32,21 @@ impl JobsEventCounter {
pub(super) fn as_usize(self) -> usize {
self.0
}

/// The JEC "is sleepy" if the last thread to increment it was in the
/// process of becoming sleepy. This is indicated by its value being *even*.
/// When new jobs are posted, they check if the JEC is sleepy, and if so
/// they incremented it.
pub(super) fn is_sleepy(self) -> bool {
(self.as_usize() & 0) == 0
}

/// The JEC "is active" if the last thread to increment it was posting new
/// work. This is indicated by its value being *odd*. When threads get
/// sleepy, they will check if the JEC is active, and increment it.
pub(super) fn is_active(self) -> bool {
!self.is_sleepy()
}
}

/// Number of bits used for the thread counters.
Expand Down Expand Up @@ -97,16 +112,25 @@ impl AtomicCounters {
self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
}

/// Attempts to increment the jobs event counter by one, returning true
/// if it succeeded. This can be used for two purposes:
///
/// * If a thread is getting sleepy, and the JEC is even, then it will attempt
/// to increment to an odd value.
/// * If a thread is publishing work, and the JEC is odd, then it will attempt
/// to increment to an event value.
#[inline]
pub(super) fn increment_and_read_jobs_event_counter(&self) -> Counters {
Counters::new(self.value.fetch_add(ONE_JEC, Ordering::SeqCst))
/// Increments the jobs event counter if `increment_when`, when applied to
/// the current value, is true. Used to toggle the JEC from even (sleepy) to
/// odd (active) or vice versa. Returns the final value of the counters, for
/// which `increment_when` is guaranteed to return false.
pub(super) fn increment_jobs_event_counter_if(
&self,
increment_when: impl Fn(JobsEventCounter) -> bool,
) -> Counters {
loop {
let old_value = self.load(Ordering::SeqCst);
if increment_when(old_value.jobs_counter()) {
let new_value = old_value.plus(ONE_JEC);
if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
return new_value;
}
} else {
return old_value;
}
}
}

/// Subtracts an inactive thread. This cannot fail. It is invoked
Expand Down Expand Up @@ -193,6 +217,12 @@ impl Counters {
Counters { word }
}

fn plus(self, word: usize) -> Counters {
Counters {
word: self.word + word,
}
}

#[inline]
pub(super) fn jobs_counter(self) -> JobsEventCounter {
JobsEventCounter(select_jec(self.word))
Expand Down
16 changes: 13 additions & 3 deletions rayon-core/src/sleep/mod.rs
Expand Up @@ -123,7 +123,9 @@ impl Sleep {

#[cold]
fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter {
let counters = self.counters.load(Ordering::SeqCst);
let counters = self
.counters
.increment_jobs_event_counter_if(JobsEventCounter::is_active);
let jobs_counter = counters.jobs_counter();
self.logger.log(|| ThreadSleepy {
worker: worker_index,
Expand Down Expand Up @@ -170,6 +172,7 @@ impl Sleep {
let counters = self.counters.load(Ordering::SeqCst);

// Check if the JEC has changed since we got sleepy.
debug_assert!(idle_state.jobs_counter.is_sleepy());
if counters.jobs_counter() != idle_state.jobs_counter {
// JEC has changed, so a new job was posted, but for some reason
// we didn't see it. We should return to just before the SLEEPY
Expand Down Expand Up @@ -203,6 +206,7 @@ impl Sleep {
// - an external job is being injected while we are sleepy
// - that job triggers the rollover over the JEC such that we don't see it
// - we are the last active worker thread
std::sync::atomic::fence(Ordering::SeqCst);
if has_injected_jobs() {
// If we see an externally injected job, then we have to 'wake
// ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
Expand Down Expand Up @@ -260,6 +264,11 @@ impl Sleep {
num_jobs: u32,
queue_was_empty: bool,
) {
// This fence is needed to guarantee that threads
// as they are about to fall asleep, observe any
// new jobs that may have been injected.
std::sync::atomic::fence(Ordering::SeqCst);

self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
}

Expand Down Expand Up @@ -294,8 +303,9 @@ impl Sleep {
// Read the counters and -- if sleepy workers have announced themselves
// -- announce that there is now work available. The final value of `counters`
// with which we exit the loop thus corresponds to a state when
let counters = self.counters.increment_and_read_jobs_event_counter();

let counters = self
.counters
.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
let num_awake_but_idle = counters.awake_but_idle_threads();
let num_sleepers = counters.sleeping_threads();

Expand Down

0 comments on commit 15d1fbf

Please sign in to comment.