Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add StreamExt::timeout_repeating #5577

Merged
merged 9 commits into from Apr 24, 2023

Conversation

jrray
Copy link
Contributor

@jrray jrray commented Mar 23, 2023

Motivation

The behavior of StreamExt::timeout is it only fires the timeout once (per value produced from the wrapped stream) and while this behavior is documented it is easy to overlook and assume that timeouts are generated repeatedly. I had made use of timeout in a project assuming that timeouts did repeat and it led to a subtle bug. It is useful for my use case for timeouts to fire repeatedly, so this change adds that in timeout_repeating as an alternative to timeout. For example, one might want to log a message at a fixed interval while no messages are being produced from a stream.

Solution

The non-repeating timeout behavior of timeout was not tested or demonstrated by the doc test, so first this was added.

A new enum TimeoutMode has been added to describe the two possible behaviors. stream_ext::Timeout was modified with a new timeout_mode: TimeoutMode field to share an implementation between the two behaviors.

Include a test for timeout behavior of `timeout()`, demonstrating and
verifying that it will not repeat timeouts.
Extend `Timeout` with a new `TimeoutMode` that controls the behavior of
how timeouts are produced. A new extension method `timeout_repeating`
makes use of a newly added behavior where the stream continuously produces
timeouts at the specified interval rather than stalling after a single
timeout.
tokio-stream/src/stream_ext.rs Outdated Show resolved Hide resolved
Use a strategy for testing the repeating timeout behavior that is less
sensitive to timing to avoid spurious failures.
@Darksonn Darksonn added A-tokio-macros Area: The tokio-macros crate M-time Module: tokio/time labels Mar 24, 2023
Use `start_paused = true` so the interval and timeouts in the stream
timeout doc tests happen in a consistent order.
@Darksonn Darksonn added A-tokio-stream Area: The tokio-stream crate and removed A-tokio-macros Area: The tokio-macros crate labels Apr 16, 2023
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR fell between the cracks. Sorry about the delay in review.

Comment on lines 1069 to 1074
fn timeout_repeating(self, duration: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, duration, TimeoutMode::Repeating)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make two separate structs instead of adding a mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for having a look! I removed the mode and timeout.rs is now mostly unchanged. I took the liberty of reusing the Elapsed struct for timeouts.

Revert adding conditional behavior to the `Timeout` struct and implement
the new behavior in a new struct, while reusing the `Elapsed` struct to
represent timeouts.
Comment on lines 54 to 57
ready!(me.deadline.as_mut().poll(cx));
let next = Instant::now() + *me.duration;
me.deadline.reset(next);
Poll::Ready(Some(Err(Elapsed::new())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not yield the errors at a regular rate. The duration between two errors will be increased by the time between the timeout triggering and poll_next being called. Perhaps we should use an Interval instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to use an Interval. The infrequent polling problem has another edge case: if set to duration x, and the underlying stream doesn't produce a value until x + t1, but the stream is not polled until x + t2, then when polled would it be expected to produce a timeout or the value from the underlying stream?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably makes sense to add a MissedTickBehavior argument.

The infrequent polling problem has another edge case: if set to duration x, and the underlying stream doesn't produce a value until x + t1, but the stream is not polled until x + t2, then when polled would it be expected to produce a timeout or the value from the underlying stream?

I'm not sure. What do you think? Maybe it should depend on the MissedTickBehavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my use case, it isn't sensitive to these kinds of timing or ordering issues, but of course I would like this to be as generally useful as possible.

Returning a timeout when there is an actual value waiting to be returned feels a little pedantic but would arguably be the most correct behavior if not the most useful. I lean towards favoring the actual value, or making this a configurable behavior.

Avoid stretching the time between timeouts if the stream is not polled
regularly.
To give the caller control over the MissedTickBehavior of the Interval
used internally, take an Interval argument instead of a Duration and
MissedTickBehavior argument. This simplifies usage for users that do not
care about which MissedTickBehavior is used.
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will need to merge in master to fix the CI errors.

Comment on lines 977 to 986
/// // Once a timeout error is received, no further events will be received
/// // unless the wrapped stream yields a value (timeouts do not repeat).
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
/// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
/// tokio::pin!(timeout_stream);
///
/// // Only one timeout will be received between values in the source stream.
/// assert!(timeout_stream.try_next().await.is_ok());
/// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
/// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this a separate example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put the new code into a new section, and now the original code in timeout is unmodified. I think this is what you were going for, let me know if I misunderstood. Thanks!

Split out the repeating vs. non-repeating example as a separate code unit.
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks good to me.

@Darksonn Darksonn merged commit e789b61 into tokio-rs:master Apr 24, 2023
52 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-stream Area: The tokio-stream crate M-time Module: tokio/time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants