Skip to content

Commit

Permalink
Fix for stalled streams (square#7801)
Browse files Browse the repository at this point in the history
(cherry picked from commit b11a6a8)
  • Loading branch information
yschimke committed May 13, 2023
1 parent 74724b2 commit a316383
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ class Http2Stream internal constructor(

val unacknowledgedBytesRead = readBytesTotal - readBytesAcknowledged
if (errorExceptionToDeliver == null &&
unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2) {
unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2
) {
// Flow control: notify the peer that we're ready for more data! Only send a
// WINDOW_UPDATE if the stream isn't in error.
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead)
Expand All @@ -387,8 +388,6 @@ class Http2Stream internal constructor(
}

if (readBytesDelivered != -1L) {
// Update connection.unacknowledgedBytesRead outside the synchronized block.
updateConnectionFlowControl(readBytesDelivered)
return readBytesDelivered
}

Expand Down Expand Up @@ -418,41 +417,39 @@ class Http2Stream internal constructor(
internal fun receive(source: BufferedSource, byteCount: Long) {
this@Http2Stream.assertThreadDoesntHoldLock()

var byteCount = byteCount
var remainingByteCount = byteCount

while (byteCount > 0L) {
while (remainingByteCount > 0L) {
val finished: Boolean
val flowControlError: Boolean
synchronized(this@Http2Stream) {
finished = this.finished
flowControlError = byteCount + readBuffer.size > maxByteCount
flowControlError = remainingByteCount + readBuffer.size > maxByteCount
}

// If the peer sends more data than we can handle, discard it and close the connection.
if (flowControlError) {
source.skip(byteCount)
source.skip(remainingByteCount)
closeLater(ErrorCode.FLOW_CONTROL_ERROR)
return
}

// Discard data received after the stream is finished. It's probably a benign race.
if (finished) {
source.skip(byteCount)
source.skip(remainingByteCount)
return
}

// Fill the receive buffer without holding any locks.
val read = source.read(receiveBuffer, byteCount)
val read = source.read(receiveBuffer, remainingByteCount)
if (read == -1L) throw EOFException()
byteCount -= read
remainingByteCount -= read

// Move the received data to the read buffer to the reader can read it. If this source has
// been closed since this read began we must discard the incoming data and tell the
// connection we've done so.
var bytesDiscarded = 0L
synchronized(this@Http2Stream) {
if (closed) {
bytesDiscarded = receiveBuffer.size
receiveBuffer.clear()
} else {
val wasEmpty = readBuffer.size == 0L
Expand All @@ -462,10 +459,13 @@ class Http2Stream internal constructor(
}
}
}
if (bytesDiscarded > 0L) {
updateConnectionFlowControl(bytesDiscarded)
}
}

// Update the connection flow control, as this is a shared resource.
// Even if our stream doesn't need more data, others might.
// But delay updating the stream flow control until that stream has been
// consumed
updateConnectionFlowControl(byteCount)
}

override fun timeout(): Timeout = readTimeout
Expand Down

0 comments on commit a316383

Please sign in to comment.