Skip to content

Commit 4ffaad5

Browse files
authoredJul 1, 2024··
feat(client): add SendRequest::try_send_request() method (#3691)
This method returns a `TrySendError` type, which allows for returning the request back to the caller if an error occured between queuing and trying to write the request. This method is added for both `http1` and `http2`.
1 parent 56c3cd5 commit 4ffaad5

File tree

7 files changed

+214
-74
lines changed

7 files changed

+214
-74
lines changed
 

‎src/client/conn/http1.rs

+28-23
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use futures_util::ready;
1212
use http::{Request, Response};
1313
use httparse::ParserConfig;
1414

15-
use super::super::dispatch;
15+
use super::super::dispatch::{self, TrySendError};
1616
use crate::body::{Body, Incoming as IncomingBody};
1717
use crate::proto;
1818

@@ -208,33 +208,38 @@ where
208208
}
209209
}
210210

211-
/*
212-
pub(super) fn send_request_retryable(
211+
/// Sends a `Request` on the associated connection.
212+
///
213+
/// Returns a future that if successful, yields the `Response`.
214+
///
215+
/// # Error
216+
///
217+
/// If there was an error before trying to serialize the request to the
218+
/// connection, the message will be returned as part of this error.
219+
pub fn try_send_request(
213220
&mut self,
214221
req: Request<B>,
215-
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
216-
where
217-
B: Send,
218-
{
219-
match self.dispatch.try_send(req) {
220-
Ok(rx) => {
221-
Either::Left(rx.then(move |res| {
222-
match res {
223-
Ok(Ok(res)) => future::ok(res),
224-
Ok(Err(err)) => future::err(err),
225-
// this is definite bug if it happens, but it shouldn't happen!
226-
Err(_) => panic!("dispatch dropped without returning error"),
227-
}
228-
}))
229-
}
230-
Err(req) => {
231-
debug!("connection was not ready");
232-
let err = crate::Error::new_canceled().with("connection was not ready");
233-
Either::Right(future::err((err, Some(req))))
222+
) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
223+
let sent = self.dispatch.try_send(req);
224+
async move {
225+
match sent {
226+
Ok(rx) => match rx.await {
227+
Ok(Ok(res)) => Ok(res),
228+
Ok(Err(err)) => Err(err),
229+
// this is definite bug if it happens, but it shouldn't happen!
230+
Err(_) => panic!("dispatch dropped without returning error"),
231+
},
232+
Err(req) => {
233+
debug!("connection was not ready");
234+
let error = crate::Error::new_canceled().with("connection was not ready");
235+
Err(TrySendError {
236+
error,
237+
message: Some(req),
238+
})
239+
}
234240
}
235241
}
236242
}
237-
*/
238243
}
239244

240245
impl<B> fmt::Debug for SendRequest<B> {

‎src/client/conn/http2.rs

+28-23
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::rt::{Read, Write};
1313
use futures_util::ready;
1414
use http::{Request, Response};
1515

16-
use super::super::dispatch;
16+
use super::super::dispatch::{self, TrySendError};
1717
use crate::body::{Body, Incoming as IncomingBody};
1818
use crate::common::time::Time;
1919
use crate::proto;
@@ -152,33 +152,38 @@ where
152152
}
153153
}
154154

155-
/*
156-
pub(super) fn send_request_retryable(
155+
/// Sends a `Request` on the associated connection.
156+
///
157+
/// Returns a future that if successful, yields the `Response`.
158+
///
159+
/// # Error
160+
///
161+
/// If there was an error before trying to serialize the request to the
162+
/// connection, the message will be returned as part of this error.
163+
pub fn try_send_request(
157164
&mut self,
158165
req: Request<B>,
159-
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
160-
where
161-
B: Send,
162-
{
163-
match self.dispatch.try_send(req) {
164-
Ok(rx) => {
165-
Either::Left(rx.then(move |res| {
166-
match res {
167-
Ok(Ok(res)) => future::ok(res),
168-
Ok(Err(err)) => future::err(err),
169-
// this is definite bug if it happens, but it shouldn't happen!
170-
Err(_) => panic!("dispatch dropped without returning error"),
171-
}
172-
}))
173-
}
174-
Err(req) => {
175-
debug!("connection was not ready");
176-
let err = crate::Error::new_canceled().with("connection was not ready");
177-
Either::Right(future::err((err, Some(req))))
166+
) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
167+
let sent = self.dispatch.try_send(req);
168+
async move {
169+
match sent {
170+
Ok(rx) => match rx.await {
171+
Ok(Ok(res)) => Ok(res),
172+
Ok(Err(err)) => Err(err),
173+
// this is definite bug if it happens, but it shouldn't happen!
174+
Err(_) => panic!("dispatch dropped without returning error"),
175+
},
176+
Err(req) => {
177+
debug!("connection was not ready");
178+
let error = crate::Error::new_canceled().with("connection was not ready");
179+
Err(TrySendError {
180+
error,
181+
message: Some(req),
182+
})
183+
}
178184
}
179185
}
180186
}
181-
*/
182187
}
183188

