Skip to content

Commit 75aac9f

Browse files
noxseanmonstar
authored andcommittedOct 31, 2022
fix(client): send an error back to client when dispatch misbehaves (fixes #2649)
1 parent b96eb16 commit 75aac9f

File tree

3 files changed

+94
-15
lines changed

3 files changed

+94
-15
lines changed
 

‎src/client/dispatch.rs

+41-15
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl<T, U> Sender<T, U> {
9090
}
9191
let (tx, rx) = oneshot::channel();
9292
self.inner
93-
.send(Envelope(Some((val, Callback::Retry(tx)))))
93+
.send(Envelope(Some((val, Callback::Retry(Some(tx))))))
9494
.map(move |_| rx)
9595
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
9696
}
@@ -101,7 +101,7 @@ impl<T, U> Sender<T, U> {
101101
}
102102
let (tx, rx) = oneshot::channel();
103103
self.inner
104-
.send(Envelope(Some((val, Callback::NoRetry(tx)))))
104+
.send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
105105
.map(move |_| rx)
106106
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
107107
}
@@ -131,15 +131,15 @@ impl<T, U> UnboundedSender<T, U> {
131131
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
132132
let (tx, rx) = oneshot::channel();
133133
self.inner
134-
.send(Envelope(Some((val, Callback::Retry(tx)))))
134+
.send(Envelope(Some((val, Callback::Retry(Some(tx))))))
135135
.map(move |_| rx)
136136
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
137137
}
138138

139139
pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
140140
let (tx, rx) = oneshot::channel();
141141
self.inner
142-
.send(Envelope(Some((val, Callback::NoRetry(tx)))))
142+
.send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
143143
.map(move |_| rx)
144144
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
145145
}
@@ -215,33 +215,59 @@ impl<T, U> Drop for Envelope<T, U> {
215215

216216
pub(crate) enum Callback<T, U> {
217217
#[allow(unused)]
218-
Retry(oneshot::Sender<Result<U, (crate::Error, Option<T>)>>),
219-
NoRetry(oneshot::Sender<Result<U, crate::Error>>),
218+
Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>),
219+
NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
220+
}
221+
222+
impl<T, U> Drop for Callback<T, U> {
223+
fn drop(&mut self) {
224+
// FIXME(nox): What errors do we want here?
225+
let error = crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() {
226+
"user code panicked"
227+
} else {
228+
"runtime dropped the dispatch task"
229+
});
230+
231+
match self {
232+
Callback::Retry(tx) => {
233+
if let Some(tx) = tx.take() {
234+
let _ = tx.send(Err((error, None)));
235+
}
236+
}
237+
Callback::NoRetry(tx) => {
238+
if let Some(tx) = tx.take() {
239+
let _ = tx.send(Err(error));
240+
}
241+
}
242+
}
243+
}
220244
}
221245

222246
impl<T, U> Callback<T, U> {
223247
#[cfg(feature = "http2")]
224248
pub(crate) fn is_canceled(&self) -> bool {
225249
match *self {
226-
Callback::Retry(ref tx) => tx.is_closed(),
227-
Callback::NoRetry(ref tx) => tx.is_closed(),
250+
Callback::Retry(Some(ref tx)) => tx.is_closed(),
251+
Callback::NoRetry(Some(ref tx)) => tx.is_closed(),
252+
_ => unreachable!(),
228253
}
229254
}
230255

231256
pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> {
232257
match *self {
233-
Callback::Retry(ref mut tx) => tx.poll_closed(cx),
234-
Callback::NoRetry(ref mut tx) => tx.poll_closed(cx),
258+
Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
259+
Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
260+
_ => unreachable!(),
235261
}
236262
}
237263

238-
pub(crate) fn send(self, val: Result<U, (crate::Error, Option<T>)>) {
264+
pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) {
239265
match self {
240-
Callback::Retry(tx) => {
241-
let _ = tx.send(val);
266+
Callback::Retry(ref mut tx) => {
267+
let _ = tx.take().unwrap().send(val);
242268
}
243-
Callback::NoRetry(tx) => {
244-
let _ = tx.send(val.map_err(|e| e.0));
269+
Callback::NoRetry(ref mut tx) => {
270+
let _ = tx.take().unwrap().send(val.map_err(|e| e.0));
245271
}
246272
}
247273
}

‎src/error.rs

+11
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ pub(super) enum User {
110110
#[cfg(feature = "http1")]
111111
ManualUpgrade,
112112

113+
/// The dispatch task is gone.
114+
#[cfg(feature = "client")]
115+
DispatchGone,
116+
113117
/// User aborted in an FFI callback.
114118
#[cfg(feature = "ffi")]
115119
AbortedByCallback,
@@ -314,6 +318,11 @@ impl Error {
314318
Error::new_user(User::AbortedByCallback)
315319
}
316320

321+
#[cfg(feature = "client")]
322+
pub(super) fn new_user_dispatch_gone() -> Error {
323+
Error::new(Kind::User(User::DispatchGone))
324+
}
325+
317326
#[cfg(feature = "http2")]
318327
pub(super) fn new_h2(cause: ::h2::Error) -> Error {
319328
if cause.is_io() {
@@ -390,6 +399,8 @@ impl Error {
390399
Kind::User(User::NoUpgrade) => "no upgrade available",
391400
#[cfg(feature = "http1")]
392401
Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use",
402+
#[cfg(feature = "client")]
403+
Kind::User(User::DispatchGone) => "dispatch task is gone",
393404
#[cfg(feature = "ffi")]
394405
Kind::User(User::AbortedByCallback) => "operation aborted by an application callback",
395406
}

‎tests/client.rs

+42
Original file line numberDiff line numberDiff line change
@@ -2267,6 +2267,48 @@ mod conn {
22672267
done_tx.send(()).unwrap();
22682268
}
22692269

2270+
#[tokio::test]
2271+
async fn test_body_panics() {
2272+
let _ = pretty_env_logger::try_init();
2273+
2274+
let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
2275+
.await
2276+
.unwrap();
2277+
let addr = listener.local_addr().unwrap();
2278+
2279+
// spawn a server that reads but doesn't write
2280+
tokio::spawn(async move {
2281+
let sock = listener.accept().await.unwrap().0;
2282+
drain_til_eof(sock).await.expect("server read");
2283+
});
2284+
2285+
let io = tcp_connect(&addr).await.expect("tcp connect");
2286+
2287+
let (mut client, conn) = conn::http1::Builder::new()
2288+
.handshake(io)
2289+
.await
2290+
.expect("handshake");
2291+
2292+
tokio::spawn(async move {
2293+
conn.await.expect("client conn shouldn't error");
2294+
});
2295+
2296+
let req = Request::post("/a")
2297+
.body(http_body_util::BodyExt::map_frame::<_, bytes::Bytes>(
2298+
http_body_util::Full::<bytes::Bytes>::from("baguette"),
2299+
|_| panic!("oopsie"),
2300+
))
2301+
.unwrap();
2302+
2303+
let error = client.send_request(req).await.unwrap_err();
2304+
2305+
assert!(error.is_user());
2306+
assert_eq!(
2307+
error.to_string(),
2308+
"dispatch task is gone: user code panicked"
2309+
);
2310+
}
2311+
22702312
async fn drain_til_eof<T: AsyncRead + Unpin>(mut sock: T) -> io::Result<()> {
22712313
let mut buf = [0u8; 1024];
22722314
loop {

0 commit comments

Comments
 (0)
Please sign in to comment.