Skip to content

Commit 68bbebd

Browse files
puzpuzpuzMylesBorins
authored andcommittedAug 31, 2021
tls: allow reading data into a static buffer
Refs: #25436 PR-URL: #35753 Refs: #25436 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
1 parent 39eba0a commit 68bbebd

File tree

6 files changed

+362
-44
lines changed

6 files changed

+362
-44
lines changed
 
File renamed without changes.

Diff for: ‎benchmark/tls/throughput-s2c.js

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
'use strict';
2+
const common = require('../common.js');
3+
const bench = common.createBenchmark(main, {
4+
dur: [5],
5+
type: ['buf', 'asc', 'utf'],
6+
sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024],
7+
recvbuflen: [0, 64 * 1024, 1024 * 1024],
8+
recvbufgenfn: ['true', 'false']
9+
});
10+
11+
const fixtures = require('../../test/common/fixtures');
12+
let options;
13+
let recvbuf;
14+
let received = 0;
15+
const tls = require('tls');
16+
17+
function main({ dur, type, sendchunklen, recvbuflen, recvbufgenfn }) {
18+
if (isFinite(recvbuflen) && recvbuflen > 0)
19+
recvbuf = Buffer.alloc(recvbuflen);
20+
21+
let encoding;
22+
let chunk;
23+
switch (type) {
24+
case 'buf':
25+
chunk = Buffer.alloc(sendchunklen, 'b');
26+
break;
27+
case 'asc':
28+
chunk = 'a'.repeat(sendchunklen);
29+
encoding = 'ascii';
30+
break;
31+
case 'utf':
32+
chunk = 'ü'.repeat(sendchunklen / 2);
33+
encoding = 'utf8';
34+
break;
35+
default:
36+
throw new Error('invalid type');
37+
}
38+
39+
options = {
40+
key: fixtures.readKey('rsa_private.pem'),
41+
cert: fixtures.readKey('rsa_cert.crt'),
42+
ca: fixtures.readKey('rsa_ca.crt'),
43+
ciphers: 'AES256-GCM-SHA384'
44+
};
45+
46+
let socketOpts;
47+
if (recvbuf === undefined) {
48+
socketOpts = { port: common.PORT, rejectUnauthorized: false };
49+
} else {
50+
let buffer = recvbuf;
51+
if (recvbufgenfn === 'true') {
52+
let bufidx = -1;
53+
const bufpool = [
54+
recvbuf,
55+
Buffer.from(recvbuf),
56+
Buffer.from(recvbuf),
57+
];
58+
buffer = () => {
59+
bufidx = (bufidx + 1) % bufpool.length;
60+
return bufpool[bufidx];
61+
};
62+
}
63+
socketOpts = {
64+
port: common.PORT,
65+
rejectUnauthorized: false,
66+
onread: {
67+
buffer,
68+
callback: function(nread, buf) {
69+
received += nread;
70+
}
71+
}
72+
};
73+
}
74+
75+
const server = tls.createServer(options, (socket) => {
76+
socket.on('data', (buf) => {
77+
socket.on('drain', write);
78+
write();
79+
});
80+
81+
function write() {
82+
while (false !== socket.write(chunk, encoding));
83+
}
84+
});
85+
86+
let conn;
87+
server.listen(common.PORT, () => {
88+
conn = tls.connect(socketOpts, () => {
89+
setTimeout(done, dur * 1000);
90+
bench.start();
91+
conn.write('hello');
92+
});
93+
94+
conn.on('data', (chunk) => {
95+
received += chunk.length;
96+
});
97+
});
98+
99+
function done() {
100+
const mbits = (received * 8) / (1024 * 1024);
101+
bench.end(mbits);
102+
process.exit(0);
103+
}
104+
}

Diff for: ‎doc/api/tls.md

