Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ThreadPool::broadcast #492

Merged
merged 6 commits into from Nov 16, 2022
Merged

Add ThreadPool::broadcast #492

merged 6 commits into from Nov 16, 2022

Conversation

cuviper
Copy link
Member

@cuviper cuviper commented Dec 14, 2017

A broadcast runs the closure on every thread in the pool, then collects
the results. It's scheduled somewhat like a very soft interrupt -- it
won't preempt a thread's local work, but will run before it goes to
steal from any other threads.

This can be used when you want to precisely split your work per-thread,
or to set or retrieve some thread-local data in the pool, e.g. #483.

Copy link
Member

@nikomatsakis nikomatsakis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this overall. In terms of what we are committing to publicly, I suppose it's this:

You have the ability to schedule a closure to run exactly once on each thread. The priority is sort of undefined (as is typical for Rayon).

That doesn't seem like too much of a commitment. The main question is whether that behavior is well-defined if the size of the threadpool were to ever become dynamic, but I am more and more dubious of such a thing ever happening, and certainly not without some form of opt-in.

I wonder if it'd be useful to give the closure the number of threads as well? I guess that is readily accessible from the pool, so that's why we don't, right?

@cuviper
Copy link
Member Author

cuviper commented Dec 20, 2017

In terms of what we are committing to publicly, I suppose it's this:

You have the ability to schedule a closure to run exactly once on each thread. The priority is sort of undefined (as is typical for Rayon).

That doesn't seem like too much of a commitment.

One thing to strengthen this is that racing broadcasts will be consistently ordered. That is, given simultaneous broadcasts A and B, then if one thread sees A before B, they all will, and vice versa. So for example, if both are setting the same TLS, every thread will set them in the same order.

It doesn't guarantee that every A will complete before any B starts, but you could use a Barrier in A if needed. Consistent order means you won't have to worry about whether a barrier in B is also waiting, interleaved with A to deadlock.

We don't have to promise this, but I think it's powerful if we do.

The main question is whether that behavior is well-defined if the size of the threadpool were to ever become dynamic, but I am more and more dubious of such a thing ever happening, and certainly not without some form of opt-in.

I'm also skeptical of this happening. Internally, we could at least make sure that threads are never removed while they have a broadcast waiting, but I guess new threads would just be left out.

I wonder if it'd be useful to give the closure the number of threads as well? I guess that is readily accessible from the pool, so that's why we don't, right?

Yeah, it's easy to get, although that could be racy if there are dynamic threads. We could supply that value to indicate the number of times the broadcast will be called -- the number we're actually queuing -- independent of any new threads that may pop up.

@nikomatsakis
Copy link
Member

@cuviper

Yeah, it's easy to get, although that could be racy if there are dynamic threads. We could supply that value to indicate the number of times the broadcast will be called -- the number we're actually queuing -- independent of any new threads that may pop up.

So actually, I think this is a case where it would make sense for us to supply a &BroadcastContext (or BroadcastContext) so that we can easily add more contextual information later. If we really want to future proof, it'd probably be &BroadcastContext<'_>, where the lifetime is currently just phantom.

bors bot added a commit that referenced this pull request Jun 6, 2018
550: add bridge from Iterator to ParallelIterator r=cuviper a=QuietMisdreavus

Half of #46

This started getting reviewed in QuietMisdreavus/polyester#6, but i decided to move my work to Rayon proper.

This PR adds a new trait, `AsParallel`, an implementation on `Iterator + Send`, and an iterator adapter `IterParallel` that implements `ParallelIterator` with a similar "cache items as you go" methodology as Polyester. I introduced a new trait because `ParallelIterator` was implemented on `Range`, which is itself an `Iterator`.

The basic idea is that you would start with a quick sequential `Iterator`, call `.as_parallel()` on it, and be able to use `ParallelIterator` adapters after that point, to do more expensive processing in multiple threads.

The design of `IterParallel` is like this:

* `IterParallel` defers background work to `IterParallelProducer`, which implements `UnindexedProducer`.
* `IterParallelProducer` will split as many times as there are threads in the current pool. (I've been told that #492 is a better way to organize this, but until that's in, this is how i wrote it. `>_>`)
* When folding items, `IterParallelProducer` keeps a `Stealer` from `crossbeam-deque` (added as a dependency, but using the same version as `rayon-core`) to access a deque of items that have already been loaded from the iterator.
* If the `Stealer` is empty, a worker will attempt to lock the Mutex to access the source `Iterator` and the `Deque`.
  * If the Mutex is already locked, it will call `yield_now`. The implementation in polyester used a `synchronoise::SignalEvent` but i've been told that worker threads should not block. In lieu of #548, a regular spin-loop was chosen instead.
  * If the Mutex is available, the worker will load a number of items from the iterator (currently (number of threads * number of threads * 2)) before closing the Mutex and continuing.
  * (If the Mutex is poisoned, the worker will just... stop. Is there a recommended approach here? `>_>`)

