Skip to content

Commit

Permalink
replicate the tcp combined interest example
Browse files Browse the repository at this point in the history
  • Loading branch information
folkertdev committed May 29, 2023
1 parent 395fe37 commit 81cb2bb
Showing 1 changed file with 198 additions and 0 deletions.
198 changes: 198 additions & 0 deletions tokio/src/io/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,72 @@ impl<T: AsRawFd> AsyncFd<T> {
///
/// This method takes `&mut self`, so it is possible to access the inner IO
/// resource mutably when handling the [`AsyncFdReadyMutGuard`].
///
/// # Examples
///
/// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
/// splitting.
///
/// ```no_run
/// use std::error::Error;
/// use std::io;
/// use std::io::{Read, Write};
/// use std::net::TcpStream;
/// use tokio::io::unix::AsyncFd;
/// use tokio::io::{Interest, Ready};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let stream = TcpStream::connect("127.0.0.1:8080")?;
/// stream.set_nonblocking(true)?;
/// let mut stream = AsyncFd::new(stream)?;
///
/// loop {
/// let mut guard = stream
/// .ready_mut(Interest::READABLE | Interest::WRITABLE)
/// .await?;
///
/// if guard.ready().is_readable() {
/// let mut data = vec![0; 1024];
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match guard.get_inner_mut().read(&mut data) {
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// // a read has blocked, but a write might still succeed.
/// // clear only the read readiness.
/// guard.clear_ready_matching(Ready::READABLE);
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// if guard.ready().is_writable() {
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match guard.get_inner_mut().write(b"hello world") {
/// Ok(n) => {
/// println!("write {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// // a write has blocked, but a read might still succeed.
/// // clear only the write readiness.
/// guard.clear_ready_matching(Ready::WRITABLE);
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
/// }
/// }
/// ```
pub async fn ready_mut(
&mut self,
interest: Interest,
Expand Down Expand Up @@ -783,6 +849,72 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
/// when a read is observed to block. Only clear the specific readiness that is observed to
/// block. For example when a read blocks when using a combined interest,
/// only clear `Ready::READABLE`.
///
/// # Examples
///
/// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
/// splitting.
///
/// ```no_run
/// use std::error::Error;
/// use std::io;
/// use std::io::{Read, Write};
/// use std::net::TcpStream;
/// use tokio::io::unix::AsyncFd;
/// use tokio::io::{Interest, Ready};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let stream = TcpStream::connect("127.0.0.1:8080")?;
/// stream.set_nonblocking(true)?;
/// let stream = AsyncFd::new(stream)?;
///
/// loop {
/// let mut guard = stream
/// .ready(Interest::READABLE | Interest::WRITABLE)
/// .await?;
///
/// if guard.ready().is_readable() {
/// let mut data = vec![0; 1024];
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.get_ref().read(&mut data) {
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// // a read has blocked, but a write might still succeed.
/// // clear only the read readiness.
/// guard.clear_ready_matching(Ready::READABLE);
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// if guard.ready().is_writable() {
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.get_ref().write(b"hello world") {
/// Ok(n) => {
/// println!("write {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// // a write has blocked, but a read might still succeed.
/// // clear only the write readiness.
/// guard.clear_ready_matching(Ready::WRITABLE);
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
/// }
/// }
/// ```
pub fn clear_ready_matching(&mut self, ready: Ready) {
if let Some(mut event) = self.event.take() {
self.async_fd
Expand Down Expand Up @@ -936,6 +1068,72 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> {
/// when a read is observed to block. Only clear the specific readiness that is observed to
/// block. For example when a read blocks when using a combined interest,
/// only clear `Ready::READABLE`.
///
/// # Examples
///
/// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
/// splitting.
///
/// ```no_run
/// use std::error::Error;
/// use std::io;
/// use std::io::{Read, Write};
/// use std::net::TcpStream;
/// use tokio::io::unix::AsyncFd;
/// use tokio::io::{Interest, Ready};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let stream = TcpStream::connect("127.0.0.1:8080")?;
/// stream.set_nonblocking(true)?;
/// let mut stream = AsyncFd::new(stream)?;
///
/// loop {
/// let mut guard = stream
/// .ready_mut(Interest::READABLE | Interest::WRITABLE)
/// .await?;
///
/// if guard.ready().is_readable() {
/// let mut data = vec![0; 1024];
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match guard.get_inner_mut().read(&mut data) {
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// // a read has blocked, but a write might still succeed.
/// // clear only the read readiness.
/// guard.clear_ready_matching(Ready::READABLE);
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// if guard.ready().is_writable() {
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match guard.get_inner_mut().write(b"hello world") {
/// Ok(n) => {
/// println!("write {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// // a write has blocked, but a read might still succeed.
/// // clear only the write readiness.
/// guard.clear_ready_matching(Ready::WRITABLE);
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
/// }
/// }
/// ```
pub fn clear_ready_matching(&mut self, ready: Ready) {
if let Some(mut event) = self.event.take() {
self.async_fd
Expand Down

0 comments on commit 81cb2bb

Please sign in to comment.