Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

named-pipes: fix receiving IOCP events after deregister #1761

Merged
merged 2 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/sys/windows/event.rs
Expand Up @@ -41,6 +41,14 @@ impl Event {
pub(super) fn to_completion_status(&self) -> CompletionStatus {
CompletionStatus::new(self.flags, self.data as usize, std::ptr::null_mut())
}

#[cfg(feature = "os-ext")]
pub(super) fn to_completion_status_with_overlapped(
&self,
overlapped: *mut super::Overlapped,
) -> CompletionStatus {
CompletionStatus::new(self.flags, self.data as usize, overlapped)
}
}

pub(crate) const READABLE_FLAGS: u32 = afd::POLL_RECEIVE
Expand Down
4 changes: 3 additions & 1 deletion src/sys/windows/mod.rs
Expand Up @@ -13,7 +13,7 @@ mod overlapped;
use overlapped::Overlapped;

mod selector;
pub use selector::{Selector, SelectorInner, SockState};
pub use selector::Selector;

// Macros must be defined before the modules that use them
cfg_net! {
Expand All @@ -35,6 +35,8 @@ cfg_net! {

pub(crate) mod tcp;
pub(crate) mod udp;

pub use selector::{SelectorInner, SockState};
}

cfg_os_ext! {
Expand Down
83 changes: 71 additions & 12 deletions src/sys/windows/named_pipe.rs
Expand Up @@ -84,6 +84,7 @@ struct Inner {
connect: Overlapped,
read: Overlapped,
write: Overlapped,
event: Overlapped,
// END NOTE.
handle: Handle,
connecting: AtomicBool,
Expand All @@ -110,10 +111,16 @@ impl Inner {

/// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`.
unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
// `read` is after `connect: Overlapped` and `read: Overlapped`.
// `write` is after `connect: Overlapped` and `read: Overlapped`.
(ptr as *mut Overlapped).wrapping_sub(2) as *const Inner
}

/// Same as [`ptr_from_conn_overlapped`] but for `Inner.event`.
unsafe fn ptr_from_event_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
// `event` is after `connect: Overlapped`, `read: Overlapped`, and `write: Overlapped`.
(ptr as *mut Overlapped).wrapping_sub(3) as *const Inner
}

/// Issue a connection request with the specified overlapped operation.
///
/// This function will issue a request to connect a client to this server,
Expand Down Expand Up @@ -478,6 +485,7 @@ impl FromRawHandle for NamedPipe {
connecting: AtomicBool::new(false),
read: Overlapped::new(read_done),
write: Overlapped::new(write_done),
event: Overlapped::new(event_done),
io: Mutex::new(Io {
cp: None,
token: None,
Expand Down Expand Up @@ -724,7 +732,7 @@ impl Inner {
// out the error.
Err(e) => {
io.read = State::Err(e);
io.notify_readable(events);
io.notify_readable(me, events);
true
}
}
Expand Down Expand Up @@ -787,7 +795,7 @@ impl Inner {
Ok(None) => (),
Err(e) => {
io.write = State::Err(e);
io.notify_writable(events);
io.notify_writable(me, events);
}
}
}
Expand All @@ -797,7 +805,7 @@ impl Inner {
#[allow(clippy::needless_option_as_deref)]
if Inner::schedule_read(me, &mut io, events.as_deref_mut()) {
if let State::None = io.write {
io.notify_writable(events);
io.notify_writable(me, events);
}
}
}
Expand Down Expand Up @@ -877,7 +885,7 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
}

// Flag our readiness that we've got data.
io.notify_readable(events);
io.notify_readable(&me, events);
}

fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
Expand All @@ -895,7 +903,7 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// `Ok` here means, that the operation was completed immediately
// `bytes_transferred` is already reported to a client
State::Ok(..) => {
io.notify_writable(events);
io.notify_writable(&me, events);
return;
}
State::Pending(buf, pos) => (buf, pos),
Expand All @@ -909,20 +917,46 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
let new_pos = pos + (status.bytes_transferred() as usize);
if new_pos == buf.len() {
me.put_buffer(buf);
io.notify_writable(events);
io.notify_writable(&me, events);
} else {
Inner::schedule_write(&me, buf, new_pos, &mut io, events);
}
}
Err(e) => {
debug_assert_eq!(status.bytes_transferred(), 0);
io.write = State::Err(e);
io.notify_writable(events);
io.notify_writable(&me, events);
}
}
}
}

