Skip to content

Commit ac84af6

Browse files
authoredMay 13, 2024··
feat(http1): add support for receiving trailer fields (#3637)
This allows receiving HTTP/1 chunked trailers, both as a client and as a server. The number of trailer pairs is limited to 1024. The size of the trailer fields is limited. The limit accounts for a single, very large trailer field or many trailer fields that exceed the limit in aggregate. Closes #2703
1 parent e77cefe commit ac84af6

File tree

7 files changed

+661
-80
lines changed

7 files changed

+661
-80
lines changed
 

‎src/body/incoming.rs

+13
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,19 @@ impl Sender {
410410
.map_err(|err| err.into_inner().expect("just sent Ok"))
411411
}
412412

413+
#[cfg(feature = "http1")]
414+
pub(crate) fn try_send_trailers(
415+
&mut self,
416+
trailers: HeaderMap,
417+
) -> Result<(), Option<HeaderMap>> {
418+
let tx = match self.trailers_tx.take() {
419+
Some(tx) => tx,
420+
None => return Err(None),
421+
};
422+
423+
tx.send(trailers).map_err(|err| Some(err))
424+
}
425+
413426
#[cfg(test)]
414427
pub(crate) fn abort(mut self) {
415428
self.send_error(crate::Error::new_body_write_aborted());

‎src/proto/h1/conn.rs

+42-23
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use bytes::{Buf, Bytes};
1111
use futures_util::ready;
1212
use http::header::{HeaderValue, CONNECTION, TE};
1313
use http::{HeaderMap, Method, Version};
14+
use http_body::Frame;
1415
use httparse::ParserConfig;
1516

1617
use super::io::Buffered;
@@ -268,10 +269,20 @@ where
268269
self.try_keep_alive(cx);
269270
}
270271
} else if msg.expect_continue && msg.head.version.gt(&Version::HTTP_10) {
271-
self.state.reading = Reading::Continue(Decoder::new(msg.decode));
272+
let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
273+
self.state.reading = Reading::Continue(Decoder::new(
274+
msg.decode,
275+
self.state.h1_max_headers,
276+
h1_max_header_size,
277+
));
272278
wants = wants.add(Wants::EXPECT);
273279
} else {
274-
self.state.reading = Reading::Body(Decoder::new(msg.decode));
280+
let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
281+
self.state.reading = Reading::Body(Decoder::new(
282+
msg.decode,
283+
self.state.h1_max_headers,
284+
h1_max_header_size,
285+
));
275286
}
276287

