Skip to content

Commit a6d4fcb

Browse files
authoredDec 29, 2020
feat(server): expose Accept without httpX features (#2382)
1 parent 510b998 commit a6d4fcb

File tree

6 files changed

+458
-449
lines changed

6 files changed

+458
-449
lines changed
 

‎src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,10 @@ cfg_feature! {
102102
}
103103

104104
cfg_feature! {
105-
#![all(feature = "server", any(feature = "http1", feature = "http2"))]
105+
#![all(feature = "server")]
106106

107107
pub mod server;
108+
#[cfg(any(feature = "http1", feature = "http2"))]
109+
#[doc(no_inline)]
108110
pub use crate::server::Server;
109111
}

‎src/server/conn.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use bytes::Bytes;
5757
use pin_project::pin_project;
5858
use tokio::io::{AsyncRead, AsyncWrite};
5959

60-
use super::Accept;
60+
use super::accept::Accept;
6161
use crate::body::{Body, HttpBody};
6262
use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec};
6363
#[cfg(feature = "http2")]

‎src/server/mod.rs

+8-445
Original file line numberDiff line numberDiff line change
@@ -52,452 +52,15 @@
5252
//! ```
5353
5454
pub mod accept;
55-
pub mod conn;
56-
mod shutdown;
57-
#[cfg(feature = "tcp")]
58-
mod tcp;
5955