184189
impl<B> fmt::Debug for SendRequest<B> {

‎src/client/conn/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,5 @@
1818
pub mod http1;
1919
#[cfg(feature = "http2")]
2020
pub mod http2;
21+
22+
pub use super::dispatch::TrySendError;

‎src/client/dispatch.rs

+45-16
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,21 @@ use tokio::sync::{mpsc, oneshot};
1313
#[cfg(feature = "http2")]
1414
use crate::{body::Incoming, proto::h2::client::ResponseFutMap};
1515

16-
#[cfg(test)]
17-
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
16+
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
1817
pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
1918

19+
/// An error when calling `try_send_request`.
20+
///
21+
/// There is a possibility of an error occuring on a connection in-between the
22+
/// time that a request is queued and when it is actually written to the IO
23+
/// transport. If that happens, it is safe to return the request back to the
24+
/// caller, as it was never fully sent.
25+
#[derive(Debug)]
26+
pub struct TrySendError<T> {
27+
pub(crate) error: crate::Error,
28+
pub(crate) message: Option<T>,
29+
}
30+
2031
pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
2132
let (tx, rx) = mpsc::unbounded_channel();
2233
let (giver, taker) = want::new();
@@ -92,7 +103,7 @@ impl<T, U> Sender<T, U> {
92103
}
93104
}
94105

95-
#[cfg(test)]
106+
#[cfg(feature = "http1")]
96107
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
97108
if !self.can_send() {
98109
return Err(val);
@@ -135,7 +146,6 @@ impl<T, U> UnboundedSender<T, U> {
135146
self.giver.is_canceled()
136147
}
137148

138-
#[cfg(test)]
139149
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
140150
let (tx, rx) = oneshot::channel();
141151
self.inner
@@ -210,17 +220,17 @@ struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
210220
impl<T, U> Drop for Envelope<T, U> {
211221
fn drop(&mut self) {
212222
if let Some((val, cb)) = self.0.take() {
213-
cb.send(Err((
214-
crate::Error::new_canceled().with("connection closed"),
215-
Some(val),
216-
)));
223+
cb.send(Err(TrySendError {
224+
error: crate::Error::new_canceled().with("connection closed"),
225+
message: Some(val),
226+
}));
217227
}
218228
}
219229
}
220230

221231
pub(crate) enum Callback<T, U> {
222232
#[allow(unused)]
223-
Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>),
233+
Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>),
224234
NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
225235
}
226236

@@ -229,7 +239,10 @@ impl<T, U> Drop for Callback<T, U> {
229239
match self {
230240
Callback::Retry(tx) => {
231241
if let Some(tx) = tx.take() {
232-
let _ = tx.send(Err((dispatch_gone(), None)));
242+
let _ = tx.send(Err(TrySendError {
243+
error: dispatch_gone(),
244+
message: None,
245+
}));
233246
}
234247
}
235248
Callback::NoRetry(tx) => {
@@ -269,18 +282,34 @@ impl<T, U> Callback<T, U> {
269282
}
270283
}
271284

272-
pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) {
285+
pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
273286
match self {
274287
Callback::Retry(ref mut tx) => {
275288
let _ = tx.take().unwrap().send(val);
276289
}
277290
Callback::NoRetry(ref mut tx) => {
278-
let _ = tx.take().unwrap().send(val.map_err(|e| e.0));
291+
let _ = tx.take().unwrap().send(val.map_err(|e| e.error));
279292
}
280293
}
281294
}
282295
}
283296

