Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add async_io for AsyncFd #5542

Merged
merged 5 commits into from Apr 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
140 changes: 140 additions & 0 deletions tokio/src/io/async_fd.rs
Expand Up @@ -508,6 +508,109 @@ impl<T: AsRawFd> AsyncFd<T> {
pub async fn writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
self.readiness_mut(Interest::WRITABLE).await
}

/// Reads or writes from the file descriptor using a user-provided IO operation.
///
/// The `async_io` method is a convenience utility that waits for the file
/// descriptor to become ready, and then executes the provided IO operation.
/// Since file descriptors may be marked ready spuriously, the closure will
/// be called repeatedly until it returns something other than a
/// [`WouldBlock`] error. This is done using the following loop:
Comment on lines +514 to +518
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation of a function should start with a short, single line description of what it does.

You can use the same one as the one on async_io_mut.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

///
/// ```no_run
/// # use std::io::{self, Result};
/// # struct Dox<T> { inner: T }
/// # impl<T> Dox<T> {
/// # async fn writable(&self) -> Result<&Self> {
/// # Ok(self)
/// # }
/// # fn try_io<R>(&self, _: impl FnMut(&T) -> Result<R>) -> Result<Result<R>> {
/// # panic!()
/// # }
/// async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
/// loop {
/// // or `readable` if called with the read interest.
/// let guard = self.writable().await?;
///
/// match guard.try_io(&mut f) {
/// Ok(result) => return result,
/// Err(_would_block) => continue,
/// }
/// }
/// }
/// # }
/// ```
///
/// The closure should only return a [`WouldBlock`] error if it has performed
/// an IO operation on the file descriptor that failed due to the file descriptor not being
/// ready. Returning a [`WouldBlock`] error in any other situation will
/// incorrectly clear the readiness flag, which can cause the file descriptor to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio [`AsyncFd`] type, as this will mess with the
/// readiness flag and can cause the file descriptor to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
///
/// # Examples
///
/// This example sends some bytes on the inner [`std::net::UdpSocket`]. The `async_io`
/// method waits for readiness, and retries if the send operation does block. This example
/// is equivalent to the one given for [`try_io`].
///
/// ```no_run
/// use tokio::io::{Interest, unix::AsyncFd};
///
/// use std::io;
/// use std::net::UdpSocket;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let socket = UdpSocket::bind("0.0.0.0:8080")?;
/// socket.set_nonblocking(true)?;
/// let async_fd = AsyncFd::new(socket)?;
///
/// let written = async_fd
/// .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2]))
/// .await?;
///
/// println!("wrote {written} bytes");
///
/// Ok(())
/// }
/// ```
///
/// [`try_io`]: AsyncFdReadyGuard::try_io
/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut(&T) -> io::Result<R>,
) -> io::Result<R> {
self.registration
.async_io(interest, || f(self.get_ref()))
.await
}

/// Reads or writes from the file descriptor using a user-provided IO operation.
///
/// The behavior is the same as [`async_io`], except that the closure can mutate the inner
/// value of the [`AsyncFd`].
///
/// [`async_io`]: AsyncFd::async_io
folkertdev marked this conversation as resolved.
Show resolved Hide resolved
pub async fn async_io_mut<R>(
&mut self,
interest: Interest,
mut f: impl FnMut(&mut T) -> io::Result<R>,
) -> io::Result<R> {
self.registration
.async_io(interest, || f(self.inner.as_mut().unwrap()))
.await
}
}

impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
Expand Down Expand Up @@ -578,6 +681,43 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
/// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
/// create this `AsyncFdReadyGuard`.
///
/// # Examples
///
/// This example sends some bytes to the inner [`std::net::UdpSocket`]. Waiting
/// for write-readiness and retrying when the send operation does block are explicit.
/// This example can be written more succinctly using [`AsyncFd::async_io`].
///
/// ```no_run
/// use tokio::io::unix::AsyncFd;
///
/// use std::io;
/// use std::net::UdpSocket;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let socket = UdpSocket::bind("0.0.0.0:8080")?;
/// socket.set_nonblocking(true)?;
/// let async_fd = AsyncFd::new(socket)?;
///
/// let written = loop {
/// let mut guard = async_fd.writable().await?;
/// match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) {
/// Ok(result) => {
/// break result?;
/// }
/// Err(_would_block) => {
/// // try_io already cleared the file descriptor's readiness state
/// continue;
/// }
/// }
/// };
///
/// println!("wrote {written} bytes");
///
/// Ok(())
/// }
/// ```
///
/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "with_io"))]
Expand Down