fn event_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
let status = CompletionStatus::from_entry(status);

// Acquire the `Arc<Inner>`. Note that we should be guaranteed that
// the refcount is available to us due to the `mem::forget` in
// `schedule_write` above.
let me = unsafe { Arc::from_raw(Inner::ptr_from_event_overlapped(status.overlapped())) };

let io = me.io.lock().unwrap();

// Make sure the I/O handle is still registered with the selector
if io.token.is_some() {
// This method is also called during `Selector::drop` to perform
// cleanup. In this case, `events` is `None` and we don't need to track
// the event.
if let Some(events) = events {
let mut ev = Event::from_completion_status(&status);
// Reverse the `.data` alteration done in `schedule_event`. This
// alteration was done so the selector recognized the event as one from
// a named pipe.
ev.data >>= 1;
events.push(ev);
}
}
}

impl Io {
fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
match self.cp {
Expand All @@ -938,28 +972,53 @@ impl Io {
}
}

fn notify_readable(&self, events: Option<&mut Vec<Event>>) {
fn notify_readable(&self, me: &Arc<Inner>, events: Option<&mut Vec<Event>>) {
if let Some(token) = self.token {
let mut ev = Event::new(token);
ev.set_readable();

if let Some(events) = events {
events.push(ev);
} else {
let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
self.schedule_event(me, ev);
}
}
}

fn notify_writable(&self, events: Option<&mut Vec<Event>>) {
fn notify_writable(&self, me: &Arc<Inner>, events: Option<&mut Vec<Event>>) {
if let Some(token) = self.token {
let mut ev = Event::new(token);
ev.set_writable();

if let Some(events) = events {
events.push(ev);
} else {
let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
self.schedule_event(me, ev);
}
}
}

fn schedule_event(&self, me: &Arc<Inner>, mut event: Event) {
// Alter the token so that the selector will identify the IOCP event as
// one for a named pipe. This will be reversed in `event_done`
//
// `data` for named pipes is an auto-incrementing counter. Because
// `data` is `u64` we do not risk losing the most-significant bit
// (unless a user creates 2^62 named pipes during the lifetime of the
// process).
event.data <<= 1;
event.data += 1;

let completion_status =
event.to_completion_status_with_overlapped(me.event.as_ptr() as *mut _);

match self.cp.as_ref().unwrap().post(completion_status) {
Ok(_) => {
// Increase the ref count of `Inner` for the completion event.
mem::forget(me.clone());
}
Err(_) => {
// Nothing to do here
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/poll.rs
Expand Up @@ -581,7 +581,7 @@ fn poll_registration() {
let interests = Interest::READABLE;
registry.register(&mut source, token, interests).unwrap();
assert_eq!(source.registrations.len(), 1);
assert_eq!(source.registrations.get(0), Some(&(token, interests)));
assert_eq!(source.registrations.first(), Some(&(token, interests)));
assert!(source.reregistrations.is_empty());
assert_eq!(source.deregister_count, 0);

Expand All @@ -593,7 +593,7 @@ fn poll_registration() {
assert_eq!(source.registrations.len(), 1);
assert_eq!(source.reregistrations.len(), 1);
assert_eq!(
source.reregistrations.get(0),
source.reregistrations.first(),
Some(&(re_token, re_interests))
);
assert_eq!(source.deregister_count, 0);
Expand Down