Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move has_waker boolean to Registry #1706

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 20 additions & 6 deletions src/poll.rs
@@ -1,9 +1,14 @@
use crate::{event, sys, Events, Interest, Token};
#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))]
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, io};

use crate::{event, sys, Events, Interest, Token};

/// Polls for readiness events on all registered values.
///
/// `Poll` allows a program to monitor a large number of [`event::Source`]s,
Expand Down Expand Up @@ -252,6 +257,9 @@ pub struct Poll {
/// Registers I/O resources.
pub struct Registry {
selector: sys::Selector,
/// Whether this selector currently has an associated waker.
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
has_waker: Arc<AtomicBool>,
}

impl Poll {
Expand Down Expand Up @@ -298,7 +306,11 @@ impl Poll {
/// ```
pub fn new() -> io::Result<Poll> {
sys::Selector::new().map(|selector| Poll {
registry: Registry { selector },
registry: Registry {
selector,
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
has_waker: Arc::new(AtomicBool::new(false)),
},
})
}
}
Expand Down Expand Up @@ -668,17 +680,19 @@ impl Registry {
/// Event sources registered with this `Registry` will be registered with
/// the original `Registry` and `Poll` instance.
pub fn try_clone(&self) -> io::Result<Registry> {
self.selector
.try_clone()
.map(|selector| Registry { selector })
self.selector.try_clone().map(|selector| Registry {
selector,
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
has_waker: Arc::clone(&self.has_waker),
})
}

/// Internal check to ensure only a single `Waker` is active per [`Poll`]
/// instance.
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
pub(crate) fn register_waker(&self) {
assert!(
!self.selector.register_waker(),
!self.has_waker.swap(true, Ordering::AcqRel),
"Only a single `Waker` can be active per `Poll` instance"
);
}
Expand Down
5 changes: 0 additions & 5 deletions src/sys/shell/selector.rs
Expand Up @@ -18,11 +18,6 @@ impl Selector {
pub fn select(&self, _: &mut Events, _: Option<Duration>) -> io::Result<()> {
os_required!();
}

#[cfg(all(debug_assertions, not(target_os = "wasi")))]
pub fn register_waker(&self) -> bool {
os_required!();
}
}

#[cfg(unix)]
Expand Down
13 changes: 1 addition & 12 deletions src/sys/unix/selector/epoll.rs
Expand Up @@ -3,7 +3,7 @@ use crate::{Interest, Token};
use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLRDHUP};
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, i32, io, ptr};

Expand All @@ -16,8 +16,6 @@ pub struct Selector {
#[cfg(debug_assertions)]
id: usize,
ep: RawFd,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand Down Expand Up @@ -60,8 +58,6 @@ impl Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
ep,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
})
}

Expand All @@ -71,8 +67,6 @@ impl Selector {
#[cfg(debug_assertions)]
id: self.id,
ep,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}

Expand Down Expand Up @@ -138,11 +132,6 @@ impl Selector {
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}
}

cfg_io_source! {
Expand Down
13 changes: 1 addition & 12 deletions src/sys/unix/selector/kqueue.rs
Expand Up @@ -3,7 +3,7 @@ use std::mem::{self, MaybeUninit};
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, io, ptr, slice};

Expand Down Expand Up @@ -66,8 +66,6 @@ pub struct Selector {
#[cfg(debug_assertions)]
id: usize,
kq: RawFd,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand All @@ -77,8 +75,6 @@ impl Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
kq,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
};

syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC))?;
Expand All @@ -91,8 +87,6 @@ impl Selector {
#[cfg(debug_assertions)]
id: self.id,
kq,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}

Expand Down Expand Up @@ -213,11 +207,6 @@ impl Selector {
kevent_register(self.kq, &mut changes, &[libc::ENOENT as i64])
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}

// Used by `Waker`.
#[cfg(any(
target_os = "freebsd",
Expand Down
16 changes: 1 addition & 15 deletions src/sys/unix/selector/poll.rs
Expand Up @@ -22,9 +22,6 @@ static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
#[derive(Debug)]
pub struct Selector {
state: Arc<SelectorState>,
/// Whether this selector currently has an associated waker.
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand All @@ -33,8 +30,6 @@ impl Selector {

Ok(Selector {
state: Arc::new(state),
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
})
}

Expand All @@ -44,11 +39,7 @@ impl Selector {

let state = self.state.clone();

Ok(Selector {
state,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
Ok(Selector { state })
}

pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
Expand Down Expand Up @@ -77,11 +68,6 @@ impl Selector {
self.state.deregister(fd)
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}

pub fn wake(&self, token: Token) -> io::Result<()> {
self.state.wake(token)
}
Expand Down
11 changes: 0 additions & 11 deletions src/sys/windows/selector.rs
Expand Up @@ -328,8 +328,6 @@ pub struct Selector {
#[cfg(debug_assertions)]
id: usize,
pub(super) inner: Arc<SelectorInner>,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand All @@ -341,8 +339,6 @@ impl Selector {
#[cfg(debug_assertions)]
id,
inner: Arc::new(inner),
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
}
})
}
Expand All @@ -352,8 +348,6 @@ impl Selector {
#[cfg(debug_assertions)]
id: self.id,
inner: Arc::clone(&self.inner),
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}

Expand All @@ -365,11 +359,6 @@ impl Selector {
self.inner.select(events, timeout)
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}

pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
self.inner.cp.clone()
}
Expand Down
24 changes: 24 additions & 0 deletions tests/waker.rs
Expand Up @@ -128,6 +128,30 @@ fn using_multiple_wakers_panics() {
drop(waker2);
}

#[test]
#[cfg_attr(
not(debug_assertions),
ignore = "only works with debug_assertions enabled"
)]
#[should_panic = "Only a single `Waker` can be active per `Poll` instance"]
fn using_multiple_wakers_panics_different_cloned_registries() {
init();

let poll = Poll::new().expect("unable to create new Poll instance");
let registry1 = poll.registry().try_clone().unwrap();
let registry2 = poll.registry().try_clone().unwrap();

let token1 = Token(10);
let token2 = Token(11);

let waker1 = Waker::new(&registry1, token1).expect("unable to first waker");
// This should panic.
let waker2 = Waker::new(&registry2, token2).unwrap();

drop(waker1);
drop(waker2);
}

fn expect_waker_event(poll: &mut Poll, events: &mut Events, token: Token) {
poll.poll(events, Some(Duration::from_millis(100))).unwrap();
assert!(!events.is_empty());
Expand Down