From 1f6464eea574bf956e606581e70b8e46223b0389 Mon Sep 17 00:00:00 2001 From: Vadim Egorov Date: Thu, 28 Jul 2022 12:08:40 -0700 Subject: [PATCH] Avoid spurious wakeups when stream capacity is not available Fixes #628 Sometimes `poll_capacity` returns `Ready(Some(0))` - in which case caller will have no way to wait for the stream capacity to become available. The previous attempt on the fix has addressed only a part of the problem. The root cause - in a nutshell - is the race condition between the application tasks that performs stream I/O and the task that serves the underlying HTTP/2 connection. The application thread that is about to send data calls `reserve_capacity/poll_capacity`, is provided with some send capacity and proceeds to `send_data`. Meanwhile the service thread may send some buffered data and/or receive some window updates - either way the stream's effective allocated send capacity may not change, but, since the capacity still available, `send_capacity_inc` flag may be set. The sending task calls `send_data` and uses the entire allocated capacity, leaving the flag set. Next time `poll_capacity` returns `Ready(Some(0))`. This change sets the flag and dispatches the wakeup event only in cases when the effective capacity reported by `poll_capacity` actually increases. --- src/proto/streams/prioritize.rs | 20 ++++-------- src/proto/streams/send.rs | 7 +---- src/proto/streams/stream.rs | 56 +++++++++++++++++++++++++-------- 3 files changed, 50 insertions(+), 33 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index f89a772f..88204ddc 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -323,9 +323,11 @@ impl Prioritize { /// connection pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { let available = stream.send_flow.available().as_size(); - stream.send_flow.claim_capacity(available); - // Re-assign all capacity to the connection - self.assign_connection_capacity(available, stream, counts); + if available > 0 { + stream.send_flow.claim_capacity(available); + // Re-assign all capacity to the connection + self.assign_connection_capacity(available, stream, counts); + } } /// Reclaim just reserved capacity, not buffered capacity, and re-assign @@ -756,17 +758,7 @@ impl Prioritize { // Update the flow control tracing::trace_span!("updating stream flow").in_scope(|| { - stream.send_flow.send_data(len); - - // Decrement the stream's buffered data counter - debug_assert!(stream.buffered_send_data >= len as usize); - stream.buffered_send_data -= len as usize; - stream.requested_send_capacity -= len; - - // If the capacity was limited because of the - // max_send_buffer_size, then consider waking - // the send task again... - stream.notify_if_can_buffer_more(self.max_buffer_size); + stream.send_data(len, self.max_buffer_size); // Assign the capacity back to the connection that // was just consumed from the stream in the previous diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 38896a30..20aba38d 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -333,12 +333,7 @@ impl Send { /// Current available stream send capacity pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { - let available = stream.send_flow.available().as_size() as usize; - let buffered = stream.buffered_send_data; - - available - .min(self.prioritize.max_buffer_size()) - .saturating_sub(buffered) as WindowSize + stream.capacity(self.prioritize.max_buffer_size()) } pub fn poll_reset( diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 68a29828..2888d744 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -264,35 +264,65 @@ impl Stream { self.ref_count == 0 && !self.state.is_closed() } + /// Current available stream send capacity + pub fn capacity(&self, max_buffer_size: usize) -> WindowSize { + let available = self.send_flow.available().as_size() as usize; + let buffered = self.buffered_send_data; + + available.min(max_buffer_size).saturating_sub(buffered) as WindowSize + } + pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { + let prev_capacity = self.capacity(max_buffer_size); debug_assert!(capacity > 0); self.send_flow.assign_capacity(capacity); tracing::trace!( - " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={}", + " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", self.send_flow.available(), self.buffered_send_data, self.id, - max_buffer_size + max_buffer_size, + prev_capacity, ); - self.notify_if_can_buffer_more(max_buffer_size); + if prev_capacity < self.capacity(max_buffer_size) { + self.notify_capacity(); + } } - /// If the capacity was limited because of the max_send_buffer_size, - /// then consider waking the send task again... - pub fn notify_if_can_buffer_more(&mut self, max_buffer_size: usize) { - let available = self.send_flow.available().as_size() as usize; - let buffered = self.buffered_send_data; + pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { + let prev_capacity = self.capacity(max_buffer_size); + + self.send_flow.send_data(len); - // Only notify if the capacity exceeds the amount of buffered data - if available.min(max_buffer_size) > buffered { - self.send_capacity_inc = true; - tracing::trace!(" notifying task"); - self.notify_send(); + // Decrement the stream's buffered data counter + debug_assert!(self.buffered_send_data >= len as usize); + self.buffered_send_data -= len as usize; + self.requested_send_capacity -= len; + + tracing::trace!( + " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", + self.send_flow.available(), + self.buffered_send_data, + self.id, + max_buffer_size, + prev_capacity, + ); + + if prev_capacity < self.capacity(max_buffer_size) { + self.notify_capacity(); } } + /// If the capacity was limited because of the max_send_buffer_size, + /// then consider waking the send task again... + pub fn notify_capacity(&mut self) { + self.send_capacity_inc = true; + tracing::trace!(" notifying task"); + self.notify_send(); + } + /// Returns `Err` when the decrement cannot be completed due to overflow. pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { match self.content_length {