Skip to content

Commit

Permalink
Use crossbeam_deque::Injector instead of crossbeam_queue::SegQueue
Browse files Browse the repository at this point in the history
`Injector` and `SegQueue` are _almost_ identical, down to the very same
comments in their implementations. One difference is that `Injector`
allocates its first block as soon as it's created, but `SegQueue` waits
until its first `push`, which complicates it to allow being `null`.
`Injector` also has methods to steal batches into a deque `Worker`,
which might be useful to us.

At the very least, this lets us trim a dependency.
  • Loading branch information
cuviper committed Aug 17, 2020
1 parent 09428ec commit c7d963a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 27 deletions.
12 changes: 0 additions & 12 deletions ci/compat-Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ num_cpus = "1.2"
lazy_static = "1"
crossbeam-channel = "0.4.2"
crossbeam-deque = "0.7.2"
crossbeam-queue = "0.2"
crossbeam-utils = "0.7"

[dev-dependencies]
Expand Down
14 changes: 10 additions & 4 deletions rayon-core/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::latch::Latch;
use crate::unwind;
use crossbeam_queue::SegQueue;
use crossbeam_deque::{Injector, Steal};
use std::any::Any;
use std::cell::UnsafeCell;
use std::mem;
Expand Down Expand Up @@ -184,13 +184,13 @@ impl<T> JobResult<T> {

/// Indirect queue to provide FIFO job priority.
pub(super) struct JobFifo {
inner: SegQueue<JobRef>,
inner: Injector<JobRef>,
}

impl JobFifo {
pub(super) fn new() -> Self {
JobFifo {
inner: SegQueue::new(),
inner: Injector::new(),
}
}

Expand All @@ -206,6 +206,12 @@ impl JobFifo {
impl Job for JobFifo {
unsafe fn execute(this: *const Self) {
// We "execute" a queue by executing its first job, FIFO.
(*this).inner.pop().expect("job in fifo queue").execute()
loop {
match (*this).inner.steal() {
Steal::Success(job_ref) => break job_ref.execute(),
Steal::Empty => panic!("FIFO is empty"),
Steal::Retry => {}
}
}
}
}
24 changes: 14 additions & 10 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use crate::util::leak;
use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
};
use crossbeam_deque::{Steal, Stealer, Worker};
use crossbeam_queue::SegQueue;
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use std::any::Any;
use std::cell::Cell;
use std::collections::hash_map::DefaultHasher;
Expand Down Expand Up @@ -136,7 +135,7 @@ pub(super) struct Registry {
logger: Logger,
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: SegQueue<JobRef>,
injected_jobs: Injector<JobRef>,
panic_handler: Option<Box<PanicHandler>>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
Expand Down Expand Up @@ -240,7 +239,7 @@ impl Registry {
logger: logger.clone(),
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
sleep: Sleep::new(logger, n_threads),
injected_jobs: SegQueue::new(),
injected_jobs: Injector::new(),
terminate_count: AtomicUsize::new(1),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
Expand Down Expand Up @@ -413,13 +412,18 @@ impl Registry {
}

fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
let job = self.injected_jobs.pop().ok();
if job.is_some() {
self.log(|| JobUninjected {
worker: worker_index,
});
loop {
match self.injected_jobs.steal() {
Steal::Success(job) => {
self.log(|| JobUninjected {
worker: worker_index,
});
return Some(job);
}
Steal::Empty => return None,
Steal::Retry => {}
}
}
job
}

/// If already in a worker-thread of this registry, just execute `op`.
Expand Down

0 comments on commit c7d963a

Please sign in to comment.