Skip to content

Commit

Permalink
io: add async_io helper method to sockets (#5512)
Browse files Browse the repository at this point in the history
  • Loading branch information
amab8901 committed Mar 9, 2023
1 parent 002f4a2 commit 2b7b1a0
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 0 deletions.
36 changes: 36 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -1016,6 +1016,42 @@ impl TcpStream {
.try_io(interest, || self.io.try_io(f))
}

/// Reads or writes from the socket using a user-provided IO operation.
///
/// The readiness of the socket is awaited and when the socket is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the socket by manually calling the appropriate syscall.
/// If the operation fails because the socket is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the socket readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `TcpStream` type, as this will mess with the
/// readiness flag and can cause the socket to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io
.registration()
.async_io(interest, || self.io.try_io(&mut f))
.await
}

/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
Expand Down
36 changes: 36 additions & 0 deletions tokio/src/net/udp.rs
Expand Up @@ -1319,6 +1319,42 @@ impl UdpSocket {
.try_io(interest, || self.io.try_io(f))
}

/// Reads or writes from the socket using a user-provided IO operation.
///
/// The readiness of the socket is awaited and when the socket is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the socket by manually calling the appropriate syscall.
/// If the operation fails because the socket is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the socket readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `UdpSocket` type, as this will mess with the
/// readiness flag and can cause the socket to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io
.registration()
.async_io(interest, || self.io.try_io(&mut f))
.await
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read and the address from whence
/// the data came.
Expand Down
36 changes: 36 additions & 0 deletions tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -1260,6 +1260,42 @@ impl UnixDatagram {
.try_io(interest, || self.io.try_io(f))
}

/// Reads or writes from the socket using a user-provided IO operation.
///
/// The readiness of the socket is awaited and when the socket is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the socket by manually calling the appropriate syscall.
/// If the operation fails because the socket is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the socket readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `UnixDatagram` type, as this will mess with the
/// readiness flag and can cause the socket to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io
.registration()
.async_io(interest, || self.io.try_io(&mut f))
.await
}

/// Returns the local address that this socket is bound to.
///
/// # Examples
Expand Down
36 changes: 36 additions & 0 deletions tokio/src/net/unix/stream.rs
Expand Up @@ -706,6 +706,42 @@ impl UnixStream {
.try_io(interest, || self.io.try_io(f))
}

/// Reads or writes from the socket using a user-provided IO operation.
///
/// The readiness of the socket is awaited and when the socket is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the socket by manually calling the appropriate syscall.
/// If the operation fails because the socket is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the socket readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `UnixStream` type, as this will mess with the
/// readiness flag and can cause the socket to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io
.registration()
.async_io(interest, || self.io.try_io(&mut f))
.await
}

/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
///
/// This function is intended to be used to wrap a UnixStream from the
Expand Down
66 changes: 66 additions & 0 deletions tokio/src/net/windows/named_pipe.rs
Expand Up @@ -851,6 +851,39 @@ impl NamedPipeServer {
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Reads or writes from the pipe using a user-provided IO operation.
///
/// The readiness of the pipe is awaited and when the pipe is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the pipe by manually calling the appropriate syscall.
/// If the operation fails because the pipe is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the pipe readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the pipe that failed due to the pipe not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the pipe to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `NamedPipeServer` type, as this will mess with the
/// readiness flag and can cause the pipe to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().async_io(interest, f).await
}
}

impl AsyncRead for NamedPipeServer {
Expand Down Expand Up @@ -1601,6 +1634,39 @@ impl NamedPipeClient {
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Reads or writes from the pipe using a user-provided IO operation.
///
/// The readiness of the pipe is awaited and when the pipe is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the pipe by manually calling the appropriate syscall.
/// If the operation fails because the pipe is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the pipe readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the pipe that failed due to the pipe not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the pipe to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `NamedPipeClient` type, as this will mess with the
/// readiness flag and can cause the pipe to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().async_io(interest, f).await
}
}

impl AsyncRead for NamedPipeClient {
Expand Down

0 comments on commit 2b7b1a0

Please sign in to comment.