+7
Original file line numberDiff line numberDiff line change
@@ -1357,6 +1357,9 @@ being issued by trusted CA (`options.ca`).
13571357
<!-- YAML
13581358
added: v0.11.3
13591359
changes:
1360+
- version: REPLACEME
1361+
pr-url: https://github.com/nodejs/node/pull/35753
1362+
description: Added `onread` option.
13601363
- version: v14.1.0
13611364
pr-url: https://github.com/nodejs/node/pull/32786
13621365
description: The `highWaterMark` option is accepted now.
@@ -1468,6 +1471,10 @@ changes:
14681471
[`tls.createSecureContext()`][]. If a `secureContext` is _not_ provided, one
14691472
will be created by passing the entire `options` object to
14701473
`tls.createSecureContext()`.
1474+
* `onread` {Object} If the `socket` option is missing, incoming data is
1475+
stored in a single `buffer` and passed to the supplied `callback` when
1476+
data arrives on the socket, otherwise the option is ignored. See the
1477+
`onread` option of [`net.Socket`][] for details.
14711478
* ...: [`tls.createSecureContext()`][] options that are used if the
14721479
`secureContext` option is missing, otherwise they are ignored.
14731480
* ...: Any [`socket.connect()`][] option not already listed.

Diff for: ‎lib/_tls_wrap.js

+2
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,7 @@ function TLSSocket(socket, opts) {
502502
pauseOnCreate: tlsOptions.pauseOnConnect,
503503
manualStart: true,
504504
highWaterMark: tlsOptions.highWaterMark,
505+
onread: !socket ? tlsOptions.onread : null,
505506
});
506507

507508
// Proxy for API compatibility
@@ -1617,6 +1618,7 @@ exports.connect = function connect(...args) {
16171618
enableTrace: options.enableTrace,
16181619
pskCallback: options.pskCallback,
16191620
highWaterMark: options.highWaterMark,
1621+
onread: options.onread,
16201622
});
16211623

16221624
// rejectUnauthorized property can be explicitly defined as `undefined`

Diff for: ‎lib/net.js

