Skip to content

Commit e9e86e1

Browse files
clshortfuserichardlau
authored andcommittedOct 7, 2020
http2: support non-empty DATA frame with END_STREAM flag
Adds support for reading from a stream where the final frame is a non-empty DATA frame with the END_STREAM flag set, instead of hanging waiting for another frame. Fixes: #31309 Refs: https://nghttp2.org/documentation/types.html#c.nghttp2_on_data_chunk_recv_callback PR-URL: #33875 Backport-PR-URL: #34857 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 751820b commit e9e86e1

File tree

3 files changed

+111
-23
lines changed

3 files changed

+111
-23
lines changed
 

‎src/node_http2.cc

+7-6
Original file line numberDiff line numberDiff line change
@@ -882,7 +882,7 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
882882
// quite expensive. This is a potential performance optimization target later.
883883
ssize_t Http2Session::ConsumeHTTP2Data() {
884884
CHECK_NOT_NULL(stream_buf_.base);
885-
CHECK_LT(stream_buf_offset_, stream_buf_.len);
885+
CHECK_LE(stream_buf_offset_, stream_buf_.len);
886886
size_t read_len = stream_buf_.len - stream_buf_offset_;
887887

888888
// multiple side effects.
@@ -903,11 +903,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() {
903903
CHECK_GT(ret, 0);
904904
CHECK_LE(static_cast<size_t>(ret), read_len);
905905

906-
if (static_cast<size_t>(ret) < read_len) {
907-
// Mark the remainder of the data as available for later consumption.
908-
stream_buf_offset_ += ret;
909-
return ret;
910-
}
906+
// Mark the remainder of the data as available for later consumption.
907+
// Even if all bytes were received, a paused stream may delay the
908+
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
909+
stream_buf_offset_ += ret;
910+
return ret;
911911
}
912912

913913
// We are done processing the current input chunk.
@@ -1241,6 +1241,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
12411241
if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) {
12421242
CHECK_NE(session->flags_ & SESSION_STATE_READING_STOPPED, 0);
12431243
session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED;
1244+
Debug(session, "receive paused");
12441245
return NGHTTP2_ERR_PAUSE;
12451246
}
12461247

‎test/parallel/test-http2-misbehaving-multiplex.js

+39-17
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Flags: --expose-internals
33

44
const common = require('../common');
5+
const assert = require('assert');
56

67
if (!common.hasCrypto)
78
common.skip('missing crypto');
@@ -13,16 +14,36 @@ const h2test = require('../common/http2');
1314
let client;
1415

1516
const server = h2.createServer();
17+
let gotFirstStreamId1;
1618
server.on('stream', common.mustCall((stream) => {
1719
stream.respond();
1820
stream.end('ok');
1921

20-
// the error will be emitted asynchronously
21-
stream.on('error', common.expectsError({
22-
type: NghttpError,
23-
code: 'ERR_HTTP2_ERROR',
24-
message: 'Stream was already closed or invalid'
25-
}));
22+
// Http2Server should be fast enough to respond to and close
23+
// the first streams with ID 1 and ID 3 without errors.
24+
25+
// Test for errors in 'close' event to ensure no errors on some streams.
26+
stream.on('error', () => {});
27+
stream.on('close', (err) => {
28+
if (stream.id === 1) {
29+
if (gotFirstStreamId1) {
30+
// We expect our outgoing frames to fail on Stream ID 1 the second time
31+
// because a stream with ID 1 was already closed before.
32+
common.expectsError({
33+
constructor: NghttpError,
34+
code: 'ERR_HTTP2_ERROR',
35+
message: 'Stream was already closed or invalid'
36+
});
37+
return;
38+
}
39+
gotFirstStreamId1 = true;
40+
}
41+
assert.strictEqual(err, undefined);
42+
});
43+
44+
// Stream ID 5 should never reach the server
45+
assert.notStrictEqual(stream.id, 5);
46+
2647
}, 2));
2748

2849
server.on('session', common.mustCall((session) => {
@@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => {
3556

3657
const settings = new h2test.SettingsFrame();
3758
const settingsAck = new h2test.SettingsFrame(true);
38-
const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
39-
const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
40-
const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
41-
const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
59+
// HeadersFrame(id, payload, padding, END_STREAM)
60+
const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
61+
const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
62+
const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
4263

4364
server.listen(0, () => {
4465
client = net.connect(server.address().port, () => {
4566
client.write(h2test.kClientMagic, () => {
4667
client.write(settings.data, () => {
4768
client.write(settingsAck.data);
48-
// This will make it ok.
49-
client.write(head1.data, () => {
50-
// This will make it ok.
51-
client.write(head2.data, () => {
69+
// Stream ID 1 frame will make it OK.
70+
client.write(id1.data, () => {
71+
// Stream ID 3 frame will make it OK.
72+
client.write(id3.data, () => {
73+
// A second Stream ID 1 frame should fail.
5274
// This will cause an error to occur because the client is
5375
// attempting to reuse an already closed stream. This must
5476
// cause the server session to be torn down.
55-
client.write(head3.data, () => {
56-
// This won't ever make it to the server
57-
client.write(head4.data);
77+
client.write(id1.data, () => {
78+
// This Stream ID 5 frame will never make it to the server
79+
client.write(id5.data);
5880
});
5981
});
6082
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
const assert = require('assert');
7+
const http2 = require('http2');
8+
9+
const { PerformanceObserver } = require('perf_hooks');
10+
11+
const server = http2.createServer();
12+
13+
server.on('stream', (stream, headers) => {
14+
stream.respond({
15+
'content-type': 'text/html',
16+
':status': 200
17+
});
18+
switch (headers[':path']) {
19+
case '/singleEnd':
20+
stream.end('OK');
21+
// Backport v10.x: Manually pack END_STREAM flag
22+
stream._final(() => {});
23+
break;
24+
case '/sequentialEnd':
25+
stream.write('OK');
26+
stream.end();
27+
// Backport v10.x: Manually pack END_STREAM flag
28+
stream._final(() => {});
29+
break;
30+
case '/delayedEnd':
31+
stream.write('OK', () => stream.end());
32+
break;
33+
}
34+
});
35+
36+
function testRequest(path, targetFrameCount, callback) {
37+
const obs = new PerformanceObserver((list, observer) => {
38+
const entry = list.getEntries()[0];
39+
if (entry.name !== 'Http2Session') return;
40+
if (entry.type !== 'client') return;
41+
assert.strictEqual(entry.framesReceived, targetFrameCount);
42+
observer.disconnect();
43+
callback();
44+
});
45+
obs.observe({ entryTypes: ['http2'] });
46+
const client = http2.connect(`http://localhost:${server.address().port}`, () => {
47+
const req = client.request({ ':path': path });
48+
req.resume();
49+
req.end();
50+
req.on('end', () => client.close());
51+
});
52+
}
53+
54+
// SETTINGS => SETTINGS => HEADERS => DATA
55+
const MIN_FRAME_COUNT = 4;
56+
57+
server.listen(0, () => {
58+
testRequest('/singleEnd', MIN_FRAME_COUNT, () => {
59+
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => {
60+
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => {
61+
server.close();
62+
});
63+
});
64+
});
65+
});

0 commit comments

Comments
 (0)
Please sign in to comment.