Skip to content

Commit

Permalink
Merge #550
Browse files Browse the repository at this point in the history
550: add bridge from Iterator to ParallelIterator r=cuviper a=QuietMisdreavus

Half of #46

This started getting reviewed in QuietMisdreavus/polyester#6, but i decided to move my work to Rayon proper.

This PR adds a new trait, `AsParallel`, an implementation on `Iterator + Send`, and an iterator adapter `IterParallel` that implements `ParallelIterator` with a similar "cache items as you go" methodology as Polyester. I introduced a new trait because `ParallelIterator` was implemented on `Range`, which is itself an `Iterator`.

The basic idea is that you would start with a quick sequential `Iterator`, call `.as_parallel()` on it, and be able to use `ParallelIterator` adapters after that point, to do more expensive processing in multiple threads.

The design of `IterParallel` is like this:

* `IterParallel` defers background work to `IterParallelProducer`, which implements `UnindexedProducer`.
* `IterParallelProducer` will split as many times as there are threads in the current pool. (I've been told that #492 is a better way to organize this, but until that's in, this is how i wrote it. `>_>`)
* When folding items, `IterParallelProducer` keeps a `Stealer` from `crossbeam-deque` (added as a dependency, but using the same version as `rayon-core`) to access a deque of items that have already been loaded from the iterator.
* If the `Stealer` is empty, a worker will attempt to lock the Mutex to access the source `Iterator` and the `Deque`.
  * If the Mutex is already locked, it will call `yield_now`. The implementation in polyester used a `synchronoise::SignalEvent` but i've been told that worker threads should not block. In lieu of #548, a regular spin-loop was chosen instead.
  * If the Mutex is available, the worker will load a number of items from the iterator (currently (number of threads * number of threads * 2)) before closing the Mutex and continuing.
  * (If the Mutex is poisoned, the worker will just... stop. Is there a recommended approach here? `>_>`)

