Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Fix connectivity state change event sequencing #2421

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/grpc-js/src/load-balancer-pick-first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
);
}
this.currentPick = subchannel;
this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
subchannel.addConnectivityStateListener(this.pickedSubchannelStateListener);
subchannel.ref();
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
this.resetSubchannelList();
clearTimeout(this.connectionDelayTimeout);
this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
}

private updateState(newState: ConnectivityState, picker: Picker) {
Expand Down
65 changes: 34 additions & 31 deletions packages/grpc-js/src/subchannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class Subchannel {
* state changes. Will be modified by `addConnectivityStateListener` and
* `removeConnectivityStateListener`
*/
private stateListeners: ConnectivityStateListener[] = [];
private stateListeners: Set<ConnectivityStateListener> = new Set();

private backoffTimeout: BackoffTimeout;

Expand Down Expand Up @@ -227,6 +227,8 @@ export class Subchannel {
}
const previousState = this.connectivityState;
this.connectivityState = newState;
process.nextTick(() => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@murgatroid99
What's the purpose of this call?
It looks like some debugging leftover. It doesn't make anything asynchronous, it just adds an empty function to be called by Event Loop after synchronous part of the code is executed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. That was part of a change that I was trying out before I went in a different direction.

});
switch (newState) {
case ConnectivityState.READY:
this.stopBackoff();
Expand Down Expand Up @@ -261,9 +263,7 @@ export class Subchannel {
default:
throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
}
/* We use a shallow copy of the stateListeners array in case a listener
* is removed during this iteration */
for (const listener of [...this.stateListeners]) {
for (const listener of this.stateListeners) {
listener(this, previousState, newState, this.keepaliveTime);
}
return true;
Expand Down Expand Up @@ -291,13 +291,15 @@ export class Subchannel {
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
}
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.IDLE
);
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
process.nextTick(() => {
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.IDLE
);
});
}
}

Expand Down Expand Up @@ -339,20 +341,22 @@ export class Subchannel {
* Otherwise, do nothing.
*/
startConnecting() {
/* First, try to transition from IDLE to connecting. If that doesn't happen
* because the state is not currently IDLE, check if it is
* TRANSIENT_FAILURE, and if so indicate that it should go back to
* connecting after the backoff timer ends. Otherwise do nothing */
if (
!this.transitionToState(
[ConnectivityState.IDLE],
ConnectivityState.CONNECTING
)
) {
if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
this.continueConnecting = true;
process.nextTick(() => {
/* First, try to transition from IDLE to connecting. If that doesn't happen
* because the state is not currently IDLE, check if it is
* TRANSIENT_FAILURE, and if so indicate that it should go back to
* connecting after the backoff timer ends. Otherwise do nothing */
if (
!this.transitionToState(
[ConnectivityState.IDLE],
ConnectivityState.CONNECTING
)
) {
if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
this.continueConnecting = true;
}
}
}
});
}

/**
Expand All @@ -368,7 +372,7 @@ export class Subchannel {
* @param listener
*/
addConnectivityStateListener(listener: ConnectivityStateListener) {
this.stateListeners.push(listener);
this.stateListeners.add(listener);
}

/**
Expand All @@ -377,21 +381,20 @@ export class Subchannel {
* `addConnectivityStateListener`
*/
removeConnectivityStateListener(listener: ConnectivityStateListener) {
const listenerIndex = this.stateListeners.indexOf(listener);
if (listenerIndex > -1) {
this.stateListeners.splice(listenerIndex, 1);
}
this.stateListeners.delete(listener);
}

/**
* Reset the backoff timeout, and immediately start connecting if in backoff.
*/
resetBackoff() {
this.backoffTimeout.reset();
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE],
ConnectivityState.CONNECTING
);
process.nextTick(() => {
this.backoffTimeout.reset();
this.transitionToState(
[ConnectivityState.TRANSIENT_FAILURE],
ConnectivityState.CONNECTING
);
});
}

getAddress(): string {
Expand Down
130 changes: 130 additions & 0 deletions packages/grpc-js/test/test-global-subchannel-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import * as assert from 'assert';
import * as path from 'path';

import * as grpc from '../src';
import {sendUnaryData, Server, ServerCredentials, ServerUnaryCall, ServiceClientConstructor, ServiceError} from '../src';

import {loadProtoFile} from './common';

const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService =
loadProtoFile(protoFile).EchoService as ServiceClientConstructor;

describe.only('Global subchannel pool', () => {
let server: Server;
let serverPort: number;

let client1: InstanceType<grpc.ServiceClientConstructor>;
let client2: InstanceType<grpc.ServiceClientConstructor>;

let promises: Promise<any>[];

before(done => {
server = new Server();
server.addService(echoService.service, {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
});

server.bindAsync(
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
assert.ifError(err);
serverPort = port;
server.start();
done();
});
});

beforeEach(() => {
promises = [];
})

after(done => {
server.tryShutdown(done);
});

function callService(client: InstanceType<grpc.ServiceClientConstructor>) {
return new Promise<void>((resolve) => {
const request = {value: 'test value', value2: 3};

client.echo(request, (error: ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, request);
resolve();
});
})
}

function connect() {
const grpcOptions = {
'grpc.use_local_subchannel_pool': 0,
}

client1 = new echoService(
`127.0.0.1:${serverPort}`, grpc.credentials.createInsecure(),
grpcOptions);

client2 = new echoService(
`127.0.0.1:${serverPort}`, grpc.credentials.createInsecure(),
grpcOptions);
}

/* This is a regression test for a bug where client1.close in the
* waitForReady callback would cause the subchannel to transition to IDLE
* even though client2 is also using it. */
it('Should handle client.close calls in waitForReady',
done => {
connect();

promises.push(new Promise<void>((resolve) => {
client1.waitForReady(Date.now() + 50, (error) => {
assert.ifError(error);
client1.close();
resolve();
});
}))

promises.push(new Promise<void>((resolve) => {
client2.waitForReady(Date.now() + 50, (error) => {
assert.ifError(error);
resolve();
});
}))

Promise.all(promises).then(() => {done()});
})

it('Call the service', done => {
promises.push(callService(client2));

Promise.all(promises).then(() => {
done();
});
})

it('Should complete the client lifecycle without error', done => {
setTimeout(() => {
client1.close();
client2.close();
done()
}, 500);
});
});