Skip to content

Commit

Permalink
grpc-js: add await/async on method that return promise
Browse files Browse the repository at this point in the history
add await/async on method that return promise to ensure that the order of message (and of the end of stream) are preserved
  • Loading branch information
phoenix741 committed Feb 26, 2023
1 parent 1054432 commit 081270f
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions packages/grpc-js/src/server-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -808,10 +808,10 @@ export class Http2ServerCallStream<

let pushedEnd = false;

const maybePushEnd = () => {
const maybePushEnd = async () => {
if (!pushedEnd && readsDone && !pendingMessageProcessing) {
pushedEnd = true;
this.pushOrBufferMessage(readable, null);
await this.pushOrBufferMessage(readable, null);
}
};

Expand Down Expand Up @@ -844,16 +844,16 @@ export class Http2ServerCallStream<
// Just return early
if (!decompressedMessage) return;

this.pushOrBufferMessage(readable, decompressedMessage);
await this.pushOrBufferMessage(readable, decompressedMessage);
}
pendingMessageProcessing = false;
this.stream.resume();
maybePushEnd();
await maybePushEnd();
});

this.stream.once('end', () => {
this.stream.once('end', async () => {
readsDone = true;
maybePushEnd();
await maybePushEnd();
});
}

Expand All @@ -877,16 +877,16 @@ export class Http2ServerCallStream<
return this.canPush;
}

private pushOrBufferMessage(
private async pushOrBufferMessage(
readable:
| ServerReadableStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>,
messageBytes: Buffer | null
): void {
): Promise<void> {
if (this.isPushPending) {
this.bufferedMessages.push(messageBytes);
} else {
this.pushMessage(readable, messageBytes);
await this.pushMessage(readable, messageBytes);
}
}

Expand Down Expand Up @@ -939,7 +939,7 @@ export class Http2ServerCallStream<
this.isPushPending = false;

if (this.bufferedMessages.length > 0) {
this.pushMessage(
await this.pushMessage(
readable,
this.bufferedMessages.shift() as Buffer | null
);
Expand Down

0 comments on commit 081270f

Please sign in to comment.