Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time: fix wake-up with interval on Ready #5553

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 8 additions & 6 deletions tokio/src/runtime/time/entry.rs
Expand Up @@ -527,19 +527,21 @@ impl TimerEntry {
unsafe { self.driver().clear_entry(NonNull::from(self.inner())) };
}

pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) {
pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time;
unsafe { self.as_mut().get_unchecked_mut() }.registered = true;
unsafe { self.as_mut().get_unchecked_mut() }.registered = reregister;

let tick = self.driver().time_source().deadline_to_tick(new_time);

if self.inner().extend_expiration(tick).is_ok() {
return;
}

unsafe {
self.driver()
.reregister(&self.driver.driver().io, tick, self.inner().into());
if reregister {
unsafe {
self.driver()
.reregister(&self.driver.driver().io, tick, self.inner().into());
}
}
}

Expand All @@ -553,7 +555,7 @@ impl TimerEntry {

if !self.registered {
let deadline = self.deadline;
self.as_mut().reset(deadline);
self.as_mut().reset(deadline, true);
}

let this = unsafe { self.get_unchecked_mut() };
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/time/tests/mod.rs
Expand Up @@ -164,7 +164,7 @@ fn reset_future() {
.as_mut()
.poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));

entry.as_mut().reset(start + Duration::from_secs(2));
entry.as_mut().reset(start + Duration::from_secs(2), true);

// shouldn't complete before 2s
block_on(futures::future::poll_fn(|cx| {
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/time/interval.rs
Expand Up @@ -482,7 +482,10 @@ impl Interval {
timeout + self.period
};

self.delay.as_mut().reset(next);
// When we arrive here, the internal delay returned `Poll::Ready`.
// Reset the delay but do not register it. It should be registered with
// the next call to [`poll_tick`].
self.delay.as_mut().reset_without_reregister(next);

// Return the time when we were scheduled to tick
Poll::Ready(timeout)
Expand Down
15 changes: 14 additions & 1 deletion tokio/src/time/sleep.rs
Expand Up @@ -353,9 +353,22 @@ impl Sleep {
self.reset_inner(deadline)
}

/// Resets the `Sleep` instance to a new deadline without reregistering it
/// to be woken up.
///
/// Calling this function allows changing the instant at which the `Sleep`
/// future completes without having to create new associated state and
/// without having it registered. This is required in e.g. the
/// [crate::time::Interval] where we want to reset the internal [Sleep]
/// without having it wake up the last task that polled it.
pub(crate) fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) {
let mut me = self.project();
me.entry.as_mut().reset(deadline, false);
}

fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let mut me = self.project();
me.entry.as_mut().reset(deadline);
me.entry.as_mut().reset(deadline, true);

#[cfg(all(tokio_unstable, feature = "tracing"))]
{
Expand Down
118 changes: 115 additions & 3 deletions tokio/tests/time_interval.rs
@@ -1,10 +1,12 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use tokio::time::{self, Duration, Instant, MissedTickBehavior};
use tokio_test::{assert_pending, assert_ready_eq, task};
use std::pin::Pin;
use std::task::{Context, Poll};

use std::task::Poll;
use futures::{Stream, StreamExt};
use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior};
use tokio_test::{assert_pending, assert_ready_eq, task};

// Takes the `Interval` task, `start` variable, and optional time deltas
// For each time delta, it polls the `Interval` and asserts that the result is
Expand Down Expand Up @@ -209,3 +211,113 @@ fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

/// Helper struct to test the [tokio::time::Interval::poll_tick()] method.
///
/// `poll_tick()` should register the waker in the context only if it returns
/// `Poll::Pending`, not when returning `Poll::Ready`. This struct contains an
/// interval timer and counts up on every tick when used as stream. When the
/// counter is a multiple of four, it yields the current counter value.
/// Depending on the value for `wake_on_pending`, it will reschedule itself when
/// it returns `Poll::Pending` or not. When used with `wake_on_pending=false`,
/// we expect that the stream stalls because the timer will **not** reschedule
/// the next wake-up itself once it returned `Poll::Ready`.
struct IntervalStreamer {
counter: u32,
timer: Interval,
wake_on_pending: bool,
}

impl Stream for IntervalStreamer {
type Item = u32;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);

if this.counter > 12 {
return Poll::Ready(None);
}

match this.timer.poll_tick(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => {
this.counter += 1;
if this.counter % 4 == 0 {
Poll::Ready(Some(this.counter))
} else {
if this.wake_on_pending {
// Schedule this task for wake-up
cx.waker().wake_by_ref();
}
Poll::Pending
}
}
}
}
}

#[tokio::test(start_paused = true)]
async fn stream_with_interval_poll_tick_self_waking() {
let stream = IntervalStreamer {
counter: 0,
timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
wake_on_pending: true,
};

let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);

// Wrap task in timeout so that it will finish eventually even if the stream
// stalls.
tokio::spawn(tokio::time::timeout(
tokio::time::Duration::from_millis(150),
async move {
tokio::pin!(stream);

while let Some(item) = stream.next().await {
res_tx.send(item).await.ok();
}
},
));

let mut items = Vec::with_capacity(3);
while let Some(result) = res_rx.recv().await {
items.push(result);
}

// We expect the stream to yield normally and thus three items.
assert_eq!(items, vec![4, 8, 12]);
}

#[tokio::test(start_paused = true)]
async fn stream_with_interval_poll_tick_no_waking() {
let stream = IntervalStreamer {
counter: 0,
timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
wake_on_pending: false,
};

let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);

// Wrap task in timeout so that it will finish eventually even if the stream
// stalls.
tokio::spawn(tokio::time::timeout(
tokio::time::Duration::from_millis(150),
async move {
tokio::pin!(stream);

while let Some(item) = stream.next().await {
res_tx.send(item).await.ok();
}
},
));

let mut items = Vec::with_capacity(0);
while let Some(result) = res_rx.recv().await {
items.push(result);
}

// We expect the stream to stall because it does not reschedule itself on
// `Poll::Pending` and neither does [tokio::time::Interval] reschedule the
// task when returning `Poll::Ready`.
assert_eq!(items, vec![]);
}