Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5417 expose async io #5512

Merged
merged 17 commits into from Mar 9, 2023
36 changes: 36 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -1016,6 +1016,42 @@ impl TcpStream {
.try_io(interest, || self.io.try_io(f))
}

/// Reads or writes from the socket using a user-provided IO operation.
///
/// The readiness of the socket is awaited and when the socket is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the socket by manually calling the appropriate syscall.
/// If the operation fails because the socket is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the socket readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `TcpStream` type, as this will mess with the
/// readiness flag and can cause the socket to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io
.registration()
.async_io(interest, || self.io.try_io(&mut f))
.await
}

/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
Expand Down
36 changes: 36 additions & 0 deletions tokio/src/net/udp.rs
Expand Up @@ -1319,6 +1319,42 @@ impl UdpSocket {
.try_io(interest, || self.io.try_io(f))
}

/// Reads or writes from the socket using a user-provided IO operation.
///
/// The readiness of the socket is awaited and when the socket is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the socket by manually calling the appropriate syscall.
/// If the operation fails because the socket is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the socket readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `UdpSocket` type, as this will mess with the
/// readiness flag and can cause the socket to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io
.registration()
.async_io(interest, || self.io.try_io(&mut f))
.await
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read and the address from whence
/// the data came.
Expand Down
36 changes: 36 additions & 0 deletions tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -1260,6 +1260,42 @@ impl UnixDatagram {
.try_io(interest, || self.io.try_io(f))
}

/// Reads or writes from the socket using a user-provided IO operation.
///
/// The readiness of the socket is awaited and when the socket is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the socket by manually calling the appropriate syscall.
/// If the operation fails because the socket is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the socket readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `UnixDatagram` type, as this will mess with the
/// readiness flag and can cause the socket to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io
.registration()
.async_io(interest, || self.io.try_io(&mut f))
.await
}

/// Returns the local address that this socket is bound to.
///
/// # Examples
Expand Down
36 changes: 36 additions & 0 deletions tokio/src/net/unix/stream.rs
Expand Up @@ -706,6 +706,42 @@ impl UnixStream {
.try_io(interest, || self.io.try_io(f))
}

/// Reads or writes from the socket using a user-provided IO operation.
///
/// The readiness of the socket is awaited and when the socket is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the socket by manually calling the appropriate syscall.
/// If the operation fails because the socket is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the socket readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `UnixStream` type, as this will mess with the
/// readiness flag and can cause the socket to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io
.registration()
.async_io(interest, || self.io.try_io(&mut f))
.await
}

/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
///
/// This function is intended to be used to wrap a UnixStream from the
Expand Down
66 changes: 66 additions & 0 deletions tokio/src/net/windows/named_pipe.rs
Expand Up @@ -851,6 +851,39 @@ impl NamedPipeServer {
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Reads or writes from the pipe using a user-provided IO operation.
///
/// The readiness of the pipe is awaited and when the pipe is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the pipe by manually calling the appropriate syscall.
/// If the operation fails because the pipe is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the pipe readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the pipe that failed due to the pipe not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the pipe to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `NamedPipeServer` type, as this will mess with the
/// readiness flag and can cause the pipe to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().async_io(interest, f).await
}
}

impl AsyncRead for NamedPipeServer {
Expand Down Expand Up @@ -1601,6 +1634,39 @@ impl NamedPipeClient {
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Reads or writes from the pipe using a user-provided IO operation.
///
/// The readiness of the pipe is awaited and when the pipe is ready,
/// the provided closure is called. The closure should attempt to perform
/// IO operation on the pipe by manually calling the appropriate syscall.
/// If the operation fails because the pipe is not actually ready,
/// then the closure should return a `WouldBlock` error. In such case the
/// readiness flag is cleared and the pipe readiness is awaited again.
/// This loop is repeated until the closure returns an `Ok` or an error
/// other than `WouldBlock`.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the pipe that failed due to the pipe not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the pipe to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `NamedPipeClient` type, as this will mess with the
/// readiness flag and can cause the pipe to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub async fn async_io<R>(
&self,
interest: Interest,
f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().async_io(interest, f).await
}
}

impl AsyncRead for NamedPipeClient {
Expand Down