Skip to content

Commit

Permalink
io: add AsyncFd::async_io (#5542)
Browse files Browse the repository at this point in the history
  • Loading branch information
folkertdev committed Apr 16, 2023
1 parent 8497f37 commit 6037fae
Showing 1 changed file with 140 additions and 0 deletions.
140 changes: 140 additions & 0 deletions tokio/src/io/async_fd.rs
Expand Up @@ -508,6 +508,109 @@ impl<T: AsRawFd> AsyncFd<T> {
pub async fn writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
self.readiness_mut(Interest::WRITABLE).await
}

/// Reads or writes from the file descriptor using a user-provided IO operation.
///
/// The `async_io` method is a convenience utility that waits for the file
/// descriptor to become ready, and then executes the provided IO operation.
/// Since file descriptors may be marked ready spuriously, the closure will
/// be called repeatedly until it returns something other than a
/// [`WouldBlock`] error. This is done using the following loop:
///
/// ```no_run
/// # use std::io::{self, Result};
/// # struct Dox<T> { inner: T }
/// # impl<T> Dox<T> {
/// # async fn writable(&self) -> Result<&Self> {
/// # Ok(self)
/// # }
/// # fn try_io<R>(&self, _: impl FnMut(&T) -> Result<R>) -> Result<Result<R>> {
/// # panic!()
/// # }
/// async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
/// loop {
/// // or `readable` if called with the read interest.
/// let guard = self.writable().await?;
///
/// match guard.try_io(&mut f) {
/// Ok(result) => return result,
/// Err(_would_block) => continue,
/// }
/// }
/// }
/// # }
/// ```
///
/// The closure should only return a [`WouldBlock`] error if it has performed
/// an IO operation on the file descriptor that failed due to the file descriptor not being
/// ready. Returning a [`WouldBlock`] error in any other situation will
/// incorrectly clear the readiness flag, which can cause the file descriptor to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio [`AsyncFd`] type, as this will mess with the
/// readiness flag and can cause the file descriptor to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
///
/// # Examples
///
/// This example sends some bytes on the inner [`std::net::UdpSocket`]. The `async_io`
/// method waits for readiness, and retries if the send operation does block. This example
/// is equivalent to the one given for [`try_io`].
///
/// ```no_run
/// use tokio::io::{Interest, unix::AsyncFd};
///
/// use std::io;
/// use std::net::UdpSocket;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let socket = UdpSocket::bind("0.0.0.0:8080")?;
/// socket.set_nonblocking(true)?;
/// let async_fd = AsyncFd::new(socket)?;
///
/// let written = async_fd
/// .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2]))
/// .await?;
///
/// println!("wrote {written} bytes");
///
/// Ok(())
/// }
/// ```
///
/// [`try_io`]: AsyncFdReadyGuard::try_io
/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut(&T) -> io::Result<R>,
) -> io::Result<R> {
self.registration
.async_io(interest, || f(self.get_ref()))
.await
}

/// Reads or writes from the file descriptor using a user-provided IO operation.
///
/// The behavior is the same as [`async_io`], except that the closure can mutate the inner
/// value of the [`AsyncFd`].
///
/// [`async_io`]: AsyncFd::async_io
pub async fn async_io_mut<R>(
&mut self,
interest: Interest,
mut f: impl FnMut(&mut T) -> io::Result<R>,
) -> io::Result<R> {
self.registration
.async_io(interest, || f(self.inner.as_mut().unwrap()))
.await
}
}

impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
Expand Down Expand Up @@ -578,6 +681,43 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
/// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
/// create this `AsyncFdReadyGuard`.
///
/// # Examples
///
/// This example sends some bytes to the inner [`std::net::UdpSocket`]. Waiting
/// for write-readiness and retrying when the send operation does block are explicit.
/// This example can be written more succinctly using [`AsyncFd::async_io`].
///
/// ```no_run
/// use tokio::io::unix::AsyncFd;
///
/// use std::io;
/// use std::net::UdpSocket;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let socket = UdpSocket::bind("0.0.0.0:8080")?;
/// socket.set_nonblocking(true)?;
/// let async_fd = AsyncFd::new(socket)?;
///
/// let written = loop {
/// let mut guard = async_fd.writable().await?;
/// match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) {
/// Ok(result) => {
/// break result?;
/// }
/// Err(_would_block) => {
/// // try_io already cleared the file descriptor's readiness state
/// continue;
/// }
/// }
/// };
///
/// println!("wrote {written} bytes");
///
/// Ok(())
/// }
/// ```
///
/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "with_io"))]
Expand Down

0 comments on commit 6037fae

Please sign in to comment.