diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 3af1af3a..21c575a1 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -600,6 +600,16 @@ impl Recv { } } + // Received a frame, but no one cared about it. fix issue#648 + if !stream.is_recv { + tracing::trace!( + "recv_data; frame ignored on stream release {:?} for some time", + stream.id, + ); + self.release_connection_capacity(sz, &mut None); + return Ok(()); + } + // Update stream level flow control stream.recv_flow.send_data(sz); diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 36d515ba..de7f4f64 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -99,6 +99,9 @@ pub(super) struct Stream { /// Frames pending for this stream to read pub pending_recv: buffer::Deque, + /// When the RecvStream drop occurs, no data should be received. + pub is_recv: bool, + /// Task tracking receiving frames pub recv_task: Option, @@ -180,6 +183,7 @@ impl Stream { reset_at: None, next_reset_expire: None, pending_recv: buffer::Deque::new(), + is_recv: true, recv_task: None, pending_push_promises: store::Queue::new(), content_length: ContentLength::Omitted, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index aee64ca6..4bd671b0 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1345,12 +1345,13 @@ impl OpaqueStreamRef { .release_capacity(capacity, &mut stream, &mut me.actions.task) } + /// Clear the receive queue and set the status to no longer receive data frames. pub(crate) fn clear_recv_buffer(&mut self) { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let mut stream = me.store.resolve(self.key); - + stream.is_recv = false; me.actions.recv.clear_recv_buffer(&mut stream); }