Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add StreamExt::timeout_repeating #5577

Merged
merged 9 commits into from Apr 24, 2023
100 changes: 96 additions & 4 deletions tokio-stream/src/stream_ext.rs
Expand Up @@ -57,7 +57,7 @@ use try_next::TryNext;

cfg_time! {
pub(crate) mod timeout;
use timeout::Timeout;
use timeout::{Timeout, TimeoutMode};
use tokio::time::Duration;
mod throttle;
use throttle::{throttle, Throttle};
Expand Down Expand Up @@ -924,7 +924,9 @@ pub trait StreamExt: Stream {
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
/// stream value once it becomes available.
/// stream value once it becomes available. See
/// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
/// where the timeouts will repeat.
///
/// # Notes
///
Expand All @@ -941,7 +943,7 @@ pub trait StreamExt: Stream {
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream};
/// use std::time::Duration;
/// # let int_stream = stream::iter(1..=3);
///
Expand Down Expand Up @@ -969,6 +971,17 @@ pub trait StreamExt: Stream {
///
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // Once a timeout error is received, no further events will be received
/// // unless the wrapped stream yields a value (timeouts do not repeat).
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
/// let timeout_stream = interval_stream.timeout(Duration::from_millis(9));
/// tokio::pin!(timeout_stream);
///
/// // Only one timeout will be received between values in the source stream.
/// assert!(timeout_stream.try_next().await.is_ok());
/// assert!(timeout_stream.try_next().await.is_err());
/// assert!(timeout_stream.try_next().await.is_ok());
/// # }
/// ```
#[cfg(all(feature = "time"))]
Expand All @@ -977,7 +990,86 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
Timeout::new(self, duration)
Timeout::new(self, duration, TimeoutMode::Once)
}

/// Applies a per-item timeout to the passed stream.
///
/// `timeout_repeating()` takes a `Duration` that represents the maximum amount of
/// time each element of the stream has to complete before timing out.
///
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
/// stream value once it becomes available. Unlike `timeout()`, if no value
/// becomes available before the deadline is reached, additional errors are
/// returned at the specified interval. See [`timeout`](StreamExt::timeout)
/// for an alternative where the timeouts do not repeat.
///
/// # Notes
///
/// This function consumes the stream passed into it and returns a
/// wrapped version of it.
///
/// Polling the returned stream will continue to poll the inner stream even
/// if one or more items time out.
///
/// # Examples
///
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream};
/// use std::time::Duration;
/// # let int_stream = stream::iter(1..=3);
///
/// let int_stream = int_stream.timeout_repeating(Duration::from_secs(1));
/// tokio::pin!(int_stream);
///
/// // When no items time out, we get the 3 elements in succession:
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // If the second item times out, we get an error and continue polling the stream:
/// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert!(int_stream.try_next().await.is_err());
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // If we want to stop consuming the source stream the first time an
/// // element times out, we can use the `take_while` operator:
/// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// let mut int_stream = int_stream.take_while(Result::is_ok);
///
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // Timeout errors will be continuously produced at the specified
/// // interval until the wrapped stream yields a value.
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
/// let timeout_stream = interval_stream.timeout_repeating(Duration::from_millis(9));
/// tokio::pin!(timeout_stream);
///
/// // Multiple timeouts will be received between values in the source stream.
/// assert!(timeout_stream.try_next().await.is_ok());
/// assert!(timeout_stream.try_next().await.is_err());
/// assert!(timeout_stream.try_next().await.is_err());
/// assert!(timeout_stream.try_next().await.is_ok());
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
/// # }
/// ```
#[cfg(all(feature = "time"))]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout_repeating(self, duration: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, duration, TimeoutMode::Repeating)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make two separate structs instead of adding a mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for having a look! I removed the mode and timeout.rs is now mostly unchanged. I took the liberty of reusing the Elapsed struct for timeouts.


/// Slows down a stream by enforcing a delay between items.
Expand Down
61 changes: 45 additions & 16 deletions tokio-stream/src/stream_ext/timeout.rs
Expand Up @@ -9,6 +9,15 @@ use pin_project_lite::pin_project;
use std::fmt;
use std::time::Duration;

/// Behavior of timeouts.
#[derive(Debug)]
pub(super) enum TimeoutMode {
/// A timeout will only fire once.
Once,
/// A timeout will fire repeatedly.
Repeating,
}

pin_project! {
/// Stream returned by the [`timeout`](super::StreamExt::timeout) method.
#[must_use = "streams do nothing unless polled"]
Expand All @@ -20,6 +29,7 @@ pin_project! {
deadline: Sleep,
duration: Duration,
poll_deadline: bool,
timeout_mode: TimeoutMode,
}
}

Expand All @@ -28,7 +38,7 @@ pin_project! {
pub struct Elapsed(());

impl<S: Stream> Timeout<S> {
pub(super) fn new(stream: S, duration: Duration) -> Self {
pub(super) fn new(stream: S, duration: Duration, timeout_mode: TimeoutMode) -> Self {
let next = Instant::now() + duration;
let deadline = tokio::time::sleep_until(next);

Expand All @@ -37,6 +47,7 @@ impl<S: Stream> Timeout<S> {
deadline,
duration,
poll_deadline: true,
timeout_mode,
}
}
}
Expand All @@ -45,7 +56,7 @@ impl<S: Stream> Stream for Timeout<S> {
type Item = Result<S::Item, Elapsed>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.project();
let mut me = self.project();

match me.stream.poll_next(cx) {
Poll::Ready(v) => {
Expand All @@ -59,28 +70,46 @@ impl<S: Stream> Stream for Timeout<S> {
Poll::Pending => {}
};

if *me.poll_deadline {
ready!(me.deadline.poll(cx));
*me.poll_deadline = false;
return Poll::Ready(Some(Err(Elapsed::new())));
}
match me.timeout_mode {
TimeoutMode::Once => {
if *me.poll_deadline {
ready!(me.deadline.poll(cx));
*me.poll_deadline = false;
return Poll::Ready(Some(Err(Elapsed::new())));
}

Poll::Pending
Poll::Pending
}
TimeoutMode::Repeating => {
ready!(me.deadline.as_mut().poll(cx));
let next = Instant::now() + *me.duration;
me.deadline.reset(next);
Poll::Ready(Some(Err(Elapsed::new())))
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, upper) = self.stream.size_hint();

// The timeout stream may insert an error before and after each message
// from the underlying stream, but no more than one error between each
// message. Hence the upper bound is computed as 2x+1.
match self.timeout_mode {
TimeoutMode::Once => {
// The timeout stream may insert an error before and after each message
// from the underlying stream, but no more than one error between each
// message. Hence the upper bound is computed as 2x+1.

// Using a helper function to enable use of question mark operator.
fn twice_plus_one(value: Option<usize>) -> Option<usize> {
value?.checked_mul(2)?.checked_add(1)
}
// Using a helper function to enable use of question mark operator.
fn twice_plus_one(value: Option<usize>) -> Option<usize> {
value?.checked_mul(2)?.checked_add(1)
}

(lower, twice_plus_one(upper))
(lower, twice_plus_one(upper))
}
TimeoutMode::Repeating => {
// The timeout stream may insert an error an infinite number of times.
(lower, None)
}
}
}
}

Expand Down