60-
use std::error::Error as StdError;
61-
use std::fmt;
62-
#[cfg(feature = "tcp")]
63-
use std::net::{SocketAddr, TcpListener as StdTcpListener};
56+
cfg_feature! {
57+
#![any(feature = "http1", feature = "http2")]
6458

65-
#[cfg(feature = "tcp")]
66-
use std::time::Duration;
59+
pub use self::server::{Builder, Server};
6760

68-
use pin_project::pin_project;
69-
use tokio::io::{AsyncRead, AsyncWrite};
70-
71-
use self::accept::Accept;
72-
use crate::body::{Body, HttpBody};
73-
use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec};
74-
use crate::common::{task, Future, Pin, Poll, Unpin};
75-
use crate::service::{HttpService, MakeServiceRef};
76-
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
77-
// error that `hyper::server::Http` is private...
78-
use self::conn::{Http as Http_, NoopWatcher, SpawnAll};
79-
use self::shutdown::{Graceful, GracefulWatcher};
80-
#[cfg(feature = "tcp")]
81-
use self::tcp::AddrIncoming;
82-
83-
/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
84-
///
85-
/// `Server` is a `Future` mapping a bound listener with a set of service
86-
/// handlers. It is built using the [`Builder`](Builder), and the future
87-
/// completes when the server has been shutdown. It should be run by an
88-
/// `Executor`.
89-
#[pin_project]
90-
pub struct Server<I, S, E = Exec> {
91-
#[pin]
92-
spawn_all: SpawnAll<I, S, E>,
93-
}
94-
95-
/// A builder for a [`Server`](Server).
96-
#[derive(Debug)]
97-
pub struct Builder<I, E = Exec> {
98-
incoming: I,
99-
protocol: Http_<E>,
100-
}
101-
102-
// ===== impl Server =====
103-
104-
impl<I> Server<I, ()> {
105-
/// Starts a [`Builder`](Builder) with the provided incoming stream.
106-
pub fn builder(incoming: I) -> Builder<I> {
107-
Builder {
108-
incoming,
109-
protocol: Http_::new(),
110-
}
111-
}
112-
}
113-
114-
#[cfg(feature = "tcp")]
115-
impl Server<AddrIncoming, ()> {
116-
/// Binds to the provided address, and returns a [`Builder`](Builder).
117-
///
118-
/// # Panics
119-
///
120-
/// This method will panic if binding to the address fails. For a method
121-
/// to bind to an address and return a `Result`, see `Server::try_bind`.
122-
pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
123-
let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| {
124-
panic!("error binding to {}: {}", addr, e);
125-
});
126-
Server::builder(incoming)
127-
}
128-
129-
/// Tries to bind to the provided address, and returns a [`Builder`](Builder).
130-
pub fn try_bind(addr: &SocketAddr) -> crate::Result<Builder<AddrIncoming>> {
131-
AddrIncoming::new(addr).map(Server::builder)
132-
}
133-
134-
/// Create a new instance from a `std::net::TcpListener` instance.
135-
pub fn from_tcp(listener: StdTcpListener) -> Result<Builder<AddrIncoming>, crate::Error> {
136-
AddrIncoming::from_std(listener).map(Server::builder)
137-
}
138-
}
139-
140-
#[cfg(feature = "tcp")]
141-
impl<S, E> Server<AddrIncoming, S, E> {
142-
/// Returns the local address that this server is bound to.
143-
pub fn local_addr(&self) -> SocketAddr {
144-
self.spawn_all.local_addr()
145-
}
146-
}
147-
148-
impl<I, IO, IE, S, E, B> Server<I, S, E>
149-
where
150-
I: Accept<Conn = IO, Error = IE>,
151-
IE: Into<Box<dyn StdError + Send + Sync>>,
152-
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
153-
S: MakeServiceRef<IO, Body, ResBody = B>,
154-
S::Error: Into<Box<dyn StdError + Send + Sync>>,
155-
B: HttpBody + Send + Sync + 'static,
156-
B::Error: Into<Box<dyn StdError + Send + Sync>>,
157-
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
158-
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
159-
{
160-
/// Prepares a server to handle graceful shutdown when the provided future
161-
/// completes.
162-
///
163-
/// # Example
164-
///
165-
/// ```
166-
/// # fn main() {}
167-
/// # #[cfg(feature = "tcp")]
168-
/// # async fn run() {
169-
/// # use hyper::{Body, Response, Server, Error};
170-
/// # use hyper::service::{make_service_fn, service_fn};
171-
/// # let make_service = make_service_fn(|_| async {
172-
/// # Ok::<_, Error>(service_fn(|_req| async {
173-
/// # Ok::<_, Error>(Response::new(Body::from("Hello World")))
174-
/// # }))
175-
/// # });
176-
/// // Make a server from the previous examples...
177-
/// let server = Server::bind(&([127, 0, 0, 1], 3000).into())
178-
/// .serve(make_service);
179-
///
180-
/// // Prepare some signal for when the server should start shutting down...
181-
/// let (tx, rx) = tokio::sync::oneshot::channel::<()>();
182-
/// let graceful = server
183-
/// .with_graceful_shutdown(async {
184-
/// rx.await.ok();
185-
/// });
186-
///
187-
/// // Await the `server` receiving the signal...
188-
/// if let Err(e) = graceful.await {
189-
/// eprintln!("server error: {}", e);
190-
/// }
191-
///
192-
/// // And later, trigger the signal by calling `tx.send(())`.
193-
/// let _ = tx.send(());
194-
/// # }
195-
/// ```
196-
pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E>
197-
where
198-
F: Future<Output = ()>,
199-
{
200-
Graceful::new(self.spawn_all, signal)
201-
}
202-
}
203-
204-
impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
205-
where
206-
I: Accept<Conn = IO, Error = IE>,
207-
IE: Into<Box<dyn StdError + Send + Sync>>,
208-
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
209-
S: MakeServiceRef<IO, Body, ResBody = B>,
210-
S::Error: Into<Box<dyn StdError + Send + Sync>>,
211-
B: HttpBody + 'static,
212-
B::Error: Into<Box<dyn StdError + Send + Sync>>,
213-
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
214-
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
215-
{
216-
type Output = crate::Result<()>;
217-
218-
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
219-
self.project().spawn_all.poll_watch(cx, &NoopWatcher)
220-
}
221-
}
222-
223-
impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
224-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225-
f.debug_struct("Server")
226-
.field("listener", &self.spawn_all.incoming_ref())
227-
.finish()
228-
}
229-
}
230-
231-
// ===== impl Builder =====
232-
233-
impl<I, E> Builder<I, E> {
234-
/// Start a new builder, wrapping an incoming stream and low-level options.
235-
///
236-
/// For a more convenient constructor, see [`Server::bind`](Server::bind).
237-
pub fn new(incoming: I, protocol: Http_<E>) -> Self {
238-
Builder { incoming, protocol }
239-
}
240-
241-
/// Sets whether to use keep-alive for HTTP/1 connections.
242-
///
243-
/// Default is `true`.
244-
#[cfg(feature = "http1")]
245-
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
246-
pub fn http1_keepalive(mut self, val: bool) -> Self {
247-
self.protocol.http1_keep_alive(val);
248-
self
249-
}
250-
251-
/// Set whether HTTP/1 connections should support half-closures.
252-
///
253-
/// Clients can chose to shutdown their write-side while waiting
254-
/// for the server to respond. Setting this to `true` will
255-
/// prevent closing the connection immediately if `read`
256-
/// detects an EOF in the middle of a request.
257-
///
258-
/// Default is `false`.
259-
#[cfg(feature = "http1")]
260-
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
261-
pub fn http1_half_close(mut self, val: bool) -> Self {
262-
self.protocol.http1_half_close(val);
263-
self
264-
}
265-
266-
/// Set the maximum buffer size.
267-
///
268-
/// Default is ~ 400kb.
269-
#[cfg(feature = "http1")]
270-
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
271-
pub fn http1_max_buf_size(mut self, val: usize) -> Self {
272-
self.protocol.max_buf_size(val);
273-
self
274-
}
275-
276-
// Sets whether to bunch up HTTP/1 writes until the read buffer is empty.
277-
//
278-
// This isn't really desirable in most cases, only really being useful in
279-
// silly pipeline benchmarks.
280-
#[doc(hidden)]
281-
#[cfg(feature = "http1")]
282-
pub fn http1_pipeline_flush(mut self, val: bool) -> Self {
283-
self.protocol.pipeline_flush(val);
284-
self
285-
}
286-
287-
/// Sets whether HTTP/1 is required.
288-
///
289-
/// Default is `false`.
290-
#[cfg(feature = "http1")]
291-
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
292-
pub fn http1_only(mut self, val: bool) -> Self {
293-
self.protocol.http1_only(val);
294-
self
295-
}
296-
297-
/// Sets whether HTTP/2 is required.
298-
///
299-
/// Default is `false`.
300-
#[cfg(feature = "http2")]
301-
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
302-
pub fn http2_only(mut self, val: bool) -> Self {
303-
self.protocol.http2_only(val);
304-
self
305-
}
306-
307-
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
308-
/// stream-level flow control.
309-
///
310-
/// Passing `None` will do nothing.
311-
///
312-
/// If not set, hyper will use a default.
313-
///
314-
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
315-
#[cfg(feature = "http2")]
316-
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
317-
pub fn http2_initial_stream_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
318-
self.protocol.http2_initial_stream_window_size(sz.into());
319-
self
320-
}
321-
322-
/// Sets the max connection-level flow control for HTTP2
323-
///
324-
/// Passing `None` will do nothing.
325-
///
326-
/// If not set, hyper will use a default.
327-
#[cfg(feature = "http2")]
328-
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
329-
pub fn http2_initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
330-
self.protocol
331-
.http2_initial_connection_window_size(sz.into());
332-
self
333-
}
334-
335-
/// Sets whether to use an adaptive flow control.
336-
///
337-
/// Enabling this will override the limits set in
338-
/// `http2_initial_stream_window_size` and
339-
/// `http2_initial_connection_window_size`.
340-
#[cfg(feature = "http2")]
341-
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
342-
pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
343-
self.protocol.http2_adaptive_window(enabled);
344-
self
345-
}
346-
347-
/// Sets the maximum frame size to use for HTTP2.
348-
///
349-
/// Passing `None` will do nothing.
350-
///
351-
/// If not set, hyper will use a default.
352-
#[cfg(feature = "http2")]
353-
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
354-
pub fn http2_max_frame_size(mut self, sz: impl Into<Option<u32>>) -> Self {
355-
self.protocol.http2_max_frame_size(sz);
356-
self
357-
}
358-
359-
/// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
360-
/// connections.
361-
///
362-
/// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
363-
///
364-
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
365-
#[cfg(feature = "http2")]
366-
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
367-
pub fn http2_max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
368-
self.protocol.http2_max_concurrent_streams(max.into());
369-
self
370-
}
371-
372-
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
373-
/// connection alive.
374-
///
375-
/// Pass `None` to disable HTTP2 keep-alive.
376-
///
377-
/// Default is currently disabled.
378-
///
379-
/// # Cargo Feature
380-
///
381-
/// Requires the `runtime` cargo feature to be enabled.
382-
#[cfg(feature = "runtime")]
383-
#[cfg(feature = "http2")]
384-
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
385-
pub fn http2_keep_alive_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
386-
self.protocol.http2_keep_alive_interval(interval);
387-
self
388-
}
389-
390-
/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
391-
///
392-
/// If the ping is not acknowledged within the timeout, the connection will
393-
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
394-
///
395-
/// Default is 20 seconds.
396-
///
397-
/// # Cargo Feature
398-
///
399-
/// Requires the `runtime` cargo feature to be enabled.
400-
#[cfg(feature = "runtime")]
401-
#[cfg(feature = "http2")]
402-
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
403-
pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
404-
self.protocol.http2_keep_alive_timeout(timeout);
405-
self
406-
}
407-
408-
/// Sets the `Executor` to deal with connection tasks.
409-
///
410-
/// Default is `tokio::spawn`.
411-
pub fn executor<E2>(self, executor: E2) -> Builder<I, E2> {
412-
Builder {
413-
incoming: self.incoming,
414-
protocol: self.protocol.with_executor(executor),
415-
}
416-
}
417-
418-
/// Consume this `Builder`, creating a [`Server`](Server).
419-
///
420-
/// # Example
421-
///
422-
/// ```
423-
/// # #[cfg(feature = "tcp")]
424-
/// # async fn run() {
425-
/// use hyper::{Body, Error, Response, Server};
426-
/// use hyper::service::{make_service_fn, service_fn};
427-
///
428-
/// // Construct our SocketAddr to listen on...
429-
/// let addr = ([127, 0, 0, 1], 3000).into();
430-
///
431-
/// // And a MakeService to handle each connection...
432-
/// let make_svc = make_service_fn(|_| async {
433-
/// Ok::<_, Error>(service_fn(|_req| async {
434-
/// Ok::<_, Error>(Response::new(Body::from("Hello World")))
435-
/// }))
436-
/// });
437-
///
438-
/// // Then bind and serve...
439-
/// let server = Server::bind(&addr)
440-
/// .serve(make_svc);
441-
///
442-
/// // Run forever-ish...
443-
/// if let Err(err) = server.await {
444-
/// eprintln!("server error: {}", err);
445-
/// }
446-
/// # }
447-
/// ```
448-
pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
449-
where
450-
I: Accept,
451-
I::Error: Into<Box<dyn StdError + Send + Sync>>,
452-
I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
453-
S: MakeServiceRef<I::Conn, Body, ResBody = B>,
454-
S::Error: Into<Box<dyn StdError + Send + Sync>>,
455-
B: HttpBody + 'static,
456-
B::Error: Into<Box<dyn StdError + Send + Sync>>,
457-
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
458-
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
459-
{
460-
let serve = self.protocol.serve(self.incoming, new_service);
461-
let spawn_all = serve.spawn_all();
462-
Server { spawn_all }
463-
}
464-
}
465-
466-
#[cfg(feature = "tcp")]
467-
impl<E> Builder<AddrIncoming, E> {
468-
/// Set whether TCP keepalive messages are enabled on accepted connections.
469-
///
470-
/// If `None` is specified, keepalive is disabled, otherwise the duration
471-
/// specified will be the time to remain idle before sending TCP keepalive
472-
/// probes.
473-
pub fn tcp_keepalive(mut self, keepalive: Option<Duration>) -> Self {
474-
self.incoming.set_keepalive(keepalive);
475-
self
476-
}
477-
478-
/// Set the value of `TCP_NODELAY` option for accepted connections.
479-
pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
480-
self.incoming.set_nodelay(enabled);
481-
self
482-
}
483-
484-
/// Set whether to sleep on accept errors.
485-
///
486-
/// A possible scenario is that the process has hit the max open files
487-
/// allowed, and so trying to accept a new connection will fail with
488-
/// EMFILE. In some cases, it's preferable to just wait for some time, if
489-
/// the application will likely close some files (or connections), and try
490-
/// to accept the connection again. If this option is true, the error will
491-
/// be logged at the error level, since it is still a big deal, and then
492-
/// the listener will sleep for 1 second.
493-
///
494-
/// In other cases, hitting the max open files should be treat similarly
495-
/// to being out-of-memory, and simply error (and shutdown). Setting this
496-
/// option to false will allow that.
497-
///
498-
/// For more details see [`AddrIncoming::set_sleep_on_errors`]
499-
pub fn tcp_sleep_on_accept_errors(mut self, val: bool) -> Self {
500-
self.incoming.set_sleep_on_errors(val);
501-
self
502-
}
61+
pub mod conn;
62+
mod server;
63+
mod shutdown;
64+
#[cfg(feature = "tcp")]
65+
mod tcp;
50366
}