+43-44
Original file line numberDiff line numberDiff line change
@@ -307,55 +307,54 @@ function Socket(options) {
307307
if (options.handle) {
308308
this._handle = options.handle; // private
309309
this[async_id_symbol] = getNewAsyncId(this._handle);
310-
} else {
311-
const onread = options.onread;
312-
if (onread !== null && typeof onread === 'object' &&
313-
(isUint8Array(onread.buffer) || typeof onread.buffer === 'function') &&
314-
typeof onread.callback === 'function') {
315-
if (typeof onread.buffer === 'function') {
316-
this[kBuffer] = true;
317-
this[kBufferGen] = onread.buffer;
318-
} else {
319-
this[kBuffer] = onread.buffer;
320-
}
321-
this[kBufferCb] = onread.callback;
322-
}
323-
if (options.fd !== undefined) {
324-
const { fd } = options;
325-
let err;
310+
} else if (options.fd !== undefined) {
311+
const { fd } = options;
312+
let err;
326313

327-
// createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
328-
// a valid `PIPE` or `TCP` descriptor
329-
this._handle = createHandle(fd, false);
314+
// createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
315+
// a valid `PIPE` or `TCP` descriptor
316+
this._handle = createHandle(fd, false);
317+
318+
err = this._handle.open(fd);
319+
320+
// While difficult to fabricate, in some architectures
321+
// `open` may return an error code for valid file descriptors
322+
// which cannot be opened. This is difficult to test as most
323+
// un-openable fds will throw on `createHandle`
324+
if (err)
325+
throw errnoException(err, 'open');
330326

331-
err = this._handle.open(fd);
327+
this[async_id_symbol] = this._handle.getAsyncId();
332328

333-
// While difficult to fabricate, in some architectures
334-
// `open` may return an error code for valid file descriptors
335-
// which cannot be opened. This is difficult to test as most
336-
// un-openable fds will throw on `createHandle`
329+
if ((fd === 1 || fd === 2) &&
330+
(this._handle instanceof Pipe) && isWindows) {
331+
// Make stdout and stderr blocking on Windows
332+
err = this._handle.setBlocking(true);
337333
if (err)
338-
throw errnoException(err, 'open');
339-
340-
this[async_id_symbol] = this._handle.getAsyncId();
341-
342-
if ((fd === 1 || fd === 2) &&
343-
(this._handle instanceof Pipe) && isWindows) {
344-
// Make stdout and stderr blocking on Windows
345-
err = this._handle.setBlocking(true);
346-
if (err)
347-
throw errnoException(err, 'setBlocking');
348-
349-
this._writev = null;
350-
this._write = makeSyncWrite(fd);
351-
// makeSyncWrite adjusts this value like the original handle would, so
352-
// we need to let it do that by turning it into a writable, own
353-
// property.
354-
ObjectDefineProperty(this._handle, 'bytesWritten', {
355-
value: 0, writable: true
356-
});
357-
}
334+
throw errnoException(err, 'setBlocking');
335+
336+
this._writev = null;
337+
this._write = makeSyncWrite(fd);
338+
// makeSyncWrite adjusts this value like the original handle would, so
339+
// we need to let it do that by turning it into a writable, own
340+
// property.
341+
ObjectDefineProperty(this._handle, 'bytesWritten', {
342+
value: 0, writable: true
343+
});
344+
}
345+
}
346+
347+
const onread = options.onread;
348+
if (onread !== null && typeof onread === 'object' &&
349+
(isUint8Array(onread.buffer) || typeof onread.buffer === 'function') &&
350+
typeof onread.callback === 'function') {
351+
if (typeof onread.buffer === 'function') {
352+
this[kBuffer] = true;
353+
this[kBufferGen] = onread.buffer;
354+
} else {
355+
this[kBuffer] = onread.buffer;
358356
}
357+
this[kBufferCb] = onread.callback;
359358
}
360359

361360
// Shut down the socket when we're finished with it.

Diff for: ‎test/parallel/test-tls-onread-static-buffer.js

+206
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
'use strict';
2+
const common = require('../common');
3+
if (!common.hasCrypto)
4+
common.skip('missing crypto');
5+
6+
const assert = require('assert');
7+
const tls = require('tls');
8+
const fixtures = require('../common/fixtures');
9+
10+
const options = {
11+
key: fixtures.readKey('agent2-key.pem'),
12+
cert: fixtures.readKey('agent2-cert.pem')
13+
};
14+
15+
const smallMessage = Buffer.from('hello world');
16+
// Used to test .pause(), so needs to be larger than the internal buffer
17+
const largeMessage = Buffer.alloc(64 * 1024).fill('hello world');
18+
19+
// Test typical usage
20+
tls.createServer(options, common.mustCall(function(socket) {
21+
this.close();
22+
socket.end(smallMessage);
23+
})).listen(0, function() {
24+
let received = 0;
25+
const buffers = [];
26+
const sockBuf = Buffer.alloc(8);
27+
tls.connect({
28+
port: this.address().port,
29+
rejectUnauthorized: false,
30+
onread: {
31+
buffer: sockBuf,
32+
callback: function(nread, buf) {
33+
assert.strictEqual(buf, sockBuf);
34+
received += nread;
35+
buffers.push(Buffer.from(buf.slice(0, nread)));
36+
}
37+
}
38+
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
39+
assert.strictEqual(received, smallMessage.length);
40+
assert.deepStrictEqual(Buffer.concat(buffers), smallMessage);
41+
}));
42+
});
43+
44+
// Test Uint8Array support
45+
tls.createServer(options, common.mustCall(function(socket) {
46+
this.close();
47+
socket.end(smallMessage);
48+
})).listen(0, function() {
49+
let received = 0;
50+
let incoming = new Uint8Array(0);
51+
const sockBuf = new Uint8Array(8);
52+
tls.connect({
53+
port: this.address().port,
54+
rejectUnauthorized: false,
55+
onread: {
56+
buffer: sockBuf,
57+
callback: function(nread, buf) {
58+
assert.strictEqual(buf, sockBuf);
59+
received += nread;
60+
const newIncoming = new Uint8Array(incoming.length + nread);
61+
newIncoming.set(incoming);
62+
newIncoming.set(buf.slice(0, nread), incoming.length);
63+
incoming = newIncoming;
64+
}
65+
}
66+
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
67+
assert.strictEqual(received, smallMessage.length);
68+
assert.deepStrictEqual(incoming, new Uint8Array(smallMessage));
69+
}));
70+
});
71+
72+
// Test Buffer callback usage
73+
tls.createServer(options, common.mustCall(function(socket) {
74+
this.close();
75+
socket.end(smallMessage);
76+
})).listen(0, function() {
77+
let received = 0;
78+
const incoming = [];
79+
const bufPool = [ Buffer.alloc(2), Buffer.alloc(2), Buffer.alloc(2) ];
80+
let bufPoolIdx = -1;
81+
let bufPoolUsage = 0;
82+
tls.connect({
83+
port: this.address().port,
84+
rejectUnauthorized: false,
85+
onread: {
86+
buffer: () => {
87+
++bufPoolUsage;
88+
bufPoolIdx = (bufPoolIdx + 1) % bufPool.length;
89+
return bufPool[bufPoolIdx];
90+
},
91+
callback: function(nread, buf) {
92+
assert.strictEqual(buf, bufPool[bufPoolIdx]);
93+
received += nread;
94+
incoming.push(Buffer.from(buf.slice(0, nread)));
95+
}
96+
}
97+
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
98+
assert.strictEqual(received, smallMessage.length);
99+
assert.deepStrictEqual(Buffer.concat(incoming), smallMessage);
100+
assert.strictEqual(bufPoolUsage, 7);
101+
}));
102+
});
103+
104+
// Test Uint8Array callback support
105+
tls.createServer(options, common.mustCall(function(socket) {
106+
this.close();
107+
socket.end(smallMessage);
108+
})).listen(0, function() {
109+
let received = 0;
110+
let incoming = new Uint8Array(0);
111+
const bufPool = [ new Uint8Array(2), new Uint8Array(2), new Uint8Array(2) ];
112+
let bufPoolIdx = -1;
113+
let bufPoolUsage = 0;
114+
tls.connect({
115+
port: this.address().port,
116+
rejectUnauthorized: false,
117+
onread: {
118+
buffer: () => {
119+
++bufPoolUsage;
120+
bufPoolIdx = (bufPoolIdx + 1) % bufPool.length;
121+
return bufPool[bufPoolIdx];
122+
},
123+
callback: function(nread, buf) {
124+
assert.strictEqual(buf, bufPool[bufPoolIdx]);
125+
received += nread;
126+
const newIncoming = new Uint8Array(incoming.length + nread);
127+
newIncoming.set(incoming);
128+
newIncoming.set(buf.slice(0, nread), incoming.length);
129+
incoming = newIncoming;
130+
}
131+
}
132+
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
133+
assert.strictEqual(received, smallMessage.length);
134+
assert.deepStrictEqual(incoming, new Uint8Array(smallMessage));
135+
assert.strictEqual(bufPoolUsage, 7);
136+
}));
137+
});
138+
139+
// Test explicit socket pause
140+
tls.createServer(options, common.mustCall(function(socket) {
141+
this.close();
142+
// Need larger message here to observe the pause
143+
socket.end(largeMessage);
144+
})).listen(0, function() {
145+
let received = 0;
146+
const buffers = [];
147+
const sockBuf = Buffer.alloc(64);
148+
let pauseScheduled = false;
149+
const client = tls.connect({
150+
port: this.address().port,
151+
rejectUnauthorized: false,
152+
onread: {
153+
buffer: sockBuf,
154+
callback: function(nread, buf) {
155+
assert.strictEqual(buf, sockBuf);
156+
received += nread;
157+
buffers.push(Buffer.from(buf.slice(0, nread)));
158+
if (!pauseScheduled) {
159+
pauseScheduled = true;
160+
client.pause();
161+
setTimeout(() => {
162+
client.resume();
163+
}, 100);
164+
}
165+
}
166+
}
167+
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
168+
assert.strictEqual(received, largeMessage.length);
169+
assert.deepStrictEqual(Buffer.concat(buffers), largeMessage);
170+
}));
171+
});
172+
173+
// Test implicit socket pause
174+
tls.createServer(options, common.mustCall(function(socket) {
175+
this.close();
176+
// Need larger message here to observe the pause
177+
socket.end(largeMessage);
178+
})).listen(0, function() {
179+
let received = 0;
180+
const buffers = [];
181+
const sockBuf = Buffer.alloc(64);
182+
let pauseScheduled = false;
183+
const client = tls.connect({
184+
port: this.address().port,
185+
rejectUnauthorized: false,
186+
onread: {
187+
buffer: sockBuf,
188+
callback: function(nread, buf) {
189+
assert.strictEqual(buf, sockBuf);
190+
received += nread;
191+
buffers.push(Buffer.from(buf.slice(0, nread)));
192+
if (!pauseScheduled) {
193+
pauseScheduled = true;
194+
setTimeout(() => {
195+
client.resume();
196+
}, 100);
197+
return false;
198+
}
199+
return true;
200+
}
201+
}
202+
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
203+
assert.strictEqual(received, largeMessage.length);
204+
assert.deepStrictEqual(Buffer.concat(buffers), largeMessage);
205+
}));
206+
});

0 commit comments

Comments
 (0)
Please sign in to comment.