Skip to content

Commit 509672a

Browse files
authoredJul 20, 2022
feat(client): introduce version-specific client modules (#2906)
This creates `client::conn::http1` and `client::conn::http2` modules, both with specific `SendRequest`, `Connection`, and `Builder` types.
1 parent 09e3566 commit 509672a

File tree

4 files changed

+942
-1
lines changed

4 files changed

+942
-1
lines changed
 

‎src/client/conn/http1.rs

+504
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,504 @@
1+
//! HTTP/1 client connections
2+
3+
use std::error::Error as StdError;
4+
use std::fmt;
5+
use std::sync::Arc;
6+
7+
use http::{Request, Response};
8+
use httparse::ParserConfig;
9+
use tokio::io::{AsyncRead, AsyncWrite};
10+
11+
use crate::Body;
12+
use crate::body::HttpBody;
13+
use crate::common::{
14+
exec::{BoxSendFuture, Exec},
15+
task, Future, Pin, Poll,
16+
};
17+
use crate::upgrade::Upgraded;
18+
use crate::proto;
19+
use crate::rt::Executor;
20+
use super::super::dispatch;
21+
22+
type Dispatcher<T, B> =
23+
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
24+
25+
/// The sender side of an established connection.
26+
pub struct SendRequest<B> {
27+
dispatch: dispatch::Sender<Request<B>, Response<Body>>,
28+
}
29+
30+
/// A future that processes all HTTP state for the IO object.
31+
///
32+
/// In most cases, this should just be spawned into an executor, so that it
33+
/// can process incoming and outgoing messages, notice hangups, and the like.
34+
#[must_use = "futures do nothing unless polled"]
35+
pub struct Connection<T, B>
36+
where
37+
T: AsyncRead + AsyncWrite + Send + 'static,
38+
B: HttpBody + 'static,
39+
{
40+
inner: Option<Dispatcher<T, B>>,
41+
}
42+
43+
/// A builder to configure an HTTP connection.
44+
///
45+
/// After setting options, the builder is used to create a handshake future.
46+
#[derive(Clone, Debug)]
47+
pub struct Builder {
48+
pub(super) exec: Exec,
49+
h09_responses: bool,
50+
h1_parser_config: ParserConfig,
51+
h1_writev: Option<bool>,
52+
h1_title_case_headers: bool,
53+
h1_preserve_header_case: bool,
54+
#[cfg(feature = "ffi")]
55+
h1_preserve_header_order: bool,
56+
h1_read_buf_exact_size: Option<usize>,
57+
h1_max_buf_size: Option<usize>,
58+
}
59+
60+
/// Returns a handshake future over some IO.
61+
///
62+
/// This is a shortcut for `Builder::new().handshake(io)`.
63+
/// See [`client::conn`](crate::client::conn) for more.
64+
pub async fn handshake<T>(
65+
io: T,
66+
) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
67+
where
68+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
69+
{
70+
Builder::new().handshake(io).await
71+
}
72+
73+
// ===== impl SendRequest
74+
75+
impl<B> SendRequest<B> {
76+
/// Polls to determine whether this sender can be used yet for a request.
77+
///
78+
/// If the associated connection is closed, this returns an Error.
79+
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
80+
self.dispatch.poll_ready(cx)
81+
}
82+
83+
/*
84+
pub(super) async fn when_ready(self) -> crate::Result<Self> {
85+
let mut me = Some(self);
86+
future::poll_fn(move |cx| {
87+
ready!(me.as_mut().unwrap().poll_ready(cx))?;
88+
Poll::Ready(Ok(me.take().unwrap()))
89+
})
90+
.await
91+
}
92+
93+
pub(super) fn is_ready(&self) -> bool {
94+
self.dispatch.is_ready()
95+
}
96+
97+
pub(super) fn is_closed(&self) -> bool {
98+
self.dispatch.is_closed()
99+
}
100+
*/
101+
}
102+
103+
impl<B> SendRequest<B>
104+
where
105+
B: HttpBody + 'static,
106+
{
107+
/// Sends a `Request` on the associated connection.
108+
///
109+
/// Returns a future that if successful, yields the `Response`.
110+
///
111+
/// # Note
112+
///
113+
/// There are some key differences in what automatic things the `Client`
114+
/// does for you that will not be done here:
115+
///
116+
/// - `Client` requires absolute-form `Uri`s, since the scheme and
117+
/// authority are needed to connect. They aren't required here.
118+
/// - Since the `Client` requires absolute-form `Uri`s, it can add
119+
/// the `Host` header based on it. You must add a `Host` header yourself
120+
/// before calling this method.
121+
/// - Since absolute-form `Uri`s are not required, if received, they will
122+
/// be serialized as-is.
123+
///
124+
/// # Example
125+
///
126+
/// ```
127+
/// # use http::header::HOST;
128+
/// # use hyper::client::conn::SendRequest;
129+
/// # use hyper::Body;
130+
/// use hyper::Request;
131+
///
132+
/// # async fn doc(mut tx: SendRequest<Body>) -> hyper::Result<()> {
133+
/// // build a Request
134+
/// let req = Request::builder()
135+
/// .uri("/foo/bar")
136+
/// .header(HOST, "hyper.rs")
137+
/// .body(Body::empty())
138+
/// .unwrap();
139+
///
140+
/// // send it and await a Response
141+
/// let res = tx.send_request(req).await?;
142+
/// // assert the Response
143+
/// assert!(res.status().is_success());
144+
/// # Ok(())
145+
/// # }
146+
/// # fn main() {}
147+
/// ```
148+
pub fn send_request(&mut self, req: Request<B>) -> impl Future<Output = crate::Result<Response<Body>>> {
149+
let sent = self.dispatch.send(req);
150+
151+
async move {
152+
match sent {
153+
Ok(rx) => match rx.await {
154+
Ok(Ok(resp)) => Ok(resp),
155+
Ok(Err(err)) => Err(err),
156+
// this is definite bug if it happens, but it shouldn't happen!
157+
Err(_canceled) => panic!("dispatch dropped without returning error"),
158+
}
159+
Err(_req) => {
160+
tracing::debug!("connection was not ready");
161+
162+
Err(crate::Error::new_canceled().with("connection was not ready"))
163+
}
164+
}
165+
}
166+
}
167+
168+
/*
169+
pub(super) fn send_request_retryable(
170+
&mut self,
171+
req: Request<B>,
172+
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
173+
where
174+
B: Send,
175+
{
176+
match self.dispatch.try_send(req) {
177+
Ok(rx) => {
178+
Either::Left(rx.then(move |res| {
179+
match res {
180+
Ok(Ok(res)) => future::ok(res),
181+
Ok(Err(err)) => future::err(err),
182+
// this is definite bug if it happens, but it shouldn't happen!
183+
Err(_) => panic!("dispatch dropped without returning error"),
184+
}
185+
}))
186+
}
187+
Err(req) => {
188+
tracing::debug!("connection was not ready");
189+
let err = crate::Error::new_canceled().with("connection was not ready");
190+
Either::Right(future::err((err, Some(req))))
191+
}
192+
}
193+
}
194+
*/
195+
}
196+
197+
impl<B> fmt::Debug for SendRequest<B> {
198+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199+
f.debug_struct("SendRequest").finish()
200+
}
201+
}
202+
203+
// ===== impl Connection
204+
205+
impl<T, B> fmt::Debug for Connection<T, B>
206+
where
207+
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
208+
B: HttpBody + 'static,
209+
{
210+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211+
f.debug_struct("Connection").finish()
212+
}
213+
}
214+
215+
impl<T, B> Future for Connection<T, B>
216+
where
217+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
218+
B: HttpBody + Send + 'static,
219+
B::Data: Send,
220+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
221+
{
222+
type Output = crate::Result<()>;
223+
224+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
225+
match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
226+
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
227+
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
228+
Some(h1) => {
229+
let (io, buf, _) = h1.into_inner();
230+
pending.fulfill(Upgraded::new(io, buf));
231+
Poll::Ready(Ok(()))
232+
}
233+
_ => {
234+
drop(pending);
235+
unreachable!("Upgraded twice");
236+
}
237+
},
238+
}
239+
}
240+
}
241+
242+
// ===== impl Builder
243+
244+
impl Builder {
245+
/// Creates a new connection builder.
246+
#[inline]
247+
pub fn new() -> Builder {
248+
Builder {
249+
exec: Exec::Default,
250+
h09_responses: false,
251+
h1_writev: None,
252+
h1_read_buf_exact_size: None,
253+
h1_parser_config: Default::default(),
254+
h1_title_case_headers: false,
255+
h1_preserve_header_case: false,
256+
#[cfg(feature = "ffi")]
257+
h1_preserve_header_order: false,
258+
h1_max_buf_size: None,
259+
}
260+
}
261+
262+
/// Provide an executor to execute background HTTP2 tasks.
263+
pub fn executor<E>(&mut self, exec: E) -> &mut Builder
264+
where
265+
E: Executor<BoxSendFuture> + Send + Sync + 'static,
266+
{
267+
self.exec = Exec::Executor(Arc::new(exec));
268+
self
269+
}
270+
271+
/// Set whether HTTP/0.9 responses should be tolerated.
272+
///
273+
/// Default is false.
274+
pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
275+
self.h09_responses = enabled;
276+
self
277+
}
278+
279+
/// Set whether HTTP/1 connections will accept spaces between header names
280+
/// and the colon that follow them in responses.
281+
///
282+
/// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
283+
/// to say about it:
284+
///
285+
/// > No whitespace is allowed between the header field-name and colon. In
286+
/// > the past, differences in the handling of such whitespace have led to
287+
/// > security vulnerabilities in request routing and response handling. A
288+
/// > server MUST reject any received request message that contains
289+
/// > whitespace between a header field-name and colon with a response code
290+
/// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
291+
/// > response message before forwarding the message downstream.
292+
///
293+
/// Note that this setting does not affect HTTP/2.
294+
///
295+
/// Default is false.
296+
///
297+
/// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
298+
pub fn http1_allow_spaces_after_header_name_in_responses(
299+
&mut self,
300+
enabled: bool,
301+
) -> &mut Builder {
302+
self.h1_parser_config
303+
.allow_spaces_after_header_name_in_responses(enabled);
304+
self
305+
}
306+
307+
/// Set whether HTTP/1 connections will accept obsolete line folding for
308+
/// header values.
309+
///
310+
/// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
311+
/// parsing.
312+
///
313+
/// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
314+
/// to say about it:
315+
///
316+
/// > A server that receives an obs-fold in a request message that is not
317+
/// > within a message/http container MUST either reject the message by
318+
/// > sending a 400 (Bad Request), preferably with a representation
319+
/// > explaining that obsolete line folding is unacceptable, or replace
320+
/// > each received obs-fold with one or more SP octets prior to
321+
/// > interpreting the field value or forwarding the message downstream.
322+
///
323+
/// > A proxy or gateway that receives an obs-fold in a response message
324+
/// > that is not within a message/http container MUST either discard the
325+
/// > message and replace it with a 502 (Bad Gateway) response, preferably
326+
/// > with a representation explaining that unacceptable line folding was
327+
/// > received, or replace each received obs-fold with one or more SP
328+
/// > octets prior to interpreting the field value or forwarding the
329+
/// > message downstream.
330+
///
331+
/// > A user agent that receives an obs-fold in a response message that is
332+
/// > not within a message/http container MUST replace each received
333+
/// > obs-fold with one or more SP octets prior to interpreting the field
334+
/// > value.
335+
///
336+
/// Note that this setting does not affect HTTP/2.
337+
///
338+
/// Default is false.
339+
///
340+
/// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
341+
pub fn http1_allow_obsolete_multiline_headers_in_responses(
342+
&mut self,
343+
enabled: bool,
344+
) -> &mut Builder {
345+
self.h1_parser_config
346+
.allow_obsolete_multiline_headers_in_responses(enabled);
347+
self
348+
}
349+
350+
/// Set whether HTTP/1 connections should try to use vectored writes,
351+
/// or always flatten into a single buffer.
352+
///
353+
/// Note that setting this to false may mean more copies of body data,
354+
/// but may also improve performance when an IO transport doesn't
355+
/// support vectored writes well, such as most TLS implementations.
356+
///
357+
/// Setting this to true will force hyper to use queued strategy
358+
/// which may eliminate unnecessary cloning on some TLS backends
359+
///
360+
/// Default is `auto`. In this mode hyper will try to guess which
361+
/// mode to use
362+
pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
363+
self.h1_writev = Some(enabled);
364+
self
365+
}
366+
367+
/// Set whether HTTP/1 connections will write header names as title case at
368+
/// the socket level.
369+
///
370+
/// Note that this setting does not affect HTTP/2.
371+
///
372+
/// Default is false.
373+
pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder {
374+
self.h1_title_case_headers = enabled;
375+
self
376+
}
377+
378+
/// Set whether to support preserving original header cases.
379+
///
380+
/// Currently, this will record the original cases received, and store them
381+
/// in a private extension on the `Response`. It will also look for and use
382+
/// such an extension in any provided `Request`.
383+
///
384+
/// Since the relevant extension is still private, there is no way to
385+
/// interact with the original cases. The only effect this can have now is
386+
/// to forward the cases in a proxy-like fashion.
387+
///
388+
/// Note that this setting does not affect HTTP/2.
389+
///
390+
/// Default is false.
391+
pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
392+
self.h1_preserve_header_case = enabled;
393+
self
394+
}
395+
396+
/// Set whether to support preserving original header order.
397+
///
398+
/// Currently, this will record the order in which headers are received, and store this
399+
/// ordering in a private extension on the `Response`. It will also look for and use
400+
/// such an extension in any provided `Request`.
401+
///
402+
/// Note that this setting does not affect HTTP/2.
403+
///
404+
/// Default is false.
405+
#[cfg(feature = "ffi")]
406+
pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
407+
self.h1_preserve_header_order = enabled;
408+
self
409+
}
410+
411+
/// Sets the exact size of the read buffer to *always* use.
412+
///
413+
/// Note that setting this option unsets the `http1_max_buf_size` option.
414+
///
415+
/// Default is an adaptive read buffer.
416+
pub fn http1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
417+
self.h1_read_buf_exact_size = sz;
418+
self.h1_max_buf_size = None;
419+
self
420+
}
421+
422+
/// Set the maximum buffer size for the connection.
423+
///
424+
/// Default is ~400kb.
425+
///
426+
/// Note that setting this option unsets the `http1_read_exact_buf_size` option.
427+
///
428+
/// # Panics
429+
///
430+
/// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
431+
#[cfg(feature = "http1")]
432+
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
433+
pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
434+
assert!(
435+
max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
436+
"the max_buf_size cannot be smaller than the minimum that h1 specifies."
437+
);
438+
439+
self.h1_max_buf_size = Some(max);
440+
self.h1_read_buf_exact_size = None;
441+
self
442+
}
443+
444+
/// Constructs a connection with the configured options and IO.
445+
/// See [`client::conn`](crate::client::conn) for more.
446+
///
447+
/// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
448+
/// do nothing.
449+
pub fn handshake<T, B>(
450+
&self,
451+
io: T,
452+
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
453+
where
454+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
455+
B: HttpBody + 'static,
456+
B::Data: Send,
457+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
458+
{
459+
let opts = self.clone();
460+
461+
async move {
462+
tracing::trace!("client handshake HTTP/1");
463+
464+
let (tx, rx) = dispatch::channel();
465+
let mut conn = proto::Conn::new(io);
466+
conn.set_h1_parser_config(opts.h1_parser_config);
467+
if let Some(writev) = opts.h1_writev {
468+
if writev {
469+
conn.set_write_strategy_queue();
470+
} else {
471+
conn.set_write_strategy_flatten();
472+
}
473+
}
474+
if opts.h1_title_case_headers {
475+
conn.set_title_case_headers();
476+
}
477+
if opts.h1_preserve_header_case {
478+
conn.set_preserve_header_case();
479+
}
480+
#[cfg(feature = "ffi")]
481+
if opts.h1_preserve_header_order {
482+
conn.set_preserve_header_order();
483+
}
484+
if opts.h09_responses {
485+
conn.set_h09_responses();
486+
}
487+
488+
if let Some(sz) = opts.h1_read_buf_exact_size {
489+
conn.set_read_buf_exact_size(sz);
490+
}
491+
if let Some(max) = opts.h1_max_buf_size {
492+
conn.set_max_buf_size(max);
493+
}
494+
let cd = proto::h1::dispatch::Client::new(rx);
495+
let proto = proto::h1::Dispatcher::new(cd, conn);
496+
497+
Ok((
498+
SendRequest { dispatch: tx },
499+
Connection { inner: Some(proto) },
500+
))
501+
}
502+
}
503+
}
504+

