Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: implement try_new and try_with_interest for AsyncFd #6345

Merged
merged 5 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
114 changes: 107 additions & 7 deletions tokio/src/io/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::runtime::io::{ReadyEvent, Registration};
use crate::runtime::scheduler;

use mio::unix::SourceFd;
use std::error::Error;
use std::fmt;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::{task::Context, task::Poll};
Expand Down Expand Up @@ -249,15 +251,69 @@ impl<T: AsRawFd> AsyncFd<T> {
handle: scheduler::Handle,
interest: Interest,
) -> io::Result<Self> {
let fd = inner.as_raw_fd();
Self::try_new_with_handle_and_interest(inner, handle, interest).map_err(Into::into)
}

let registration =
Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle)?;
/// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
/// implementing [`AsRawFd`]. The backing file descriptor is cached at the
/// time of creation.
///
/// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
/// control, use [`AsyncFd::try_with_interest`].
///
/// This method must be called in the context of a tokio runtime.
///
/// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
/// passed to this function.
///
/// # Panics
///
/// This function panics if there is no current reactor set, or if the `rt`
/// feature flag is not enabled.
#[inline]
#[track_caller]
pub fn try_new(inner: T) -> Result<Self, AsyncFdTryNewError<T>>
where
T: AsRawFd,
{
Self::try_with_interest(inner, Interest::READABLE | Interest::WRITABLE)
}

Ok(AsyncFd {
registration,
inner: Some(inner),
})
/// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
/// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
/// file descriptor is cached at the time of creation.
///
/// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
/// passed to this function.
///
/// # Panics
///
/// This function panics if there is no current reactor set, or if the `rt`
/// feature flag is not enabled.
#[inline]
#[track_caller]
pub fn try_with_interest(inner: T, interest: Interest) -> Result<Self, AsyncFdTryNewError<T>>
where
T: AsRawFd,
{
Self::try_new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
}

#[track_caller]
pub(crate) fn try_new_with_handle_and_interest(
inner: T,
handle: scheduler::Handle,
interest: Interest,
) -> Result<Self, AsyncFdTryNewError<T>> {
let fd = inner.as_raw_fd();

match Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle) {
Ok(registration) => Ok(AsyncFd {
registration,
inner: Some(inner),
}),
Err(cause) => Err(AsyncFdTryNewError { inner, cause }),
}
}

/// Returns a shared reference to the backing object of this [`AsyncFd`].
Expand Down Expand Up @@ -1257,3 +1313,47 @@ impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyMutGuard<
/// [`try_io`]: method@AsyncFdReadyGuard::try_io
#[derive(Debug)]
pub struct TryIoError(());

/// Error returned by [`try_new`] or [`try_with_interest`].
///
/// [`try_new`]: AsyncFd::try_new
/// [`try_with_interest`]: AsyncFd::try_with_interest
pub struct AsyncFdTryNewError<T> {
inner: T,
cause: io::Error,
}

impl<T> AsyncFdTryNewError<T> {
/// Returns the original object passed to [`try_new`] or [`try_with_interest`]
/// alongside the error that caused these functions to fail.
///
/// [`try_new`]: AsyncFd::try_new
/// [`try_with_interest`]: AsyncFd::try_with_interest
pub fn into_parts(self) -> (T, io::Error) {
(self.inner, self.cause)
}
}

impl<T> fmt::Display for AsyncFdTryNewError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.cause, f)
}
}

impl<T> fmt::Debug for AsyncFdTryNewError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.cause, f)
}
}

impl<T> Error for AsyncFdTryNewError<T> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.cause)
}
}

impl<T> From<AsyncFdTryNewError<T>> for io::Error {
fn from(value: AsyncFdTryNewError<T>) -> Self {
value.cause
}
}
2 changes: 1 addition & 1 deletion tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ cfg_net_unix! {

pub mod unix {
//! Asynchronous IO structures specific to Unix-like operating systems.
pub use super::async_fd::{AsyncFd, AsyncFdReadyGuard, AsyncFdReadyMutGuard, TryIoError};
pub use super::async_fd::{AsyncFd, AsyncFdTryNewError, AsyncFdReadyGuard, AsyncFdReadyMutGuard, TryIoError};
}
}

Expand Down
30 changes: 30 additions & 0 deletions tokio/tests/io_async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use nix::unistd::{close, read, write};
use futures::poll;

use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
use tokio::io::Interest;
use tokio_test::{assert_err, assert_pending};

struct TestWaker {
Expand Down Expand Up @@ -834,3 +835,32 @@ async fn await_error_readiness_invalid_address() {
let guard = fd.ready(Interest::ERROR).await.unwrap();
assert_eq!(guard.ready(), Ready::ERROR);
}

#[derive(Debug, PartialEq, Eq)]
struct InvalidSource;

impl AsRawFd for InvalidSource {
fn as_raw_fd(&self) -> RawFd {
-1
}
}

#[tokio::test]
async fn try_new() {
let original = Arc::new(InvalidSource);

let error = AsyncFd::try_new(original.clone()).unwrap_err();
let (returned, _cause) = error.into_parts();

assert!(Arc::ptr_eq(&original, &returned));
}

#[tokio::test]
async fn try_with_interest() {
let original = Arc::new(InvalidSource);

let error = AsyncFd::try_with_interest(original.clone(), Interest::READABLE).unwrap_err();
let (returned, _cause) = error.into_parts();

assert!(Arc::ptr_eq(&original, &returned));
}
45 changes: 45 additions & 0 deletions tokio/tests/io_panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,48 @@ fn async_fd_with_interest_panic_caller() -> Result<(), Box<dyn Error>> {

Ok(())
}

#[test]
#[cfg(unix)]
fn async_fd_try_new_panic_caller() -> Result<(), Box<dyn Error>> {
use tokio::io::unix::AsyncFd;
use tokio::runtime::Builder;

let panic_location_file = test_panic(|| {
// Runtime without `enable_io` so it has no IO driver set.
let rt = Builder::new_current_thread().build().unwrap();
rt.block_on(async {
let fd = unix::MockFd;

let _ = AsyncFd::try_new(fd);
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]
#[cfg(unix)]
fn async_fd_try_with_interest_panic_caller() -> Result<(), Box<dyn Error>> {
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tokio::runtime::Builder;

let panic_location_file = test_panic(|| {
// Runtime without `enable_io` so it has no IO driver set.
let rt = Builder::new_current_thread().build().unwrap();
rt.block_on(async {
let fd = unix::MockFd;

let _ = AsyncFd::try_with_interest(fd, Interest::READABLE);
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}