Skip to content

Commit

Permalink
sync: move broadcast waiters into separate list before waking (#5925)
Browse files Browse the repository at this point in the history
Within `notify_rx`, looping while re-locking and re-reading from
`Shared.tail` as long as there are still available wakers causes a
quadratic slowdown as receivers which are looping receiving from the
channel are added. Instead of continually re-reading from the original
list, this commit modifies `notify_rx` to move the waiters into a
separate list immediately similar to how `Notify::notify_waiters` works,
using a new `WaitersList` struct modified after NotifyWaitersList.

Fixes #5923
  • Loading branch information
glittershark committed Aug 14, 2023
1 parent 2c92cad commit 3dd5f7a
Showing 1 changed file with 76 additions and 2 deletions.
78 changes: 76 additions & 2 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
use crate::util::linked_list::{self, LinkedList};
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
use crate::util::WakeList;

use std::fmt;
Expand Down Expand Up @@ -366,6 +366,17 @@ struct Waiter {
_p: PhantomPinned,
}

impl Waiter {
fn new() -> Self {
Self {
queued: false,
waker: None,
pointers: linked_list::Pointers::new(),
_p: PhantomPinned,
}
}
}

generate_addr_of_methods! {
impl<> Waiter {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
Expand Down Expand Up @@ -817,12 +828,75 @@ fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
Receiver { shared, next }
}

/// List used in `Shared::notify_rx`. It wraps a guarded linked list
/// and gates the access to it on the `Shared.tail` mutex. It also empties
/// the list on drop.
struct WaitersList<'a, T> {
list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
is_empty: bool,
shared: &'a Shared<T>,
}

impl<'a, T> Drop for WaitersList<'a, T> {
fn drop(&mut self) {
// If the list is not empty, we unlink all waiters from it.
// We do not wake the waiters to avoid double panics.
if !self.is_empty {
let _lock_guard = self.shared.tail.lock();
while self.list.pop_back().is_some() {}
}
}
}

impl<'a, T> WaitersList<'a, T> {
fn new(
unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
guard: Pin<&'a Waiter>,
shared: &'a Shared<T>,
) -> Self {
let guard_ptr = NonNull::from(guard.get_ref());
let list = unguarded_list.into_guarded(guard_ptr);
WaitersList {
list,
is_empty: false,
shared,
}
}

/// Removes the last element from the guarded list. Modifying this list
/// requires an exclusive access to the main list in `Notify`.
fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
let result = self.list.pop_back();
if result.is_none() {
// Save information about emptiness to avoid waiting for lock
// in the destructor.
self.is_empty = true;
}
result
}
}

impl<T> Shared<T> {
fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
// It is critical for `GuardedLinkedList` safety that the guard node is
// pinned in memory and is not dropped until the guarded list is dropped.
let guard = Waiter::new();
pin!(guard);

// We move all waiters to a secondary list. It uses a `GuardedLinkedList`
// underneath to allow every waiter to safely remove itself from it.
//
// * This list will be still guarded by the `waiters` lock.
// `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
// * This wrapper will empty the list on drop. It is critical for safety
// that we will not leave any list entry with a pointer to the local
// guard node after this function returns / panics.
let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);

let mut wakers = WakeList::new();
'outer: loop {
while wakers.can_push() {
match tail.waiters.pop_back() {
match list.pop_back_locked(&mut tail) {
Some(mut waiter) => {
// Safety: `tail` lock is still held.
let waiter = unsafe { waiter.as_mut() };
Expand Down

0 comments on commit 3dd5f7a

Please sign in to comment.