Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid spurious wakeups when stream capacity is not available #661

Merged
merged 2 commits into from Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 6 additions & 14 deletions src/proto/streams/prioritize.rs
Expand Up @@ -323,9 +323,11 @@ impl Prioritize {
/// connection
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
let available = stream.send_flow.available().as_size();
stream.send_flow.claim_capacity(available);
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream, counts);
if available > 0 {
stream.send_flow.claim_capacity(available);
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream, counts);
}
}

/// Reclaim just reserved capacity, not buffered capacity, and re-assign
Expand Down Expand Up @@ -756,17 +758,7 @@ impl Prioritize {

// Update the flow control
tracing::trace_span!("updating stream flow").in_scope(|| {
stream.send_flow.send_data(len);

// Decrement the stream's buffered data counter
debug_assert!(stream.buffered_send_data >= len as usize);
stream.buffered_send_data -= len as usize;
stream.requested_send_capacity -= len;

// If the capacity was limited because of the
// max_send_buffer_size, then consider waking
// the send task again...
stream.notify_if_can_buffer_more(self.max_buffer_size);
stream.send_data(len, self.max_buffer_size);

// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
Expand Down
7 changes: 1 addition & 6 deletions src/proto/streams/send.rs
Expand Up @@ -333,12 +333,7 @@ impl Send {

/// Current available stream send capacity
pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
let available = stream.send_flow.available().as_size() as usize;
let buffered = stream.buffered_send_data;

available
.min(self.prioritize.max_buffer_size())
.saturating_sub(buffered) as WindowSize
stream.capacity(self.prioritize.max_buffer_size())
}

pub fn poll_reset(
Expand Down
56 changes: 43 additions & 13 deletions src/proto/streams/stream.rs
Expand Up @@ -264,35 +264,65 @@ impl Stream {
self.ref_count == 0 && !self.state.is_closed()
}

/// Current available stream send capacity
pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
let available = self.send_flow.available().as_size() as usize;
let buffered = self.buffered_send_data;

available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
}

pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
let prev_capacity = self.capacity(max_buffer_size);
debug_assert!(capacity > 0);
self.send_flow.assign_capacity(capacity);

tracing::trace!(
" assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={}",
" assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
self.send_flow.available(),
self.buffered_send_data,
self.id,
max_buffer_size
max_buffer_size,
prev_capacity,
);

self.notify_if_can_buffer_more(max_buffer_size);
if prev_capacity < self.capacity(max_buffer_size) {
self.notify_capacity();
}
}

/// If the capacity was limited because of the max_send_buffer_size,
/// then consider waking the send task again...
pub fn notify_if_can_buffer_more(&mut self, max_buffer_size: usize) {
let available = self.send_flow.available().as_size() as usize;
let buffered = self.buffered_send_data;
pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
let prev_capacity = self.capacity(max_buffer_size);

self.send_flow.send_data(len);

// Only notify if the capacity exceeds the amount of buffered data
if available.min(max_buffer_size) > buffered {
self.send_capacity_inc = true;
tracing::trace!(" notifying task");
self.notify_send();
// Decrement the stream's buffered data counter
debug_assert!(self.buffered_send_data >= len as usize);
self.buffered_send_data -= len as usize;
self.requested_send_capacity -= len;

tracing::trace!(
" sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
self.send_flow.available(),
self.buffered_send_data,
self.id,
max_buffer_size,
prev_capacity,
);

if prev_capacity < self.capacity(max_buffer_size) {
self.notify_capacity();
}
}

/// If the capacity was limited because of the max_send_buffer_size,
/// then consider waking the send task again...
pub fn notify_capacity(&mut self) {
self.send_capacity_inc = true;
tracing::trace!(" notifying task");
self.notify_send();
}

/// Returns `Err` when the decrement cannot be completed due to overflow.
pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
match self.content_length {
Expand Down
61 changes: 61 additions & 0 deletions tests/h2-tests/tests/flow_control.rs
Expand Up @@ -1797,3 +1797,64 @@ async fn max_send_buffer_size_poll_capacity_wakes_task() {

join(srv, client).await;
}

#[tokio::test]
async fn poll_capacity_wakeup_after_window_update() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
let settings = srv
.assert_client_handshake_with_settings(frames::settings().initial_window_size(10))
.await;
assert_default_settings!(settings);
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
.await;
srv.send_frame(frames::headers(1).response(200)).await;
srv.recv_frame(frames::data(1, &b"abcde"[..])).await;
srv.send_frame(frames::window_update(1, 5)).await;
srv.send_frame(frames::window_update(1, 5)).await;
srv.recv_frame(frames::data(1, &b"abcde"[..])).await;
srv.recv_frame(frames::data(1, &b""[..]).eos()).await;
};

let h2 = async move {
let (mut client, mut h2) = client::Builder::new()
.max_send_buffer_size(5)
.handshake::<_, Bytes>(io)
.await
.unwrap();
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (response, mut stream) = client.send_request(request, false).unwrap();

let response = h2.drive(response).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);

stream.send_data("abcde".into(), false).unwrap();

stream.reserve_capacity(10);
assert_eq!(stream.capacity(), 0);

let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await;
h2.drive(idle_ms(10)).await;
stream.send_data("abcde".into(), false).unwrap();

stream.reserve_capacity(5);
assert_eq!(stream.capacity(), 0);

// This will panic if there is a bug causing h2 to return Ok(0) from poll_capacity.
let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await;

stream.send_data("".into(), true).unwrap();

// Wait for the connection to close
h2.await.unwrap();
};

join(srv, h2).await;
}