Skip to content

Commit fece9f7

Browse files
authoredAug 21, 2023
fix(client): avoid double-polling a Select future (#3290)
This reworks http2 client connection task in order to avoid storing a `Select` future. Under some scheduling cases the future was polled multiple times, resulting in runtime panics: ``` thread 'main' panicked at 'cannot poll Select twice', /index.crates.io/futures-util-0.3.28/src/future/select.rs:112:42 stack backtrace: [...] 5: <futures_util::future::select::Select<A,B> as core::future::future::Future>::poll 6: <hyper::proto::h2::client::H2ClientFuture<B,T> as core::future::future::Future>::poll [...] ``` Closes #3289
1 parent e68dc96 commit fece9f7

File tree

1 file changed

+49
-28
lines changed

1 file changed

+49
-28
lines changed
 

‎src/proto/h2/client.rs

+49-28
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::rt::{Read, Write};
66
use bytes::Bytes;
77
use futures_channel::mpsc::{Receiver, Sender};
88
use futures_channel::{mpsc, oneshot};
9-
use futures_util::future::{self, Either, FutureExt as _, Select};
9+
use futures_util::future::{Either, FusedFuture, FutureExt as _};
1010
use futures_util::stream::{StreamExt as _, StreamFuture};
1111
use h2::client::{Builder, Connection, SendRequest};
1212
use h2::SendStream;
@@ -143,7 +143,10 @@ where
143143
} else {
144144
(Either::Right(conn), ping::disabled())
145145
};
146-
let conn: ConnMapErr<T, B> = ConnMapErr { conn };
146+
let conn: ConnMapErr<T, B> = ConnMapErr {
147+
conn,
148+
is_terminated: false,
149+
};
147150

148151
exec.execute_h2_future(H2ClientFuture::Task {
149152
task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
@@ -218,6 +221,8 @@ pin_project! {
218221
{
219222
#[pin]
220223
conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
224+
#[pin]
225+
is_terminated: bool,
221226
}
222227
}
223228

@@ -229,10 +234,26 @@ where
229234
type Output = Result<(), ()>;
230235

231236
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
232-
self.project()
233-
.conn
234-
.poll(cx)
235-
.map_err(|e| debug!("connection error: {}", e))
237+
let mut this = self.project();
238+
239+
if *this.is_terminated {
240+
return Poll::Pending;
241+
}
242+
let polled = this.conn.poll(cx);
243+
if polled.is_ready() {
244+
*this.is_terminated = true;
245+
}
246+
polled.map_err(|e| debug!("connection error: {}", e))
247+
}
248+
}
249+
250+
impl<T, B> FusedFuture for ConnMapErr<T, B>
251+
where
252+
B: Body,
253+
T: Read + Write + Unpin,
254+
{
255+
fn is_terminated(&self) -> bool {
256+
self.is_terminated
236257
}
237258
}
238259

@@ -245,10 +266,11 @@ pin_project! {
245266
T: Unpin,
246267
{
247268
#[pin]
248-
select: Select<ConnMapErr<T, B>, StreamFuture<Receiver<Never>>>,
269+
drop_rx: StreamFuture<Receiver<Never>>,
249270
#[pin]
250271
cancel_tx: Option<oneshot::Sender<Never>>,
251-
conn: Option<ConnMapErr<T, B>>,
272+
#[pin]
273+
conn: ConnMapErr<T, B>,
252274
}
253275
}
254276

@@ -263,9 +285,9 @@ where
263285
cancel_tx: oneshot::Sender<Never>,
264286
) -> Self {
265287
Self {
266-
select: future::select(conn, drop_rx),
288+
drop_rx,
267289
cancel_tx: Some(cancel_tx),
268-
conn: None,
290+
conn,
269291
}
270292
}
271293
}
@@ -280,25 +302,24 @@ where
280302
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
281303
let mut this = self.project();
282304

283-
if let Some(conn) = this.conn {
284-
conn.poll_unpin(cx).map(|_| ())
285-
} else {
286-
match ready!(this.select.poll_unpin(cx)) {
287-
Either::Left((_, _)) => {
288-
// ok or err, the `conn` has finished
289-
return Poll::Ready(());
290-
}
291-
Either::Right((_, b)) => {
292-
// mpsc has been dropped, hopefully polling
293-
// the connection some more should start shutdown
294-
// and then close
295-
trace!("send_request dropped, starting conn shutdown");
296-
drop(this.cancel_tx.take().expect("Future polled twice"));
297-
this.conn = &mut Some(b);
298-
return Poll::Pending;
299-
}
300-
}
305+
if !this.conn.is_terminated() {
306+
if let Poll::Ready(_) = this.conn.poll_unpin(cx) {
307+
// ok or err, the `conn` has finished.
308+
return Poll::Ready(());
309+
};
301310
}
311+
312+
if !this.drop_rx.is_terminated() {
313+
if let Poll::Ready(_) = this.drop_rx.poll_unpin(cx) {
314+
// mpsc has been dropped, hopefully polling
315+
// the connection some more should start shutdown
316+
// and then close.
317+
trace!("send_request dropped, starting conn shutdown");
318+
drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
319+
}
320+
};
321+
322+
Poll::Pending
302323
}
303324
}
304325

0 commit comments

Comments
 (0)
Please sign in to comment.