Skip to content

Commit

Permalink
fs: reduce blocking ops in fs::read_dir (#5653)
Browse files Browse the repository at this point in the history
  • Loading branch information
icedrocket committed Apr 28, 2023
1 parent f478ff4 commit 52bc6b6
Showing 1 changed file with 22 additions and 25 deletions.
47 changes: 22 additions & 25 deletions tokio/src/fs/read_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ const CHUNK_SIZE: usize = 32;
pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
let path = path.as_ref().to_owned();
asyncify(|| -> io::Result<ReadDir> {
let mut std = std::fs::read_dir(path)?.fuse();
let mut std = std::fs::read_dir(path)?;
let mut buf = VecDeque::with_capacity(CHUNK_SIZE);
ReadDir::next_chunk(&mut buf, &mut std);
let remain = ReadDir::next_chunk(&mut buf, &mut std);

Ok(ReadDir(State::Idle(Some((buf, std)))))
Ok(ReadDir(State::Idle(Some((buf, std, remain)))))
})
.await
}
Expand All @@ -64,12 +64,10 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
#[must_use = "streams do nothing unless polled"]
pub struct ReadDir(State);

type StdReadDir = std::iter::Fuse<std::fs::ReadDir>;

#[derive(Debug)]
enum State {
Idle(Option<(VecDeque<io::Result<DirEntry>>, StdReadDir)>),
Pending(JoinHandle<(VecDeque<io::Result<DirEntry>>, StdReadDir)>),
Idle(Option<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir, bool)>),
Pending(JoinHandle<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir, bool)>),
}

impl ReadDir {
Expand Down Expand Up @@ -105,38 +103,35 @@ impl ReadDir {
loop {
match self.0 {
State::Idle(ref mut data) => {
let (buf, _) = data.as_mut().unwrap();
let (buf, _, ref remain) = data.as_mut().unwrap();

if let Some(ent) = buf.pop_front() {
return Poll::Ready(ent.map(Some));
};
} else if !remain {
return Poll::Ready(Ok(None));
}

let (mut buf, mut std) = data.take().unwrap();
let (mut buf, mut std, _) = data.take().unwrap();

self.0 = State::Pending(spawn_blocking(move || {
ReadDir::next_chunk(&mut buf, &mut std);
(buf, std)
let remain = ReadDir::next_chunk(&mut buf, &mut std);
(buf, std, remain)
}));
}
State::Pending(ref mut rx) => {
let (mut buf, std) = ready!(Pin::new(rx).poll(cx))?;

let ret = match buf.pop_front() {
Some(Ok(x)) => Ok(Some(x)),
Some(Err(e)) => Err(e),
None => Ok(None),
};

self.0 = State::Idle(Some((buf, std)));

return Poll::Ready(ret);
self.0 = State::Idle(Some(ready!(Pin::new(rx).poll(cx))?));
}
}
}
}

fn next_chunk(buf: &mut VecDeque<io::Result<DirEntry>>, std: &mut StdReadDir) {
for ret in std.by_ref().take(CHUNK_SIZE) {
fn next_chunk(buf: &mut VecDeque<io::Result<DirEntry>>, std: &mut std::fs::ReadDir) -> bool {
for _ in 0..CHUNK_SIZE {
let ret = match std.next() {
Some(ret) => ret,
None => return false,
};

let success = ret.is_ok();

buf.push_back(ret.map(|std| DirEntry {
Expand All @@ -154,6 +149,8 @@ impl ReadDir {
break;
}
}

true
}
}

Expand Down

0 comments on commit 52bc6b6

Please sign in to comment.