Skip to content

Commit c88011d

Browse files
committedSep 15, 2021
fix(client): don't reuse a connection while still flushing
A client connection that read a full response while the request body was still flushing would see incorrect behavior, since the pool would let it be checked out again for a new request. In debug builds, it would then panic, but in release builds it would intermix the 2nd request bytes with the body of the previous request. In practice, this only ever happens if a server replies with a full response before reading the full request, while also choosing to not close that connection. Most servers either wait for the full request, or close the connection after the new response is written, so as to stop reading.
1 parent e3ab409 commit c88011d

File tree

3 files changed

+49
-3
lines changed

3 files changed

+49
-3
lines changed
 

‎src/proto/h1/conn.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ where
7171
self.io.set_flush_pipeline(enabled);
7272
}
7373

74+
#[cfg(test)]
75+
pub(crate) fn set_write_strategy_queue(&mut self) {
76+
self.io.set_write_strategy_queue();
77+
}
78+
7479
pub(crate) fn set_max_buf_size(&mut self, max: usize) {
7580
self.io.set_max_buf_size(max);
7681
}
@@ -461,7 +466,7 @@ where
461466
}
462467
}
463468
match self.state.writing {
464-
Writing::Init => true,
469+
Writing::Init => self.io.can_headers_buf(),
465470
_ => false,
466471
}
467472
}

‎src/proto/h1/dispatch.rs

+28-1
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,6 @@ mod tests {
665665

666666
// Block at 0 for now, but we will release this response before
667667
// the request is ready to write later...
668-
//let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0);
669668
let (mut tx, rx) = crate::client::dispatch::channel();
670669
let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
671670
let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
@@ -692,6 +691,34 @@ mod tests {
692691
});
693692
}
694693

694+
#[tokio::test]
695+
async fn client_flushing_is_not_ready_for_next_request() {
696+
let _ = pretty_env_logger::try_init();
697+
698+
let (io, _handle) = tokio_test::io::Builder::new()
699+
.write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n")
700+
.read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
701+
.wait(std::time::Duration::from_secs(2))
702+
.build_with_handle();
703+
704+
let (mut tx, rx) = crate::client::dispatch::channel();
705+
let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
706+
conn.set_write_strategy_queue();
707+
708+
let dispatcher = Dispatcher::new(Client::new(rx), conn);
709+
let _dispatcher = tokio::spawn(async move { dispatcher.await });
710+
711+
let req = crate::Request::builder()
712+
.method("POST")
713+
.body(crate::Body::from("reee"))
714+
.unwrap();
715+
716+
let res = tx.try_send(req).unwrap().await.expect("response");
717+
drop(res);
718+
719+
assert!(!tx.is_ready());
720+
}
721+
695722
#[tokio::test]
696723
async fn body_empty_chunks_ignored() {
697724
let _ = pretty_env_logger::try_init();

‎src/proto/h1/io.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,18 @@ where
9898
}
9999

100100
#[cfg(feature = "server")]
101-
pub(crate) fn set_write_strategy_flatten(&mut self) {
101+
fn set_write_strategy_flatten(&mut self) {
102102
// this should always be called only at construction time,
103103
// so this assert is here to catch myself
104104
debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
105105
self.write_buf.set_strategy(WriteStrategy::Flatten);
106106
}
107107

108+
#[cfg(test)]
109+
pub(crate) fn set_write_strategy_queue(&mut self) {
110+
self.write_buf.set_strategy(WriteStrategy::Queue);
111+
}
112+
108113
pub(crate) fn read_buf(&self) -> &[u8] {
109114
self.read_buf.as_ref()
110115
}
@@ -121,6 +126,15 @@ where
121126
self.read_buf.capacity() - self.read_buf.len()
122127
}
123128

129+
/// Return whether we can append to the headers buffer.
130+
///
131+
/// Reasons we can't:
132+
/// - The write buf is in queue mode, and some of the past body is still
133+
/// needing to be flushed.
134+
pub(crate) fn can_headers_buf(&self) -> bool {
135+
!self.write_buf.queue.has_remaining()
136+
}
137+
124138
pub(crate) fn headers_buf(&mut self) -> &mut Vec<u8> {
125139
let buf = self.write_buf.headers_mut();
126140
&mut buf.bytes

0 commit comments

Comments
 (0)
Please sign in to comment.