Skip to content

Commit

Permalink
fix: stream body handling
Browse files Browse the repository at this point in the history
Make sure stream body always has an error handler.

Make sure not to leave dangling event listeners.
  • Loading branch information
ronag committed Nov 2, 2023
1 parent 41c253d commit 505cbcc
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 21 deletions.
24 changes: 5 additions & 19 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1462,23 +1462,7 @@ function _resume (client, sync) {
return
}

if (util.isStream(request.body) && util.bodyLength(request.body) === 0) {
request.body
.on('data', /* istanbul ignore next */ function () {
/* istanbul ignore next */
assert(false)
})
.on('error', function (err) {
errorRequest(client, request, err)
})
.on('end', function () {
util.destroy(this)
})

request.body = null
}

if (client[kRunning] > 0 &&
if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
(util.isStream(request.body) || util.isAsyncIterable(request.body))) {
// Request with stream or iterator body can error while other requests
// are inflight and indirectly error those as well.
Expand Down Expand Up @@ -1527,7 +1511,9 @@ function write (client, request) {
body.read(0)
}

let contentLength = util.bodyLength(body)
const bodyLength = util.bodyLength(body)

let contentLength = bodyLength

if (contentLength === null) {
contentLength = request.contentLength
Expand Down Expand Up @@ -1623,7 +1609,7 @@ function write (client, request) {
}

/* istanbul ignore else: assertion */
if (!body) {
if (!body || bodyLength === 0) {
if (contentLength === 0) {
socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
} else {
Expand Down
41 changes: 40 additions & 1 deletion lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,28 @@ class Request {

this.method = method

this.abort = null

if (body == null) {
this.body = null
} else if (util.isStream(body)) {
this.body = body

if (!this.body._readableState?.autoDestroy) {
this.endHandler = function () {
util.destroy(this)
}
this.body.on('end', this.endHandler)
}

this.errorHandler = err => {
if (this.abort) {
this.abort(err)
} else {
this.error = err
}
}
this.body.on('error', this.errorHandler)
} else if (util.isBuffer(body)) {
this.body = body.byteLength ? body : null
} else if (ArrayBuffer.isView(body)) {
Expand Down Expand Up @@ -236,7 +254,12 @@ class Request {
assert(!this.aborted)
assert(!this.completed)

return this[kHandler].onConnect(abort)
if (this.error) {
abort(this.error)
} else {
this.abort = abort
return this[kHandler].onConnect(abort)
}
}

onHeaders (statusCode, headers, resume, statusText) {
Expand Down Expand Up @@ -265,6 +288,8 @@ class Request {
}

onComplete (trailers) {
this.onFinally()

assert(!this.aborted)

this.completed = true
Expand All @@ -275,6 +300,8 @@ class Request {
}

onError (error) {
this.onFinally()

if (channels.error.hasSubscribers) {
channels.error.publish({ request: this, error })
}
Expand All @@ -286,6 +313,18 @@ class Request {
return this[kHandler].onError(error)
}

onFinally () {
if (this.errorHandler) {
this.body.off('error', this.errorHandler)
this.errorHandler = null
}

if (this.endHandler) {
this.body.off('end', this.endHandler)
this.endHandler = null
}
}

// TODO: adjust to support H2
addHeader (key, value) {
processHeader(this, key, value)
Expand Down
2 changes: 1 addition & 1 deletion lib/core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ function isReadableAborted (stream) {
}

function destroy (stream, err) {
if (!isStream(stream) || isDestroyed(stream)) {
if (stream == null || !isStream(stream) || isDestroyed(stream)) {
return
}

Expand Down

0 comments on commit 505cbcc

Please sign in to comment.