Skip to content

Commit

Permalink
Move has_waker boolean to Registry
Browse files Browse the repository at this point in the history
And share it properly between all Registries
  • Loading branch information
Thomasdezeeuw committed Aug 6, 2023
1 parent 236fc31 commit fb8211b
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 61 deletions.
26 changes: 20 additions & 6 deletions src/poll.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use crate::{event, sys, Events, Interest, Token};
#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))]
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, io};

use crate::{event, sys, Events, Interest, Token};

/// Polls for readiness events on all registered values.
///
/// `Poll` allows a program to monitor a large number of [`event::Source`]s,
Expand Down Expand Up @@ -252,6 +257,9 @@ pub struct Poll {
/// Registers I/O resources.
pub struct Registry {
selector: sys::Selector,
/// Whether this selector currently has an associated waker.
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
has_waker: Arc<AtomicBool>,
}

impl Poll {
Expand Down Expand Up @@ -298,7 +306,11 @@ impl Poll {
/// ```
pub fn new() -> io::Result<Poll> {
sys::Selector::new().map(|selector| Poll {
registry: Registry { selector },
registry: Registry {
selector,
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
has_waker: Arc::new(AtomicBool::new(false)),
},
})
}
}
Expand Down Expand Up @@ -668,17 +680,19 @@ impl Registry {
/// Event sources registered with this `Registry` will be registered with
/// the original `Registry` and `Poll` instance.
pub fn try_clone(&self) -> io::Result<Registry> {
self.selector
.try_clone()
.map(|selector| Registry { selector })
self.selector.try_clone().map(|selector| Registry {
selector,
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
has_waker: Arc::clone(&self.has_waker),
})
}

/// Internal check to ensure only a single `Waker` is active per [`Poll`]
/// instance.
#[cfg(all(debug_assertions, not(target_os = "wasi")))]
pub(crate) fn register_waker(&self) {
assert!(
!self.selector.register_waker(),
!self.has_waker.swap(true, Ordering::AcqRel),
"Only a single `Waker` can be active per `Poll` instance"
);
}
Expand Down
5 changes: 0 additions & 5 deletions src/sys/shell/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ impl Selector {
pub fn select(&self, _: &mut Events, _: Option<Duration>) -> io::Result<()> {
os_required!();
}

#[cfg(all(debug_assertions, not(target_os = "wasi")))]
pub fn register_waker(&self) -> bool {
os_required!();
}
}

#[cfg(unix)]
Expand Down
13 changes: 1 addition & 12 deletions src/sys/unix/selector/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{Interest, Token};
use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLRDHUP};
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, i32, io, ptr};

Expand All @@ -16,8 +16,6 @@ pub struct Selector {
#[cfg(debug_assertions)]
id: usize,
ep: RawFd,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand Down Expand Up @@ -60,8 +58,6 @@ impl Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
ep,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
})
}

Expand All @@ -71,8 +67,6 @@ impl Selector {
#[cfg(debug_assertions)]
id: self.id,
ep,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}

Expand Down Expand Up @@ -138,11 +132,6 @@ impl Selector {
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}
}

cfg_io_source! {
Expand Down
13 changes: 1 addition & 12 deletions src/sys/unix/selector/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::mem::{self, MaybeUninit};
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, io, ptr, slice};

Expand Down Expand Up @@ -66,8 +66,6 @@ pub struct Selector {
#[cfg(debug_assertions)]
id: usize,
kq: RawFd,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand All @@ -77,8 +75,6 @@ impl Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
kq,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
};

syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC))?;
Expand All @@ -91,8 +87,6 @@ impl Selector {
#[cfg(debug_assertions)]
id: self.id,
kq,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}

Expand Down Expand Up @@ -213,11 +207,6 @@ impl Selector {
kevent_register(self.kq, &mut changes, &[libc::ENOENT as i64])
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}

// Used by `Waker`.
#[cfg(any(
target_os = "freebsd",
Expand Down
16 changes: 1 addition & 15 deletions src/sys/unix/selector/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
#[derive(Debug)]
pub struct Selector {
state: Arc<SelectorState>,
/// Whether this selector currently has an associated waker.
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand All @@ -33,8 +30,6 @@ impl Selector {

Ok(Selector {
state: Arc::new(state),
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
})
}

Expand All @@ -44,11 +39,7 @@ impl Selector {

let state = self.state.clone();

Ok(Selector {
state,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
Ok(Selector { state })
}

pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
Expand Down Expand Up @@ -77,11 +68,6 @@ impl Selector {
self.state.deregister(fd)
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}

pub fn wake(&self, token: Token) -> io::Result<()> {
self.state.wake(token)
}
Expand Down
11 changes: 0 additions & 11 deletions src/sys/windows/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,6 @@ pub struct Selector {
#[cfg(debug_assertions)]
id: usize,
pub(super) inner: Arc<SelectorInner>,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand All @@ -341,8 +339,6 @@ impl Selector {
#[cfg(debug_assertions)]
id,
inner: Arc::new(inner),
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
}
})
}
Expand All @@ -352,8 +348,6 @@ impl Selector {
#[cfg(debug_assertions)]
id: self.id,
inner: Arc::clone(&self.inner),
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}

Expand All @@ -365,11 +359,6 @@ impl Selector {
self.inner.select(events, timeout)
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}

pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
self.inner.cp.clone()
}
Expand Down
24 changes: 24 additions & 0 deletions tests/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,30 @@ fn using_multiple_wakers_panics() {
drop(waker2);
}

#[test]
#[cfg_attr(
not(debug_assertions),
ignore = "only works with debug_assertions enabled"
)]
#[should_panic = "Only a single `Waker` can be active per `Poll` instance"]
fn using_multiple_wakers_panics_different_cloned_registries() {
init();

let poll = Poll::new().expect("unable to create new Poll instance");
let registry1 = poll.registry().try_clone().unwrap();
let registry2 = poll.registry().try_clone().unwrap();

let token1 = Token(10);
let token2 = Token(11);

let waker1 = Waker::new(&registry1, token1).expect("unable to first waker");
// This should panic.
let waker2 = Waker::new(&registry2, token2).unwrap();

drop(waker1);
drop(waker2);
}

fn expect_waker_event(poll: &mut Poll, events: &mut Events, token: Token) {
poll.poll(events, Some(Duration::from_millis(100))).unwrap();
assert!(!events.is_empty());
Expand Down

0 comments on commit fb8211b

Please sign in to comment.