From b2d67ef8b465e87311ffe023ca609b37cad186c5 Mon Sep 17 00:00:00 2001 From: "Simon B. Gasse" Date: Fri, 17 Mar 2023 10:16:03 +0100 Subject: [PATCH] time: fix wake-up with interval on `Ready` (#5551) When `tokio::time::Interval::poll_tick()` returns `Poll::Pending`, it schedules itself for being woken up again through the waker of the passed context, which is correct behavior. However when `Poll::Ready(_)` is returned, the interval timer should be reset but not scheduled to be woken up again as this is up to the caller. This commit fixes the bug by introducing a `reset_without_reregister` method on `TimerEntry` which is called by `Intervall::poll_tick(cx)` in case the delay poll returns `Poll::Ready(_)`. --- tokio/src/runtime/time/entry.rs | 11 +++++ tokio/src/time/interval.rs | 5 ++- tokio/src/time/sleep.rs | 20 +++++++++ tokio/tests/time_interval.rs | 73 +++++++++++++++++++++++++++++++++ 4 files changed, 108 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index f86d9ed97f0..9016e29f395 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -543,6 +543,17 @@ impl TimerEntry { } } + pub(crate) fn reset_without_reregister(mut self: Pin<&mut Self>, new_time: Instant) { + unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time; + unsafe { self.as_mut().get_unchecked_mut() }.registered = false; + + let tick = self.driver().time_source().deadline_to_tick(new_time); + + if self.inner().extend_expiration(tick).is_ok() { + return; + } + } + pub(crate) fn poll_elapsed( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index ea8b3939f46..9b98cd77214 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -482,7 +482,10 @@ impl Interval { timeout + self.period }; - self.delay.as_mut().reset(next); + // When we arrive here, the internal delay returned `Poll::Ready`. + // Reset the delay but do not register it. It should be registered with + // the next call to [`poll_tick`]. + self.delay.as_mut().reset_without_reregister(next); // Return the time when we were scheduled to tick Poll::Ready(timeout) diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 370a98b9902..74bff62599c 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -353,6 +353,26 @@ impl Sleep { self.reset_inner(deadline) } + /// Resets the `Sleep` instance to a new deadline without reregistering it + /// to be woken up. + /// + /// Calling this function allows changing the instant at which the `Sleep` + /// future completes without having to create new associated state and + /// without having it registered. This is required in e.g. the + /// [crate::time::Interval] where we want to reset the internal [Sleep] + /// without having it wake up the last task that polled it. + /// + /// This function can be called both before and after the future has + /// completed. + /// + /// To call this method, you will usually combine the call with + /// [`Pin::as_mut`], which lets you call the method without consuming the + /// `Sleep` itself. + pub fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) { + let mut me = self.project(); + me.entry.as_mut().reset_without_reregister(deadline); + } + fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { let mut me = self.project(); me.entry.as_mut().reset(deadline); diff --git a/tokio/tests/time_interval.rs b/tokio/tests/time_interval.rs index 186582e2e52..c75ce060bd5 100644 --- a/tokio/tests/time_interval.rs +++ b/tokio/tests/time_interval.rs @@ -209,3 +209,76 @@ fn poll_next(interval: &mut task::Spawn) -> Poll { fn ms(n: u64) -> Duration { Duration::from_millis(n) } + +mod tmp_tests { + use std::{ + pin::Pin, + task::{Context, Poll}, + time::Instant, + }; + + use crate::time::Interval; + use futures::{pin_mut, Stream, StreamExt}; + + struct IntervalStreamer { + start: Instant, + counter: u32, + timer: Interval, + } + + impl Stream for IntervalStreamer { + type Item = u32; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); + + if this.counter > 12 { + return Poll::Ready(None); + } + + match this.timer.poll_tick(cx) { + Poll::Pending => { + println!( + "Timer returned Poll::Pending after {:?}", + this.start.elapsed() + ); + Poll::Pending + } + Poll::Ready(_) => { + println!( + "Timer returned Poll::Ready after {:?}", + this.start.elapsed() + ); + this.counter += 1; + if this.counter % 4 == 0 { + Poll::Ready(Some(this.counter)) + } else { + // Schedule this task for wake-up + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + } + } + + #[tokio::test] + async fn reset_without_reregister() { + let stream = IntervalStreamer { + start: Instant::now(), + counter: 0, + timer: crate::time::interval(std::time::Duration::from_millis(10)), + }; + + pin_mut!(stream); + + let mut results = Vec::with_capacity(4); + while let Some(item) = stream.next().await { + println!("Stream yielded an item: {}", item); + results.push(item); + } + + dbg!(&results); + assert_eq!(results, vec![4, 8, 12]); + } +}