297+
impl<T> TrySendError<T> {
298+
/// Take the message from this error.
299+
///
300+
/// The message will not always have been recovered. If an error occurs
301+
/// after the message has been serialized onto the connection, it will not
302+
/// be available here.
303+
pub fn take_message(&mut self) -> Option<T> {
304+
self.message.take()
305+
}
306+
307+
/// Consumes this to return the inner error.
308+
pub fn into_error(self) -> crate::Error {
309+
self.error
310+
}
311+
}
312+
284313
#[cfg(feature = "http2")]
285314
pin_project! {
286315
pub struct SendWhen<B>
@@ -325,8 +354,8 @@ where
325354
trace!("send_when canceled");
326355
Poll::Ready(())
327356
}
328-
Poll::Ready(Err(err)) => {
329-
call_back.send(Err(err));
357+
Poll::Ready(Err((error, message))) => {
358+
call_back.send(Err(TrySendError { error, message }));
330359
Poll::Ready(())
331360
}
332361
}
@@ -389,8 +418,8 @@ mod tests {
389418
let err = fulfilled
390419
.expect("fulfilled")
391420
.expect_err("promise should error");
392-
match (err.0.kind(), err.1) {
393-
(&crate::error::Kind::Canceled, Some(_)) => (),
421+
match (err.error.is_canceled(), err.message) {
422+
(true, Some(_)) => (),
394423
e => panic!("expected Error::Cancel(_), found {:?}", e),
395424
}
396425
}

‎src/proto/h1/dispatch.rs

+13-5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use http::Request;
1313

1414
use super::{Http1Transaction, Wants};
1515
use crate::body::{Body, DecodedLength, Incoming as IncomingBody};
16+
#[cfg(feature = "client")]
17+
use crate::client::dispatch::TrySendError;
1618
use crate::common::task;
1719
use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
1820
use crate::upgrade::OnUpgrade;
@@ -655,15 +657,21 @@ cfg_client! {
655657
}
656658
Err(err) => {
657659
if let Some(cb) = self.callback.take() {
658-
cb.send(Err((err, None)));
660+
cb.send(Err(TrySendError {
661+
error: err,
662+
message: None,
663+
}));
659664
Ok(())
660665
} else if !self.rx_closed {
661666
self.rx.close();
662667
if let Some((req, cb)) = self.rx.try_recv() {
663668
trace!("canceling queued request with connection error: {}", err);
664669
// in this case, the message was never even started, so it's safe to tell
665670
// the user that the request was completely canceled
666-
cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
671+
cb.send(Err(TrySendError {
672+
error: crate::Error::new_canceled().with(err),
673+
message: Some(req),
674+
}));
667675
Ok(())
668676
} else {
669677
Err(err)
@@ -729,9 +737,9 @@ mod tests {
729737
let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
730738
.expect_err("callback should send error");
731739

732-
match (err.0.kind(), err.1) {
733-
(&crate::error::Kind::Canceled, Some(_)) => (),
734-
other => panic!("expected Canceled, got {:?}", other),
740+
match (err.error.is_canceled(), err.message.as_ref()) {
741+
(true, Some(_)) => (),
742+
_ => panic!("expected Canceled, got {:?}", err),
735743
}
736744
});
737745
}

‎src/proto/h2/client.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use pin_project_lite::pin_project;
2222
use super::ping::{Ponger, Recorder};
2323
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
2424
use crate::body::{Body, Incoming as IncomingBody};
25-
use crate::client::dispatch::{Callback, SendWhen};
25+
use crate::client::dispatch::{Callback, SendWhen, TrySendError};
2626
use crate::common::io::Compat;
2727
use crate::common::time::Time;
2828
use crate::ext::Protocol;
@@ -662,10 +662,10 @@ where
662662
.map_or(false, |len| len != 0)
663663
{
664664
warn!("h2 connect request with non-zero body not supported");
665-
cb.send(Err((
666-
crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
667-
None,
668-
)));
665+
cb.send(Err(TrySendError {
666+
error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
667+
message: None,
668+
}));
669669
continue;
670670
}
671671

@@ -677,7 +677,10 @@ where
677677
Ok(ok) => ok,
678678
Err(err) => {
679679
debug!("client send request error: {}", err);
680-
cb.send(Err((crate::Error::new_h2(err), None)));
680+
cb.send(Err(TrySendError {
681+
error: crate::Error::new_h2(err),
682+
message: None,
683+
}));
681684
continue;
682685
}
683686
};
@@ -702,7 +705,10 @@ where
702705
}
703706
Poll::Ready(Ok(())) => (),
704707
Poll::Ready(Err(err)) => {
705-
f.cb.send(Err((crate::Error::new_h2(err), None)));
708+
f.cb.send(Err(TrySendError {
709+
error: crate::Error::new_h2(err),
710+
message: None,
711+
}));
706712
continue;
707713
}
708714
}

