Skip to content

Commit bf8d74a

Browse files
authoredJan 15, 2021
feat(body): add send_trailers to Body channel's Sender (#2387)
Closes #2260
1 parent 257d6a9 commit bf8d74a

File tree

1 file changed

+43
-20
lines changed

1 file changed

+43
-20
lines changed
 

‎src/body/body.rs

+43-20
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ use std::fmt;
55

66
use bytes::Bytes;
77
use futures_channel::mpsc;
8-
#[cfg(any(feature = "http1", feature = "http2"))]
9-
#[cfg(feature = "client")]
108
use futures_channel::oneshot;
119
use futures_core::Stream; // for mpsc::Receiver
1210
#[cfg(feature = "stream")]
@@ -17,14 +15,16 @@ use http_body::{Body as HttpBody, SizeHint};
1715
use super::DecodedLength;
1816
#[cfg(feature = "stream")]
1917
use crate::common::sync_wrapper::SyncWrapper;
20-
use crate::common::{task, watch, Pin, Poll};
18+
use crate::common::Future;
2119
#[cfg(any(feature = "http1", feature = "http2"))]
2220
#[cfg(feature = "client")]
23-
use crate::common::{Future, Never};
21+
use crate::common::Never;
22+
use crate::common::{task, watch, Pin, Poll};
2423
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
2524
use crate::proto::h2::ping;
2625

2726
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
27+
type TrailersSender = oneshot::Sender<HeaderMap>;
2828

2929
/// A stream of `Bytes`, used when receiving bodies.
3030
///
@@ -43,7 +43,8 @@ enum Kind {
4343
Chan {
4444
content_length: DecodedLength,
4545
want_tx: watch::Sender,
46-
rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
46+
data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
47+
trailers_rx: oneshot::Receiver<HeaderMap>,
4748
},
4849
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
4950
H2 {
@@ -106,7 +107,8 @@ enum DelayEof {
106107
#[must_use = "Sender does nothing unless sent on"]
107108
pub struct Sender {
108109
want_rx: watch::Receiver,
109-
tx: BodySender,
110+
data_tx: BodySender,
111+
trailers_tx: Option<TrailersSender>,
110112
}
111113

112114
const WANT_PENDING: usize = 1;
@@ -137,19 +139,25 @@ impl Body {
137139
}
138140

139141
pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
140-
let (tx, rx) = mpsc::channel(0);
142+
let (data_tx, data_rx) = mpsc::channel(0);
143+
let (trailers_tx, trailers_rx) = oneshot::channel();
141144

142145
// If wanter is true, `Sender::poll_ready()` won't becoming ready
143146
// until the `Body` has been polled for data once.
144147
let want = if wanter { WANT_PENDING } else { WANT_READY };
145148

146149
let (want_tx, want_rx) = watch::channel(want);
147150

148-
let tx = Sender { want_rx, tx };
151+
let tx = Sender {
152+
want_rx,
153+
data_tx,
154+
trailers_tx: Some(trailers_tx),
155+
};
149156
let rx = Body::new(Kind::Chan {
150157
content_length,
151158
want_tx,
152-
rx,
159+
data_rx,
160+
trailers_rx,
153161
});
154162

155163
(tx, rx)
@@ -282,12 +290,13 @@ impl Body {
282290
Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
283291
Kind::Chan {
284292
content_length: ref mut len,
285-
ref mut rx,
293+
ref mut data_rx,
286294
ref mut want_tx,
295+
..
287296
} => {
288297
want_tx.send(WANT_READY);
289298

290-
match ready!(Pin::new(rx).poll_next(cx)?) {
299+
match ready!(Pin::new(data_rx).poll_next(cx)?) {
291300
Some(chunk) => {
292301
len.sub_if(chunk.len() as u64);
293302
Poll::Ready(Some(Ok(chunk)))
@@ -368,10 +377,15 @@ impl HttpBody for Body {
368377
}
369378
Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
370379
},
371-
380+
Kind::Chan {
381+
ref mut trailers_rx,
382+
..
383+
} => match ready!(Pin::new(trailers_rx).poll(cx)) {
384+
Ok(t) => Poll::Ready(Ok(Some(t))),
385+
Err(_) => Poll::Ready(Ok(None)),
386+
},
372387
#[cfg(feature = "ffi")]
373388
Kind::Ffi(ref mut body) => body.poll_trailers(cx),
374-
375389
_ => Poll::Ready(Ok(None)),
376390
}
377391
}
@@ -527,7 +541,7 @@ impl Sender {
527541
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
528542
// Check if the receiver end has tried polling for the body yet
529543
ready!(self.poll_want(cx)?);
530-
self.tx
544+
self.data_tx
531545
.poll_ready(cx)
532546
.map_err(|_| crate::Error::new_closed())
533547
}
@@ -545,14 +559,23 @@ impl Sender {
545559
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
546560
}
547561

548-
/// Send data on this channel when it is ready.
562+
/// Send data on data channel when it is ready.
549563
pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
550564
self.ready().await?;
551-
self.tx
565+
self.data_tx
552566
.try_send(Ok(chunk))
553567
.map_err(|_| crate::Error::new_closed())
554568
}
555569

570+
/// Send trailers on trailers channel.
571+
pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
572+
let tx = match self.trailers_tx.take() {
573+
Some(tx) => tx,
574+
None => return Err(crate::Error::new_closed()),
575+
};
576+
tx.send(trailers).map_err(|_| crate::Error::new_closed())
577+
}
578+
556579
/// Try to send data on this channel.
557580
///
558581
/// # Errors
@@ -566,23 +589,23 @@ impl Sender {
566589
/// that doesn't have an async context. If in an async context, prefer
567590
/// `send_data()` instead.
568591
pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
569-
self.tx
592+
self.data_tx
570593
.try_send(Ok(chunk))
571594
.map_err(|err| err.into_inner().expect("just sent Ok"))
572595
}
573596

574597
/// Aborts the body in an abnormal fashion.
575598
pub fn abort(self) {
576599
let _ = self
577-
.tx
600+
.data_tx
578601
// clone so the send works even if buffer is full
579602
.clone()
580603
.try_send(Err(crate::Error::new_body_write_aborted()));
581604
}
582605

583606
#[cfg(feature = "http1")]
584607
pub(crate) fn send_error(&mut self, err: crate::Error) {
585-
let _ = self.tx.try_send(Err(err));
608+
let _ = self.data_tx.try_send(Err(err));
586609
}
587610
}
588611

@@ -628,7 +651,7 @@ mod tests {
628651

629652
assert_eq!(
630653
mem::size_of::<Sender>(),
631-
mem::size_of::<usize>() * 4,
654+
mem::size_of::<usize>() * 5,
632655
"Sender"
633656
);
634657

0 commit comments

Comments
 (0)
Please sign in to comment.