Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: h2 refactoring #3082

Merged
merged 4 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
208 changes: 128 additions & 80 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,22 @@ async function connectH2 (client, socket) {
util.addListener(session, 'goaway', onHTTP2GoAway)
util.addListener(session, 'close', function () {
const { [kClient]: client } = this
const { [kSocket]: socket } = client

const err = this[kSocket][kError] || new SocketError('closed', util.getSocketInfo(this))
const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))

client[kSocket] = null
client[kHTTP2Session] = null

assert(client[kPending] === 0)
if (client.destroyed) {
assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
util.errorRequest(client, request, err)
// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
util.errorRequest(client, request, err)
}
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
})

session.unref()
Expand All @@ -139,6 +133,24 @@ async function connectH2 (client, socket) {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})

util.addListener(socket, 'close', function () {
const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

client[kSocket] = null

if (this[kHTTP2Session] != null) {
this[kHTTP2Session].destroy(err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
})

let closed = false
socket.on('close', () => {
closed = true
Expand All @@ -155,10 +167,10 @@ async function connectH2 (client, socket) {

},
destroy (err, callback) {
session.destroy(err)
if (closed) {
queueMicrotask(callback)
} else {
// Destroying the socket will trigger the session close
socket.destroy(err).on('close', callback)
}
},
Expand Down Expand Up @@ -257,27 +269,30 @@ function writeH2 (client, request) {
headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
headers[HTTP2_HEADER_METHOD] = method

try {
// We are already connected, streams are pending.
// We can call on connect, and wait for abort
request.onConnect((err) => {
if (request.aborted || request.completed) {
return
}
// TODO: propagate abort to the stream writters, this can save duplication
// as we can consolidate the stream count just and only upon stream closed
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved
const abort = (err) => {
if (request.aborted || request.completed) {
return
}

err = err || new RequestAbortedError()
err = err || new RequestAbortedError()

if (stream != null) {
util.destroy(stream, err)
util.errorRequest(client, request, err)

session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}
}
if (stream != null) {
util.destroy(stream, err)
}

util.errorRequest(client, request, err)
})
// We do not destroy the socket as we can continue using the session
// the stream get's destroyed and the session remains to create new streams
util.destroy(body, err)
}

try {
// We are already connected, streams are pending.
// We can call on connect, and wait for abort
request.onConnect(abort)
} catch (err) {
util.errorRequest(client, request, err)
}
Expand All @@ -302,7 +317,6 @@ function writeH2 (client, request) {

stream.once('close', () => {
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (session[kOpenStreams] === 0) session.unref()
})

Expand Down Expand Up @@ -382,7 +396,7 @@ function writeH2 (client, request) {
writeBodyH2()
}

// Increment counter as we have new several streams open
// Increment counter as we have new streams open
++session[kOpenStreams]

stream.once('response', headers => {
Expand All @@ -394,7 +408,7 @@ function writeH2 (client, request) {
// the request remains in-flight and headers hasn't been received yet
// for those scenarios, best effort is to destroy the stream immediately
// as there's no value to keep it open.
if (request.aborted || request.completed) {
if (request.aborted) {
const err = new RequestAbortedError()
util.errorRequest(client, request, err)
util.destroy(stream, err)
Expand Down Expand Up @@ -424,14 +438,11 @@ function writeH2 (client, request) {
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}

const err = new InformationalError('HTTP/2: stream half-closed (remote)')
util.errorRequest(client, request, err)
util.destroy(stream, err)
abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
})

stream.once('close', () => {
Expand All @@ -442,21 +453,11 @@ function writeH2 (client, request) {
})

stream.once('error', function (err) {
if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
session[kOpenStreams] -= 1
util.errorRequest(client, request, err)
util.destroy(stream, err)
}
abort(err)
})

stream.once('frameError', (type, code) => {
const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
util.errorRequest(client, request, err)

if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
session[kOpenStreams] -= 1
util.destroy(stream, err)
}
abort(new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`))
})

// stream.on('aborted', () => {
Expand All @@ -479,37 +480,49 @@ function writeH2 (client, request) {

function writeBodyH2 () {
/* istanbul ignore else: assertion */
if (!body) {
request.onRequestSent()
if (!body || contentLength === 0) {
writeBuffer({
abort,
client,
request,
contentLength,
expectsPayload,
h2stream: stream,
body: null,
socket: client[kSocket]
})
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')
stream.cork()
stream.write(body)
stream.uncork()
stream.end()
request.onBodySent(body)
request.onRequestSent()
writeBuffer({
abort,
client,
request,
contentLength,
body,
expectsPayload,
h2stream: stream,
socket: client[kSocket]
})
} else if (util.isBlobLike(body)) {
if (typeof body.stream === 'function') {
writeIterable({
abort,
client,
request,
contentLength,
h2stream: stream,
expectsPayload,
h2stream: stream,
body: body.stream(),
socket: client[kSocket],
header: ''
socket: client[kSocket]
})
} else {
writeBlob({
abort,
body,
client,
request,
contentLength,
expectsPayload,
h2stream: stream,
header: '',
socket: client[kSocket]
})
}
Expand Down Expand Up @@ -541,7 +554,30 @@ function writeH2 (client, request) {
}
}

function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
function writeBuffer ({ abort, h2stream, body, client, request, socket, contentLength, expectsPayload }) {
try {
if (body != null && util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')
h2stream.cork()
h2stream.write(body)
h2stream.uncork()
h2stream.end()

request.onBodySent(body)
}

if (!expectsPayload) {
socket[kReset] = true
}

request.onRequestSent()
client[kResume]()
} catch (error) {
abort(error)
}
}

function writeStream ({ abort, socket, expectsPayload, h2stream, body, client, request, contentLength }) {
assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')

// For HTTP/2, is enough to pipe the stream
Expand All @@ -550,26 +586,29 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
h2stream,
(err) => {
if (err) {
util.destroy(body, err)
util.destroy(h2stream, err)
util.destroy(pipe, err)
abort(err)
} else {
util.removeAllListeners(pipe)
request.onRequestSent()

if (!expectsPayload) {
socket[kReset] = true
}

client[kResume]()
}
}
)

pipe.on('data', onPipeData)
pipe.once('end', () => {
pipe.removeListener('data', onPipeData)
util.destroy(pipe)
})
util.addListener(pipe, 'data', onPipeData)

function onPipeData (chunk) {
request.onBodySent(chunk)
}
}

async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
async function writeBlob ({ abort, h2stream, body, client, request, socket, contentLength, expectsPayload }) {
assert(contentLength === body.size, 'blob body must have content length')

try {
Expand All @@ -582,6 +621,7 @@ async function writeBlob ({ h2stream, body, client, request, socket, contentLeng
h2stream.cork()
h2stream.write(buffer)
h2stream.uncork()
h2stream.end()

request.onBodySent(buffer)
request.onRequestSent()
Expand All @@ -592,11 +632,11 @@ async function writeBlob ({ h2stream, body, client, request, socket, contentLeng

client[kResume]()
} catch (err) {
util.destroy(h2stream)
abort(err)
}
}

async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
async function writeIterable ({ abort, h2stream, body, client, request, socket, contentLength, expectsPayload }) {
assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')

let callback = null
Expand Down Expand Up @@ -635,11 +675,19 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
await waitForDrain()
}
}

h2stream.end()

request.onRequestSent()

if (!expectsPayload) {
socket[kReset] = true
}

client[kResume]()
} catch (err) {
h2stream.destroy(err)
abort(err)
} finally {
request.onRequestSent()
h2stream.end()
h2stream
.off('close', onDrain)
.off('drain', onDrain)
Expand Down