Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Stop leaking freed message buffer placeholder objects #2372

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.10",
"version": "1.8.11",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
61 changes: 33 additions & 28 deletions packages/grpc-js/src/retrying-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ export class RetryingCall implements Call {
private initialMetadata: Metadata | null = null;
private underlyingCalls: UnderlyingCall[] = [];
private writeBuffer: WriteBufferEntry[] = [];
/**
* The offset of message indices in the writeBuffer. For example, if
* writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
* is in writeBuffer[5].
*/
private writeBufferOffset = 0;
/**
* Tracks whether a read has been started, so that we know whether to start
* reads on new child calls. This only matters for the first read, because
Expand Down Expand Up @@ -203,14 +209,8 @@ export class RetryingCall implements Call {
private reportStatus(statusObject: StatusObject) {
this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"');
this.bufferTracker.freeAll(this.callNumber);
for (let i = 0; i < this.writeBuffer.length; i++) {
if (this.writeBuffer[i].entryType === 'MESSAGE') {
this.writeBuffer[i] = {
entryType: 'FREED',
allocated: false
};
}
}
this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
this.writeBuffer = [];
process.nextTick(() => {
// Explicitly construct status object to remove progress field
this.listener?.onReceiveStatus({
Expand All @@ -236,20 +236,27 @@ export class RetryingCall implements Call {
}
}

private maybefreeMessageBufferEntry(messageIndex: number) {
private getBufferEntry(messageIndex: number): WriteBufferEntry {
return this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {entryType: 'FREED', allocated: false};
}

private getNextBufferIndex() {
return this.writeBufferOffset + this.writeBuffer.length;
}

private clearSentMessages() {
if (this.state !== 'COMMITTED') {
return;
}
const bufferEntry = this.writeBuffer[messageIndex];
if (bufferEntry.entryType === 'MESSAGE') {
const earliestNeededMessageIndex = this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
for (let messageIndex = this.writeBufferOffset; messageIndex < earliestNeededMessageIndex; messageIndex++) {
const bufferEntry = this.getBufferEntry(messageIndex);
if (bufferEntry.allocated) {
this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber);
}
this.writeBuffer[messageIndex] = {
entryType: 'FREED',
allocated: false
};
}
this.writeBuffer = this.writeBuffer.slice(earliestNeededMessageIndex - this.writeBufferOffset);
this.writeBufferOffset = earliestNeededMessageIndex;
}

private commitCall(index: number) {
Expand All @@ -272,9 +279,7 @@ export class RetryingCall implements Call {
this.underlyingCalls[i].state = 'COMPLETED';
this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt');
}
for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) {
this.maybefreeMessageBufferEntry(messageIndex);
}
this.clearSentMessages();
}

private commitCallWithMostMessages() {
Expand Down Expand Up @@ -555,8 +560,8 @@ export class RetryingCall implements Call {
private handleChildWriteCompleted(childIndex: number) {
const childCall = this.underlyingCalls[childIndex];
const messageIndex = childCall.nextMessageToSend;
this.writeBuffer[messageIndex].callback?.();
this.maybefreeMessageBufferEntry(messageIndex);
this.getBufferEntry(messageIndex).callback?.();
this.clearSentMessages();
childCall.nextMessageToSend += 1;
this.sendNextChildMessage(childIndex);
}
Expand All @@ -566,10 +571,10 @@ export class RetryingCall implements Call {
if (childCall.state === 'COMPLETED') {
return;
}
if (this.writeBuffer[childCall.nextMessageToSend]) {
const bufferEntry = this.writeBuffer[childCall.nextMessageToSend];
if (this.getBufferEntry(childCall.nextMessageToSend)) {
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
switch (bufferEntry.entryType) {
case 'MESSAGE':
case 'MESSAGE':
childCall.call.sendMessageWithContext({
callback: (error) => {
// Ignore error
Expand All @@ -594,13 +599,13 @@ export class RetryingCall implements Call {
message,
flags: context.flags,
};
const messageIndex = this.writeBuffer.length;
const messageIndex = this.getNextBufferIndex();
const bufferEntry: WriteBufferEntry = {
entryType: 'MESSAGE',
message: writeObj,
allocated: this.bufferTracker.allocate(message.length, this.callNumber)
};
this.writeBuffer[messageIndex] = bufferEntry;
this.writeBuffer.push(bufferEntry);
if (bufferEntry.allocated) {
context.callback?.();
for (const [callIndex, call] of this.underlyingCalls.entries()) {
Expand Down Expand Up @@ -642,11 +647,11 @@ export class RetryingCall implements Call {
}
halfClose(): void {
this.trace('halfClose called');
const halfCloseIndex = this.writeBuffer.length;
this.writeBuffer[halfCloseIndex] = {
const halfCloseIndex = this.getNextBufferIndex();
this.writeBuffer.push({
entryType: 'HALF_CLOSE',
allocated: false
};
});
for (const call of this.underlyingCalls) {
if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {
call.nextMessageToSend += 1;
Expand Down