Skip to content

Commit

Permalink
sync: use Acquire/Release instead of SeqCst in watch (#6018)
Browse files Browse the repository at this point in the history
  • Loading branch information
uklotzde committed Sep 24, 2023
1 parent e76c06b commit 453c720
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions tokio/src/sync/watch.rs
Expand Up @@ -357,7 +357,7 @@ mod big_notify {
use self::state::{AtomicState, Version};
mod state {
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::SeqCst;
use crate::loom::sync::atomic::Ordering;

const CLOSED_BIT: usize = 1;

Expand All @@ -377,6 +377,11 @@ mod state {
pub(super) struct StateSnapshot(usize);

/// The state stored in an atomic integer.
///
/// The `Sender` uses `Release` ordering for storing a new state
/// and the `Receiver`s use `Acquire` ordering for loading the
/// current state. This ensures that written values are seen by
/// the `Receiver`s for a proper handover.
#[derive(Debug)]
pub(super) struct AtomicState(AtomicUsize);

Expand Down Expand Up @@ -406,24 +411,35 @@ mod state {

impl AtomicState {
/// Create a new `AtomicState` that is not closed and which has the
/// version set to `Version::initial()`.
/// version set to `Version::INITIAL`.
pub(super) fn new() -> Self {
AtomicState(AtomicUsize::new(Version::INITIAL.0))
}

/// Load the current value of the state.
///
/// Only used by the receiver and for debugging purposes.
///
/// The receiver side (read-only) uses `Acquire` ordering for a proper handover
/// of the shared value with the sender side (single writer). The state is always
/// updated after modifying and before releasing the (exclusive) lock on the
/// shared value.
pub(super) fn load(&self) -> StateSnapshot {
StateSnapshot(self.0.load(SeqCst))
StateSnapshot(self.0.load(Ordering::Acquire))
}

/// Increment the version counter.
pub(super) fn increment_version(&self) {
self.0.fetch_add(STEP_SIZE, SeqCst);
pub(super) fn increment_version_while_locked(&self) {
// Use `Release` ordering to ensure that the shared value
// has been written before updating the version. The shared
// value is still protected by an exclusive lock during this
// method.
self.0.fetch_add(STEP_SIZE, Ordering::Release);
}

/// Set the closed bit in the state.
pub(super) fn set_closed(&self) {
self.0.fetch_or(CLOSED_BIT, SeqCst);
self.0.fetch_or(CLOSED_BIT, Ordering::Release);
}
}
}
Expand Down Expand Up @@ -1061,7 +1077,7 @@ impl<T> Sender<T> {
}
};

self.shared.state.increment_version();
self.shared.state.increment_version_while_locked();

// Release the write lock.
//
Expand Down

0 comments on commit 453c720

Please sign in to comment.