From 94b539231c82caeeade982481c0a3bc09b4b0d6d Mon Sep 17 00:00:00 2001 From: Nicola Fiorella Date: Sun, 10 Sep 2023 17:40:21 +0200 Subject: [PATCH 1/5] tokio: add builder function for watch::Sender Added a method that allows to build a Sender without specifying any Receiver. --- tokio/src/sync/watch.rs | 52 ++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 5a46670eeeb..00344de71dc 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -377,13 +377,6 @@ mod state { #[derive(Debug)] pub(super) struct AtomicState(AtomicUsize); - impl Version { - /// Get the initial version when creating the channel. - pub(super) fn initial() -> Self { - Version(0) - } - } - impl StateSnapshot { /// Extract the version from the state. pub(super) fn version(self) -> Version { @@ -458,23 +451,8 @@ mod state { /// [`Sender`]: struct@Sender /// [`Receiver`]: struct@Receiver pub fn channel(init: T) -> (Sender, Receiver) { - let shared = Arc::new(Shared { - value: RwLock::new(init), - state: AtomicState::new(), - ref_count_rx: AtomicUsize::new(1), - notify_rx: big_notify::BigNotify::new(), - notify_tx: Notify::new(), - }); - - let tx = Sender { - shared: shared.clone(), - }; - - let rx = Receiver { - shared, - version: Version::initial(), - }; - + let tx = Sender::new(init); + let rx = tx.subscribe(); (tx, rx) } @@ -854,6 +832,32 @@ impl Drop for Receiver { } impl Sender { + /// Creates the sending-half of the [`watch`] channel. + /// + /// See documentation of [`watch::channel`] for errors when calling this function. + /// Beware that attempting to send a value when no one subscribed to the channel will + /// return an error. + /// + /// [`watch`]: crate::sync::watch + /// [`watch::channel`]: crate::sync::watch + /// + /// # Examples + /// ``` + /// let sender = tokio::sync::watch::Sender::new(0u8); + /// assert!(sender.send(3).is_err()) + /// let _rec = sender.subscribe() + /// assert!(sender.send(4).is_ok()) + /// ``` + pub fn new(init: T) -> Self { + let shared = Arc::new(Shared { + value: RwLock::new(init), + state: AtomicState::new(), + ref_count_rx: AtomicUsize::new(0), + notify_rx: big_notify::BigNotify::new(), + notify_tx: Notify::new(), + }); + Sender { shared } + } /// Sends a new value via the channel, notifying all receivers. /// /// This method fails if the channel is closed, which is the case when From 1a9ebf2c9b6501a904278c0a43beabd66195ba0c Mon Sep 17 00:00:00 2001 From: Nicola Fiorella Date: Sun, 10 Sep 2023 18:58:47 +0200 Subject: [PATCH 2/5] Revert "tokio: add builder function for watch::Sender" This reverts commit 94b539231c82caeeade982481c0a3bc09b4b0d6d. --- tokio/src/sync/watch.rs | 52 +++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 00344de71dc..5a46670eeeb 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -377,6 +377,13 @@ mod state { #[derive(Debug)] pub(super) struct AtomicState(AtomicUsize); + impl Version { + /// Get the initial version when creating the channel. + pub(super) fn initial() -> Self { + Version(0) + } + } + impl StateSnapshot { /// Extract the version from the state. pub(super) fn version(self) -> Version { @@ -451,8 +458,23 @@ mod state { /// [`Sender`]: struct@Sender /// [`Receiver`]: struct@Receiver pub fn channel(init: T) -> (Sender, Receiver) { - let tx = Sender::new(init); - let rx = tx.subscribe(); + let shared = Arc::new(Shared { + value: RwLock::new(init), + state: AtomicState::new(), + ref_count_rx: AtomicUsize::new(1), + notify_rx: big_notify::BigNotify::new(), + notify_tx: Notify::new(), + }); + + let tx = Sender { + shared: shared.clone(), + }; + + let rx = Receiver { + shared, + version: Version::initial(), + }; + (tx, rx) } @@ -832,32 +854,6 @@ impl Drop for Receiver { } impl Sender { - /// Creates the sending-half of the [`watch`] channel. - /// - /// See documentation of [`watch::channel`] for errors when calling this function. - /// Beware that attempting to send a value when no one subscribed to the channel will - /// return an error. - /// - /// [`watch`]: crate::sync::watch - /// [`watch::channel`]: crate::sync::watch - /// - /// # Examples - /// ``` - /// let sender = tokio::sync::watch::Sender::new(0u8); - /// assert!(sender.send(3).is_err()) - /// let _rec = sender.subscribe() - /// assert!(sender.send(4).is_ok()) - /// ``` - pub fn new(init: T) -> Self { - let shared = Arc::new(Shared { - value: RwLock::new(init), - state: AtomicState::new(), - ref_count_rx: AtomicUsize::new(0), - notify_rx: big_notify::BigNotify::new(), - notify_tx: Notify::new(), - }); - Sender { shared } - } /// Sends a new value via the channel, notifying all receivers. /// /// This method fails if the channel is closed, which is the case when From 3fe36115d03fdadf6fae010aed1fb4c8967ae72e Mon Sep 17 00:00:00 2001 From: Nicola Fiorella Date: Sun, 10 Sep 2023 22:33:02 +0200 Subject: [PATCH 3/5] tokio: function new for watch::Sender Added a function that basically calls the watch::channel function and drops the returned receiver --- tokio/src/sync/watch.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 5a46670eeeb..569257540e5 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -854,6 +854,26 @@ impl Drop for Receiver { } impl Sender { + /// Creates the sending-half of the [`watch`] channel. + /// + /// See documentation of [`watch::channel`] for errors when calling this function. + /// Beware that attempting to send a value when no one subscribed to the channel will + /// return an error. + /// + /// [`watch`]: crate::sync::watch + /// [`watch::channel`]: crate::sync::watch + /// + /// # Examples + /// ``` + /// let sender = tokio::sync::watch::Sender::new(0u8); + /// assert!(sender.send(3).is_err()) + /// let _rec = sender.subscribe() + /// assert!(sender.send(4).is_ok()) + pub fn new(init: T) -> Self { + let (tx, _) = channel(init); + tx + } + /// Sends a new value via the channel, notifying all receivers. /// /// This method fails if the channel is closed, which is the case when From fed678994756027189bfcca47663b091000dc7fd Mon Sep 17 00:00:00 2001 From: nicflower <50106721+nicflower@users.noreply.github.com> Date: Mon, 11 Sep 2023 20:42:10 +0200 Subject: [PATCH 4/5] fix comment typo Co-authored-by: Alice Ryhl --- tokio/src/sync/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 569257540e5..98a88581975 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -857,7 +857,7 @@ impl Sender { /// Creates the sending-half of the [`watch`] channel. /// /// See documentation of [`watch::channel`] for errors when calling this function. - /// Beware that attempting to send a value when no one subscribed to the channel will + /// Beware that attempting to send a value when there are no receivers will /// return an error. /// /// [`watch`]: crate::sync::watch From 7012700e7ee0fcae6e265872945efca1fd7a2f38 Mon Sep 17 00:00:00 2001 From: Nicola Fiorella Date: Mon, 11 Sep 2023 22:53:10 +0200 Subject: [PATCH 5/5] fix docs code example --- tokio/src/sync/watch.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 98a88581975..410a4fb4fcb 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -866,9 +866,10 @@ impl Sender { /// # Examples /// ``` /// let sender = tokio::sync::watch::Sender::new(0u8); - /// assert!(sender.send(3).is_err()) - /// let _rec = sender.subscribe() - /// assert!(sender.send(4).is_ok()) + /// assert!(sender.send(3).is_err()); + /// let _rec = sender.subscribe(); + /// assert!(sender.send(4).is_ok()); + /// ``` pub fn new(init: T) -> Self { let (tx, _) = channel(init); tx