Skip to content

Commit

Permalink
Merge pull request #2319 from murgatroid99/grpc-js_transport_disconne…
Browse files Browse the repository at this point in the history
…ct_fix

grpc-js: Don't end calls when receiving GOAWAY
  • Loading branch information
murgatroid99 committed Jan 11, 2023
2 parents 13337aa + b3b6310 commit 3db8acb
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 20 deletions.
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.2",
"version": "1.8.3",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
38 changes: 29 additions & 9 deletions packages/grpc-js/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class Http2Transport implements Transport {
session.once('close', () => {
this.trace('session closed');
this.stopKeepalivePings();
this.handleDisconnect(false);
this.handleDisconnect();
});
session.once('goaway', (errorCode: number, lastStreamID: number, opaqueData: Buffer) => {
let tooManyPings = false;
Expand All @@ -177,7 +177,7 @@ class Http2Transport implements Transport {
'connection closed by GOAWAY with code ' +
errorCode
);
this.handleDisconnect(tooManyPings);
this.reportDisconnectToOwner(tooManyPings);
});
session.once('error', error => {
/* Do nothing here. Any error should also trigger a close event, which is
Expand Down Expand Up @@ -263,15 +263,35 @@ class Http2Transport implements Transport {
logging.trace(LogVerbosity.DEBUG, 'transport_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}

private handleDisconnect(tooManyPings: boolean) {
/**
* Indicate to the owner of this object that this transport should no longer
* be used. That happens if the connection drops, or if the server sends a
* GOAWAY.
* @param tooManyPings If true, this was triggered by a GOAWAY with data
* indicating that the session was closed becaues the client sent too many
* pings.
* @returns
*/
private reportDisconnectToOwner(tooManyPings: boolean) {
if (this.disconnectHandled) {
return;
}
this.disconnectHandled = true;
this.disconnectListeners.forEach(listener => listener(tooManyPings));
for (const call of this.activeCalls) {
call.onDisconnect();
}
}

/**
* Handle connection drops, but not GOAWAYs.
*/
private handleDisconnect() {
this.reportDisconnectToOwner(false);
/* Give calls an event loop cycle to finish naturally before reporting the
* disconnnection to them. */
setImmediate(() => {
for (const call of this.activeCalls) {
call.onDisconnect();
}
});
}

addDisconnectListener(listener: TransportDisconnectListener): void {
Expand All @@ -294,7 +314,7 @@ class Http2Transport implements Transport {
if (!this.keepaliveTimeoutId) {
this.keepaliveTimeoutId = setTimeout(() => {
this.keepaliveTrace('Ping timeout passed without response');
this.handleDisconnect(false);
this.handleDisconnect();
}, this.keepaliveTimeoutMs);
this.keepaliveTimeoutId.unref?.();
}
Expand All @@ -308,7 +328,7 @@ class Http2Transport implements Transport {
} catch (e) {
/* If we fail to send a ping, the connection is no longer functional, so
* we should discard it. */
this.handleDisconnect(false);
this.handleDisconnect();
}
}

Expand Down Expand Up @@ -365,7 +385,7 @@ class Http2Transport implements Transport {
try {
http2Stream = this.session!.request(headers);
} catch (e) {
this.handleDisconnect(false);
this.handleDisconnect();
throw e;
}
this.flowControlTrace(
Expand Down
67 changes: 57 additions & 10 deletions packages/grpc-js/test/test-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import * as grpc from '../src';
import { Server, ServerCredentials } from '../src';
import { ServiceError } from '../src/call';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { sendUnaryData, ServerUnaryCall } from '../src/server-call';
import { sendUnaryData, ServerUnaryCall, ServerDuplexStream } from '../src/server-call';

import { loadProtoFile } from './common';
import { assert2, loadProtoFile } from './common';
import { TestServiceClient, TestServiceHandlers } from './generated/TestService';
import { ProtoGrpcType as TestServiceGrpcType } from './generated/test_service';
import { Request__Output } from './generated/Request';
Expand Down Expand Up @@ -458,18 +458,28 @@ describe('Server', () => {
describe('Echo service', () => {
let server: Server;
let client: ServiceClient;
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;

const serviceImplementation = {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
echoBidiStream(call: ServerDuplexStream<any, any>) {
call.on('data', data => {
call.write(data);
});
call.on('end', () => {
call.end();
});
}
};

before(done => {
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;

server = new Server();
server.addService(echoService.service, {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
});
server.addService(echoService.service, serviceImplementation);

server.bindAsync(
'localhost:0',
Expand Down Expand Up @@ -501,6 +511,43 @@ describe('Echo service', () => {
}
);
});

/* This test passes on Node 18 but fails on Node 16. The failure appears to
* be caused by https://github.com/nodejs/node/issues/42713 */
it.skip('should continue a stream after server shutdown', done => {
const server2 = new Server();
server2.addService(echoService.service, serviceImplementation);
server2.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => {
if (err) {
done(err);
return;
}
const client2 = new echoService(`localhost:${port}`, grpc.credentials.createInsecure());
server2.start();
const stream = client2.echoBidiStream();
const totalMessages = 5;
let messagesSent = 0;
stream.write({ value: 'test value', value2: messagesSent});
messagesSent += 1;
stream.on('data', () => {
if (messagesSent === 1) {
server2.tryShutdown(assert2.mustCall(() => {}));
}
if (messagesSent >= totalMessages) {
stream.end();
} else {
stream.write({ value: 'test value', value2: messagesSent});
messagesSent += 1;
}
});
stream.on('status', assert2.mustCall((status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(messagesSent, totalMessages);
}));
stream.on('error', () => {});
assert2.afterMustCallsSatisfied(done);
});
});
});

describe('Generic client and server', () => {
Expand Down

0 comments on commit 3db8acb

Please sign in to comment.