Skip to content

Commit

Permalink
use fast thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed Nov 26, 2019
1 parent 6100d9a commit ac385d5
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 20 deletions.
36 changes: 20 additions & 16 deletions rayoff/src/rayoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,30 @@ extern crate sys_info;

use job::Job;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread::{spawn, JoinHandle};
use std::sync::{Mutex, Arc};
use std::thread::{spawn};

#[derive(Debug)]
pub struct Pool {
senders: Vec<Sender<Arc<Job>>>,
threads: Vec<JoinHandle<()>>,
senders: Mutex<Vec<Sender<Arc<Job>>>>,
}

impl Default for Pool {
fn default() -> Self {
let num_threads = sys_info::cpu_num().unwrap_or(16) - 1;
let mut pool = Self {
senders: vec![],
threads: vec![],
};
let mut senders = vec![];
(0..num_threads).for_each(|_| {
let (sender, recvr): (Sender<Arc<Job>>, Receiver<Arc<Job>>) = channel();
let t = spawn(move || {
let _ = spawn(move || {
for job in recvr.iter() {
job.execute()
}
});
pool.senders.push(sender);
pool.threads.push(t);
senders.push(sender);
});
pool
Self {
senders: Mutex::new(senders),
}
}
}

Expand All @@ -39,11 +37,9 @@ impl Pool {
// Job must be destroyed in the frame that its created
let job = unsafe { Job::new(elems, func) };
let job = Arc::new(job);
for s in &self.senders {
s.send(job.clone()).expect("send should never fail");
}
let len = self.notify_all(job.clone());
job.execute();
job.wait(self.senders.len() + 1);
job.wait(len + 1);
}
pub fn map<F, A, B>(&self, inputs: &[A], func: F) -> Vec<B>
where
Expand All @@ -58,6 +54,14 @@ impl Pool {
});
outs
}
fn notify_all(&self, job: Arc<Job>) -> usize {
let senders = self.senders.lock().unwrap();
let len = senders.len();
for s in senders.iter() {
s.send(job.clone()).expect("send should never fail");
}
len
}
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ impl Accounts {
/// returns only the latest/current version of B for this slot
fn scan_slot<F, B>(&self, slot: Slot, func: F) -> Vec<B>
where
F: Fn(&StoredAccount) -> Option<B> + 'static,
B: Clone + Default,
F: Fn(&StoredAccount) -> Option<B> + Send + Sync,
B: Clone + Default + Send,
{
let accumulator: Vec<Vec<(Pubkey, u64, B)>> = self.accounts_db.scan_account_storage(
slot,
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,8 @@ impl AccountsDB {
// PERF: Sequentially read each storage entry in parallel
pub fn scan_account_storage<F, B>(&self, slot_id: Slot, scan_func: F) -> Vec<B>
where
F: Fn(&StoredAccount, AppendVecId, &mut B) -> () + 'static,
B: Default + Clone,
F: Fn(&StoredAccount, AppendVecId, &mut B) -> () + Send + Sync,
B: Default + Clone + Send,
{
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
Expand Down

0 comments on commit ac385d5

Please sign in to comment.