‎tests/client.rs

+85
Original file line numberDiff line numberDiff line change
@@ -2041,6 +2041,91 @@ mod conn {
20412041
assert_eq!(vec, b"bar=foo");
20422042
}
20432043

2044+
#[tokio::test]
2045+
async fn test_try_send_request() {
2046+
use std::future::Future;
2047+
let (listener, addr) = setup_tk_test_server().await;
2048+
let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>();
2049+
2050+
tokio::spawn(async move {
2051+
let mut sock = listener.accept().await.unwrap().0;
2052+
let mut buf = [0u8; 8192];
2053+
sock.read(&mut buf).await.expect("read 1");
2054+
sock.write_all(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
2055+
.await
2056+
.expect("write 1");
2057+
let _ = done_rx.await;
2058+
});
2059+
2060+
// make polling fair by putting both in spawns
2061+
tokio::spawn(async move {
2062+
let io = tcp_connect(&addr).await.expect("tcp connect");
2063+
let (mut client, mut conn) = conn::http1::Builder::new()
2064+
.handshake::<_, Empty<Bytes>>(io)
2065+
.await
2066+
.expect("http handshake");
2067+
2068+
// get the conn ready
2069+
assert!(
2070+
future::poll_fn(|cx| Poll::Ready(Pin::new(&mut conn).poll(cx)))
2071+
.await
2072+
.is_pending()
2073+
);
2074+
assert!(client.is_ready());
2075+
2076+
// use the connection once
2077+
let mut fut1 = std::pin::pin!(client.send_request(http::Request::new(Empty::new())));
2078+
let _res1 = future::poll_fn(|cx| loop {
2079+
if let Poll::Ready(res) = fut1.as_mut().poll(cx) {
2080+
return Poll::Ready(res);
2081+
}
2082+
return match Pin::new(&mut conn).poll(cx) {
2083+
Poll::Ready(_) => panic!("ruh roh"),
2084+
Poll::Pending => Poll::Pending,
2085+
};
2086+
})
2087+
.await
2088+
.expect("resp 1");
2089+
2090+
assert!(client.is_ready());
2091+
2092+
// simulate the server dropping the conn
2093+
let _ = done_tx.send(());
2094+
// let the server task die
2095+
tokio::task::yield_now().await;
2096+
2097+
let mut fut2 =
2098+
std::pin::pin!(client.try_send_request(http::Request::new(Empty::new())));
2099+
let poll1 = future::poll_fn(|cx| Poll::Ready(fut2.as_mut().poll(cx))).await;
2100+
assert!(poll1.is_pending(), "not already known to error");
2101+
2102+
let mut conn_opt = Some(conn);
2103+
// wasn't a known error, req is in queue, and now the next poll, the
2104+
// conn will be noticed as errored
2105+
let mut err = future::poll_fn(|cx| {
2106+
loop {
2107+
if let Poll::Ready(res) = fut2.as_mut().poll(cx) {
2108+
return Poll::Ready(res);
2109+
}
2110+
if let Some(ref mut conn) = conn_opt {
2111+
match Pin::new(conn).poll(cx) {
2112+
Poll::Ready(_) => {
2113+
conn_opt = None;
2114+
} // ok
2115+
Poll::Pending => return Poll::Pending,
2116+
};
2117+
}
2118+
}
2119+
})
2120+
.await
2121+
.expect_err("resp 2");
2122+
2123+
assert!(err.take_message().is_some(), "request was returned");
2124+
})
2125+
.await
2126+
.unwrap();
2127+
}
2128+
20442129
#[tokio::test]
20452130
async fn http2_detect_conn_eof() {
20462131
use futures_util::future;

0 commit comments

Comments
 (0)
Please sign in to comment.