Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Don't end calls when receiving GOAWAY #2319

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.2",
"version": "1.8.3",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
38 changes: 29 additions & 9 deletions packages/grpc-js/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class Http2Transport implements Transport {
session.once('close', () => {
this.trace('session closed');
this.stopKeepalivePings();
this.handleDisconnect(false);
this.handleDisconnect();
});
session.once('goaway', (errorCode: number, lastStreamID: number, opaqueData: Buffer) => {
let tooManyPings = false;
Expand All @@ -177,7 +177,7 @@ class Http2Transport implements Transport {
'connection closed by GOAWAY with code ' +
errorCode
);
this.handleDisconnect(tooManyPings);
this.reportDisconnectToOwner(tooManyPings);
});
session.once('error', error => {
/* Do nothing here. Any error should also trigger a close event, which is
Expand Down Expand Up @@ -263,15 +263,35 @@ class Http2Transport implements Transport {
logging.trace(LogVerbosity.DEBUG, 'transport_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}

private handleDisconnect(tooManyPings: boolean) {
/**
* Indicate to the owner of this object that this transport should no longer
* be used. That happens if the connection drops, or if the server sends a
* GOAWAY.
* @param tooManyPings If true, this was triggered by a GOAWAY with data
* indicating that the session was closed becaues the client sent too many
* pings.
* @returns
*/
private reportDisconnectToOwner(tooManyPings: boolean) {
if (this.disconnectHandled) {
return;
}
this.disconnectHandled = true;
this.disconnectListeners.forEach(listener => listener(tooManyPings));
for (const call of this.activeCalls) {
call.onDisconnect();
}
}

/**
* Handle connection drops, but not GOAWAYs.
*/
private handleDisconnect() {
this.reportDisconnectToOwner(false);
/* Give calls an event loop cycle to finish naturally before reporting the
* disconnnection to them. */
setImmediate(() => {
for (const call of this.activeCalls) {
call.onDisconnect();
}
});
}

addDisconnectListener(listener: TransportDisconnectListener): void {
Expand All @@ -294,7 +314,7 @@ class Http2Transport implements Transport {
if (!this.keepaliveTimeoutId) {
this.keepaliveTimeoutId = setTimeout(() => {
this.keepaliveTrace('Ping timeout passed without response');
this.handleDisconnect(false);
this.handleDisconnect();
}, this.keepaliveTimeoutMs);
this.keepaliveTimeoutId.unref?.();
}
Expand All @@ -308,7 +328,7 @@ class Http2Transport implements Transport {
} catch (e) {
/* If we fail to send a ping, the connection is no longer functional, so
* we should discard it. */
this.handleDisconnect(false);
this.handleDisconnect();
}
}

Expand Down Expand Up @@ -365,7 +385,7 @@ class Http2Transport implements Transport {
try {
http2Stream = this.session!.request(headers);
} catch (e) {
this.handleDisconnect(false);
this.handleDisconnect();
throw e;
}
this.flowControlTrace(
Expand Down
67 changes: 57 additions & 10 deletions packages/grpc-js/test/test-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import * as grpc from '../src';
import { Server, ServerCredentials } from '../src';
import { ServiceError } from '../src/call';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { sendUnaryData, ServerUnaryCall } from '../src/server-call';
import { sendUnaryData, ServerUnaryCall, ServerDuplexStream } from '../src/server-call';

import { loadProtoFile } from './common';
import { assert2, loadProtoFile } from './common';
import { TestServiceClient, TestServiceHandlers } from './generated/TestService';
import { ProtoGrpcType as TestServiceGrpcType } from './generated/test_service';
import { Request__Output } from './generated/Request';
Expand Down Expand Up @@ -458,18 +458,28 @@ describe('Server', () => {
describe('Echo service', () => {
let server: Server;
let client: ServiceClient;
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;

const serviceImplementation = {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
echoBidiStream(call: ServerDuplexStream<any, any>) {
call.on('data', data => {
call.write(data);
});
call.on('end', () => {
call.end();
});
}
};

before(done => {
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;

server = new Server();
server.addService(echoService.service, {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
});
server.addService(echoService.service, serviceImplementation);

server.bindAsync(
'localhost:0',
Expand Down Expand Up @@ -501,6 +511,43 @@ describe('Echo service', () => {
}
);
});

/* This test passes on Node 18 but fails on Node 16. The failure appears to
* be caused by https://github.com/nodejs/node/issues/42713 */
it.skip('should continue a stream after server shutdown', done => {
const server2 = new Server();
server2.addService(echoService.service, serviceImplementation);
server2.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => {
if (err) {
done(err);
return;
}
const client2 = new echoService(`localhost:${port}`, grpc.credentials.createInsecure());
server2.start();
const stream = client2.echoBidiStream();
const totalMessages = 5;
let messagesSent = 0;
stream.write({ value: 'test value', value2: messagesSent});
messagesSent += 1;
stream.on('data', () => {
if (messagesSent === 1) {
server2.tryShutdown(assert2.mustCall(() => {}));
}
if (messagesSent >= totalMessages) {
stream.end();
} else {
stream.write({ value: 'test value', value2: messagesSent});
messagesSent += 1;
}
});
stream.on('status', assert2.mustCall((status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(messagesSent, totalMessages);
}));
stream.on('error', () => {});
assert2.afterMustCallsSatisfied(done);
});
});
});

describe('Generic client and server', () => {
Expand Down