-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(browser): websockets improvements and bundle optimizations #1732
Conversation
@mcollina We actually use
If all my suppositions are correct I would like to know how in your opinion I could use readable-stream |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #1732 +/- ##
==========================================
- Coverage 81.08% 79.52% -1.56%
==========================================
Files 22 23 +1
Lines 1369 1397 +28
Branches 323 326 +3
==========================================
+ Hits 1110 1111 +1
- Misses 182 209 +27
Partials 77 77 ☔ View full report in Codecov by Sentry. |
const buffers = new Array(chunks.length) | ||
for (let i = 0; i < chunks.length; i++) { | ||
if (typeof chunks[i].chunk === 'string') { | ||
buffers[i] = Buffer.from(chunks[i], 'utf8') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also noticed this should have been chunks[i].chunk
. Wondering how this cold ever be working before. Corrected here
cc @vishnureddy17 if you also would like to give a look at this your feedback would be welcome :) |
duplexify
with nodejs Duplex
duplexify
with nodejs Duplex
duplexify
with nodejs Duplex
duplexify
with Duplex
const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser) | ||
|
||
if (!opts.objectMode) { | ||
proxy._writev = writev | ||
proxy._writev = writev.bind(proxy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering how this could have work before without the bind
, maybe Transform automatically handles the bind the _writev
is set?
I like the changes. Unfortunately, my knowledge on Streams in JS is poor, so I don't think my review will be of much use. I share your concerns regarding:
|
@@ -263,13 +257,15 @@ const browserStreamBuilder: StreamBuilder = (client, opts) => { | |||
if (socket.bufferedAmount > bufferSize) { | |||
// throttle data until buffered amount is reduced. | |||
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next) | |||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if there should be a return after setTimeout here ?
UPDATE
Seems it should: https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/bufferedAmount
Without this send
would be called anyway and even worse this would cause duplicated messages sent to socket 😨
duplexify
with Duplex
Seems that the reason why socket error in browser are not triggering client error is that the error emitted doesn't contain a code that is recognized as a socket error code, in fact it is a generic window Event, seems that we could get a code in close event but that will always be 1006 (see reference. The fix could be to emit the error with a static code, maybe ECONREFUSED so that will trigger the error on client and close the connection, this will remove the need to call destroy on stream. UPDATE I decided to give up on this for now to keep it for another PR as it creates too much confusion here. I only pass the error to stream but that error will not cause client to emit it as it will not be recognized as a socket error. We could also decide this is enough |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As i have no knowledge of the MQTTjs codebase, i only reviewed BufferedDuplex and its usage in ws.ts.
I'll keep on reading ...but here is my firs thought.
src/lib/BufferedDuplex.ts
Outdated
this.proxy.read(size) | ||
} | ||
|
||
async _write(chunk: any, encoding: string, cb: (err?: Error) => void) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit concerned by this asynchronous _write method.
Originally Stream classes implement synchronous _write methods, this kind of break the expected signature and requires to wait for BufferedDuplex._write to resolve (or reject).
Maybe buffering the chunks to be written until the socket is ready would be more appropriate ? with something like :
Also it could handles buffering in a similar way as you wrote in ws.ts inside socketWriteBrowser
?
class BufferedDuplex {
// ....
private writeQueue: Array<{chunk: any, encoding: string, cb: (err?: Error) => void}>;
constructor(opts: IClientOptions, proxy: Transform, socket: WebSocket) {
// ...
this.writeQueue = [];
}
_write(chunk: any, encoding: string, cb: (err?: Error) => void) {
if (!this.isSocketOpen) {
// Buffer the data in a queue
this.writeQueue.push({chunk, encoding, cb});
} else {
this.writeToProxy(chunk, encoding, cb);
}
}
socketReady() {
// ...
this.processWriteQueue();
}
private writeToProxy(chunk: any, encoding: string, cb: (err?: Error) => void) {
if (this.proxy.write(chunk, encoding) === false) {
this.proxy.once('drain', cb);
} else {
cb();
}
}
private processWriteQueue() {
while (this.writeQueue.length > 0) {
const {chunk, encoding, cb} = this.writeQueue.shift()!;
this.writeToProxy(chunk, encoding, cb);
}
}
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the first way I implemented it (I mean by using a queue) but preferred this solution as it doesn't require another variable. Not sure if making the method async makes any difference, the underlying code handling stream expects it to be sync so will never await it anyway (also unhandled rejections are impossible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method must not be async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mcollina Ok, fixed now 🙏🏼
@mcollina kindly ping |
src/lib/BufferedDuplex.ts
Outdated
this.proxy.read(size) | ||
} | ||
|
||
async _write(chunk: any, encoding: string, cb: (err?: Error) => void) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method must not be async
@mcollina ping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
BufferedDuplex
to replaceduplexify
onws
ali
andwx
ws
_writev
functionbrowserBufferSize
is reached (defaults to 512kB) this not only wasn't blocking the writes to socket but was also causing duplicated messages. For sure there were some open issues caused by thisws
. Usewindow.WebSocket
for Browser andws
module types for NodeFixes #1647