‎src/server/server.rs

+444
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,444 @@
1+
use std::error::Error as StdError;
2+
use std::fmt;
3+
#[cfg(feature = "tcp")]
4+
use std::net::{SocketAddr, TcpListener as StdTcpListener};
5+
6+
#[cfg(feature = "tcp")]
7+
use std::time::Duration;
8+
9+
use pin_project::pin_project;
10+
use tokio::io::{AsyncRead, AsyncWrite};
11+
12+
use super::accept::Accept;
13+
use crate::body::{Body, HttpBody};
14+
use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec};
15+
use crate::common::{task, Future, Pin, Poll, Unpin};
16+
use crate::service::{HttpService, MakeServiceRef};
17+
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
18+
// error that `hyper::server::Http` is private...
19+
use super::conn::{Http as Http_, NoopWatcher, SpawnAll};
20+
use super::shutdown::{Graceful, GracefulWatcher};
21+
#[cfg(feature = "tcp")]
22+
use super::tcp::AddrIncoming;
23+
24+
/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
25+
///
26+
/// `Server` is a `Future` mapping a bound listener with a set of service
27+
/// handlers. It is built using the [`Builder`](Builder), and the future
28+
/// completes when the server has been shutdown. It should be run by an
29+
/// `Executor`.
30+
#[pin_project]
31+
pub struct Server<I, S, E = Exec> {
32+
#[pin]
33+
spawn_all: SpawnAll<I, S, E>,
34+
}
35+
36+
/// A builder for a [`Server`](Server).
37+
#[derive(Debug)]
38+
pub struct Builder<I, E = Exec> {
39+
incoming: I,
40+
protocol: Http_<E>,
41+
}
42+
43+
// ===== impl Server =====
44+
45+
impl<I> Server<I, ()> {
46+
/// Starts a [`Builder`](Builder) with the provided incoming stream.
47+
pub fn builder(incoming: I) -> Builder<I> {
48+
Builder {
49+
incoming,
50+
protocol: Http_::new(),
51+
}
52+
}
53+
}
54+
55+
#[cfg(feature = "tcp")]
56+
impl Server<AddrIncoming, ()> {
57+
/// Binds to the provided address, and returns a [`Builder`](Builder).
58+
///
59+
/// # Panics
60+
///
61+
/// This method will panic if binding to the address fails. For a method
62+
/// to bind to an address and return a `Result`, see `Server::try_bind`.
63+
pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
64+
let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| {
65+
panic!("error binding to {}: {}", addr, e);
66+
});
67+
Server::builder(incoming)
68+
}
69+
70+
/// Tries to bind to the provided address, and returns a [`Builder`](Builder).
71+
pub fn try_bind(addr: &SocketAddr) -> crate::Result<Builder<AddrIncoming>> {
72+
AddrIncoming::new(addr).map(Server::builder)
73+
}
74+
75+
/// Create a new instance from a `std::net::TcpListener` instance.
76+
pub fn from_tcp(listener: StdTcpListener) -> Result<Builder<AddrIncoming>, crate::Error> {
77+
AddrIncoming::from_std(listener).map(Server::builder)
78+
}
79+
}
80+
81+
#[cfg(feature = "tcp")]
82+
impl<S, E> Server<AddrIncoming, S, E> {
83+
/// Returns the local address that this server is bound to.
84+
pub fn local_addr(&self) -> SocketAddr {
85+
self.spawn_all.local_addr()
86+
}
87+
}
88+
89+
impl<I, IO, IE, S, E, B> Server<I, S, E>
90+
where
91+
I: Accept<Conn = IO, Error = IE>,
92+
IE: Into<Box<dyn StdError + Send + Sync>>,
93+
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
94+
S: MakeServiceRef<IO, Body, ResBody = B>,
95+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
96+
B: HttpBody + Send + Sync + 'static,
97+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
98+
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
99+
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
100+
{
101+
/// Prepares a server to handle graceful shutdown when the provided future
102+
/// completes.
103+
///
104+
/// # Example
105+
///
106+
/// ```
107+
/// # fn main() {}
108+
/// # #[cfg(feature = "tcp")]
109+
/// # async fn run() {
110+
/// # use hyper::{Body, Response, Server, Error};
111+
/// # use hyper::service::{make_service_fn, service_fn};
112+
/// # let make_service = make_service_fn(|_| async {
113+
/// # Ok::<_, Error>(service_fn(|_req| async {
114+
/// # Ok::<_, Error>(Response::new(Body::from("Hello World")))
115+
/// # }))
116+
/// # });
117+
/// // Make a server from the previous examples...
118+
/// let server = Server::bind(&([127, 0, 0, 1], 3000).into())
119+
/// .serve(make_service);
120+
///
121+
/// // Prepare some signal for when the server should start shutting down...
122+
/// let (tx, rx) = tokio::sync::oneshot::channel::<()>();
123+
/// let graceful = server
124+
/// .with_graceful_shutdown(async {
125+
/// rx.await.ok();
126+
/// });
127+
///
128+
/// // Await the `server` receiving the signal...
129+
/// if let Err(e) = graceful.await {
130+
/// eprintln!("server error: {}", e);
131+
/// }
132+
///
133+
/// // And later, trigger the signal by calling `tx.send(())`.
134+
/// let _ = tx.send(());
135+
/// # }
136+
/// ```
137+
pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E>
138+
where
139+
F: Future<Output = ()>,
140+
{
141+
Graceful::new(self.spawn_all, signal)
142+
}
143+
}
144+
145+
impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
146+
where
147+
I: Accept<Conn = IO, Error = IE>,
148+
IE: Into<Box<dyn StdError + Send + Sync>>,
149+
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
150+
S: MakeServiceRef<IO, Body, ResBody = B>,
151+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
152+
B: HttpBody + 'static,
153+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
154+
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
155+
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
156+
{
157+
type Output = crate::Result<()>;
158+
159+
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
160+
self.project().spawn_all.poll_watch(cx, &NoopWatcher)
161+
}
162+
}
163+
164+
impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
165+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166+
f.debug_struct("Server")
167+
.field("listener", &self.spawn_all.incoming_ref())
168+
.finish()
169+
}
170+
}
171+
172+
// ===== impl Builder =====
173+
174+
impl<I, E> Builder<I, E> {
175+
/// Start a new builder, wrapping an incoming stream and low-level options.
176+
///
177+
/// For a more convenient constructor, see [`Server::bind`](Server::bind).
178+
pub fn new(incoming: I, protocol: Http_<E>) -> Self {
179+
Builder { incoming, protocol }
180+
}
181+
182+
/// Sets whether to use keep-alive for HTTP/1 connections.
183+
///
184+
/// Default is `true`.
185+
#[cfg(feature = "http1")]
186+
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
187+
pub fn http1_keepalive(mut self, val: bool) -> Self {
188+
self.protocol.http1_keep_alive(val);
189+
self
190+
}
191+
192+
/// Set whether HTTP/1 connections should support half-closures.
193+
///
194+
/// Clients can chose to shutdown their write-side while waiting
195+
/// for the server to respond. Setting this to `true` will
196+
/// prevent closing the connection immediately if `read`
197+
/// detects an EOF in the middle of a request.
198+
///
199+
/// Default is `false`.
200+
#[cfg(feature = "http1")]
201+
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
202+
pub fn http1_half_close(mut self, val: bool) -> Self {
203+
self.protocol.http1_half_close(val);
204+
self
205+
}
206+
207+
/// Set the maximum buffer size.
208+
///
209+
/// Default is ~ 400kb.
210+
#[cfg(feature = "http1")]
211+
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
212+
pub fn http1_max_buf_size(mut self, val: usize) -> Self {
213+
self.protocol.max_buf_size(val);
214+
self
215+
}
216+
217+
// Sets whether to bunch up HTTP/1 writes until the read buffer is empty.
218+
//
219+
// This isn't really desirable in most cases, only really being useful in
220+
// silly pipeline benchmarks.
221+
#[doc(hidden)]
222+
#[cfg(feature = "http1")]
223+
pub fn http1_pipeline_flush(mut self, val: bool) -> Self {
224+
self.protocol.pipeline_flush(val);
225+
self
226+
}
227+
228+
/// Sets whether HTTP/1 is required.
229+
///
230+
/// Default is `false`.
231+
#[cfg(feature = "http1")]
232+
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
233+
pub fn http1_only(mut self, val: bool) -> Self {
234+
self.protocol.http1_only(val);
235+
self
236+
}
237+
238+
/// Sets whether HTTP/2 is required.
239+
///
240+
/// Default is `false`.
241+
#[cfg(feature = "http2")]
242+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
243+
pub fn http2_only(mut self, val: bool) -> Self {
244+
self.protocol.http2_only(val);
245+
self
246+
}
247+
248+
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
249+
/// stream-level flow control.
250+
///
251+
/// Passing `None` will do nothing.
252+
///
253+
/// If not set, hyper will use a default.
254+
///
255+
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
256+
#[cfg(feature = "http2")]
257+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
258+
pub fn http2_initial_stream_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
259+
self.protocol.http2_initial_stream_window_size(sz.into());
260+
self
261+
}
262+
263+
/// Sets the max connection-level flow control for HTTP2
264+
///
265+
/// Passing `None` will do nothing.
266+
///
267+
/// If not set, hyper will use a default.
268+
#[cfg(feature = "http2")]
269+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
270+
pub fn http2_initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
271+
self.protocol
272+
.http2_initial_connection_window_size(sz.into());
273+
self
274+
}
275+
276+
/// Sets whether to use an adaptive flow control.
277+
///
278+
/// Enabling this will override the limits set in
279+
/// `http2_initial_stream_window_size` and
280+
/// `http2_initial_connection_window_size`.
281+
#[cfg(feature = "http2")]
282+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
283+
pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
284+
self.protocol.http2_adaptive_window(enabled);
285+
self
286+
}
287+
288+
/// Sets the maximum frame size to use for HTTP2.
289+
///
290+
/// Passing `None` will do nothing.
291+
///
292+
/// If not set, hyper will use a default.
293+
#[cfg(feature = "http2")]
294+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
295+
pub fn http2_max_frame_size(mut self, sz: impl Into<Option<u32>>) -> Self {
296+
self.protocol.http2_max_frame_size(sz);
297+
self
298+
}
299+
300+
/// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
301+
/// connections.
302+
///
303+
/// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
304+
///
305+
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
306+
#[cfg(feature = "http2")]
307+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
308+
pub fn http2_max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
309+
self.protocol.http2_max_concurrent_streams(max.into());
310+
self
311+
}
312+
313+
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
314+
/// connection alive.
315+
///
316+
/// Pass `None` to disable HTTP2 keep-alive.
317+
///
318+
/// Default is currently disabled.
319+
///
320+
/// # Cargo Feature
321+
///
322+
/// Requires the `runtime` cargo feature to be enabled.
323+
#[cfg(feature = "runtime")]
324+
#[cfg(feature = "http2")]
325+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
326+
pub fn http2_keep_alive_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
327+
self.protocol.http2_keep_alive_interval(interval);
328+
self
329+
}
330+
331+
/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
332+
///
333+
/// If the ping is not acknowledged within the timeout, the connection will
334+
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
335+
///
336+
/// Default is 20 seconds.
337+
///
338+
/// # Cargo Feature
339+
///
340+
/// Requires the `runtime` cargo feature to be enabled.
341+
#[cfg(feature = "runtime")]
342+
#[cfg(feature = "http2")]
343+
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
344+
pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
345+
self.protocol.http2_keep_alive_timeout(timeout);
346+
self
347+
}
348+
349+
/// Sets the `Executor` to deal with connection tasks.
350+
///
351+
/// Default is `tokio::spawn`.
352+
pub fn executor<E2>(self, executor: E2) -> Builder<I, E2> {
353+
Builder {
354+
incoming: self.incoming,
355+
protocol: self.protocol.with_executor(executor),
356+
}
357+
}
358+
359+
/// Consume this `Builder`, creating a [`Server`](Server).
360+
///
361+
/// # Example
362+
///
363+
/// ```
364+
/// # #[cfg(feature = "tcp")]
365+
/// # async fn run() {
366+
/// use hyper::{Body, Error, Response, Server};
367+
/// use hyper::service::{make_service_fn, service_fn};
368+
///
369+
/// // Construct our SocketAddr to listen on...
370+
/// let addr = ([127, 0, 0, 1], 3000).into();
371+
///
372+
/// // And a MakeService to handle each connection...
373+
/// let make_svc = make_service_fn(|_| async {
374+
/// Ok::<_, Error>(service_fn(|_req| async {
375+
/// Ok::<_, Error>(Response::new(Body::from("Hello World")))
376+
/// }))
377+
/// });
378+
///
379+
/// // Then bind and serve...
380+
/// let server = Server::bind(&addr)
381+
/// .serve(make_svc);
382+
///
383+
/// // Run forever-ish...
384+
/// if let Err(err) = server.await {
385+
/// eprintln!("server error: {}", err);
386+
/// }
387+
/// # }
388+
/// ```
389+
pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
390+
where
391+
I: Accept,
392+
I::Error: Into<Box<dyn StdError + Send + Sync>>,
393+
I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
394+
S: MakeServiceRef<I::Conn, Body, ResBody = B>,
395+
S::Error: Into<Box<dyn StdError + Send + Sync>>,
396+
B: HttpBody + 'static,
397+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
398+
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
399+
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
400+
{
401+
let serve = self.protocol.serve(self.incoming, new_service);
402+
let spawn_all = serve.spawn_all();
403+
Server { spawn_all }
404+
}
405+
}
406+
407+
#[cfg(feature = "tcp")]
408+
impl<E> Builder<AddrIncoming, E> {
409+
/// Set whether TCP keepalive messages are enabled on accepted connections.
410+
///
411+
/// If `None` is specified, keepalive is disabled, otherwise the duration
412+
/// specified will be the time to remain idle before sending TCP keepalive
413+
/// probes.
414+
pub fn tcp_keepalive(mut self, keepalive: Option<Duration>) -> Self {
415+
self.incoming.set_keepalive(keepalive);
416+
self
417+
}
418+
419+
/// Set the value of `TCP_NODELAY` option for accepted connections.
420+
pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
421+
self.incoming.set_nodelay(enabled);
422+
self
423+
}
424+
425+
/// Set whether to sleep on accept errors.
426+
///
427+
/// A possible scenario is that the process has hit the max open files
428+
/// allowed, and so trying to accept a new connection will fail with
429+
/// EMFILE. In some cases, it's preferable to just wait for some time, if
430+
/// the application will likely close some files (or connections), and try
431+
/// to accept the connection again. If this option is true, the error will
432+
/// be logged at the error level, since it is still a big deal, and then
433+
/// the listener will sleep for 1 second.
434+
///
435+
/// In other cases, hitting the max open files should be treat similarly
436+
/// to being out-of-memory, and simply error (and shutdown). Setting this
437+
/// option to false will allow that.
438+
///
439+
/// For more details see [`AddrIncoming::set_sleep_on_errors`]
440+
pub fn tcp_sleep_on_accept_errors(mut self, val: bool) -> Self {
441+
self.incoming.set_sleep_on_errors(val);
442+
self
443+
}
444+
}

‎src/server/shutdown.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use pin_project::pin_project;
44
use tokio::io::{AsyncRead, AsyncWrite};
55

66
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
7-
use super::Accept;
7+
use super::accept::Accept;
88
use crate::body::{Body, HttpBody};
99
use crate::common::drain::{self, Draining, Signal, Watch, Watching};
1010
use crate::common::exec::{ConnStreamExec, NewSvcExec};

‎src/server/tcp.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tokio::time::Sleep;
99
use crate::common::{task, Future, Pin, Poll};
1010

1111
pub use self::addr_stream::AddrStream;
12-
use super::Accept;
12+
use super::accept::Accept;
1313

1414
/// A stream of connections from binding to an address.
1515
#[must_use = "streams do nothing unless polled"]

0 commit comments

Comments
 (0)
Please sign in to comment.