277288
self.state.allow_trailer_fields = msg
@@ -312,33 +323,41 @@ where
312323
pub(crate) fn poll_read_body(
313324
&mut self,
314325
cx: &mut Context<'_>,
315-
) -> Poll<Option<io::Result<Bytes>>> {
326+
) -> Poll<Option<io::Result<Frame<Bytes>>>> {
316327
debug_assert!(self.can_read_body());
317328

318329
let (reading, ret) = match self.state.reading {
319330
Reading::Body(ref mut decoder) => {
320331
match ready!(decoder.decode(cx, &mut self.io)) {
321-
Ok(slice) => {
322-
let (reading, chunk) = if decoder.is_eof() {
323-
debug!("incoming body completed");
324-
(
325-
Reading::KeepAlive,
326-
if !slice.is_empty() {
327-
Some(Ok(slice))
328-
} else {
329-
None
330-
},
331-
)
332-
} else if slice.is_empty() {
333-
error!("incoming body unexpectedly ended");
334-
// This should be unreachable, since all 3 decoders
335-
// either set eof=true or return an Err when reading
336-
// an empty slice...
337-
(Reading::Closed, None)
332+
Ok(frame) => {
333+
if frame.is_data() {
334+
let slice = frame.data_ref().unwrap_or_else(|| unreachable!());
335+
let (reading, maybe_frame) = if decoder.is_eof() {
336+
debug!("incoming body completed");
337+
(
338+
Reading::KeepAlive,
339+
if !slice.is_empty() {
340+
Some(Ok(frame))
341+
} else {
342+
None
343+
},
344+
)
345+
} else if slice.is_empty() {
346+
error!("incoming body unexpectedly ended");
347+
// This should be unreachable, since all 3 decoders
348+
// either set eof=true or return an Err when reading
349+
// an empty slice...
350+
(Reading::Closed, None)
351+
} else {
352+
return Poll::Ready(Some(Ok(frame)));
353+
};
354+
(reading, Poll::Ready(maybe_frame))
355+
} else if frame.is_trailers() {
356+
(Reading::Closed, Poll::Ready(Some(Ok(frame))))
338357
} else {
339-
return Poll::Ready(Some(Ok(slice)));
340-
};
341-
(reading, Poll::Ready(chunk))
358+
trace!("discarding unknown frame");
359+
(Reading::Closed, Poll::Ready(None))
360+
}
342361
}
343362
Err(e) => {
344363
debug!("incoming body decode error: {}", e);

‎src/proto/h1/decode.rs

+416-45
Large diffs are not rendered by default.

‎src/proto/h1/dispatch.rs

+31-9
Original file line numberDiff line numberDiff line change
@@ -213,17 +213,39 @@ where
213213
}
214214
}
215215
match self.conn.poll_read_body(cx) {
216-
Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
217-
Ok(()) => {
218-
self.body_tx = Some(body);
219-
}
220-
Err(_canceled) => {
221-
if self.conn.can_read_body() {
222-
trace!("body receiver dropped before eof, closing");
223-
self.conn.close_read();
216+
Poll::Ready(Some(Ok(frame))) => {
217+
if frame.is_data() {
218+
let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
219+
match body.try_send_data(chunk) {
220+
Ok(()) => {
221+
self.body_tx = Some(body);
222+
}
223+
Err(_canceled) => {
224+
if self.conn.can_read_body() {
225+
trace!("body receiver dropped before eof, closing");
226+
self.conn.close_read();
227+
}
228+
}
229+
}
230+
} else if frame.is_trailers() {
231+
let trailers =
232+
frame.into_trailers().unwrap_or_else(|_| unreachable!());
233+
match body.try_send_trailers(trailers) {
234+
Ok(()) => {
235+
self.body_tx = Some(body);
236+
}
237+
Err(_canceled) => {
238+
if self.conn.can_read_body() {
239+
trace!("body receiver dropped before eof, closing");
240+
self.conn.close_read();
241+
}
242+
}
224243
}
244+
} else {
245+
// we should have dropped all unknown frames in poll_read_body
246+
error!("unexpected frame");
225247
}
226-
},
248+
}
227249
Poll::Ready(None) => {
228250
// just drop, the body will close automatically
229251
}

‎src/proto/h1/role.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::proto::h1::{
3030
use crate::proto::RequestHead;
3131
use crate::proto::{BodyLength, MessageHead, RequestLine};
3232

33-
const DEFAULT_MAX_HEADERS: usize = 100;
33+
pub(crate) const DEFAULT_MAX_HEADERS: usize = 100;
3434
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
3535
#[cfg(feature = "server")]
3636
const MAX_URI_LEN: usize = (u16::MAX - 1) as usize;

‎tests/client.rs

+114-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,17 @@ where
3434
b.collect().await.map(|c| c.to_bytes())
3535
}
3636

37+
async fn concat_with_trailers<B>(b: B) -> Result<(Bytes, Option<HeaderMap>), B::Error>
38+
where
39+
B: hyper::body::Body,
40+
{
41+
let collect = b.collect().await?;
42+
let trailers = collect.trailers().cloned();
43+
let bytes = collect.to_bytes();
44+
45+
Ok((bytes, trailers))
46+
}
47+
3748
async fn tcp_connect(addr: &SocketAddr) -> std::io::Result<TokioIo<TcpStream>> {
3849
TcpStream::connect(*addr).await.map(TokioIo::new)
3950
}
@@ -122,6 +133,9 @@ macro_rules! test {
122133
status: $client_status:ident,
123134
headers: { $($response_header_name:expr => $response_header_val:expr,)* },
124135
body: $response_body:expr,
136+
$(trailers: {$(
137+
$response_trailer_name:expr => $response_trailer_val:expr,
138+
)*},)?
125139
) => (
126140
#[test]
127141
fn $name() {
@@ -158,12 +172,23 @@ macro_rules! test {
158172
);
159173
)*
160174

161-
let body = rt.block_on(concat(res))
175+
let (body, _trailers) = rt.block_on(concat_with_trailers(res))
162176
.expect("body concat wait");
163177

164178
let expected_res_body = Option::<&[u8]>::from($response_body)
165179
.unwrap_or_default();
166180
assert_eq!(body.as_ref(), expected_res_body);
181+
182+
$($(
183+
assert_eq!(
184+
_trailers.as_ref().expect("trailers is None")
185+
.get($response_trailer_name)
186+
.expect(concat!("trailer header '", stringify!($response_trailer_name), "'")),
187+
$response_trailer_val,
188+
"trailer '{}'",
189+
stringify!($response_trailer_name),
190+
);
191+
)*)?
167192
}
168193
);
169194
(
@@ -679,6 +704,94 @@ test! {
679704
body: None,
680705
}
681706

707+
test! {
708+
name: client_res_body_chunked_with_trailer,
709+
710+
server:
711+
expected: "GET / HTTP/1.1\r\nte: trailers\r\nhost: {addr}\r\n\r\n",
712+
reply: "\
713+
HTTP/1.1 200 OK\r\n\
714+
transfer-encoding: chunked\r\n\
715+
trailer: chunky-trailer\r\n\
716+
\r\n\
717+
5\r\n\
718+
hello\r\n\
719+
0\r\n\
720+
chunky-trailer: header data\r\n\
721+
\r\n\
722+
",
723+
724+
client:
725+
request: {
726+
method: GET,
727+
url: "http://{addr}/",
728+
headers: {
729+
"te" => "trailers",
730+
},
731+
},
732+
response:
733+
status: OK,
734+
headers: {
735+
"Transfer-Encoding" => "chunked",
736+
},
737+
body: &b"hello"[..],
738+
trailers: {
739+
"chunky-trailer" => "header data",
740+
},
741+
}
742+
743+
test! {
744+
name: client_res_body_chunked_with_pathological_trailers,
745+
746+
server:
747+
expected: "GET / HTTP/1.1\r\nte: trailers\r\nhost: {addr}\r\n\r\n",
748+
reply: "\
749+
HTTP/1.1 200 OK\r\n\
750+
transfer-encoding: chunked\r\n\
751+
trailer: chunky-trailer1, chunky-trailer2, chunky-trailer3, chunky-trailer4, chunky-trailer5\r\n\
752+
\r\n\
753+
5\r\n\
754+
hello\r\n\
755+
0\r\n\
756+
chunky-trailer1: header data1\r\n\
757+
chunky-trailer2: header data2\r\n\
758+
chunky-trailer3: header data3\r\n\
759+
chunky-trailer4: header data4\r\n\
760+
chunky-trailer5: header data5\r\n\
761+
sneaky-trailer: not in trailer header\r\n\
762+
transfer-encoding: chunked\r\n\
763+
content-length: 5\r\n\
764+
trailer: foo\r\n\
765+
\r\n\
766+
",
767+
768+
client:
769+
request: {
770+
method: GET,
771+
url: "http://{addr}/",
772+
headers: {
773+
"te" => "trailers",
774+
},
775+
},
776+
response:
777+
status: OK,
778+
headers: {
779+
"Transfer-Encoding" => "chunked",
780+
},
781+
body: &b"hello"[..],
782+
trailers: {
783+
"chunky-trailer1" => "header data1",
784+
"chunky-trailer2" => "header data2",
785+
"chunky-trailer3" => "header data3",
786+
"chunky-trailer4" => "header data4",
787+
"chunky-trailer5" => "header data5",
788+
"sneaky-trailer" => "not in trailer header",
789+
"transfer-encoding" => "chunked",
790+
"content-length" => "5",
791+
"trailer" => "foo",
792+
},
793+
}
794+
682795
test! {
683796
name: client_get_req_body_sized,
684797

‎tests/server.rs

+44-1
Original file line numberDiff line numberDiff line change
@@ -2693,7 +2693,7 @@ async fn http2_keep_alive_count_server_pings() {
26932693
}
26942694

26952695
#[test]
2696-
fn http1_trailer_fields() {
2696+
fn http1_trailer_send_fields() {
26972697
let body = futures_util::stream::once(async move { Ok("hello".into()) });
26982698
let mut headers = HeaderMap::new();
26992699
headers.insert("chunky-trailer", "header data".parse().unwrap());
@@ -2780,13 +2780,43 @@ fn http1_trailer_fields_not_allowed() {
27802780
assert_eq!(body, expected_body);
27812781
}
27822782

2783+
#[test]
2784+
fn http1_trailer_recv_fields() {
2785+
let server = serve();
2786+
let mut req = connect(server.addr());
2787+
req.write_all(
2788+
b"\
2789+
POST / HTTP/1.1\r\n\
2790+
trailer: chunky-trailer\r\n\
2791+
host: example.domain\r\n\
2792+
transfer-encoding: chunked\r\n\
2793+
\r\n\
2794+
5\r\n\
2795+
hello\r\n\
2796+
0\r\n\
2797+
chunky-trailer: header data\r\n\
2798+
\r\n\
2799+
",
2800+
)
2801+
.expect("writing");
2802+
2803+
assert_eq!(server.body(), b"hello");
2804+
2805+
let trailers = server.trailers();
2806+
assert_eq!(
2807+
trailers.get("chunky-trailer"),
2808+
Some(&"header data".parse().unwrap())
2809+
);
2810+
}
2811+
27832812
// -------------------------------------------------
27842813
// the Server that is used to run all the tests with
27852814
// -------------------------------------------------
27862815

27872816
struct Serve {
27882817
addr: SocketAddr,
27892818
msg_rx: mpsc::Receiver<Msg>,
2819+
trailers_rx: mpsc::Receiver<HeaderMap>,
27902820
reply_tx: Mutex<spmc::Sender<Reply>>,
27912821
shutdown_signal: Option<oneshot::Sender<()>>,
27922822
thread: Option<thread::JoinHandle<()>>,
@@ -2820,6 +2850,10 @@ impl Serve {
28202850
Ok(buf)
28212851
}
28222852

2853+
fn trailers(&self) -> HeaderMap {
2854+
self.trailers_rx.recv().expect("trailers")
2855+
}
2856+
28232857
fn reply(&self) -> ReplyBuilder<'_> {
28242858
ReplyBuilder { tx: &self.reply_tx }
28252859
}
@@ -2933,6 +2967,7 @@ impl Drop for Serve {
29332967
#[derive(Clone)]
29342968
struct TestService {
29352969
tx: mpsc::Sender<Msg>,
2970+
trailers_tx: mpsc::Sender<HeaderMap>,
29362971
reply: spmc::Receiver<Reply>,
29372972
}
29382973

@@ -2963,6 +2998,7 @@ impl Service<Request<IncomingBody>> for TestService {
29632998

29642999
fn call(&self, mut req: Request<IncomingBody>) -> Self::Future {
29653000
let tx = self.tx.clone();
3001+
let trailers_tx = self.trailers_tx.clone();
29663002
let replies = self.reply.clone();
29673003

29683004
Box::pin(async move {
@@ -2972,6 +3008,9 @@ impl Service<Request<IncomingBody>> for TestService {
29723008
if frame.is_data() {
29733009
tx.send(Msg::Chunk(frame.into_data().unwrap().to_vec()))
29743010
.unwrap();
3011+
} else if frame.is_trailers() {
3012+
let trailers = frame.into_trailers().unwrap();
3013+
trailers_tx.send(trailers).unwrap();
29753014
}
29763015
}
29773016
Err(err) => {
@@ -3100,6 +3139,7 @@ impl ServeOptions {
31003139

31013140
let (addr_tx, addr_rx) = mpsc::channel();
31023141
let (msg_tx, msg_rx) = mpsc::channel();
3142+
let (trailers_tx, trailers_rx) = mpsc::channel();
31033143
let (reply_tx, reply_rx) = spmc::channel();
31043144
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
31053145

@@ -3123,6 +3163,7 @@ impl ServeOptions {
31233163

31243164
loop {
31253165
let msg_tx = msg_tx.clone();
3166+
let trailers_tx = trailers_tx.clone();
31263167
let reply_rx = reply_rx.clone();
31273168

31283169
tokio::select! {
@@ -3135,6 +3176,7 @@ impl ServeOptions {
31353176
let reply_rx = reply_rx.clone();
31363177
let service = TestService {
31373178
tx: msg_tx,
3179+
trailers_tx,
31383180
reply: reply_rx,
31393181
};
31403182

@@ -3162,6 +3204,7 @@ impl ServeOptions {
31623204

31633205
Serve {
31643206
msg_rx,
3207+
trailers_rx,
31653208
reply_tx: Mutex::new(reply_tx),
31663209
addr,
31673210
shutdown_signal: Some(shutdown_tx),

0 commit comments

Comments
 (0)
Please sign in to comment.