From adfdc52c627a29346b2366ccd3e06418e034c43f Mon Sep 17 00:00:00 2001 From: Folkert Date: Mon, 13 Mar 2023 12:01:16 +0100 Subject: [PATCH 1/5] add async_io for AsyncFd --- tokio/src/io/async_fd.rs | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index c73227bfbbc..f827ce05fb6 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -508,6 +508,39 @@ impl AsyncFd { pub async fn writable_mut<'a>(&'a mut self) -> io::Result> { self.readiness_mut(Interest::WRITABLE).await } + + /// Reads or writes from the file descriptor using a user-provided IO operation. + /// + /// The readiness of the file descriptor is awaited and when the file descriptor is ready, + /// the provided closure is called. The closure should attempt to perform + /// IO operation on the file descriptor by manually calling the appropriate syscall. + /// If the operation fails because the file descriptor is not actually ready, + /// then the closure should return a `WouldBlock` error. In such case the + /// readiness flag is cleared and the file descriptor readiness is awaited again. + /// This loop is repeated until the closure returns an `Ok` or an error + /// other than `WouldBlock`. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the file descriptor that failed due to the file descriptor not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the file descriptor to + /// behave incorrectly. + /// + /// The closure should not perform the IO operation using any of the methods + /// defined on the Tokio `AsyncFd` type, as this will mess with the + /// readiness flag and can cause the file descriptor to behave incorrectly. + /// + /// This method is not intended to be used with combined interests. + /// The closure should perform only one type of IO operation, so it should not + /// require more than one ready state. This method may panic or sleep forever + /// if it is called with a combined interest. + pub async fn async_io( + &self, + interest: Interest, + f: impl FnMut() -> io::Result, + ) -> io::Result { + self.registration.async_io(interest, f).await + } } impl AsRawFd for AsyncFd { From efac5515baa6a7d5a1ef9c83362c09b333518b2a Mon Sep 17 00:00:00 2001 From: Folkert Date: Wed, 15 Mar 2023 18:52:19 +0100 Subject: [PATCH 2/5] add example to the doc comment --- tokio/src/io/async_fd.rs | 69 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index f827ce05fb6..65b02959571 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -511,29 +511,88 @@ impl AsyncFd { /// Reads or writes from the file descriptor using a user-provided IO operation. /// + /// This method is useful to retry an operation that might block (by returning + /// a [`WouldBlock`] error) until it doesn't block any more: + /// + /// ```no_run + /// use tokio::io::unix::AsyncFd; + /// + /// use std::io; + /// use std::net::UdpSocket; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let socket = UdpSocket::bind("0.0.0.0:8080")?; + /// socket.set_nonblocking(true)?; + /// let async_fd = AsyncFd::new(socket)?; + /// + /// let written = loop { + /// let mut guard = async_fd.writable().await?; + /// match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) { + /// Ok(result) => { + /// break result?; + /// } + /// Err(_would_block) => { + /// continue; + /// } + /// } + /// }; + /// + /// println!("wrote {written} bytes"); + /// + /// Ok(()) + /// } + /// ``` + /// + /// Using this method, this logic can instead be written as: + /// + /// ```no_run + /// use tokio::io::{Interest, unix::AsyncFd}; + /// + /// use std::io; + /// use std::net::UdpSocket; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let socket = UdpSocket::bind("0.0.0.0:8080")?; + /// socket.set_nonblocking(true)?; + /// let async_fd = AsyncFd::new(socket)?; + /// + /// let written = async_fd + /// .async_io(Interest::WRITABLE, || async_fd.get_ref().send(&[1, 2])) + /// .await?; + /// + /// println!("wrote {written} bytes"); + /// + /// Ok(()) + /// } + /// ``` + /// /// The readiness of the file descriptor is awaited and when the file descriptor is ready, /// the provided closure is called. The closure should attempt to perform /// IO operation on the file descriptor by manually calling the appropriate syscall. /// If the operation fails because the file descriptor is not actually ready, - /// then the closure should return a `WouldBlock` error. In such case the + /// then the closure should return a [`WouldBlock`] error. In such case the /// readiness flag is cleared and the file descriptor readiness is awaited again. /// This loop is repeated until the closure returns an `Ok` or an error - /// other than `WouldBlock`. + /// other than [`WouldBlock`]. /// - /// The closure should only return a `WouldBlock` error if it has performed + /// The closure should only return a [`WouldBlock`] error if it has performed /// an IO operation on the file descriptor that failed due to the file descriptor not being - /// ready. Returning a `WouldBlock` error in any other situation will + /// ready. Returning a [`WouldBlock`] error in any other situation will /// incorrectly clear the readiness flag, which can cause the file descriptor to /// behave incorrectly. /// /// The closure should not perform the IO operation using any of the methods - /// defined on the Tokio `AsyncFd` type, as this will mess with the + /// defined on the Tokio [`AsyncFd`] type, as this will mess with the /// readiness flag and can cause the file descriptor to behave incorrectly. /// /// This method is not intended to be used with combined interests. /// The closure should perform only one type of IO operation, so it should not /// require more than one ready state. This method may panic or sleep forever /// if it is called with a combined interest. + /// + /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock pub async fn async_io( &self, interest: Interest, From c5b0a4c6747de352aeb2339b05ef5e71d74d06aa Mon Sep 17 00:00:00 2001 From: Folkert Date: Tue, 21 Mar 2023 11:14:51 +0100 Subject: [PATCH 3/5] make API more ergonomic --- tokio/src/io/async_fd.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 65b02959571..6f698273451 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -559,7 +559,7 @@ impl AsyncFd { /// let async_fd = AsyncFd::new(socket)?; /// /// let written = async_fd - /// .async_io(Interest::WRITABLE, || async_fd.get_ref().send(&[1, 2])) + /// .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2])) /// .await?; /// /// println!("wrote {written} bytes"); @@ -596,9 +596,11 @@ impl AsyncFd { pub async fn async_io( &self, interest: Interest, - f: impl FnMut() -> io::Result, + mut f: impl FnMut(&T) -> io::Result, ) -> io::Result { - self.registration.async_io(interest, f).await + self.registration + .async_io(interest, || f(self.get_ref())) + .await } } From 2fed1776d6e7f4f3b46068f4b7fd6ae543489fbf Mon Sep 17 00:00:00 2001 From: Folkert Date: Wed, 22 Mar 2023 15:52:12 +0100 Subject: [PATCH 4/5] Revert "make API more ergonomic" This reverts commit 87c864f28113b21a18f7f166661850186469cdc8. --- tokio/src/io/async_fd.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 6f698273451..65b02959571 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -559,7 +559,7 @@ impl AsyncFd { /// let async_fd = AsyncFd::new(socket)?; /// /// let written = async_fd - /// .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2])) + /// .async_io(Interest::WRITABLE, || async_fd.get_ref().send(&[1, 2])) /// .await?; /// /// println!("wrote {written} bytes"); @@ -596,11 +596,9 @@ impl AsyncFd { pub async fn async_io( &self, interest: Interest, - mut f: impl FnMut(&T) -> io::Result, + f: impl FnMut() -> io::Result, ) -> io::Result { - self.registration - .async_io(interest, || f(self.get_ref())) - .await + self.registration.async_io(interest, f).await } } From 7fd397494e33595133456cb6642ef3e182504c7f Mon Sep 17 00:00:00 2001 From: Folkert Date: Sat, 8 Apr 2023 20:10:37 +0200 Subject: [PATCH 5/5] add `AsyncFd::async_io_mut` --- tokio/src/io/async_fd.rs | 156 +++++++++++++++++++++++++-------------- 1 file changed, 102 insertions(+), 54 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 65b02959571..e1e8a95c740 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -511,40 +511,56 @@ impl AsyncFd { /// Reads or writes from the file descriptor using a user-provided IO operation. /// - /// This method is useful to retry an operation that might block (by returning - /// a [`WouldBlock`] error) until it doesn't block any more: + /// The `async_io` method is a convenience utility that waits for the file + /// descriptor to become ready, and then executes the provided IO operation. + /// Since file descriptors may be marked ready spuriously, the closure will + /// be called repeatedly until it returns something other than a + /// [`WouldBlock`] error. This is done using the following loop: /// /// ```no_run - /// use tokio::io::unix::AsyncFd; - /// - /// use std::io; - /// use std::net::UdpSocket; + /// # use std::io::{self, Result}; + /// # struct Dox { inner: T } + /// # impl Dox { + /// # async fn writable(&self) -> Result<&Self> { + /// # Ok(self) + /// # } + /// # fn try_io(&self, _: impl FnMut(&T) -> Result) -> Result> { + /// # panic!() + /// # } + /// async fn async_io(&self, mut f: impl FnMut(&T) -> io::Result) -> io::Result { + /// loop { + /// // or `readable` if called with the read interest. + /// let guard = self.writable().await?; + /// + /// match guard.try_io(&mut f) { + /// Ok(result) => return result, + /// Err(_would_block) => continue, + /// } + /// } + /// } + /// # } + /// ``` /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let socket = UdpSocket::bind("0.0.0.0:8080")?; - /// socket.set_nonblocking(true)?; - /// let async_fd = AsyncFd::new(socket)?; + /// The closure should only return a [`WouldBlock`] error if it has performed + /// an IO operation on the file descriptor that failed due to the file descriptor not being + /// ready. Returning a [`WouldBlock`] error in any other situation will + /// incorrectly clear the readiness flag, which can cause the file descriptor to + /// behave incorrectly. /// - /// let written = loop { - /// let mut guard = async_fd.writable().await?; - /// match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) { - /// Ok(result) => { - /// break result?; - /// } - /// Err(_would_block) => { - /// continue; - /// } - /// } - /// }; + /// The closure should not perform the IO operation using any of the methods + /// defined on the Tokio [`AsyncFd`] type, as this will mess with the + /// readiness flag and can cause the file descriptor to behave incorrectly. /// - /// println!("wrote {written} bytes"); + /// This method is not intended to be used with combined interests. + /// The closure should perform only one type of IO operation, so it should not + /// require more than one ready state. This method may panic or sleep forever + /// if it is called with a combined interest. /// - /// Ok(()) - /// } - /// ``` + /// # Examples /// - /// Using this method, this logic can instead be written as: + /// This example sends some bytes on the inner [`std::net::UdpSocket`]. The `async_io` + /// method waits for readiness, and retries if the send operation does block. This example + /// is equivalent to the one given for [`try_io`]. /// /// ```no_run /// use tokio::io::{Interest, unix::AsyncFd}; @@ -559,7 +575,7 @@ impl AsyncFd { /// let async_fd = AsyncFd::new(socket)?; /// /// let written = async_fd - /// .async_io(Interest::WRITABLE, || async_fd.get_ref().send(&[1, 2])) + /// .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2])) /// .await?; /// /// println!("wrote {written} bytes"); @@ -568,37 +584,32 @@ impl AsyncFd { /// } /// ``` /// - /// The readiness of the file descriptor is awaited and when the file descriptor is ready, - /// the provided closure is called. The closure should attempt to perform - /// IO operation on the file descriptor by manually calling the appropriate syscall. - /// If the operation fails because the file descriptor is not actually ready, - /// then the closure should return a [`WouldBlock`] error. In such case the - /// readiness flag is cleared and the file descriptor readiness is awaited again. - /// This loop is repeated until the closure returns an `Ok` or an error - /// other than [`WouldBlock`]. - /// - /// The closure should only return a [`WouldBlock`] error if it has performed - /// an IO operation on the file descriptor that failed due to the file descriptor not being - /// ready. Returning a [`WouldBlock`] error in any other situation will - /// incorrectly clear the readiness flag, which can cause the file descriptor to - /// behave incorrectly. - /// - /// The closure should not perform the IO operation using any of the methods - /// defined on the Tokio [`AsyncFd`] type, as this will mess with the - /// readiness flag and can cause the file descriptor to behave incorrectly. - /// - /// This method is not intended to be used with combined interests. - /// The closure should perform only one type of IO operation, so it should not - /// require more than one ready state. This method may panic or sleep forever - /// if it is called with a combined interest. - /// + /// [`try_io`]: AsyncFdReadyGuard::try_io /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock pub async fn async_io( &self, interest: Interest, - f: impl FnMut() -> io::Result, + mut f: impl FnMut(&T) -> io::Result, + ) -> io::Result { + self.registration + .async_io(interest, || f(self.get_ref())) + .await + } + + /// Reads or writes from the file descriptor using a user-provided IO operation. + /// + /// The behavior is the same as [`async_io`], except that the closure can mutate the inner + /// value of the [`AsyncFd`]. + /// + /// [`async_io`]: AsyncFd::async_io + pub async fn async_io_mut( + &mut self, + interest: Interest, + mut f: impl FnMut(&mut T) -> io::Result, ) -> io::Result { - self.registration.async_io(interest, f).await + self.registration + .async_io(interest, || f(self.inner.as_mut().unwrap())) + .await } } @@ -670,6 +681,43 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> { /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to /// create this `AsyncFdReadyGuard`. /// + /// # Examples + /// + /// This example sends some bytes to the inner [`std::net::UdpSocket`]. Waiting + /// for write-readiness and retrying when the send operation does block are explicit. + /// This example can be written more succinctly using [`AsyncFd::async_io`]. + /// + /// ```no_run + /// use tokio::io::unix::AsyncFd; + /// + /// use std::io; + /// use std::net::UdpSocket; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let socket = UdpSocket::bind("0.0.0.0:8080")?; + /// socket.set_nonblocking(true)?; + /// let async_fd = AsyncFd::new(socket)?; + /// + /// let written = loop { + /// let mut guard = async_fd.writable().await?; + /// match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) { + /// Ok(result) => { + /// break result?; + /// } + /// Err(_would_block) => { + /// // try_io already cleared the file descriptor's readiness state + /// continue; + /// } + /// } + /// }; + /// + /// println!("wrote {written} bytes"); + /// + /// Ok(()) + /// } + /// ``` + /// /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock // Alias for old name in 0.x #[cfg_attr(docsrs, doc(alias = "with_io"))]