This design is effectively a first brush, has [the same caveats as polyester](https://docs.rs/polyester/0.1.0/polyester/trait.Polyester.html#implementation-note), probably needs some extra features in rayon-core, and needs some higher-level docs before i'm willing to let it go. However, i'm putting it here because it was not in the right place when i talked to @cuviper about it last time.

Co-authored-by: QuietMisdreavus <grey@quietmisdreavus.net>
Co-authored-by: Niko Matsakis <niko@alum.mit.edu>
  • Loading branch information
3 people committed Jun 6, 2018
2 parents 11bd211 + d488733 commit efed926
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 78 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -19,6 +19,7 @@ exclude = ["ci"]

[dependencies]
rayon-core = { version = "1.4", path = "rayon-core" }
crossbeam-deque = "0.2.0"

# This is a public dependency!
[dependencies.either]
Expand Down
5 changes: 5 additions & 0 deletions rayon-demo/src/life/bench.rs
Expand Up @@ -9,3 +9,8 @@ fn generations(b: &mut ::test::Bencher) {
fn parallel_generations(b: &mut ::test::Bencher) {
b.iter(|| super::parallel_generations(Board::new(200, 200).random(), 100));
}

#[bench]
fn as_parallel_generations(b: &mut ::test::Bencher) {
b.iter(|| super::par_bridge_generations(Board::new(200, 200).random(), 100));
}
19 changes: 19 additions & 0 deletions rayon-demo/src/life/mod.rs
Expand Up @@ -20,6 +20,7 @@ use time;

use docopt::Docopt;
use rayon::prelude::*;
use rayon::iter::ParallelBridge;

#[cfg(test)]
mod bench;
Expand Down Expand Up @@ -93,6 +94,15 @@ impl Board {
self.next_board(new_brd)
}

pub fn par_bridge_next_generation(&self) -> Board {
let new_brd = (0..self.len())
.par_bridge()
.map(|cell| self.successor_cell(cell))
.collect();

self.next_board(new_brd)
}

fn cell_live(&self, x: usize, y: usize) -> bool {
!(x >= self.cols || y >= self.rows) && self.board[y * self.cols + x]
}
Expand Down Expand Up @@ -145,6 +155,11 @@ fn parallel_generations(board: Board, gens: usize) {
for _ in 0..gens { brd = brd.parallel_next_generation(); }
}

fn par_bridge_generations(board: Board, gens: usize) {
let mut brd = board;
for _ in 0..gens { brd = brd.par_bridge_next_generation(); }
}

fn measure(f: fn(Board, usize) -> (), args: &Args) -> u64 {
let (n, gens) = (args.flag_size, args.flag_gens);
let brd = Board::new(n, n).random();
Expand All @@ -168,5 +183,9 @@ pub fn main(args: &[String]) {
let parallel = measure(parallel_generations, &args);
println!("parallel: {:10} ns -> {:.2}x speedup", parallel,
serial as f64 / parallel as f64);

let par_bridge = measure(par_bridge_generations, &args);
println!("par_bridge: {:10} ns -> {:.2}x speedup", par_bridge,
serial as f64 / par_bridge as f64);
}
}
5 changes: 5 additions & 0 deletions rayon-demo/src/nbody/bench.rs
Expand Up @@ -30,6 +30,11 @@ fn nbody_par(b: &mut ::test::Bencher) {
nbody_bench(b, |n| { n.tick_par(); });
}

#[bench]
fn nbody_par_bridge(b: &mut ::test::Bencher) {
nbody_bench(b, |n| { n.tick_par_bridge(); });
}

#[bench]
fn nbody_parreduce(b: &mut ::test::Bencher) {
nbody_bench(b, |n| { n.tick_par_reduce(); });
Expand Down
195 changes: 117 additions & 78 deletions rayon-demo/src/nbody/nbody.rs
Expand Up @@ -30,8 +30,10 @@
// [1]: https://github.com/IntelLabs/RiverTrail/blob/master/examples/nbody-webgl/NBody.js

use cgmath::{InnerSpace, Point3, Vector3, Zero};
use rayon::prelude::*;
use rand::{Rand, Rng};
use rayon::prelude::*;
#[cfg(test)]
use rayon::iter::ParallelBridge;
use std::f64::consts::PI;

const INITIAL_VELOCITY: f64 = 8.0; // set to 0.0 to turn off.
Expand All @@ -50,8 +52,7 @@ pub struct Body {

impl NBodyBenchmark {
pub fn new<R: Rng>(num_bodies: usize, rng: &mut R) -> NBodyBenchmark {
let bodies0: Vec<_> =
(0..num_bodies)
let bodies0: Vec<_> = (0..num_bodies)
.map(|_| {
let position = Point3 {
x: f64::rand(rng).floor() * 40_000.0,
Expand All @@ -71,7 +72,11 @@ impl NBodyBenchmark {
z: f64::rand(rng) * INITIAL_VELOCITY,
};

Body { position: position, velocity: velocity, velocity2: velocity2 }
Body {
position: position,
velocity: velocity,
velocity2: velocity2,
}
})
.collect();

Expand All @@ -91,16 +96,44 @@ impl NBodyBenchmark {
};

let time = self.time;
out_bodies.par_iter_mut()
.zip(&in_bodies[..])
.for_each(|(out, prev)| {
let (vel, vel2) = next_velocity(time, prev, in_bodies);
out.velocity = vel;
out.velocity2 = vel2;
out_bodies
.par_iter_mut()
.zip(&in_bodies[..])
.for_each(|(out, prev)| {
let (vel, vel2) = next_velocity(time, prev, in_bodies);
out.velocity = vel;
out.velocity2 = vel2;

let next_velocity = vel - vel2;
out.position = prev.position + next_velocity;
});

let next_velocity = vel - vel2;
out.position = prev.position + next_velocity;
});
self.time += 1;

out_bodies
}

#[cfg(test)]
pub fn tick_par_bridge(&mut self) -> &[Body] {
let (in_bodies, out_bodies) = if (self.time & 1) == 0 {
(&self.bodies.0, &mut self.bodies.1)
} else {
(&self.bodies.1, &mut self.bodies.0)
};

let time = self.time;
out_bodies
.iter_mut()
.zip(&in_bodies[..])
.par_bridge()
.for_each(|(out, prev)| {
let (vel, vel2) = next_velocity(time, prev, in_bodies);
out.velocity = vel;
out.velocity2 = vel2;

let next_velocity = vel - vel2;
out.position = prev.position + next_velocity;
});

self.time += 1;

Expand All @@ -115,16 +148,17 @@ impl NBodyBenchmark {
};

let time = self.time;
out_bodies.par_iter_mut()
.zip(&in_bodies[..])
.for_each(|(out, prev)| {
let (vel, vel2) = next_velocity_par(time, prev, in_bodies);
out.velocity = vel;
out.velocity2 = vel2;

let next_velocity = vel - vel2;
out.position = prev.position + next_velocity;
});
out_bodies
.par_iter_mut()
.zip(&in_bodies[..])
.for_each(|(out, prev)| {
let (vel, vel2) = next_velocity_par(time, prev, in_bodies);
out.velocity = vel;
out.velocity2 = vel2;

let next_velocity = vel - vel2;
out.position = prev.position + next_velocity;
});

self.time += 1;

Expand Down Expand Up @@ -207,57 +241,57 @@ fn next_velocity(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3<f64>, Ve
let zero: Vector3<f64> = Vector3::zero();
let (diff, diff2) = bodies
.iter()
.fold(
(zero, zero),
|(mut diff, mut diff2), body| {
let r = body.position - prev.position;

// make sure we are not testing the particle against its own position
let are_same = r == Vector3::zero();

let dist_sqrd = r.magnitude2();

if dist_sqrd < zone_sqrd && !are_same {
let length = dist_sqrd.sqrt();
let percent = dist_sqrd / zone_sqrd;

if dist_sqrd < repel {
let f = (repel / percent - 1.0) * 0.025;
let normal = (r / length) * f;
diff += normal;
diff2 += normal;
} else if dist_sqrd < align {
let thresh_delta = align - repel;
let adjusted_percent = (percent - repel) / thresh_delta;
let q = (0.5 - (adjusted_percent * PI * 2.0).cos() * 0.5 + 0.5) * 100.9;

// normalize vel2 and multiply by factor
let vel2_length = body.velocity2.magnitude();
let vel2 = (body.velocity2 / vel2_length) * q;

// normalize own velocity
let vel_length = prev.velocity.magnitude();
let vel = (prev.velocity / vel_length) * q;
.fold((zero, zero), |(mut diff, mut diff2), body| {
let r = body.position - prev.position;

// make sure we are not testing the particle against its own position
let are_same = r == Vector3::zero();

let dist_sqrd = r.magnitude2();

if dist_sqrd < zone_sqrd && !are_same {
let length = dist_sqrd.sqrt();
let percent = dist_sqrd / zone_sqrd;

if dist_sqrd < repel {
let f = (repel / percent - 1.0) * 0.025;
let normal = (r / length) * f;
diff += normal;
diff2 += normal;
} else if dist_sqrd < align {
let thresh_delta = align - repel;
let adjusted_percent = (percent - repel) / thresh_delta;
let q = (0.5 - (adjusted_percent * PI * 2.0).cos() * 0.5 + 0.5) * 100.9;

// normalize vel2 and multiply by factor
let vel2_length = body.velocity2.magnitude();
let vel2 = (body.velocity2 / vel2_length) * q;

// normalize own velocity
let vel_length = prev.velocity.magnitude();
let vel = (prev.velocity / vel_length) * q;

diff += vel2;
diff2 += vel;
}

diff += vel2;
diff2 += vel;
}
if dist_sqrd > attract {
// attract
let thresh_delta2 = 1.0 - attract;
let adjusted_percent2 = (percent - attract) / thresh_delta2;
let c =
(1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power;

if dist_sqrd > attract { // attract
let thresh_delta2 = 1.0 - attract;
let adjusted_percent2 = (percent - attract) / thresh_delta2;
let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power;
// normalize the distance vector
let d = (r / length) * c;

// normalize the distance vector
let d = (r / length) * c;

diff += d;
diff2 -= d;
}
diff += d;
diff2 -= d;
}
}

(diff, diff2)
});
(diff, diff2)
});

acc += diff;
acc2 += diff2;
Expand Down Expand Up @@ -294,8 +328,7 @@ fn next_velocity(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3<f64>, Ve
}

/// Compute next velocity of `prev`
fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body])
-> (Vector3<f64>, Vector3<f64>) {
fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body]) -> (Vector3<f64>, Vector3<f64>) {
let time = time as f64;
let center = Point3 {
x: (time / 22.0).cos() * -4200.0,
Expand Down Expand Up @@ -344,8 +377,9 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body])

let (diff, diff2) = bodies
.par_iter()
.fold(|| (Vector3::zero(), Vector3::zero()),
|(mut diff, mut diff2), body| {
.fold(
|| (Vector3::zero(), Vector3::zero()),
|(mut diff, mut diff2), body| {
let r = body.position - prev.position;

// make sure we are not testing the particle against its own position
Expand Down Expand Up @@ -379,10 +413,12 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body])
diff2 += vel;
}

if dist_sqrd > attract { // attract
if dist_sqrd > attract {
// attract
let thresh_delta2 = 1.0 - attract;
let adjusted_percent2 = (percent - attract) / thresh_delta2;
let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5)) * attract_power;
let c = (1.0 - ((adjusted_percent2 * PI * 2.0).cos() * 0.5 + 0.5))
* attract_power;

// normalize the distance vector
let d = (r / length) * c;
Expand All @@ -393,9 +429,12 @@ fn next_velocity_par(time: usize, prev: &Body, bodies: &[Body])
}

(diff, diff2)
})
.reduce(|| (Vector3::zero(), Vector3::zero()),
|(diffa, diff2a), (diffb, diff2b)| (diffa + diffb, diff2a + diff2b));
},
)
.reduce(
|| (Vector3::zero(), Vector3::zero()),
|(diffa, diff2a), (diffb, diff2b)| (diffa + diffb, diff2a + diff2b),
);

acc += diff;
acc2 += diff2;
Expand Down
3 changes: 3 additions & 0 deletions src/iter/mod.rs
Expand Up @@ -84,6 +84,9 @@ use self::plumbing::*;
// e.g. `find::find()`, are always used **prefixed**, so that they
// can be readily distinguished.

mod par_bridge;
pub use self::par_bridge::{ParallelBridge, IterBridge};

mod find;
mod find_first_last;
mod chain;
Expand Down

0 comments on commit efed926

Please sign in to comment.