From 865ca8617463426a9f6bb68982604207b072d911 Mon Sep 17 00:00:00 2001 From: Vadim Egorov Date: Thu, 16 Feb 2023 14:41:57 -0800 Subject: [PATCH 1/2] test erroneous poll_capacity makeup on window update --- tests/h2-tests/tests/flow_control.rs | 61 ++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 92e7a532..5caa2ec3 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1797,3 +1797,64 @@ async fn max_send_buffer_size_poll_capacity_wakes_task() { join(srv, client).await; } + +#[tokio::test] +async fn poll_capacity_wakeup_after_window_update() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv + .assert_client_handshake_with_settings(frames::settings().initial_window_size(10)) + .await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200)).await; + srv.recv_frame(frames::data(1, &b"abcde"[..])).await; + srv.send_frame(frames::window_update(1, 5)).await; + srv.send_frame(frames::window_update(1, 5)).await; + srv.recv_frame(frames::data(1, &b"abcde"[..])).await; + srv.recv_frame(frames::data(1, &b""[..]).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::Builder::new() + .max_send_buffer_size(5) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = h2.drive(response).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + stream.send_data("abcde".into(), false).unwrap(); + + stream.reserve_capacity(10); + assert_eq!(stream.capacity(), 0); + + let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await; + h2.drive(idle_ms(10)).await; + stream.send_data("abcde".into(), false).unwrap(); + + stream.reserve_capacity(5); + assert_eq!(stream.capacity(), 0); + + // This will panic if there is a bug causing h2 to return Ok(0) from poll_capacity. + let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await; + + stream.send_data("".into(), true).unwrap(); + + // Wait for the connection to close + h2.await.unwrap(); + }; + + join(srv, h2).await; +} From 1f6464eea574bf956e606581e70b8e46223b0389 Mon Sep 17 00:00:00 2001 From: Vadim Egorov Date: Thu, 28 Jul 2022 12:08:40 -0700 Subject: [PATCH 2/2] Avoid spurious wakeups when stream capacity is not available Fixes #628 Sometimes `poll_capacity` returns `Ready(Some(0))` - in which case caller will have no way to wait for the stream capacity to become available. The previous attempt on the fix has addressed only a part of the problem. The root cause - in a nutshell - is the race condition between the application tasks that performs stream I/O and the task that serves the underlying HTTP/2 connection. The application thread that is about to send data calls `reserve_capacity/poll_capacity`, is provided with some send capacity and proceeds to `send_data`. Meanwhile the service thread may send some buffered data and/or receive some window updates - either way the stream's effective allocated send capacity may not change, but, since the capacity still available, `send_capacity_inc` flag may be set. The sending task calls `send_data` and uses the entire allocated capacity, leaving the flag set. Next time `poll_capacity` returns `Ready(Some(0))`. This change sets the flag and dispatches the wakeup event only in cases when the effective capacity reported by `poll_capacity` actually increases. --- src/proto/streams/prioritize.rs | 20 ++++-------- src/proto/streams/send.rs | 7 +---- src/proto/streams/stream.rs | 56 +++++++++++++++++++++++++-------- 3 files changed, 50 insertions(+), 33 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index f89a772f..88204ddc 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -323,9 +323,11 @@ impl Prioritize { /// connection pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { let available = stream.send_flow.available().as_size(); - stream.send_flow.claim_capacity(available); - // Re-assign all capacity to the connection - self.assign_connection_capacity(available, stream, counts); + if available > 0 { + stream.send_flow.claim_capacity(available); + // Re-assign all capacity to the connection + self.assign_connection_capacity(available, stream, counts); + } } /// Reclaim just reserved capacity, not buffered capacity, and re-assign @@ -756,17 +758,7 @@ impl Prioritize { // Update the flow control tracing::trace_span!("updating stream flow").in_scope(|| { - stream.send_flow.send_data(len); - - // Decrement the stream's buffered data counter - debug_assert!(stream.buffered_send_data >= len as usize); - stream.buffered_send_data -= len as usize; - stream.requested_send_capacity -= len; - - // If the capacity was limited because of the - // max_send_buffer_size, then consider waking - // the send task again... - stream.notify_if_can_buffer_more(self.max_buffer_size); + stream.send_data(len, self.max_buffer_size); // Assign the capacity back to the connection that // was just consumed from the stream in the previous diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 38896a30..20aba38d 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -333,12 +333,7 @@ impl Send { /// Current available stream send capacity pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { - let available = stream.send_flow.available().as_size() as usize; - let buffered = stream.buffered_send_data; - - available - .min(self.prioritize.max_buffer_size()) - .saturating_sub(buffered) as WindowSize + stream.capacity(self.prioritize.max_buffer_size()) } pub fn poll_reset( diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 68a29828..2888d744 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -264,35 +264,65 @@ impl Stream { self.ref_count == 0 && !self.state.is_closed() } + /// Current available stream send capacity + pub fn capacity(&self, max_buffer_size: usize) -> WindowSize { + let available = self.send_flow.available().as_size() as usize; + let buffered = self.buffered_send_data; + + available.min(max_buffer_size).saturating_sub(buffered) as WindowSize + } + pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { + let prev_capacity = self.capacity(max_buffer_size); debug_assert!(capacity > 0); self.send_flow.assign_capacity(capacity); tracing::trace!( - " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={}", + " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", self.send_flow.available(), self.buffered_send_data, self.id, - max_buffer_size + max_buffer_size, + prev_capacity, ); - self.notify_if_can_buffer_more(max_buffer_size); + if prev_capacity < self.capacity(max_buffer_size) { + self.notify_capacity(); + } } - /// If the capacity was limited because of the max_send_buffer_size, - /// then consider waking the send task again... - pub fn notify_if_can_buffer_more(&mut self, max_buffer_size: usize) { - let available = self.send_flow.available().as_size() as usize; - let buffered = self.buffered_send_data; + pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { + let prev_capacity = self.capacity(max_buffer_size); + + self.send_flow.send_data(len); - // Only notify if the capacity exceeds the amount of buffered data - if available.min(max_buffer_size) > buffered { - self.send_capacity_inc = true; - tracing::trace!(" notifying task"); - self.notify_send(); + // Decrement the stream's buffered data counter + debug_assert!(self.buffered_send_data >= len as usize); + self.buffered_send_data -= len as usize; + self.requested_send_capacity -= len; + + tracing::trace!( + " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", + self.send_flow.available(), + self.buffered_send_data, + self.id, + max_buffer_size, + prev_capacity, + ); + + if prev_capacity < self.capacity(max_buffer_size) { + self.notify_capacity(); } } + /// If the capacity was limited because of the max_send_buffer_size, + /// then consider waking the send task again... + pub fn notify_capacity(&mut self) { + self.send_capacity_inc = true; + tracing::trace!(" notifying task"); + self.notify_send(); + } + /// Returns `Err` when the decrement cannot be completed due to overflow. pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { match self.content_length {