From e7f83664649eeb555772d51ccac8629e7aa0d96f Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 9 Apr 2023 16:01:02 +0530 Subject: [PATCH 01/28] sync: implement watch::Receiver::wait_for method Fixes: https://github.com/tokio-rs/tokio/issues/5606 --- tokio/src/sync/watch.rs | 43 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 9ca75979b99..3a14035dd12 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -610,6 +610,49 @@ impl Receiver { } } + /// This function is similar to [`changed`], but it takes a closure that is + /// called with a reference to the new value. If the closure returns `true`, + /// then the function returns immediately. Otherwise, it waits for a new + /// value and calls the closure again. + /// + /// # Examples + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// + /// async fn main() { + /// let (tx, _rx) = watch::channel("hello"); + /// + /// tx.send("goodbye").unwrap(); + /// + /// // here we subscribe to a second receiver + /// // now in case of using `changed` we would have + /// // to first check the current value and then wait + /// // for changes or else `changed` would hang. + /// let mut rx2 = tx.subscribe(); + /// + /// // in place of changed we have use `wait_for` + /// // which would automatically check the current value + /// // and wait for changes until the closure returns true. + /// assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok()); + /// assert_eq!(*rx2.borrow(), "goodbye"); + /// } + /// ``` + + pub async fn wait_for(&mut self, mut f: impl FnMut(&T) -> bool) -> Result { + loop { + if f(&self.borrow_and_update()) { + return Ok(true); + } + + let changed = self.changed().await; + if changed.is_err() { + return Ok(false); + } + } + } + /// Returns `true` if receivers belong to the same channel. /// /// # Examples From 32de79457597345eaf2b13ee1a04ca4737b18f55 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 9 Apr 2023 16:10:47 +0530 Subject: [PATCH 02/28] fixup! lint --- tokio/src/sync/watch.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 3a14035dd12..4bc2c24bb35 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -614,16 +614,16 @@ impl Receiver { /// called with a reference to the new value. If the closure returns `true`, /// then the function returns immediately. Otherwise, it waits for a new /// value and calls the closure again. - /// + /// /// # Examples /// ``` /// use tokio::sync::watch; - /// + /// /// #[tokio::main] - /// + /// /// async fn main() { /// let (tx, _rx) = watch::channel("hello"); - /// + /// /// tx.send("goodbye").unwrap(); /// /// // here we subscribe to a second receiver @@ -632,7 +632,7 @@ impl Receiver { /// // for changes or else `changed` would hang. /// let mut rx2 = tx.subscribe(); /// - /// // in place of changed we have use `wait_for` + /// // in place of changed we have use `wait_for` /// // which would automatically check the current value /// // and wait for changes until the closure returns true. /// assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok()); @@ -640,7 +640,10 @@ impl Receiver { /// } /// ``` - pub async fn wait_for(&mut self, mut f: impl FnMut(&T) -> bool) -> Result { + pub async fn wait_for( + &mut self, + mut f: impl FnMut(&T) -> bool, + ) -> Result { loop { if f(&self.borrow_and_update()) { return Ok(true); From 79a7f1a23f8df44ba22bd364d7237c1453adb349 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 10 Apr 2023 15:46:43 +0530 Subject: [PATCH 03/28] fixup! doc --- tokio/src/sync/watch.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 4bc2c24bb35..fab0ee02193 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -614,8 +614,11 @@ impl Receiver { /// called with a reference to the new value. If the closure returns `true`, /// then the function returns immediately. Otherwise, it waits for a new /// value and calls the closure again. + /// + /// [`changed`]: Receiver::changed /// /// # Examples + /// /// ``` /// use tokio::sync::watch; /// From 899c11a59ebcbef58a8a145eaae17698a5109ddb Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 10 Apr 2023 15:50:46 +0530 Subject: [PATCH 04/28] fixup! lint again --- tokio/src/sync/watch.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index fab0ee02193..b939928a6f6 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -610,15 +610,15 @@ impl Receiver { } } - /// This function is similar to [`changed`], but it takes a closure that is + /// This function is similar to [`Receiver::changed()`], but it takes a closure that is /// called with a reference to the new value. If the closure returns `true`, /// then the function returns immediately. Otherwise, it waits for a new /// value and calls the closure again. - /// - /// [`changed`]: Receiver::changed + /// + /// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed /// /// # Examples - /// + /// /// ``` /// use tokio::sync::watch; /// From e0e5157d9d204bd8877045f638945577a3b26eed Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 10 Apr 2023 18:15:26 +0530 Subject: [PATCH 05/28] fixup! add a test --- tokio/src/sync/tests/loom_watch.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index c575b5b66c5..1355c24d937 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -34,3 +34,25 @@ fn smoke() { th.join().unwrap(); }) } + +#[test] +fn wait_for_test() { + loom::model(move || { + let (tx, mut rx) = watch::channel(0); + + // here we repeatedly send values to the channel + // to trigger its loop + let th = thread::spawn(move || { + for i in 0..10 { + tx.send(i).unwrap(); + } + }); + + // here we block the main thread until the + // value is 9 + let result = block_on(rx.wait_for(|x| *x == 9)); + assert_eq!(result.unwrap(), true); + + th.join().unwrap(); + }); +} \ No newline at end of file From de91fc4d52a85dd93c34f422091a3b728c1dc5cc Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 11 Apr 2023 14:45:22 +0530 Subject: [PATCH 06/28] fixup! update the test for multiple threads --- tokio/src/sync/tests/loom_watch.rs | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index 1355c24d937..a9f8ae82496 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -2,6 +2,8 @@ use crate::sync::watch; use loom::future::block_on; use loom::thread; +use std::sync::Arc; +use std::time::Duration; #[test] fn smoke() { @@ -38,21 +40,28 @@ fn smoke() { #[test] fn wait_for_test() { loom::model(move || { - let (tx, mut rx) = watch::channel(0); + let (tx, mut rx) = watch::channel(false); + + let tx_arc = Arc::new(tx); + let tx1 = tx_arc.clone(); + let tx2 = tx_arc.clone(); - // here we repeatedly send values to the channel - // to trigger its loop - let th = thread::spawn(move || { - for i in 0..10 { - tx.send(i).unwrap(); + let th1 = thread::spawn(move || { + for _ in 0..10 { + tx1.send(false).unwrap(); + std::thread::sleep(Duration::from_millis(10)); } }); - // here we block the main thread until the - // value is 9 - let result = block_on(rx.wait_for(|x| *x == 9)); + let th2 = thread::spawn(move || { + std::thread::sleep(Duration::from_millis(10)); + tx2.send(true).unwrap(); + }); + + let result = block_on(rx.wait_for(|x| *x)); assert_eq!(result.unwrap(), true); - th.join().unwrap(); + th1.join().unwrap(); + th2.join().unwrap(); }); } \ No newline at end of file From f8e0810c8947c36231011d2045b8338cff5a277e Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 11 Apr 2023 14:46:42 +0530 Subject: [PATCH 07/28] fixup! lint --- tokio/src/sync/tests/loom_watch.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index a9f8ae82496..13d9a4cb68d 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -45,7 +45,7 @@ fn wait_for_test() { let tx_arc = Arc::new(tx); let tx1 = tx_arc.clone(); let tx2 = tx_arc.clone(); - + let th1 = thread::spawn(move || { for _ in 0..10 { tx1.send(false).unwrap(); @@ -55,7 +55,7 @@ fn wait_for_test() { let th2 = thread::spawn(move || { std::thread::sleep(Duration::from_millis(10)); - tx2.send(true).unwrap(); + tx2.send(true).unwrap(); }); let result = block_on(rx.wait_for(|x| *x)); @@ -64,4 +64,4 @@ fn wait_for_test() { th1.join().unwrap(); th2.join().unwrap(); }); -} \ No newline at end of file +} From 15b0ee53cc055d4e0ad0fe387566794cd029c71b Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 11 Apr 2023 15:35:08 +0530 Subject: [PATCH 08/28] fixup! use send_modify and fix race condition --- tokio/src/sync/tests/loom_watch.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index 13d9a4cb68d..8e8ee9d31e4 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -3,7 +3,6 @@ use crate::sync::watch; use loom::future::block_on; use loom::thread; use std::sync::Arc; -use std::time::Duration; #[test] fn smoke() { @@ -47,14 +46,19 @@ fn wait_for_test() { let tx2 = tx_arc.clone(); let th1 = thread::spawn(move || { - for _ in 0..10 { - tx1.send(false).unwrap(); - std::thread::sleep(Duration::from_millis(10)); + for _ in 0..3 { + tx1.send_modify(|x| { + // here we avoid unsetting the value in case + // the value is already set to true. + if !*x { + // force the loop to run. + *x = false; + } + }); } }); let th2 = thread::spawn(move || { - std::thread::sleep(Duration::from_millis(10)); tx2.send(true).unwrap(); }); From 539a17ba48a7afc22e6a7932060e15b450292fb4 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 11 Apr 2023 16:06:06 +0530 Subject: [PATCH 09/28] fixup! simplify the closure Co-authored-by: Alice Ryhl --- tokio/src/sync/tests/loom_watch.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index 8e8ee9d31e4..f9a7eb0b1d1 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -47,14 +47,7 @@ fn wait_for_test() { let th1 = thread::spawn(move || { for _ in 0..3 { - tx1.send_modify(|x| { - // here we avoid unsetting the value in case - // the value is already set to true. - if !*x { - // force the loop to run. - *x = false; - } - }); + tx1.send_modify(|_x| {}); } }); From 3e8f26813be3e002c574792223b2cb6e0658d3ca Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 11 Apr 2023 16:06:26 +0530 Subject: [PATCH 10/28] fixup! lint Co-authored-by: Alice Ryhl --- tokio/src/sync/watch.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index b939928a6f6..a74fd366284 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -642,7 +642,6 @@ impl Receiver { /// assert_eq!(*rx2.borrow(), "goodbye"); /// } /// ``` - pub async fn wait_for( &mut self, mut f: impl FnMut(&T) -> bool, From 6424dd164210df5cc49a98158146dc28835e5e3a Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 11 Apr 2023 16:09:27 +0530 Subject: [PATCH 11/28] fixup! add description of the function --- tokio/src/sync/watch.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index a74fd366284..a3369b9771b 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -610,6 +610,9 @@ impl Receiver { } } + /// Takes a simple closure and waits for change notifications and returns once the + /// closure returns `true` for one of the changed values. + /// /// This function is similar to [`Receiver::changed()`], but it takes a closure that is /// called with a reference to the new value. If the closure returns `true`, /// then the function returns immediately. Otherwise, it waits for a new From add1152ac6ed12b45a90d63572b1ac07a1d37841 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 11 Apr 2023 16:23:33 +0530 Subject: [PATCH 12/28] fixup! update the description --- tokio/src/sync/watch.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index a3369b9771b..df2674dd7a2 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -610,8 +610,7 @@ impl Receiver { } } - /// Takes a simple closure and waits for change notifications and returns once the - /// closure returns `true` for one of the changed values. + /// Takes a closure and waits until the provided closure returns true. /// /// This function is similar to [`Receiver::changed()`], but it takes a closure that is /// called with a reference to the new value. If the closure returns `true`, From 46196fb73ea4c5f78fd75e8472d2ef1b3cc89ef5 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 11 Apr 2023 16:49:05 +0530 Subject: [PATCH 13/28] fixup! return type same as changed() --- tokio/src/sync/tests/loom_watch.rs | 3 +-- tokio/src/sync/watch.rs | 9 +++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index f9a7eb0b1d1..30aa420daf9 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -55,8 +55,7 @@ fn wait_for_test() { tx2.send(true).unwrap(); }); - let result = block_on(rx.wait_for(|x| *x)); - assert_eq!(result.unwrap(), true); + block_on(rx.wait_for(|x| *x)).unwrap(); th1.join().unwrap(); th2.join().unwrap(); diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index df2674dd7a2..f8005cf7e75 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -647,16 +647,13 @@ impl Receiver { pub async fn wait_for( &mut self, mut f: impl FnMut(&T) -> bool, - ) -> Result { + ) -> Result<(), error::RecvError> { loop { if f(&self.borrow_and_update()) { - return Ok(true); + return Ok(()); } - let changed = self.changed().await; - if changed.is_err() { - return Ok(false); - } + self.changed().await?; } } From 1287301414379b55d1ab27a28d63c2fdbcb979db Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 11 Apr 2023 16:50:57 +0530 Subject: [PATCH 14/28] fixup! lint --- tokio/src/sync/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index f8005cf7e75..a90c2958327 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -611,7 +611,7 @@ impl Receiver { } /// Takes a closure and waits until the provided closure returns true. - /// + /// /// This function is similar to [`Receiver::changed()`], but it takes a closure that is /// called with a reference to the new value. If the closure returns `true`, /// then the function returns immediately. Otherwise, it waits for a new From ba8db72990360f102db9bbba4b6db0a3bd3aaf3a Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 17 Apr 2023 12:34:55 +0530 Subject: [PATCH 15/28] fixup! update docs Co-authored-by: Alice Ryhl --- tokio/src/sync/watch.rs | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index a90c2958327..aff06cdfd1e 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -610,12 +610,38 @@ impl Receiver { } } - /// Takes a closure and waits until the provided closure returns true. - /// - /// This function is similar to [`Receiver::changed()`], but it takes a closure that is - /// called with a reference to the new value. If the closure returns `true`, - /// then the function returns immediately. Otherwise, it waits for a new - /// value and calls the closure again. + /// Waits for a value that satisifes the provided condition. + /// + /// This method will call the provided closure whenever something is sent on + /// the channel. Once the closure returns `true`, this method will return a + /// reference to the value that was passed to the closure. + /// + /// Before `wait_for` starts waiting for changes, it will call the closure + /// on the current value. If the closure returns `true` when given the + /// current value, then `wait_for` will immediately return a reference to + /// the current value. This is the case even if the current value is already + /// considered seen. + /// + /// The watch channel only keeps track of the most recent value, so if + /// several messages are sent faster than `wait_for` is able to call the + /// closure, then it may skip some updates. Whenever the closure is called, + /// it will be called with the most recent value. + /// + /// When this function returns, the value that was passed to the closure + /// when it returned `true` will be considered seen. + /// + /// If the channel is closed, then `wait_for` will return a `RecvError`. + /// Once this happens, no more messages can ever be sent on the channel. + /// When an error is returned, it is guaranteed that the closure has been + /// called on the last value, and that it returned `false` for that value. + /// (If the closure returned `true`, then the last value would have been + /// returned instead of the error.) + /// + /// Like the `borrow` method, the returned borrow holds a read lock on the + /// inner value. This means that long-lived borrows could cause the producer + /// half to block. It is recommended to keep the borrow as short-lived as + /// possible. See the documentation of `borrow` for more information on + /// this. /// /// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed /// From d7d7adc08139964d52017097e61f7d4596a64241 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 17 Apr 2023 13:33:36 +0530 Subject: [PATCH 16/28] fixup! add a guarantee to see the last value --- tokio/src/sync/watch.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index aff06cdfd1e..bca2ba99374 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -679,7 +679,19 @@ impl Receiver { return Ok(()); } - self.changed().await?; + match self.changed().await { + Ok(()) => {} + Err(e) => { + // some error occurred but we still need to call the closure + // to guarantee that it has been called on the last value. + // and we error only if its false. + if f(&self.borrow_and_update()) { + return Ok(()); + } else { + return Err(e); + } + } + } } } From 2bece237f0cb4bfba4c6507c746b72b5c6d73d22 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 21 Apr 2023 23:52:17 +0530 Subject: [PATCH 17/28] fixup! use Ref Co-authored-by: Alice Ryhl --- tokio/src/sync/watch.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index bca2ba99374..3eca7ed9548 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -670,10 +670,10 @@ impl Receiver { /// assert_eq!(*rx2.borrow(), "goodbye"); /// } /// ``` - pub async fn wait_for( - &mut self, + pub async fn wait_for<'a>( + &'a mut self, mut f: impl FnMut(&T) -> bool, - ) -> Result<(), error::RecvError> { + ) -> Result, error::RecvError> { loop { if f(&self.borrow_and_update()) { return Ok(()); From 3f5f3e0708a95836b9f242a32fd7b640c05db79b Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 22 Apr 2023 00:39:06 +0530 Subject: [PATCH 18/28] fixup! finish update to Ref --- tokio/src/sync/tests/loom_watch.rs | 2 +- tokio/src/sync/watch.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index 30aa420daf9..0b4e7c16d85 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -55,7 +55,7 @@ fn wait_for_test() { tx2.send(true).unwrap(); }); - block_on(rx.wait_for(|x| *x)).unwrap(); + assert_eq!(*block_on(rx.wait_for(|x| *x)).unwrap(), true); th1.join().unwrap(); th2.join().unwrap(); diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 3eca7ed9548..67ab00227e5 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -676,7 +676,7 @@ impl Receiver { ) -> Result, error::RecvError> { loop { if f(&self.borrow_and_update()) { - return Ok(()); + return Ok(self.borrow()); } match self.changed().await { @@ -685,8 +685,9 @@ impl Receiver { // some error occurred but we still need to call the closure // to guarantee that it has been called on the last value. // and we error only if its false. + // we should not call the closure twice on the same value. if f(&self.borrow_and_update()) { - return Ok(()); + return Ok(self.borrow()); } else { return Err(e); } From b76576e59f6886c6b0d45de9f00089edeb54bb9a Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 22 Apr 2023 00:53:15 +0530 Subject: [PATCH 19/28] fixup! clippy fix --- tokio/src/sync/watch.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 67ab00227e5..07a02bbc1a9 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -673,7 +673,7 @@ impl Receiver { pub async fn wait_for<'a>( &'a mut self, mut f: impl FnMut(&T) -> bool, - ) -> Result, error::RecvError> { + ) -> Result, error::RecvError> { loop { if f(&self.borrow_and_update()) { return Ok(self.borrow()); @@ -685,7 +685,6 @@ impl Receiver { // some error occurred but we still need to call the closure // to guarantee that it has been called on the last value. // and we error only if its false. - // we should not call the closure twice on the same value. if f(&self.borrow_and_update()) { return Ok(self.borrow()); } else { From 59e7074973293a8e6b4577e6f30972247af92f78 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 22 Apr 2023 01:06:57 +0530 Subject: [PATCH 20/28] fixup! clippy again --- tokio/src/sync/watch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 07a02bbc1a9..dcc4c62471d 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -670,8 +670,8 @@ impl Receiver { /// assert_eq!(*rx2.borrow(), "goodbye"); /// } /// ``` - pub async fn wait_for<'a>( - &'a mut self, + pub async fn wait_for( + &mut self, mut f: impl FnMut(&T) -> bool, ) -> Result, error::RecvError> { loop { From ca2d2bfd70336843dd577d02a41308d3e3c15187 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 22 Apr 2023 13:05:26 +0530 Subject: [PATCH 21/28] fixup! include version matching --- tokio/src/sync/watch.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index dcc4c62471d..631e31f6cbe 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -680,16 +680,18 @@ impl Receiver { } match self.changed().await { - Ok(()) => {} + Ok(_) => {} Err(e) => { - // some error occurred but we still need to call the closure - // to guarantee that it has been called on the last value. - // and we error only if its false. + // check if the version has changed since the last time we + // called the closure. + if self.shared.state.load().version() == self.version { + return Err(e); + } + // the version has changed, so we need to call the closure if f(&self.borrow_and_update()) { return Ok(self.borrow()); - } else { - return Err(e); } + return Err(e); } } } From 09652f8eeca6171feb897aad66b26b44445c6b3e Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sat, 22 Apr 2023 18:43:38 +0530 Subject: [PATCH 22/28] fixup! use has_changed instead --- tokio/src/sync/watch.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 631e31f6cbe..dbd3fd2e0c0 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -682,15 +682,14 @@ impl Receiver { match self.changed().await { Ok(_) => {} Err(e) => { + let updated_val = self.borrow_and_update(); // check if the version has changed since the last time we // called the closure. - if self.shared.state.load().version() == self.version { - return Err(e); - } - // the version has changed, so we need to call the closure - if f(&self.borrow_and_update()) { - return Ok(self.borrow()); + if updated_val.has_changed && f(&updated_val) { + return Ok(updated_val); } + // if the version hasn't changed, then the channel has been + // closed. We return the error. return Err(e); } } From e4472d556871d28f30fb1742c2c31a26090ff8df Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sun, 23 Apr 2023 17:13:32 +0200 Subject: [PATCH 23/28] inline borrow_and_update into wait_for --- tokio/src/sync/watch.rs | 65 +++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index dbd3fd2e0c0..3dc192314dd 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -595,19 +595,7 @@ impl Receiver { /// } /// ``` pub async fn changed(&mut self) -> Result<(), error::RecvError> { - loop { - // In order to avoid a race condition, we first request a notification, - // **then** check the current value's version. If a new version exists, - // the notification request is dropped. - let notified = self.shared.notify_rx.notified(); - - if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { - return ret; - } - - notified.await; - // loop around again in case the wake-up was spurious - } + changed_impl(&self.shared, &mut self.version).await } /// Waits for a value that satisifes the provided condition. @@ -656,13 +644,13 @@ impl Receiver { /// let (tx, _rx) = watch::channel("hello"); /// /// tx.send("goodbye").unwrap(); - /// + /// /// // here we subscribe to a second receiver /// // now in case of using `changed` we would have /// // to first check the current value and then wait /// // for changes or else `changed` would hang. /// let mut rx2 = tx.subscribe(); - /// + /// /// // in place of changed we have use `wait_for` /// // which would automatically check the current value /// // and wait for changes until the closure returns true. @@ -674,25 +662,28 @@ impl Receiver { &mut self, mut f: impl FnMut(&T) -> bool, ) -> Result, error::RecvError> { + let mut closed = false; loop { - if f(&self.borrow_and_update()) { - return Ok(self.borrow()); - } + { + let inner = self.shared.value.read().unwrap(); + + let new_version = self.shared.state.load().version(); + let has_changed = self.version != new_version; + self.version = new_version; - match self.changed().await { - Ok(_) => {} - Err(e) => { - let updated_val = self.borrow_and_update(); - // check if the version has changed since the last time we - // called the closure. - if updated_val.has_changed && f(&updated_val) { - return Ok(updated_val); + if !closed || has_changed { + if f(&inner) { + return Ok(Ref { inner, has_changed }); } - // if the version hasn't changed, then the channel has been - // closed. We return the error. - return Err(e); } } + + if closed { + return Err(error::RecvError(())); + } + + // Wait for the value to change. + closed = changed_impl(&self.shared, &mut self.version).await.is_err(); } } @@ -741,6 +732,22 @@ fn maybe_changed( None } +async fn changed_impl(shared: &Shared, version: &mut Version) -> Result<(), error::RecvError> { + loop { + // In order to avoid a race condition, we first request a notification, + // **then** check the current value's version. If a new version exists, + // the notification request is dropped. + let notified = shared.notify_rx.notified(); + + if let Some(ret) = maybe_changed(shared, version) { + return ret; + } + + notified.await; + // loop around again in case the wake-up was spurious + } +} + impl Clone for Receiver { fn clone(&self) -> Self { let version = self.version; From fd15eeaa8ce3142febfaaeffb9945177f45bd440 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 23 Apr 2023 21:18:09 +0530 Subject: [PATCH 24/28] fixup! lint and clippy --- tokio/src/sync/watch.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 3dc192314dd..993204cd2a5 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -671,10 +671,8 @@ impl Receiver { let has_changed = self.version != new_version; self.version = new_version; - if !closed || has_changed { - if f(&inner) { - return Ok(Ref { inner, has_changed }); - } + if (!closed || has_changed) && f(&inner) { + return Ok(Ref { inner, has_changed }); } } From 39f1cbebe5c440abe5ed44026925118f7ae982e7 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 23 Apr 2023 21:20:32 +0530 Subject: [PATCH 25/28] fixup! lint again --- tokio/src/sync/watch.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 993204cd2a5..449711ad75c 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -730,7 +730,10 @@ fn maybe_changed( None } -async fn changed_impl(shared: &Shared, version: &mut Version) -> Result<(), error::RecvError> { +async fn changed_impl( + shared: &Shared, + version: &mut Version, +) -> Result<(), error::RecvError> { loop { // In order to avoid a race condition, we first request a notification, // **then** check the current value's version. If a new version exists, From 1f38f3f062b4ddfb313c8da44a0fef66e3f86bd0 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 24 Apr 2023 12:05:36 +0530 Subject: [PATCH 26/28] Update tokio/src/sync/tests/loom_watch.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/tests/loom_watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index 0b4e7c16d85..d5a37c790dc 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -46,7 +46,7 @@ fn wait_for_test() { let tx2 = tx_arc.clone(); let th1 = thread::spawn(move || { - for _ in 0..3 { + for _ in 0..2 { tx1.send_modify(|_x| {}); } }); From 3d1077f69228ec6d85a816aee7058015e6b58f8d Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 24 Apr 2023 12:09:10 +0530 Subject: [PATCH 27/28] fixup! add another test --- tokio/src/sync/tests/loom_watch.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index d5a37c790dc..0ea44e4db3d 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -61,3 +61,29 @@ fn wait_for_test() { th2.join().unwrap(); }); } + +#[test] +fn wait_for_returns_correct_value() { + loom::model(move || { + let (tx, mut rx) = watch::channel(0); + + let jh = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(2).unwrap(); + tx.send(3).unwrap(); + }); + + // Stop at the first value we are called at. + let mut stopped_at = usize::MAX; + let returned = *block_on(rx.wait_for(|x| { + stopped_at = *x; + true + })).unwrap(); + + // Check that it returned the same value as the one we returned + // `true` for. + assert_eq!(stopped_at, returned); + + jh.join().unwrap(); + }); +} \ No newline at end of file From 9157bbca552615b39b3b64d9e144f83fba812984 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 24 Apr 2023 12:39:31 +0530 Subject: [PATCH 28/28] fixup! lint --- tokio/src/sync/tests/loom_watch.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index 0ea44e4db3d..51589cd8042 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -78,7 +78,8 @@ fn wait_for_returns_correct_value() { let returned = *block_on(rx.wait_for(|x| { stopped_at = *x; true - })).unwrap(); + })) + .unwrap(); // Check that it returned the same value as the one we returned // `true` for. @@ -86,4 +87,4 @@ fn wait_for_returns_correct_value() { jh.join().unwrap(); }); -} \ No newline at end of file +}