Skip to content

Commit

Permalink
refactor: h2 refactoring (#3082)
Browse files Browse the repository at this point in the history
* refactor: h2 refactoring

* test: add test for servername changed

* fix: leftover
  • Loading branch information
metcoder95 committed Apr 11, 2024
1 parent 836986d commit 2e128c1
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 85 deletions.
206 changes: 126 additions & 80 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,22 @@ async function connectH2 (client, socket) {
util.addListener(session, 'goaway', onHTTP2GoAway)
util.addListener(session, 'close', function () {
const { [kClient]: client } = this
const { [kSocket]: socket } = client

const err = this[kSocket][kError] || new SocketError('closed', util.getSocketInfo(this))
const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))

client[kSocket] = null
client[kHTTP2Session] = null

assert(client[kPending] === 0)
if (client.destroyed) {
assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
util.errorRequest(client, request, err)
// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
util.errorRequest(client, request, err)
}
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
})

session.unref()
Expand All @@ -139,6 +133,24 @@ async function connectH2 (client, socket) {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})

util.addListener(socket, 'close', function () {
const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

client[kSocket] = null

if (this[kHTTP2Session] != null) {
this[kHTTP2Session].destroy(err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
})

let closed = false
socket.on('close', () => {
closed = true
Expand All @@ -155,10 +167,10 @@ async function connectH2 (client, socket) {

},
destroy (err, callback) {
session.destroy(err)
if (closed) {
queueMicrotask(callback)
} else {
// Destroying the socket will trigger the session close
socket.destroy(err).on('close', callback)
}
},
Expand Down Expand Up @@ -257,27 +269,28 @@ function writeH2 (client, request) {
headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
headers[HTTP2_HEADER_METHOD] = method

try {
// We are already connected, streams are pending.
// We can call on connect, and wait for abort
request.onConnect((err) => {
if (request.aborted || request.completed) {
return
}
const abort = (err) => {
if (request.aborted || request.completed) {
return
}

err = err || new RequestAbortedError()
err = err || new RequestAbortedError()

if (stream != null) {
util.destroy(stream, err)
util.errorRequest(client, request, err)

session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}
}
if (stream != null) {
util.destroy(stream, err)
}

util.errorRequest(client, request, err)
})
// We do not destroy the socket as we can continue using the session
// the stream get's destroyed and the session remains to create new streams
util.destroy(body, err)
}

try {
// We are already connected, streams are pending.
// We can call on connect, and wait for abort
request.onConnect(abort)
} catch (err) {
util.errorRequest(client, request, err)
}
Expand All @@ -302,7 +315,6 @@ function writeH2 (client, request) {

stream.once('close', () => {
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (session[kOpenStreams] === 0) session.unref()
})

Expand Down Expand Up @@ -382,7 +394,7 @@ function writeH2 (client, request) {
writeBodyH2()
}

// Increment counter as we have new several streams open
// Increment counter as we have new streams open
++session[kOpenStreams]

stream.once('response', headers => {
Expand All @@ -394,7 +406,7 @@ function writeH2 (client, request) {
// the request remains in-flight and headers hasn't been received yet
// for those scenarios, best effort is to destroy the stream immediately
// as there's no value to keep it open.
if (request.aborted || request.completed) {
if (request.aborted) {
const err = new RequestAbortedError()
util.errorRequest(client, request, err)
util.destroy(stream, err)
Expand Down Expand Up @@ -424,14 +436,11 @@ function writeH2 (client, request) {
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}

const err = new InformationalError('HTTP/2: stream half-closed (remote)')
util.errorRequest(client, request, err)
util.destroy(stream, err)
abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
})

stream.once('close', () => {
Expand All @@ -442,21 +451,11 @@ function writeH2 (client, request) {
})

stream.once('error', function (err) {
if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
session[kOpenStreams] -= 1
util.errorRequest(client, request, err)
util.destroy(stream, err)
}
abort(err)
})

stream.once('frameError', (type, code) => {
const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
util.errorRequest(client, request, err)

if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
session[kOpenStreams] -= 1
util.destroy(stream, err)
}
abort(new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`))
})

// stream.on('aborted', () => {
Expand All @@ -479,37 +478,49 @@ function writeH2 (client, request) {

function writeBodyH2 () {
/* istanbul ignore else: assertion */
if (!body) {
request.onRequestSent()
if (!body || contentLength === 0) {
writeBuffer({
abort,
client,
request,
contentLength,
expectsPayload,
h2stream: stream,
body: null,
socket: client[kSocket]
})
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')
stream.cork()
stream.write(body)
stream.uncork()
stream.end()
request.onBodySent(body)
request.onRequestSent()
writeBuffer({
abort,
client,
request,
contentLength,
body,
expectsPayload,
h2stream: stream,
socket: client[kSocket]
})
} else if (util.isBlobLike(body)) {
if (typeof body.stream === 'function') {
writeIterable({
abort,
client,
request,
contentLength,
h2stream: stream,
expectsPayload,
h2stream: stream,
body: body.stream(),
socket: client[kSocket],
header: ''
socket: client[kSocket]
})
} else {
writeBlob({
abort,
body,
client,
request,
contentLength,
expectsPayload,
h2stream: stream,
header: '',
socket: client[kSocket]
})
}
Expand Down Expand Up @@ -541,7 +552,30 @@ function writeH2 (client, request) {
}
}

function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
function writeBuffer ({ abort, h2stream, body, client, request, socket, contentLength, expectsPayload }) {
try {
if (body != null && util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')
h2stream.cork()
h2stream.write(body)
h2stream.uncork()
h2stream.end()

request.onBodySent(body)
}

if (!expectsPayload) {
socket[kReset] = true
}

request.onRequestSent()
client[kResume]()
} catch (error) {
abort(error)
}
}

function writeStream ({ abort, socket, expectsPayload, h2stream, body, client, request, contentLength }) {
assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')

// For HTTP/2, is enough to pipe the stream
Expand All @@ -550,26 +584,29 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
h2stream,
(err) => {
if (err) {
util.destroy(body, err)
util.destroy(h2stream, err)
util.destroy(pipe, err)
abort(err)
} else {
util.removeAllListeners(pipe)
request.onRequestSent()

if (!expectsPayload) {
socket[kReset] = true
}

client[kResume]()
}
}
)

pipe.on('data', onPipeData)
pipe.once('end', () => {
pipe.removeListener('data', onPipeData)
util.destroy(pipe)
})
util.addListener(pipe, 'data', onPipeData)

function onPipeData (chunk) {
request.onBodySent(chunk)
}
}

async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
async function writeBlob ({ abort, h2stream, body, client, request, socket, contentLength, expectsPayload }) {
assert(contentLength === body.size, 'blob body must have content length')

try {
Expand All @@ -582,6 +619,7 @@ async function writeBlob ({ h2stream, body, client, request, socket, contentLeng
h2stream.cork()
h2stream.write(buffer)
h2stream.uncork()
h2stream.end()

request.onBodySent(buffer)
request.onRequestSent()
Expand All @@ -592,11 +630,11 @@ async function writeBlob ({ h2stream, body, client, request, socket, contentLeng

client[kResume]()
} catch (err) {
util.destroy(h2stream)
abort(err)
}
}

async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
async function writeIterable ({ abort, h2stream, body, client, request, socket, contentLength, expectsPayload }) {
assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')

let callback = null
Expand Down Expand Up @@ -635,11 +673,19 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
await waitForDrain()
}
}

h2stream.end()

request.onRequestSent()

if (!expectsPayload) {
socket[kReset] = true
}

client[kResume]()
} catch (err) {
h2stream.destroy(err)
abort(err)
} finally {
request.onRequestSent()
h2stream.end()
h2stream
.off('close', onDrain)
.off('drain', onDrain)
Expand Down

0 comments on commit 2e128c1

Please sign in to comment.