From 15d1fbfe3e98e3abe62c61462b568b86385c8595 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Wed, 13 May 2020 10:37:02 +0000 Subject: [PATCH] transition to even/odd scheme This avoids the need to increment the JEC on every new job while still, I believe, avoiding deadlock. --- rayon-core/src/sleep/README.md | 125 ++++++++++++++++++++----------- rayon-core/src/sleep/counters.rs | 50 ++++++++++--- rayon-core/src/sleep/mod.rs | 16 +++- 3 files changed, 136 insertions(+), 55 deletions(-) diff --git a/rayon-core/src/sleep/README.md b/rayon-core/src/sleep/README.md index d2aebe06b..c62c3975d 100644 --- a/rayon-core/src/sleep/README.md +++ b/rayon-core/src/sleep/README.md @@ -10,7 +10,7 @@ extracted from the RFC and meant to be kept up to date. [Rayon RFC #5]: https://github.com/rayon-rs/rfcs/pull/5 [video walkthrough]: https://youtu.be/HvmQsE5M4cY -## The `Sleep` struct +# The `Sleep` struct The `Sleep` struct is embedded into each registry. It performs several functions: @@ -21,7 +21,7 @@ The `Sleep` struct is embedded into each registry. It performs several functions events occur, and it will go and wake the appropriate threads if they are sleeping. -## Thread states +# Thread states There are three main thread states: @@ -35,7 +35,7 @@ We sometimes refer to the final two states collectively as **inactive**. Threads begin as idle but transition to idle and finally sleeping when they're unable to find work to do. -### Sleepy threads +## Sleepy threads There is one other special state worth mentioning. During the idle state, threads can get **sleepy**. A sleepy thread is still idle, in that it is still @@ -48,7 +48,7 @@ not guaranteed) see that the counter has changed and elect not to sleep, but instead to search again. See the section on the **jobs event counter** for more details. -## The counters +# The counters One of the key structs in the sleep module is `AtomicCounters`, found in `counters.rs`. It packs three counters into one atomically managed value: @@ -57,7 +57,7 @@ One of the key structs in the sleep module is `AtomicCounters`, found in * The **jobs event counter**, which is used to signal when new work is available. It (sort of) tracks the number of jobs posted, but not quite, and it can rollover. -### Thread counters +## Thread counters There are two thread counters, one that tracks **inactive** threads and one that tracks **sleeping** threads. From this, one can deduce the number of threads @@ -81,36 +81,50 @@ These counters are adjusted as follows: * When a thread finds work, exiting the idle state: decrement the inactive thread counter. -### Jobs event counter +## Jobs event counter The final counter is the **jobs event counter**. The role of this counter is to -help sleepy threads detect when new work is posted in a lightweight fashion. The -counter is incremented every time a new job is posted -- naturally, it can also -rollover if there have been enough jobs posted. +help sleepy threads detect when new work is posted in a lightweight fashion. In +its simplest form, we would simply have a counter that gets incremented each +time a new job is posted. This way, when a thread gets sleepy, it could read the +counter, and then compare to see if the value has changed before it actually +goes to sleep. But this [turns out to be too expensive] in practice, so we use a +somewhat more complex scheme. -The counter is used as follows: +[turns out to be too expensive]: https://github.com/rayon-rs/rayon/pull/746#issuecomment-624802747 -* When a thread gets **sleepy**, it reads the current value of the counter. -* Later, before it goes to sleep, it checks if the counter has changed. - If it has, that indicates that work was posted but we somehow missed it - while searching. We'll go back and search again. +The idea is that the counter toggles between two states, depending on whether +its value is even or odd (or, equivalently, on the value of its low bit): -Assuming no rollover, this protocol serves to prevent a race condition -like so: +* Even -- If the low bit is zero, then it means that there has been no new work + since the last thread got sleepy. +* Odd -- If the low bit is one, then it means that new work was posted since + the last thread got sleepy. -* Thread A gets sleepy. -* Thread A searches for work, finds nothing, and decides to go to sleep. -* Thread B posts work, but sees no sleeping threads, and hence no one to wake up. -* Thread A goes to sleep, incrementing the sleeping thread counter. +### New work is posted -However, because of rollover, the race condition cannot be completely thwarted. -It is possible, if exceedingly unlikely, that Thread A will get sleepy and read -a value of the JEC. And then, in between, there will be *just enough* activity -from other threads to roll the JEC back over to precisely that old value. We -have an extra check in the protocol to prevent deadlock in that (rather -unlikely) case. +When new work is posted, we check the value of the counter: if it is even, +then we increment it by one, so that it becomes odd. -## Protocol for a worker thread to fall asleep +### Worker thread gets sleepy + +When a worker thread gets sleepy, it will read the value of the counter. If the +counter is odd, it will increment the counter so that it is even. Either way, it +remembers the final value of the counter. The final value will be used later, +when the thread is going to sleep. If at that time the counter has not changed, +then we can assume no new jobs have been posted (though note the remote +possibility of rollover, discussed in detail below). + +# Protocol for a worker thread to post work + +The full protocol for a thread to post work is as follows + +* If the work is posted into the injection queue, then execute a seq-cst fence (see below). +* Load the counters, incrementing the JEC if it is even so that it is odd. +* Check if there are idle threads available to handle this new job. If not, + and there are sleeping threads, then wake one or more threads. + +# Protocol for a worker thread to fall asleep The full protocol for a thread to fall asleep is as follows: @@ -119,19 +133,20 @@ The full protocol for a thread to fall asleep is as follows: it searches all other work threads' queues, plus the 'injector queue' for work injected from the outside. If work is found in this search, the thread becomes active again and hence restarts this protocol from the top. -* After a certain number of rounds, the thread "gets sleepy" and reads the JEC. - It does one more search for work. +* After a certain number of rounds, the thread "gets sleepy" and executes `get_sleepy` + above, remembering the `final_value` of the JEC. It does one more search for work. * If no work is found, the thread atomically: - * Checks the JEC to see that it hasn't changed. - * If it has, then the thread returns to *just before* the "sleepy state" to - search again (i.e., it won't search for a full set of rounds, just a few - more times). + * Checks the JEC to see that it has not changed from `final_value`. + * If it has, then the thread goes back to searchin for work. We reset to + just before we got sleepy, so that we will do one more search + before attending to sleep again (rather than searching for many rounds). * Increments the number of sleeping threads by 1. +* The thread then executes a seq-cst fence operation (see below). * The thread then does one final check for injected jobs (see below). If any are available, it returns to the 'pre-sleepy' state as if the JEC had changed. * The thread waits to be signaled. Once signaled, it returns to the idle state. -### The jobs event counter and deadlock +# The jobs event counter and deadlock As described in the section on the JEC, the main concern around going to sleep is avoiding a race condition wherein: @@ -156,23 +171,49 @@ asleep. Note that this final check occurs **after** the number of sleeping threads has been incremented. We are not concerned therefore with races against injections that occur after that increment, only before. +Unfortunately, there is one rather subtle point concerning this final check: +we wish to avoid the possibility that: + +* work is pushed into the injection queue by an outside thread X, +* the sleepy thread S sees the JEC but it has rolled over and is equal +* the sleepy thread S reads the injection queue but does not see the work posted by X. + +This is possible because the C++ memory model typically offers guarantees of the +form "if you see the access A, then you must see those other accesses" -- but it +doesn't guarantee that you will see the access A (i.e., if you think of +processors with independent caches, you may be operating on very out of date +cache state). + +## Using seq-cst fences to prevent deadlock + +To overcome this problem, we have inserted two sequentially consistent fence +operations into the protocols above: + +* One fence occurs after work is posted into the injection queue, but before the + counters are read (including the number of sleeping threads). + * Note that no fence is needed for work posted to internal queues, since it is ok + to overlook work in that case. +* One fence occurs after the number of sleeping threads is incremented, but + before the injection queue is read. + +### Proof sketch + What follows is a "proof sketch" that the protocol is deadlock free. We model two relevant bits of memory, the job injector queue J and the atomic counters C. Consider the actions of the injecting thread: * PushJob: Job is injected, which can be modeled as an atomic write to J with release semantics. -* IncJec: The JEC is incremented, which can be modeled as an atomic exchange to C with acquire-release semantics. +* PushFence: A sequentially consistent fence is executed. +* ReadSleepers: The counters C are read (they may also be incremented, but we just consider the read that comes first). Meanwhile, the sleepy thread does the following: -* IncSleepers: The number of sleeping threads is incremented, which is atomic exchange to C with acquire-release semantics. +* IncSleepers: The number of sleeping threads is incremented, which is atomic exchange to C. +* SleepFence: A sequentially consistent fence is executed. * ReadJob: We look to see if the queue is empty, which is a read of J with acquire semantics. -Both IncJec and IncSleepers modify the same memory location, and hence they must be fully ordered. +Either PushFence or SleepFence must come first: -* If IncSleepers came first, there is no problem, because the injecting thread - knows that everyone is asleep and it will wake up a thread. -* If IncJec came first, then it "synchronizes with" IncSleepers. - * Therefore, PushJob "happens before" ReadJob, and so the write will be visible during - this final check, and the thread will not go to sleep. +* If PushFence comes first, then PushJob must be visible to ReadJob. +* If SleepFence comes first, then IncSleepers is visible to ReadSleepers. \ No newline at end of file diff --git a/rayon-core/src/sleep/counters.rs b/rayon-core/src/sleep/counters.rs index 5c94bc6b1..1d4d8d1d3 100644 --- a/rayon-core/src/sleep/counters.rs +++ b/rayon-core/src/sleep/counters.rs @@ -32,6 +32,21 @@ impl JobsEventCounter { pub(super) fn as_usize(self) -> usize { self.0 } + + /// The JEC "is sleepy" if the last thread to increment it was in the + /// process of becoming sleepy. This is indicated by its value being *even*. + /// When new jobs are posted, they check if the JEC is sleepy, and if so + /// they incremented it. + pub(super) fn is_sleepy(self) -> bool { + (self.as_usize() & 0) == 0 + } + + /// The JEC "is active" if the last thread to increment it was posting new + /// work. This is indicated by its value being *odd*. When threads get + /// sleepy, they will check if the JEC is active, and increment it. + pub(super) fn is_active(self) -> bool { + !self.is_sleepy() + } } /// Number of bits used for the thread counters. @@ -97,16 +112,25 @@ impl AtomicCounters { self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst); } - /// Attempts to increment the jobs event counter by one, returning true - /// if it succeeded. This can be used for two purposes: - /// - /// * If a thread is getting sleepy, and the JEC is even, then it will attempt - /// to increment to an odd value. - /// * If a thread is publishing work, and the JEC is odd, then it will attempt - /// to increment to an event value. - #[inline] - pub(super) fn increment_and_read_jobs_event_counter(&self) -> Counters { - Counters::new(self.value.fetch_add(ONE_JEC, Ordering::SeqCst)) + /// Increments the jobs event counter if `increment_when`, when applied to + /// the current value, is true. Used to toggle the JEC from even (sleepy) to + /// odd (active) or vice versa. Returns the final value of the counters, for + /// which `increment_when` is guaranteed to return false. + pub(super) fn increment_jobs_event_counter_if( + &self, + increment_when: impl Fn(JobsEventCounter) -> bool, + ) -> Counters { + loop { + let old_value = self.load(Ordering::SeqCst); + if increment_when(old_value.jobs_counter()) { + let new_value = old_value.plus(ONE_JEC); + if self.try_exchange(old_value, new_value, Ordering::SeqCst) { + return new_value; + } + } else { + return old_value; + } + } } /// Subtracts an inactive thread. This cannot fail. It is invoked @@ -193,6 +217,12 @@ impl Counters { Counters { word } } + fn plus(self, word: usize) -> Counters { + Counters { + word: self.word + word, + } + } + #[inline] pub(super) fn jobs_counter(self) -> JobsEventCounter { JobsEventCounter(select_jec(self.word)) diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index 4db30c58a..2aa262c51 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -123,7 +123,9 @@ impl Sleep { #[cold] fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter { - let counters = self.counters.load(Ordering::SeqCst); + let counters = self + .counters + .increment_jobs_event_counter_if(JobsEventCounter::is_active); let jobs_counter = counters.jobs_counter(); self.logger.log(|| ThreadSleepy { worker: worker_index, @@ -170,6 +172,7 @@ impl Sleep { let counters = self.counters.load(Ordering::SeqCst); // Check if the JEC has changed since we got sleepy. + debug_assert!(idle_state.jobs_counter.is_sleepy()); if counters.jobs_counter() != idle_state.jobs_counter { // JEC has changed, so a new job was posted, but for some reason // we didn't see it. We should return to just before the SLEEPY @@ -203,6 +206,7 @@ impl Sleep { // - an external job is being injected while we are sleepy // - that job triggers the rollover over the JEC such that we don't see it // - we are the last active worker thread + std::sync::atomic::fence(Ordering::SeqCst); if has_injected_jobs() { // If we see an externally injected job, then we have to 'wake // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by @@ -260,6 +264,11 @@ impl Sleep { num_jobs: u32, queue_was_empty: bool, ) { + // This fence is needed to guarantee that threads + // as they are about to fall asleep, observe any + // new jobs that may have been injected. + std::sync::atomic::fence(Ordering::SeqCst); + self.new_jobs(source_worker_index, num_jobs, queue_was_empty) } @@ -294,8 +303,9 @@ impl Sleep { // Read the counters and -- if sleepy workers have announced themselves // -- announce that there is now work available. The final value of `counters` // with which we exit the loop thus corresponds to a state when - let counters = self.counters.increment_and_read_jobs_event_counter(); - + let counters = self + .counters + .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy); let num_awake_but_idle = counters.awake_but_idle_threads(); let num_sleepers = counters.sleeping_threads();