‎src/client/conn/http2.rs

+432
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,432 @@
1+
//! HTTP/2 client connections
2+
3+
use std::error::Error as StdError;
4+
use std::fmt;
5+
use std::marker::PhantomData;
6+
use std::sync::Arc;
7+
#[cfg(feature = "runtime")]
8+
use std::time::Duration;
9+
10+
use http::{Request, Response};
11+
use tokio::io::{AsyncRead, AsyncWrite};
12+
13+
use crate::Body;
14+
use crate::body::HttpBody;
15+
use crate::common::{
16+
exec::{BoxSendFuture, Exec},
17+
task, Future, Pin, Poll,
18+
};
19+
use crate::proto;
20+
use crate::rt::Executor;
21+
use super::super::dispatch;
22+
23+
/// The sender side of an established connection.
24+
pub struct SendRequest<B> {
25+
dispatch: dispatch::Sender<Request<B>, Response<Body>>,
26+
}
27+
28+
/// A future that processes all HTTP state for the IO object.
29+
///
30+
/// In most cases, this should just be spawned into an executor, so that it
31+
/// can process incoming and outgoing messages, notice hangups, and the like.
32+
#[must_use = "futures do nothing unless polled"]
33+
pub struct Connection<T, B>
34+
where
35+
T: AsyncRead + AsyncWrite + Send + 'static,
36+
B: HttpBody + 'static,
37+
{
38+
inner: (PhantomData<T>, proto::h2::ClientTask<B>),
39+
}
40+
41+
/// A builder to configure an HTTP connection.
42+
///
43+
/// After setting options, the builder is used to create a handshake future.
44+
#[derive(Clone, Debug)]
45+
pub struct Builder {
46+
pub(super) exec: Exec,
47+
h2_builder: proto::h2::client::Config,
48+
}
49+
50+
/// Returns a handshake future over some IO.
51+
///
52+
/// This is a shortcut for `Builder::new().handshake(io)`.
53+
/// See [`client::conn`](crate::client::conn) for more.
54+
pub async fn handshake<T>(
55+
io: T,
56+
) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
57+
where
58+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
59+
{
60+
Builder::new().handshake(io).await
61+
}
62+
63+
// ===== impl SendRequest
64+
65+
impl<B> SendRequest<B> {
66+
/// Polls to determine whether this sender can be used yet for a request.
67+
///
68+
/// If the associated connection is closed, this returns an Error.
69+
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
70+
self.dispatch.poll_ready(cx)
71+
}
72+
73+
/*
74+
pub(super) async fn when_ready(self) -> crate::Result<Self> {
75+
let mut me = Some(self);
76+
future::poll_fn(move |cx| {
77+
ready!(me.as_mut().unwrap().poll_ready(cx))?;
78+
Poll::Ready(Ok(me.take().unwrap()))
79+
})
80+
.await
81+
}
82+
83+
pub(super) fn is_ready(&self) -> bool {
84+
self.dispatch.is_ready()
85+
}
86+
87+
pub(super) fn is_closed(&self) -> bool {
88+
self.dispatch.is_closed()
89+
}
90+
*/
91+
}
92+
93+
impl<B> SendRequest<B>
94+
where
95+
B: HttpBody + 'static,
96+
{
97+
/// Sends a `Request` on the associated connection.
98+
///
99+
/// Returns a future that if successful, yields the `Response`.
100+
///
101+
/// # Note
102+
///
103+
/// There are some key differences in what automatic things the `Client`
104+
/// does for you that will not be done here:
105+
///
106+
/// - `Client` requires absolute-form `Uri`s, since the scheme and
107+
/// authority are needed to connect. They aren't required here.
108+
/// - Since the `Client` requires absolute-form `Uri`s, it can add
109+
/// the `Host` header based on it. You must add a `Host` header yourself
110+
/// before calling this method.
111+
/// - Since absolute-form `Uri`s are not required, if received, they will
112+
/// be serialized as-is.
113+
///
114+
/// # Example
115+
///
116+
/// ```
117+
/// # use http::header::HOST;
118+
/// # use hyper::client::conn::SendRequest;
119+
/// # use hyper::Body;
120+
/// use hyper::Request;
121+
///
122+
/// # async fn doc(mut tx: SendRequest<Body>) -> hyper::Result<()> {
123+
/// // build a Request
124+
/// let req = Request::builder()
125+
/// .uri("/foo/bar")
126+
/// .header(HOST, "hyper.rs")
127+
/// .body(Body::empty())
128+
/// .unwrap();
129+
///
130+
/// // send it and await a Response
131+
/// let res = tx.send_request(req).await?;
132+
/// // assert the Response
133+
/// assert!(res.status().is_success());
134+
/// # Ok(())
135+
/// # }
136+
/// # fn main() {}
137+
/// ```
138+
pub fn send_request(&mut self, req: Request<B>) -> impl Future<Output = crate::Result<Response<Body>>> {
139+
let sent = self.dispatch.send(req);
140+
141+
async move {
142+
match sent {
143+
Ok(rx) => match rx.await {
144+
Ok(Ok(resp)) => Ok(resp),
145+
Ok(Err(err)) => Err(err),
146+
// this is definite bug if it happens, but it shouldn't happen!
147+
Err(_canceled) => panic!("dispatch dropped without returning error"),
148+
}
149+
Err(_req) => {
150+
tracing::debug!("connection was not ready");
151+
152+
Err(crate::Error::new_canceled().with("connection was not ready"))
153+
}
154+
}
155+
}
156+
}
157+
158+
/*
159+
pub(super) fn send_request_retryable(
160+
&mut self,
161+
req: Request<B>,
162+
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
163+
where
164+
B: Send,
165+
{
166+
match self.dispatch.try_send(req) {
167+
Ok(rx) => {
168+
Either::Left(rx.then(move |res| {
169+
match res {
170+
Ok(Ok(res)) => future::ok(res),
171+
Ok(Err(err)) => future::err(err),
172+
// this is definite bug if it happens, but it shouldn't happen!
173+
Err(_) => panic!("dispatch dropped without returning error"),
174+
}
175+
}))
176+
}
177+
Err(req) => {
178+
tracing::debug!("connection was not ready");
179+
let err = crate::Error::new_canceled().with("connection was not ready");
180+
Either::Right(future::err((err, Some(req))))
181+
}
182+
}
183+
}
184+
*/
185+
}
186+
187+
impl<B> fmt::Debug for SendRequest<B> {
188+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189+
f.debug_struct("SendRequest").finish()
190+
}
191+
}
192+
193+
// ===== impl Connection
194+
195+
impl<T, B> fmt::Debug for Connection<T, B>
196+
where
197+
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
198+
B: HttpBody + 'static,
199+
{
200+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201+
f.debug_struct("Connection").finish()
202+
}
203+
}
204+
205+
impl<T, B> Future for Connection<T, B>
206+
where
207+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
208+
B: HttpBody + Send + 'static,
209+
B::Data: Send,
210+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
211+
{
212+
type Output = crate::Result<()>;
213+
214+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
215+
match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
216+
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
217+
#[cfg(feature = "http1")]
218+
proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
219+
}
220+
}
221+
}
222+
223+
// ===== impl Builder
224+
225+
impl Builder {
226+
/// Creates a new connection builder.
227+
#[inline]
228+
pub fn new() -> Builder {
229+
Builder {
230+
exec: Exec::Default,
231+
h2_builder: Default::default(),
232+
}
233+
}
234+
235+
/// Provide an executor to execute background HTTP2 tasks.
236+
pub fn executor<E>(&mut self, exec: E) -> &mut Builder
237+
where
238+
E: Executor<BoxSendFuture> + Send + Sync + 'static,
239+
{
240+
self.exec = Exec::Executor(Arc::new(exec));
241+
self
242+
}
243+
244+
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
245+
/// stream-level flow control.
246+
///
247+
/// Passing `None` will do nothing.
248+
///
249+
/// If not set, hyper will use a default.
250+
///
251+
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
252+
#[cfg(feature = "http2")]
253+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
254+
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
255+
if let Some(sz) = sz.into() {
256+
self.h2_builder.adaptive_window = false;
257+
self.h2_builder.initial_stream_window_size = sz;
258+
}
259+
self
260+
}
261+
262+
/// Sets the max connection-level flow control for HTTP2
263+
///
264+
/// Passing `None` will do nothing.
265+
///
266+
/// If not set, hyper will use a default.
267+
#[cfg(feature = "http2")]
268+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
269+
pub fn http2_initial_connection_window_size(
270+
&mut self,
271+
sz: impl Into<Option<u32>>,
272+
) -> &mut Self {
273+
if let Some(sz) = sz.into() {
274+
self.h2_builder.adaptive_window = false;
275+
self.h2_builder.initial_conn_window_size = sz;
276+
}
277+
self
278+
}
279+
280+
/// Sets whether to use an adaptive flow control.
281+
///
282+
/// Enabling this will override the limits set in
283+
/// `http2_initial_stream_window_size` and
284+
/// `http2_initial_connection_window_size`.
285+
#[cfg(feature = "http2")]
286+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
287+
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
288+
use proto::h2::SPEC_WINDOW_SIZE;
289+
290+
self.h2_builder.adaptive_window = enabled;
291+
if enabled {
292+
self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
293+
self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
294+
}
295+
self
296+
}
297+
298+
/// Sets the maximum frame size to use for HTTP2.
299+
///
300+
/// Passing `None` will do nothing.
301+
///
302+
/// If not set, hyper will use a default.
303+
#[cfg(feature = "http2")]
304+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
305+
pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
306+
if let Some(sz) = sz.into() {
307+
self.h2_builder.max_frame_size = sz;
308+
}
309+
self
310+
}
311+
312+
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
313+
/// connection alive.
314+
///
315+
/// Pass `None` to disable HTTP2 keep-alive.
316+
///
317+
/// Default is currently disabled.
318+
///
319+
/// # Cargo Feature
320+
///
321+
/// Requires the `runtime` cargo feature to be enabled.
322+
#[cfg(feature = "runtime")]
323+
#[cfg(feature = "http2")]
324+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
325+
pub fn http2_keep_alive_interval(
326+
&mut self,
327+
interval: impl Into<Option<Duration>>,
328+
) -> &mut Self {
329+
self.h2_builder.keep_alive_interval = interval.into();
330+
self
331+
}
332+
333+
/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
334+
///
335+
/// If the ping is not acknowledged within the timeout, the connection will
336+
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
337+
///
338+
/// Default is 20 seconds.
339+
///
340+
/// # Cargo Feature
341+
///
342+
/// Requires the `runtime` cargo feature to be enabled.
343+
#[cfg(feature = "runtime")]
344+
#[cfg(feature = "http2")]
345+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
346+
pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
347+
self.h2_builder.keep_alive_timeout = timeout;
348+
self
349+
}
350+
351+
/// Sets whether HTTP2 keep-alive should apply while the connection is idle.
352+
///
353+
/// If disabled, keep-alive pings are only sent while there are open
354+
/// request/responses streams. If enabled, pings are also sent when no
355+
/// streams are active. Does nothing if `http2_keep_alive_interval` is
356+
/// disabled.
357+
///
358+
/// Default is `false`.
359+
///
360+
/// # Cargo Feature
361+
///
362+
/// Requires the `runtime` cargo feature to be enabled.
363+
#[cfg(feature = "runtime")]
364+
#[cfg(feature = "http2")]
365+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
366+
pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
367+
self.h2_builder.keep_alive_while_idle = enabled;
368+
self
369+
}
370+
371+
/// Sets the maximum number of HTTP2 concurrent locally reset streams.
372+
///
373+
/// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
374+
/// details.
375+
///
376+
/// The default value is determined by the `h2` crate.
377+
///
378+
/// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
379+
#[cfg(feature = "http2")]
380+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
381+
pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
382+
self.h2_builder.max_concurrent_reset_streams = Some(max);
383+
self
384+
}
385+
386+
/// Set the maximum write buffer size for each HTTP/2 stream.
387+
///
388+
/// Default is currently 1MB, but may change.
389+
///
390+
/// # Panics
391+
///
392+
/// The value must be no larger than `u32::MAX`.
393+
#[cfg(feature = "http2")]
394+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
395+
pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
396+
assert!(max <= std::u32::MAX as usize);
397+
self.h2_builder.max_send_buffer_size = max;
398+
self
399+
}
400+
401+
/// Constructs a connection with the configured options and IO.
402+
/// See [`client::conn`](crate::client::conn) for more.
403+
///
404+
/// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
405+
/// do nothing.
406+
pub fn handshake<T, B>(
407+
&self,
408+
io: T,
409+
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
410+
where
411+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
412+
B: HttpBody + 'static,
413+
B::Data: Send,
414+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
415+
{
416+
let opts = self.clone();
417+
418+
async move {
419+
tracing::trace!("client handshake HTTP/1");
420+
421+
let (tx, rx) = dispatch::channel();
422+
let h2 =
423+
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec)
424+
.await?;
425+
Ok((
426+
SendRequest { dispatch: tx },
427+
Connection { inner: (PhantomData, h2) },
428+
))
429+
}
430+
}
431+
}
432+

‎src/client/conn.rs ‎src/client/conn/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ use crate::rt::Executor;
8484
use crate::upgrade::Upgraded;
8585
use crate::{Body, Request, Response};
8686

87+
#[cfg(feature = "http1")]
88+
pub mod http1;
89+
#[cfg(feature = "http2")]
90+
pub mod http2;
91+
8792
#[cfg(feature = "http1")]
8893
type Http1Dispatcher<T, B> =
8994
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;

‎src/client/dispatch.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#[cfg(feature = "http2")]
22
use std::future::Future;
33

4-
use futures_util::FutureExt;
54
use tokio::sync::{mpsc, oneshot};
65

76
#[cfg(feature = "http2")]
@@ -169,6 +168,7 @@ impl<T, U> Receiver<T, U> {
169168

170169
#[cfg(feature = "http1")]
171170
pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
171+
use futures_util::FutureExt;
172172
match self.inner.recv().now_or_never() {
173173
Some(Some(mut env)) => env.0.take(),
174174
_ => None,

0 commit comments

Comments
 (0)
Please sign in to comment.