Skip to content

Commit

Permalink
fix: streams awaiting capacity lockout (#730)
Browse files Browse the repository at this point in the history
This PR changes the the assign-capacity queue to prioritize streams that
are send-ready.

This is necessary to prevent a lockout when streams aren't able to proceed while
waiting for connection capacity, but there is none.

Closes hyperium/hyper#3338
  • Loading branch information
dswij authored and seanmonstar committed Jan 10, 2024
1 parent 0f412d8 commit 00de679
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Error {
#[derive(Debug)]
enum Kind {
/// A RST_STREAM frame was received or sent.
#[allow(dead_code)]
Reset(StreamId, Reason, Initiator),

/// A GO_AWAY frame was received or sent.
Expand Down
11 changes: 10 additions & 1 deletion src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,15 @@ impl Prioritize {
stream.requested_send_capacity =
cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;

self.try_assign_capacity(stream);
// `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
// cannot be assigned at the time it is called.
//
// Streams over the max concurrent count will still call `send_data` so we should be
// careful not to put it into `pending_capacity` as it will starve the connection
// capacity for other streams
if !stream.is_pending_open {
self.try_assign_capacity(stream);
}
}

if frame.is_end_stream() {
Expand Down Expand Up @@ -522,6 +530,7 @@ impl Prioritize {
loop {
if let Some(mut stream) = self.pop_pending_open(store, counts) {
self.pending_send.push_front(&mut stream);
self.try_assign_capacity(&mut stream);
}

match self.pop_frame(buffer, store, max_frame_len, counts) {
Expand Down
97 changes: 95 additions & 2 deletions tests/h2-tests/tests/prioritization.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::future::join;
use futures::{FutureExt, StreamExt};
use futures::future::{join, select};
use futures::{pin_mut, FutureExt, StreamExt};

use h2_support::prelude::*;
use h2_support::DEFAULT_WINDOW_SIZE;
use std::task::Context;
Expand Down Expand Up @@ -408,3 +409,95 @@ async fn send_data_receive_window_update() {

join(mock, h2).await;
}

#[tokio::test]
async fn stream_count_over_max_stream_limit_does_not_starve_capacity() {
use tokio::sync::oneshot;

h2_support::trace_init!();

let (io, mut srv) = mock::new();

let (tx, rx) = oneshot::channel();

let srv = async move {
let _ = srv
.assert_client_handshake_with_settings(
frames::settings()
// super tiny server
.max_concurrent_streams(1),
)
.await;
srv.recv_frame(frames::headers(1).request("POST", "http://example.com/"))
.await;

srv.recv_frame(frames::data(1, vec![0; 16384])).await;
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
srv.recv_frame(frames::data(1, vec![0; 16383]).eos()).await;
srv.send_frame(frames::headers(1).response(200).eos()).await;

// All of these connection capacities should be assigned to stream 3
srv.send_frame(frames::window_update(0, 16384)).await;
srv.send_frame(frames::window_update(0, 16384)).await;
srv.send_frame(frames::window_update(0, 16384)).await;
srv.send_frame(frames::window_update(0, 16383)).await;

// StreamId(3) should be able to send all of its request with the conn capacity
srv.recv_frame(frames::headers(3).request("POST", "http://example.com/"))
.await;
srv.recv_frame(frames::data(3, vec![0; 16384])).await;
srv.recv_frame(frames::data(3, vec![0; 16384])).await;
srv.recv_frame(frames::data(3, vec![0; 16384])).await;
srv.recv_frame(frames::data(3, vec![0; 16383]).eos()).await;
srv.send_frame(frames::headers(3).response(200).eos()).await;

// Then all the future stream is guaranteed to be send-able by induction
tx.send(()).unwrap();
};

fn request() -> Request<()> {
Request::builder()
.method(Method::POST)
.uri("http://example.com/")
.body(())
.unwrap()
}

let client = async move {
let (mut client, mut conn) = client::Builder::new()
.handshake::<_, Bytes>(io)
.await
.expect("handshake");

let (req1, mut send1) = client.send_request(request(), false).unwrap();
let (req2, mut send2) = client.send_request(request(), false).unwrap();

// Use up the connection window.
send1.send_data(vec![0; 65535].into(), true).unwrap();
// Queue up for more connection window.
send2.send_data(vec![0; 65535].into(), true).unwrap();

// Queue up more pending open streams
for _ in 0..5 {
let (_, mut send) = client.send_request(request(), false).unwrap();
send.send_data(vec![0; 65535].into(), true).unwrap();
}

let response = conn.drive(req1).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);

let response = conn.drive(req2).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);

let _ = rx.await;
};

let task = join(srv, client);
pin_mut!(task);

let t = tokio::time::sleep(Duration::from_secs(5)).map(|_| panic!("time out"));
pin_mut!(t);

select(task, t).await;
}

0 comments on commit 00de679

Please sign in to comment.