This design is effectively a first brush, has [the same caveats as polyester](https://docs.rs/polyester/0.1.0/polyester/trait.Polyester.html#implementation-note), probably needs some extra features in rayon-core, and needs some higher-level docs before i'm willing to let it go. However, i'm putting it here because it was not in the right place when i talked to @cuviper about it last time.

Co-authored-by: QuietMisdreavus <grey@quietmisdreavus.net>
Co-authored-by: Niko Matsakis <niko@alum.mit.edu>
@cuviper cuviper force-pushed the broadcast branch 2 times, most recently from fd4cad2 to 4e05307 Compare October 3, 2018 20:43
@cuviper
Copy link
Member Author

cuviper commented Oct 3, 2018

I've rebased and added a context type. Let me know what you think!

@Zoxc
Copy link

Zoxc commented Apr 12, 2019

I'd like to use this in rustc to collect thread local data (complements my WorkerLocal type well).

@cuviper
Copy link
Member Author

cuviper commented Apr 24, 2019

@Zoxc I've rebased this again, if you'd like to try it out with rustc.

@Zoxc
Copy link

Zoxc commented Nov 1, 2020

There's 2 variations on this which may be useful too. spawn_broadcast which would spawn tasks on all threads without waiting for them. broadcast on a Scope which would spawn tasks on the Scope and could be considered a more general form of this PR.

I wonder if there's room for some generic code here. spawn_broadcast would wait until 'static ends before blocking (a.k.a. never), Scope::broadcast would wait until 'scope ends before blocking and the broadcast in this PR would wait on some internal lifetime before blocking. If you consider join as spawning 2 tasks, you can draw similar parallels between spawn, Scope::spawn and join.

@cuviper
Copy link
Member Author

cuviper commented Nov 2, 2020

Those variations make sense to me. The Scope::broadcast idea should be a pretty easy extension, just adding to the scope counter so they're part of the blocking set. For the static/unblocked version, I guess we could wrap the closure in an Arc and basically just spawn it out.

They could all use the same injection queues, at least.

@cuviper cuviper force-pushed the broadcast branch 5 times, most recently from 6ec01f6 to b6f72f4 Compare June 14, 2022 19:08
@cuviper
Copy link
Member Author

cuviper commented Jun 14, 2022

I rebased again and added spawn_broadcast, including a scoped version. I need more docs/examples on that, but otherwise I think the code is in good shape here.

The last thing I'm thinking about is a specification of when a broadcast will run. Currently, it runs when the local deque is empty, before looking for jobs elsewhere. I can think of multiple options, from high to low "priority":

  • ASAP -- if we've blocked for any reason, run broadcasts first even once the blocked latch is ready.
  • After the seeing latch is still blocked, but before popping from the local deque.
  • After the local deque, but before stealing from other threads. This is what we have now.
  • After thread stealing, but before popping from the global injector.
  • After the global injector, when there are no other sources of pending work.
  • After the whole pool is otherwise blocked/idle.

I'm not certain that the current choice is the best. These could all be supported if we kept distinct queues, and perhaps an enum argument on the broadcast methods to indicate the user's choice, but maybe that's overkill.

@LoganDark
Copy link

LoganDark commented Jun 18, 2022

These could all be supported if we kept distinct queues, and perhaps an enum argument on the broadcast methods to indicate the user's choice, but maybe that's overkill.

It sounds like something that could be tuned, but I agree that it sounds like overkill because people won't know what to pick to get it to Just Work.

You could support an enum, or not, but the most important thing is that it's as fast as possible both when it's the only thing executing on the thread pool and when the thread pool is juggling lots and lots of work. Basically, even if you do have an enum for user choice, please have (and recommend) an API where the user does not have to choose, so that they can benefit from a tried and tested compromise.

"as fast as possible" may mean many things. Broadcasts completing before other work, or a compromise between timely execution and not ruining the efficiency of other tasks running on the thread pool. I'd personally advocate for the latter as a sane default.

A broadcast runs the closure on every thread in the pool, then collects
the results.  It's scheduled somewhat like a very soft interrupt -- it
won't preempt a thread's local work, but will run before it goes to
steal from any other threads.

This can be used when you want to precisely split your work per-thread,
or to set or retrieve some thread-local data in the pool, e.g. rayon-rs#483.
@cuviper
Copy link
Member Author

cuviper commented Nov 16, 2022

Here's the current API summary:

pub fn broadcast<OP, R>(op: OP) -> Vec<R>
where
    OP: Fn(BroadcastContext<'_>) -> R + Sync,
    R: Send;

pub fn spawn_broadcast<OP>(op: OP)
where
    OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static;

pub struct BroadcastContext<'a> { .. }

impl<'a> BroadcastContext<'a> {
    pub fn index(&self) -> usize;
    pub fn num_threads(&self) -> usize;
}

impl<'a> fmt::Debug for BroadcastContext<'a> { .. }

impl<'scope> Scope<'scope> {
    pub fn spawn_broadcast<BODY>(&self, body: BODY)
    where
        BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope;
}

impl<'scope> ScopeFifo<'scope> {
    pub fn spawn_broadcast<BODY>(&self, body: BODY)
    where
        BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope;
}

impl ThreadPool {
    pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
    where
        OP: Fn(BroadcastContext<'_>) -> R + Sync,
        R: Send;

    pub fn spawn_broadcast<OP>(&self, op: OP)
    where
        OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static;
}

I think that's pretty safe, and the current "priority" (before remote work-stealing) still feels like a reasonable default.

@cuviper
Copy link
Member Author

cuviper commented Nov 16, 2022

bors r+

@bors bors bot merged commit 911d6d0 into rayon-rs:master Nov 16, 2022
@cuviper cuviper deleted the broadcast branch February 25, 2023 17:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants