Skip to content

Commit fc4d335

Browse files
seanmonstarMichael-J-Ward
andauthoredOct 13, 2022
feat(server): server::conn::http1 and server::conn::http2 modules (#3011)
This creates submodules of `hyper::server::conn` for HTTP/1 and HTTP/2. Each module contains a `Builder` and `Connection`, with options and methods specific to their version. Closes #2851 Co-authored-by: Michael J Ward <ward.michael.j@gmail.com>
1 parent 89048f4 commit fc4d335

File tree

3 files changed

+850
-0
lines changed

3 files changed

+850
-0
lines changed
 

‎src/server/conn/http1.rs

+514
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,514 @@
1+
//! HTTP/1 Server Connections
2+
3+
use std::error::Error as StdError;
4+
use std::fmt;
5+
use std::marker::PhantomData;
6+
use std::sync::Arc;
7+
use std::time::Duration;
8+
9+
use bytes::Bytes;
10+
use tokio::io::{AsyncRead, AsyncWrite};
11+
12+
use crate::body::{Body, Recv};
13+
use crate::common::exec::{ConnStreamExec, Exec};
14+
use crate::common::{task, Future, Pin, Poll, Unpin};
15+
use crate::{common::time::Time, rt::Timer};
16+
use crate::proto;
17+
use crate::service::HttpService;
18+
19+
type Http1Dispatcher<T, B, S> =
20+
proto::h1::Dispatcher<proto::h1::dispatch::Server<S, Recv>, B, T, proto::ServerTransaction>;
21+
22+
23+
pin_project_lite::pin_project! {
24+
/// A future binding an http1 connection with a Service.
25+
///
26+
/// Polling this future will drive HTTP forward.
27+
#[must_use = "futures do nothing unless polled"]
28+
pub struct Connection<T, S, E>
29+
where
30+
S: HttpService<Recv>,
31+
{
32+
conn: Option<Http1Dispatcher<T, S::ResBody, S>>,
33+
// can we remove this?
34+
_exec: PhantomData<E>,
35+
}
36+
}
37+
38+
39+
/// A configuration builder for HTTP/1 server connections.
40+
#[derive(Clone, Debug)]
41+
pub struct Builder<E = Exec> {
42+
pub(crate) _exec: E,
43+
pub(crate) timer: Time,
44+
h1_half_close: bool,
45+
h1_keep_alive: bool,
46+
h1_title_case_headers: bool,
47+
h1_preserve_header_case: bool,
48+
h1_header_read_timeout: Option<Duration>,
49+
h1_writev: Option<bool>,
50+
max_buf_size: Option<usize>,
51+
pipeline_flush: bool,
52+
}
53+
54+
/// Deconstructed parts of a `Connection`.
55+
///
56+
/// This allows taking apart a `Connection` at a later time, in order to
57+
/// reclaim the IO object, and additional related pieces.
58+
#[derive(Debug)]
59+
pub struct Parts<T, S> {
60+
/// The original IO object used in the handshake.
61+
pub io: T,
62+
/// A buffer of bytes that have been read but not processed as HTTP.
63+
///
64+
/// If the client sent additional bytes after its last request, and
65+
/// this connection "ended" with an upgrade, the read buffer will contain
66+
/// those bytes.
67+
///
68+
/// You will want to check for any existing bytes if you plan to continue
69+
/// communicating on the IO object.
70+
pub read_buf: Bytes,
71+
/// The `Service` used to serve this connection.
72+
pub service: S,
73+
_inner: (),
74+
}
75+
76+
// ===== impl Connection =====
77+
78+
impl<I, S, E> fmt::Debug for Connection<I, S, E>
79+
where
80+
S: HttpService<Recv>,
81+
{
82+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83+
f.debug_struct("Connection").finish()
84+
}
85+
}
86+
87+
impl<I, B, S, E> Connection<I, S, E>
88+
where
89+
S: HttpService<Recv, ResBody = B>,
90+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
91+
I: AsyncRead + AsyncWrite + Unpin,
92+
B: Body + 'static,
93+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
94+
E: ConnStreamExec<S::Future, B>,
95+
{
96+
/// Start a graceful shutdown process for this connection.
97+
///
98+
/// This `Connection` should continue to be polled until shutdown
99+
/// can finish.
100+
///
101+
/// # Note
102+
///
103+
/// This should only be called while the `Connection` future is still
104+
/// pending. If called after `Connection::poll` has resolved, this does
105+
/// nothing.
106+
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
107+
match self.conn {
108+
Some(ref mut h1) => {
109+
h1.disable_keep_alive();
110+
}
111+
None => (),
112+
}
113+
}
114+
115+
/// Return the inner IO object, and additional information.
116+
///
117+
/// If the IO object has been "rewound" the io will not contain those bytes rewound.
118+
/// This should only be called after `poll_without_shutdown` signals
119+
/// that the connection is "done". Otherwise, it may not have finished
120+
/// flushing all necessary HTTP bytes.
121+
///
122+
/// # Panics
123+
/// This method will panic if this connection is using an h2 protocol.
124+
pub fn into_parts(self) -> Parts<I, S> {
125+
self.try_into_parts()
126+
.unwrap_or_else(|| panic!("h2 cannot into_inner"))
127+
}
128+
129+
/// Return the inner IO object, and additional information, if available.
130+
///
131+
///
132+
/// TODO:(mike) does this need to return none for h1 or is it expected to always be present? previously used an "unwrap"
133+
/// This method will return a `None` if this connection is using an h2 protocol.
134+
pub fn try_into_parts(self) -> Option<Parts<I, S>> {
135+
self.conn.map(|h1| {
136+
let (io, read_buf, dispatch) = h1.into_inner();
137+
Parts {
138+
io,
139+
read_buf,
140+
service: dispatch.into_service(),
141+
_inner: (),
142+
}
143+
})
144+
}
145+
146+
/// Poll the connection for completion, but without calling `shutdown`
147+
/// on the underlying IO.
148+
///
149+
/// This is useful to allow running a connection while doing an HTTP
150+
/// upgrade. Once the upgrade is completed, the connection would be "done",
151+
/// but it is not desired to actually shutdown the IO object. Instead you
152+
/// would take it back using `into_parts`.
153+
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>
154+
where
155+
S: Unpin,
156+
S::Future: Unpin,
157+
B: Unpin,
158+
{
159+
self.conn.as_mut().unwrap().poll_without_shutdown(cx)
160+
}
161+
162+
/// Prevent shutdown of the underlying IO object at the end of service the request,
163+
/// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
164+
///
165+
/// # Error
166+
///
167+
/// This errors if the underlying connection protocol is not HTTP/1.
168+
pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
169+
where
170+
S: Unpin,
171+
S::Future: Unpin,
172+
B: Unpin,
173+
{
174+
// TODO(mike): "new_without_shutdown_not_h1" is not possible here
175+
let mut conn = Some(self);
176+
futures_util::future::poll_fn(move |cx| {
177+
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
178+
Poll::Ready(
179+
conn.take()
180+
.unwrap()
181+
.try_into_parts()
182+
.ok_or_else(crate::Error::new_without_shutdown_not_h1),
183+
)
184+
})
185+
}
186+
187+
/// Enable this connection to support higher-level HTTP upgrades.
188+
///
189+
/// See [the `upgrade` module](crate::upgrade) for more.
190+
pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<I, S, E>
191+
where
192+
I: Send,
193+
{
194+
upgrades::UpgradeableConnection { inner: self }
195+
}
196+
}
197+
198+
199+
impl<I, B, S, E> Future for Connection<I, S, E>
200+
where
201+
S: HttpService<Recv, ResBody = B>,
202+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
203+
I: AsyncRead + AsyncWrite + Unpin + 'static,
204+
B: Body + 'static,
205+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
206+
E: ConnStreamExec<S::Future, B>,
207+
{
208+
type Output = crate::Result<()>;
209+
210+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
211+
match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) {
212+
Ok(done) => {
213+
match done {
214+
proto::Dispatched::Shutdown => {}
215+
proto::Dispatched::Upgrade(pending) => {
216+
// With no `Send` bound on `I`, we can't try to do
217+
// upgrades here. In case a user was trying to use
218+
// `Body::on_upgrade` with this API, send a special
219+
// error letting them know about that.
220+
pending.manual();
221+
}
222+
};
223+
return Poll::Ready(Ok(()));
224+
}
225+
Err(e) => Poll::Ready(Err(e)),
226+
}
227+
}
228+
}
229+
230+
// ===== impl Builder =====
231+
232+
impl<E> Builder<E> {
233+
/// Create a new connection builder.
234+
///
235+
/// This starts with the default options, and an executor.
236+
pub fn new(exec: E) -> Self {
237+
Self {
238+
_exec: exec,
239+
timer: Time::Empty,
240+
h1_half_close: false,
241+
h1_keep_alive: true,
242+
h1_title_case_headers: false,
243+
h1_preserve_header_case: false,
244+
h1_header_read_timeout: None,
245+
h1_writev: None,
246+
max_buf_size: None,
247+
pipeline_flush: false,
248+
}
249+
}
250+
/// Set whether HTTP/1 connections should support half-closures.
251+
///
252+
/// Clients can chose to shutdown their write-side while waiting
253+
/// for the server to respond. Setting this to `true` will
254+
/// prevent closing the connection immediately if `read`
255+
/// detects an EOF in the middle of a request.
256+
///
257+
/// Default is `false`.
258+
pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
259+
self.h1_half_close = val;
260+
self
261+
}
262+
263+
/// Enables or disables HTTP/1 keep-alive.
264+
///
265+
/// Default is true.
266+
pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self {
267+
self.h1_keep_alive = val;
268+
self
269+
}
270+
271+
/// Set whether HTTP/1 connections will write header names as title case at
272+
/// the socket level.
273+
///
274+
/// Note that this setting does not affect HTTP/2.
275+
///
276+
/// Default is false.
277+
pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self {
278+
self.h1_title_case_headers = enabled;
279+
self
280+
}
281+
282+
/// Set whether to support preserving original header cases.
283+
///
284+
/// Currently, this will record the original cases received, and store them
285+
/// in a private extension on the `Request`. It will also look for and use
286+
/// such an extension in any provided `Response`.
287+
///
288+
/// Since the relevant extension is still private, there is no way to
289+
/// interact with the original cases. The only effect this can have now is
290+
/// to forward the cases in a proxy-like fashion.
291+
///
292+
/// Note that this setting does not affect HTTP/2.
293+
///
294+
/// Default is false.
295+
pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self {
296+
self.h1_preserve_header_case = enabled;
297+
self
298+
}
299+
300+
/// Set a timeout for reading client request headers. If a client does not
301+
/// transmit the entire header within this time, the connection is closed.
302+
///
303+
/// Default is None.
304+
pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
305+
self.h1_header_read_timeout = Some(read_timeout);
306+
self
307+
}
308+
309+
/// Set whether HTTP/1 connections should try to use vectored writes,
310+
/// or always flatten into a single buffer.
311+
///
312+
/// Note that setting this to false may mean more copies of body data,
313+
/// but may also improve performance when an IO transport doesn't
314+
/// support vectored writes well, such as most TLS implementations.
315+
///
316+
/// Setting this to true will force hyper to use queued strategy
317+
/// which may eliminate unnecessary cloning on some TLS backends
318+
///
319+
/// Default is `auto`. In this mode hyper will try to guess which
320+
/// mode to use
321+
pub fn http1_writev(&mut self, val: bool) -> &mut Self {
322+
self.h1_writev = Some(val);
323+
self
324+
}
325+
326+
/// Set the maximum buffer size for the connection.
327+
///
328+
/// Default is ~400kb.
329+
///
330+
/// # Panics
331+
///
332+
/// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
333+
#[cfg(feature = "http1")]
334+
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
335+
pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
336+
assert!(
337+
max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
338+
"the max_buf_size cannot be smaller than the minimum that h1 specifies."
339+
);
340+
self.max_buf_size = Some(max);
341+
self
342+
}
343+
344+
/// Aggregates flushes to better support pipelined responses.
345+
///
346+
/// Experimental, may have bugs.
347+
///
348+
/// Default is false.
349+
pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
350+
self.pipeline_flush = enabled;
351+
self
352+
}
353+
354+
/// Set the executor used to spawn background tasks.
355+
///
356+
/// Default uses implicit default (like `tokio::spawn`).
357+
pub fn with_executor<E2>(self, exec: E2) -> Builder<E2> {
358+
Builder {
359+
_exec: exec,
360+
timer: self.timer,
361+
h1_half_close: self.h1_half_close,
362+
h1_keep_alive: self.h1_keep_alive,
363+
h1_title_case_headers: self.h1_title_case_headers,
364+
h1_preserve_header_case: self.h1_preserve_header_case,
365+
h1_header_read_timeout: self.h1_header_read_timeout,
366+
h1_writev: self.h1_writev,
367+
max_buf_size: self.max_buf_size,
368+
pipeline_flush: self.pipeline_flush,
369+
}
370+
}
371+
372+
/// Set the timer used in background tasks.
373+
pub fn timer<M>(&mut self, timer: M) -> &mut Self
374+
where
375+
M: Timer + Send + Sync + 'static,
376+
{
377+
self.timer = Time::Timer(Arc::new(timer));
378+
self
379+
}
380+
381+
/// Bind a connection together with a [`Service`](crate::service::Service).
382+
///
383+
/// This returns a Future that must be polled in order for HTTP to be
384+
/// driven on the connection.
385+
///
386+
/// # Example
387+
///
388+
/// ```
389+
/// # use hyper::{Recv, Request, Response};
390+
/// # use hyper::service::Service;
391+
/// # use hyper::server::conn::Http;
392+
/// # use tokio::io::{AsyncRead, AsyncWrite};
393+
/// # async fn run<I, S>(some_io: I, some_service: S)
394+
/// # where
395+
/// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
396+
/// # S: Service<hyper::Request<Recv>, Response=hyper::Response<Recv>> + Send + 'static,
397+
/// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
398+
/// # S::Future: Send,
399+
/// # {
400+
/// let http = Http::new();
401+
/// let conn = http.serve_connection(some_io, some_service);
402+
///
403+
/// if let Err(e) = conn.await {
404+
/// eprintln!("server connection error: {}", e);
405+
/// }
406+
/// # }
407+
/// # fn main() {}
408+
/// ```
409+
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
410+
where
411+
S: HttpService<Recv, ResBody = Bd>,
412+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
413+
Bd: Body + 'static,
414+
Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
415+
I: AsyncRead + AsyncWrite + Unpin,
416+
E: ConnStreamExec<S::Future, Bd>,
417+
{
418+
let mut conn = proto::Conn::new(io);
419+
conn.set_timer(self.timer.clone());
420+
if !self.h1_keep_alive {
421+
conn.disable_keep_alive();
422+
}
423+
if self.h1_half_close {
424+
conn.set_allow_half_close();
425+
}
426+
if self.h1_title_case_headers {
427+
conn.set_title_case_headers();
428+
}
429+
if self.h1_preserve_header_case {
430+
conn.set_preserve_header_case();
431+
}
432+
if let Some(header_read_timeout) = self.h1_header_read_timeout {
433+
conn.set_http1_header_read_timeout(header_read_timeout);
434+
}
435+
if let Some(writev) = self.h1_writev {
436+
if writev {
437+
conn.set_write_strategy_queue();
438+
} else {
439+
conn.set_write_strategy_flatten();
440+
}
441+
}
442+
conn.set_flush_pipeline(self.pipeline_flush);
443+
if let Some(max) = self.max_buf_size {
444+
conn.set_max_buf_size(max);
445+
}
446+
let sd = proto::h1::dispatch::Server::new(service);
447+
let proto = proto::h1::Dispatcher::new(sd, conn);
448+
Connection {
449+
conn: Some(proto),
450+
_exec: PhantomData,
451+
}
452+
}
453+
}
454+
455+
mod upgrades {
456+
use crate::upgrade::Upgraded;
457+
458+
use super::*;
459+
460+
// A future binding a connection with a Service with Upgrade support.
461+
//
462+
// This type is unnameable outside the crate, and so basically just an
463+
// `impl Future`, without requiring Rust 1.26.
464+
#[must_use = "futures do nothing unless polled"]
465+
#[allow(missing_debug_implementations)]
466+
pub struct UpgradeableConnection<T, S, E>
467+
where
468+
S: HttpService<Recv>,
469+
{
470+
pub(super) inner: Connection<T, S, E>,
471+
}
472+
473+
impl<I, B, S, E> UpgradeableConnection<I, S, E>
474+
where
475+
S: HttpService<Recv, ResBody = B>,
476+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
477+
I: AsyncRead + AsyncWrite + Unpin,
478+
B: Body + 'static,
479+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
480+
E: ConnStreamExec<S::Future, B>,
481+
{
482+
/// Start a graceful shutdown process for this connection.
483+
///
484+
/// This `Connection` should continue to be polled until shutdown
485+
/// can finish.
486+
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
487+
Pin::new(&mut self.inner).graceful_shutdown()
488+
}
489+
}
490+
491+
impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
492+
where
493+
S: HttpService<Recv, ResBody = B>,
494+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
495+
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
496+
B: Body + 'static,
497+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
498+
E: ConnStreamExec<S::Future, B>,
499+
{
500+
type Output = crate::Result<()>;
501+
502+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
503+
match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) {
504+
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
505+
Ok(proto::Dispatched::Upgrade(pending)) => {
506+
let (io, buf, _) = self.inner.conn.take().unwrap().into_inner();
507+
pending.fulfill(Upgraded::new(io, buf));
508+
Poll::Ready(Ok(()))
509+
}
510+
Err(e) => Poll::Ready(Err(e)),
511+
}
512+
}
513+
}
514+
}

‎src/server/conn/http2.rs

+331
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
//! HTTP/2 Server Connections
2+
3+
use std::error::Error as StdError;
4+
use std::fmt;
5+
use std::sync::Arc;
6+
use std::time::Duration;
7+
8+
use pin_project_lite::pin_project;
9+
use tokio::io::{AsyncRead, AsyncWrite};
10+
11+
use crate::body::{Body, Recv};
12+
use crate::common::exec::{ConnStreamExec};
13+
use crate::common::{task, Future, Pin, Poll, Unpin};
14+
use crate::{common::time::Time, rt::Timer};
15+
use crate::proto;
16+
use crate::service::HttpService;
17+
18+
pin_project! {
19+
/// A future binding an HTTP/2 connection with a Service.
20+
///
21+
/// Polling this future will drive HTTP forward.
22+
#[must_use = "futures do nothing unless polled"]
23+
pub struct Connection<T, S, E>
24+
where
25+
S: HttpService<Recv>,
26+
{
27+
conn: proto::h2::Server<T, S, S::ResBody, E>,
28+
}
29+
}
30+
31+
/// A configuration builder for HTTP/2 server connections.
32+
#[derive(Clone, Debug)]
33+
pub struct Builder<E> {
34+
exec: E,
35+
timer: Time,
36+
h2_builder: proto::h2::server::Config,
37+
}
38+
39+
// ===== impl Connection =====
40+
41+
impl<I, S, E> fmt::Debug for Connection<I, S, E>
42+
where
43+
S: HttpService<Recv>,
44+
{
45+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46+
f.debug_struct("Connection").finish()
47+
}
48+
}
49+
50+
impl<I, B, S, E> Connection<I, S, E>
51+
where
52+
S: HttpService<Recv, ResBody = B>,
53+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
54+
I: AsyncRead + AsyncWrite + Unpin,
55+
B: Body + 'static,
56+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
57+
E: ConnStreamExec<S::Future, B>,
58+
{
59+
/// Start a graceful shutdown process for this connection.
60+
///
61+
/// This `Connection` should continue to be polled until shutdown
62+
/// can finish.
63+
///
64+
/// # Note
65+
///
66+
/// This should only be called while the `Connection` future is still
67+
/// pending. If called after `Connection::poll` has resolved, this does
68+
/// nothing.
69+
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
70+
self.conn.graceful_shutdown();
71+
}
72+
}
73+
74+
impl<I, B, S, E> Future for Connection<I, S, E>
75+
where
76+
S: HttpService<Recv, ResBody = B>,
77+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
78+
I: AsyncRead + AsyncWrite + Unpin + 'static,
79+
B: Body + 'static,
80+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
81+
E: ConnStreamExec<S::Future, B>,
82+
{
83+
type Output = crate::Result<()>;
84+
85+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
86+
match ready!(Pin::new(&mut self.conn).poll(cx)) {
87+
Ok(_done) => {
88+
//TODO: the proto::h2::Server no longer needs to return
89+
//the Dispatched enum
90+
Poll::Ready(Ok(()))
91+
}
92+
Err(e) => {
93+
Poll::Ready(Err(e))
94+
}
95+
}
96+
}
97+
}
98+
99+
// ===== impl Builder =====
100+
101+
impl<E> Builder<E> {
102+
/// Create a new connection builder.
103+
///
104+
/// This starts with the default options, and an executor.
105+
pub fn new(exec: E) -> Self {
106+
Self {
107+
exec: exec,
108+
timer: Time::Empty,
109+
h2_builder: Default::default(),
110+
}
111+
}
112+
113+
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
114+
/// stream-level flow control.
115+
///
116+
/// Passing `None` will do nothing.
117+
///
118+
/// If not set, hyper will use a default.
119+
///
120+
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
121+
#[cfg(feature = "http2")]
122+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
123+
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
124+
if let Some(sz) = sz.into() {
125+
self.h2_builder.adaptive_window = false;
126+
self.h2_builder.initial_stream_window_size = sz;
127+
}
128+
self
129+
}
130+
131+
/// Sets the max connection-level flow control for HTTP2.
132+
///
133+
/// Passing `None` will do nothing.
134+
///
135+
/// If not set, hyper will use a default.
136+
#[cfg(feature = "http2")]
137+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
138+
pub fn http2_initial_connection_window_size(
139+
&mut self,
140+
sz: impl Into<Option<u32>>,
141+
) -> &mut Self {
142+
if let Some(sz) = sz.into() {
143+
self.h2_builder.adaptive_window = false;
144+
self.h2_builder.initial_conn_window_size = sz;
145+
}
146+
self
147+
}
148+
149+
/// Sets whether to use an adaptive flow control.
150+
///
151+
/// Enabling this will override the limits set in
152+
/// `http2_initial_stream_window_size` and
153+
/// `http2_initial_connection_window_size`.
154+
#[cfg(feature = "http2")]
155+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
156+
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
157+
use proto::h2::SPEC_WINDOW_SIZE;
158+
159+
self.h2_builder.adaptive_window = enabled;
160+
if enabled {
161+
self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
162+
self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
163+
}
164+
self
165+
}
166+
167+
/// Sets the maximum frame size to use for HTTP2.
168+
///
169+
/// Passing `None` will do nothing.
170+
///
171+
/// If not set, hyper will use a default.
172+
#[cfg(feature = "http2")]
173+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
174+
pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
175+
if let Some(sz) = sz.into() {
176+
self.h2_builder.max_frame_size = sz;
177+
}
178+
self
179+
}
180+
181+
/// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
182+
/// connections.
183+
///
184+
/// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
185+
///
186+
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
187+
#[cfg(feature = "http2")]
188+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
189+
pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
190+
self.h2_builder.max_concurrent_streams = max.into();
191+
self
192+
}
193+
194+
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
195+
/// connection alive.
196+
///
197+
/// Pass `None` to disable HTTP2 keep-alive.
198+
///
199+
/// Default is currently disabled.
200+
///
201+
/// # Cargo Feature
202+
///
203+
#[cfg(feature = "http2")]
204+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
205+
pub fn http2_keep_alive_interval(
206+
&mut self,
207+
interval: impl Into<Option<Duration>>,
208+
) -> &mut Self {
209+
self.h2_builder.keep_alive_interval = interval.into();
210+
self
211+
}
212+
213+
/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
214+
///
215+
/// If the ping is not acknowledged within the timeout, the connection will
216+
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
217+
///
218+
/// Default is 20 seconds.
219+
///
220+
/// # Cargo Feature
221+
///
222+
#[cfg(feature = "http2")]
223+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
224+
pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
225+
self.h2_builder.keep_alive_timeout = timeout;
226+
self
227+
}
228+
229+
/// Set the maximum write buffer size for each HTTP/2 stream.
230+
///
231+
/// Default is currently ~400KB, but may change.
232+
///
233+
/// # Panics
234+
///
235+
/// The value must be no larger than `u32::MAX`.
236+
#[cfg(feature = "http2")]
237+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
238+
pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
239+
assert!(max <= std::u32::MAX as usize);
240+
self.h2_builder.max_send_buffer_size = max;
241+
self
242+
}
243+
244+
/// Enables the [extended CONNECT protocol].
245+
///
246+
/// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
247+
#[cfg(feature = "http2")]
248+
pub fn http2_enable_connect_protocol(&mut self) -> &mut Self {
249+
self.h2_builder.enable_connect_protocol = true;
250+
self
251+
}
252+
253+
/// Sets the max size of received header frames.
254+
///
255+
/// Default is currently ~16MB, but may change.
256+
#[cfg(feature = "http2")]
257+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
258+
pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
259+
self.h2_builder.max_header_list_size = max;
260+
self
261+
}
262+
263+
/// Set the executor used to spawn background tasks.
264+
///
265+
/// Default uses implicit default (like `tokio::spawn`).
266+
pub fn with_executor<E2>(self, exec: E2) -> Builder<E2> {
267+
Builder {
268+
exec,
269+
timer: self.timer,
270+
h2_builder: self.h2_builder,
271+
}
272+
}
273+
274+
/// Set the timer used in background tasks.
275+
pub fn timer<M>(&mut self, timer: M) -> &mut Self
276+
where
277+
M: Timer + Send + Sync + 'static,
278+
{
279+
self.timer = Time::Timer(Arc::new(timer));
280+
self
281+
}
282+
283+
/// Bind a connection together with a [`Service`](crate::service::Service).
284+
///
285+
/// This returns a Future that must be polled in order for HTTP to be
286+
/// driven on the connection.
287+
///
288+
/// # Example
289+
///
290+
/// ```
291+
/// # use hyper::{Recv, Request, Response};
292+
/// # use hyper::service::Service;
293+
/// # use hyper::server::conn::Http;
294+
/// # use tokio::io::{AsyncRead, AsyncWrite};
295+
/// # async fn run<I, S>(some_io: I, some_service: S)
296+
/// # where
297+
/// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
298+
/// # S: Service<hyper::Request<Recv>, Response=hyper::Response<Recv>> + Send + 'static,
299+
/// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
300+
/// # S::Future: Send,
301+
/// # {
302+
/// let http = Http::new();
303+
/// let conn = http.serve_connection(some_io, some_service);
304+
///
305+
/// if let Err(e) = conn.await {
306+
/// eprintln!("server connection error: {}", e);
307+
/// }
308+
/// # }
309+
/// # fn main() {}
310+
/// ```
311+
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
312+
where
313+
S: HttpService<Recv, ResBody = Bd>,
314+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
315+
Bd: Body + 'static,
316+
Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
317+
I: AsyncRead + AsyncWrite + Unpin,
318+
E: ConnStreamExec<S::Future, Bd>,
319+
{
320+
let proto = proto::h2::Server::new(
321+
io,
322+
service,
323+
&self.h2_builder,
324+
self.exec.clone(),
325+
self.timer.clone(),
326+
);
327+
Connection {
328+
conn: proto,
329+
}
330+
}
331+
}

‎src/server/conn.rs ‎src/server/conn/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ use crate::error::{Kind, Parse};
5858
use crate::upgrade::Upgraded;
5959
use crate::{common::time::Time, rt::Timer};
6060

61+
#[cfg(feature = "http1")]
62+
pub mod http1;
63+
#[cfg(feature = "http2")]
64+
pub mod http2;
65+
6166
cfg_feature! {
6267
#![any(feature = "http1", feature = "http2")]
6368

0 commit comments

Comments
 (0)
Please sign in to comment.