forked from tokio-rs/tokio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
timeout.rs
136 lines (116 loc) · 3.74 KB
/
timeout.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use crate::stream_ext::Fuse;
use crate::Stream;
use tokio::time::{Instant, Sleep};
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
use std::fmt;
use std::time::Duration;
/// Behavior of timeouts.
#[derive(Debug)]
pub(super) enum TimeoutMode {
/// A timeout will only fire once.
Once,
/// A timeout will fire repeatedly.
Repeating,
}
pin_project! {
/// Stream returned by the [`timeout`](super::StreamExt::timeout) method.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Timeout<S> {
#[pin]
stream: Fuse<S>,
#[pin]
deadline: Sleep,
duration: Duration,
poll_deadline: bool,
timeout_mode: TimeoutMode,
}
}
/// Error returned by `Timeout`.
#[derive(Debug, PartialEq, Eq)]
pub struct Elapsed(());
impl<S: Stream> Timeout<S> {
pub(super) fn new(stream: S, duration: Duration, timeout_mode: TimeoutMode) -> Self {
let next = Instant::now() + duration;
let deadline = tokio::time::sleep_until(next);
Timeout {
stream: Fuse::new(stream),
deadline,
duration,
poll_deadline: true,
timeout_mode,
}
}
}
impl<S: Stream> Stream for Timeout<S> {
type Item = Result<S::Item, Elapsed>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();
match me.stream.poll_next(cx) {
Poll::Ready(v) => {
if v.is_some() {
let next = Instant::now() + *me.duration;
me.deadline.reset(next);
*me.poll_deadline = true;
}
return Poll::Ready(v.map(Ok));
}
Poll::Pending => {}
};
match me.timeout_mode {
TimeoutMode::Once => {
if *me.poll_deadline {
ready!(me.deadline.poll(cx));
*me.poll_deadline = false;
return Poll::Ready(Some(Err(Elapsed::new())));
}
Poll::Pending
}
TimeoutMode::Repeating => {
ready!(me.deadline.as_mut().poll(cx));
let next = Instant::now() + *me.duration;
me.deadline.reset(next);
Poll::Ready(Some(Err(Elapsed::new())))
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, upper) = self.stream.size_hint();
match self.timeout_mode {
TimeoutMode::Once => {
// The timeout stream may insert an error before and after each message
// from the underlying stream, but no more than one error between each
// message. Hence the upper bound is computed as 2x+1.
// Using a helper function to enable use of question mark operator.
fn twice_plus_one(value: Option<usize>) -> Option<usize> {
value?.checked_mul(2)?.checked_add(1)
}
(lower, twice_plus_one(upper))
}
TimeoutMode::Repeating => {
// The timeout stream may insert an error an infinite number of times.
(lower, None)
}
}
}
}
// ===== impl Elapsed =====
impl Elapsed {
pub(crate) fn new() -> Self {
Elapsed(())
}
}
impl fmt::Display for Elapsed {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
"deadline has elapsed".fmt(fmt)
}
}
impl std::error::Error for Elapsed {}
impl From<Elapsed> for std::io::Error {
fn from(_err: Elapsed) -> std::io::Error {
std::io::ErrorKind::TimedOut.into()
}
}