Skip to content

Commit 0a8c7a9

Browse files
committedNov 23, 2021
[api] Add WebSocket#pause() and WebSocket#resume()
Add ability to pause and resume a `WebSocket`.
1 parent ed2b803 commit 0a8c7a9

File tree

4 files changed

+198
-27
lines changed

4 files changed

+198
-27
lines changed
 

‎doc/ws.md

+20
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,18 @@
3131
- [websocket.bufferedAmount](#websocketbufferedamount)
3232
- [websocket.close([code[, reason]])](#websocketclosecode-reason)
3333
- [websocket.extensions](#websocketextensions)
34+
- [websocket.isPaused](#websocketispaused)
3435
- [websocket.onclose](#websocketonclose)
3536
- [websocket.onerror](#websocketonerror)
3637
- [websocket.onmessage](#websocketonmessage)
3738
- [websocket.onopen](#websocketonopen)
39+
- [websocket.pause()](#websocketpause)
3840
- [websocket.ping([data[, mask]][, callback])](#websocketpingdata-mask-callback)
3941
- [websocket.pong([data[, mask]][, callback])](#websocketpongdata-mask-callback)
4042
- [websocket.protocol](#websocketprotocol)
4143
- [websocket.readyState](#websocketreadystate)
4244
- [websocket.removeEventListener(type, listener)](#websocketremoveeventlistenertype-listener)
45+
- [websocket.resume()](#websocketresume)
4346
- [websocket.send(data[, options][, callback])](#websocketsenddata-options-callback)
4447
- [websocket.terminate()](#websocketterminate)
4548
- [websocket.url](#websocketurl)
@@ -409,6 +412,12 @@ following ways:
409412

410413
Initiate a closing handshake.
411414

415+
### websocket.isPaused
416+
417+
- {Boolean}
418+
419+
Indicates whether the websocket is paused.
420+
412421
### websocket.extensions
413422

414423
- {Object}
@@ -443,6 +452,12 @@ listener receives a `MessageEvent` named "message".
443452
An event listener to be called when the connection is established. The listener
444453
receives an `OpenEvent` named "open".
445454

455+
### websocket.pause()
456+
457+
Pause the websocket causing it to stop emitting events. Some events can still be
458+
emitted after this is called, until all buffered data is consumed. This method
459+
is a noop if the ready state is `CONNECTING` or `CLOSED`.
460+
446461
### websocket.ping([data[, mask]][, callback])
447462

448463
- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
@@ -473,6 +488,11 @@ Send a pong. This method throws an error if the ready state is `CONNECTING`.
473488

474489
The subprotocol selected by the server.
475490

491+
### websocket.resume()
492+
493+
Make a paused socket resume emitting events. This method is a noop if the ready
494+
state is `CONNECTING` or `CLOSED`.
495+
476496
### websocket.readyState
477497

478498
- {Number}

‎lib/stream.js

+2-26
Original file line numberDiff line numberDiff line change
@@ -47,23 +47,8 @@ function duplexOnError(err) {
4747
* @public
4848
*/
4949
function createWebSocketStream(ws, options) {
50-
let resumeOnReceiverDrain = true;
5150
let terminateOnDestroy = true;
5251

53-
function receiverOnDrain() {
54-
if (resumeOnReceiverDrain) ws._socket.resume();
55-
}
56-
57-
if (ws.readyState === ws.CONNECTING) {
58-
ws.once('open', function open() {
59-
ws._receiver.removeAllListeners('drain');
60-
ws._receiver.on('drain', receiverOnDrain);
61-
});
62-
} else {
63-
ws._receiver.removeAllListeners('drain');
64-
ws._receiver.on('drain', receiverOnDrain);
65-
}
66-
6752
const duplex = new Duplex({
6853
...options,
6954
autoDestroy: false,
@@ -76,10 +61,7 @@ function createWebSocketStream(ws, options) {
7661
const data =
7762
!isBinary && duplex._readableState.objectMode ? msg.toString() : msg;
7863

79-
if (!duplex.push(data)) {
80-
resumeOnReceiverDrain = false;
81-
ws._socket.pause();
82-
}
64+
if (!duplex.push(data)) ws.pause();
8365
});
8466

8567
ws.once('error', function error(err) {
@@ -155,13 +137,7 @@ function createWebSocketStream(ws, options) {
155137
};
156138

157139
duplex._read = function () {
158-
if (
159-
(ws.readyState === ws.OPEN || ws.readyState === ws.CLOSING) &&
160-
!resumeOnReceiverDrain
161-
) {
162-
resumeOnReceiverDrain = true;
163-
if (!ws._receiver._writableState.needDrain) ws._socket.resume();
164-
}
140+
if (ws.isPaused) ws.resume();
165141
};
166142

167143
duplex._write = function (chunk, encoding, callback) {

‎lib/websocket.js

+46-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class WebSocket extends EventEmitter {
5858
this._closeMessage = EMPTY_BUFFER;
5959
this._closeTimer = null;
6060
this._extensions = {};
61+
this._paused = false;
6162
this._protocol = '';
6263
this._readyState = WebSocket.CONNECTING;
6364
this._receiver = null;
@@ -124,6 +125,13 @@ class WebSocket extends EventEmitter {
124125
return Object.keys(this._extensions).join();
125126
}
126127

128+
/**
129+
* @type {Boolean}
130+
*/
131+
get isPaused() {
132+
return this._paused;
133+
}
134+
127135
/**
128136
* @type {Function}
129137
*/
@@ -312,6 +320,23 @@ class WebSocket extends EventEmitter {
312320
);
313321
}
314322

323+
/**
324+
* Pause the socket.
325+
*
326+
* @public
327+
*/
328+
pause() {
329+
if (
330+
this.readyState === WebSocket.CONNECTING ||
331+
this.readyState === WebSocket.CLOSED
332+
) {
333+
return;
334+
}
335+
336+
this._paused = true;
337+
this._socket.pause();
338+
}
339+
315340
/**
316341
* Send a ping.
317342
*
@@ -376,6 +401,23 @@ class WebSocket extends EventEmitter {
376401
this._sender.pong(data || EMPTY_BUFFER, mask, cb);
377402
}
378403

404+
/**
405+
* Resume the socket.
406+
*
407+
* @public
408+
*/
409+
resume() {
410+
if (
411+
this.readyState === WebSocket.CONNECTING ||
412+
this.readyState === WebSocket.CLOSED
413+
) {
414+
return;
415+
}
416+
417+
this._paused = false;
418+
if (!this._receiver._writableState.needDrain) this._socket.resume();
419+
}
420+
379421
/**
380422
* Send a data message.
381423
*
@@ -518,6 +560,7 @@ Object.defineProperty(WebSocket.prototype, 'CLOSED', {
518560
'binaryType',
519561
'bufferedAmount',
520562
'extensions',
563+
'isPaused',
521564
'protocol',
522565
'readyState',
523566
'url'
@@ -1001,7 +1044,9 @@ function receiverOnConclude(code, reason) {
10011044
* @private
10021045
*/
10031046
function receiverOnDrain() {
1004-
this[kWebSocket]._socket.resume();
1047+
const websocket = this[kWebSocket];
1048+
1049+
if (!websocket.isPaused) websocket._socket.resume();
10051050
}
10061051

10071052
/**

‎test/websocket.test.js

+130
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,39 @@ describe('WebSocket', () => {
359359
});
360360
});
361361

362+
describe('`isPaused`', () => {
363+
it('is enumerable and configurable', () => {
364+
const descriptor = Object.getOwnPropertyDescriptor(
365+
WebSocket.prototype,
366+
'isPaused'
367+
);
368+
369+
assert.strictEqual(descriptor.configurable, true);
370+
assert.strictEqual(descriptor.enumerable, true);
371+
assert.ok(descriptor.get !== undefined);
372+
assert.ok(descriptor.set === undefined);
373+
});
374+
375+
it('indicates whether the websocket is paused', (done) => {
376+
const wss = new WebSocket.Server({ port: 0 }, () => {
377+
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
378+
379+
ws.on('open', () => {
380+
ws.pause();
381+
assert.ok(ws.isPaused);
382+
383+
ws.resume();
384+
assert.ok(!ws.isPaused);
385+
386+
ws.close();
387+
wss.close(done);
388+
});
389+
390+
assert.ok(!ws.isPaused);
391+
});
392+
});
393+
});
394+
362395
describe('`protocol`', () => {
363396
it('is enumerable and configurable', () => {
364397
const descriptor = Object.getOwnPropertyDescriptor(
@@ -1109,6 +1142,51 @@ describe('WebSocket', () => {
11091142
});
11101143
});
11111144

1145+
describe('#pause', () => {
1146+
it('does nothing if `readyState` is `CONNECTING` or `CLOSED`', (done) => {
1147+
const wss = new WebSocket.Server({ port: 0 }, () => {
1148+
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
1149+
1150+
assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
1151+
assert.ok(!ws.isPaused);
1152+
1153+
ws.pause();
1154+
assert.ok(!ws.isPaused);
1155+
1156+
ws.on('open', () => {
1157+
ws.on('close', () => {
1158+
assert.strictEqual(ws.readyState, WebSocket.CLOSED);
1159+
1160+
ws.pause();
1161+
assert.ok(!ws.isPaused);
1162+
1163+
wss.close(done);
1164+
});
1165+
1166+
ws.close();
1167+
});
1168+
});
1169+
});
1170+
1171+
it('pauses the socket', (done) => {
1172+
const wss = new WebSocket.Server({ port: 0 }, () => {
1173+
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
1174+
});
1175+
1176+
wss.on('connection', (ws) => {
1177+
assert.ok(!ws.isPaused);
1178+
assert.ok(!ws._socket.isPaused());
1179+
1180+
ws.pause();
1181+
assert.ok(ws.isPaused);
1182+
assert.ok(ws._socket.isPaused());
1183+
1184+
ws.terminate();
1185+
wss.close(done);
1186+
});
1187+
});
1188+
});
1189+
11121190
describe('#ping', () => {
11131191
it('throws an error if `readyState` is `CONNECTING`', () => {
11141192
const ws = new WebSocket('ws://localhost', {
@@ -1447,6 +1525,58 @@ describe('WebSocket', () => {
14471525
});
14481526
});
14491527

1528+
describe('#resume', () => {
1529+
it('does nothing if `readyState` is `CONNECTING` or `CLOSED`', (done) => {
1530+
const wss = new WebSocket.Server({ port: 0 }, () => {
1531+
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
1532+
1533+
assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
1534+
assert.ok(!ws.isPaused);
1535+
1536+
// Verify that no exception is thrown.
1537+
ws.resume();
1538+
1539+
ws.on('open', () => {
1540+
ws.pause();
1541+
assert.ok(ws.isPaused);
1542+
1543+
ws.on('close', () => {
1544+
assert.strictEqual(ws.readyState, WebSocket.CLOSED);
1545+
1546+
ws.resume();
1547+
assert.ok(ws.isPaused);
1548+
1549+
wss.close(done);
1550+
});
1551+
1552+
ws.terminate();
1553+
});
1554+
});
1555+
});
1556+
1557+
it('resumes the socket', (done) => {
1558+
const wss = new WebSocket.Server({ port: 0 }, () => {
1559+
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
1560+
});
1561+
1562+
wss.on('connection', (ws) => {
1563+
assert.ok(!ws.isPaused);
1564+
assert.ok(!ws._socket.isPaused());
1565+
1566+
ws.pause();
1567+
assert.ok(ws.isPaused);
1568+
assert.ok(ws._socket.isPaused());
1569+
1570+
ws.resume();
1571+
assert.ok(!ws.isPaused);
1572+
assert.ok(!ws._socket.isPaused());
1573+
1574+
ws.close();
1575+
wss.close(done);
1576+
});
1577+
});
1578+
});
1579+
14501580
describe('#send', () => {
14511581
it('throws an error if `readyState` is `CONNECTING`', () => {
14521582
const ws = new WebSocket('ws://localhost', {

0 commit comments

Comments
 (0)
Please sign in to comment.