Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Propagate keepalive throttling throughout channel #2363

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.8",
"version": "1.8.9",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
53 changes: 50 additions & 3 deletions packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import { ResolvingCall } from './resolving-call';
import { getNextCallNumber } from './call-number';
import { restrictControlPlaneStatusCode } from './control-plane-status';
import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';
import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from './subchannel-interface';

/**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
Expand Down Expand Up @@ -84,6 +85,33 @@ const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB

class ChannelSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface {
private stateListeners: ConnectivityStateListener[] = [];
private refCount = 0;
constructor(childSubchannel: SubchannelInterface, private channel: InternalChannel) {
super(childSubchannel);
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState, keepaliveTime) => {
channel.throttleKeepalive(keepaliveTime);
for (const listener of this.stateListeners) {
listener(this, previousState, newState, keepaliveTime);
}
});
}

ref(): void {
this.child.ref();
this.refCount += 1;
}

unref(): void {
this.child.unref();
this.refCount -= 1;
if (this.refCount <= 0) {
this.channel.removeWrappedSubchannel(this);
}
}
}

export class InternalChannel {

private resolvingLoadBalancer: ResolvingLoadBalancer;
Expand Down Expand Up @@ -116,8 +144,10 @@ export class InternalChannel {
* configSelector becomes set or the channel state becomes anything other
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private keepaliveTime: number;
private wrappedSubchannels: Set<ChannelSubchannelWrapper> = new Set();

// Channelz info
private readonly channelzEnabled: boolean = true;
Expand Down Expand Up @@ -190,6 +220,7 @@ export class InternalChannel {
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
);
this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
Expand All @@ -201,10 +232,13 @@ export class InternalChannel {
Object.assign({}, this.options, subchannelArgs),
this.credentials
);
subchannel.throttleKeepalive(this.keepaliveTime);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
}
return subchannel;
const wrappedSubchannel = new ChannelSubchannelWrapper(subchannel, this);
this.wrappedSubchannels.add(wrappedSubchannel);
return wrappedSubchannel;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.currentPicker = picker;
Expand Down Expand Up @@ -369,6 +403,19 @@ export class InternalChannel {
}
}

throttleKeepalive(newKeepaliveTime: number) {
if (newKeepaliveTime > this.keepaliveTime) {
this.keepaliveTime = newKeepaliveTime;
for (const wrappedSubchannel of this.wrappedSubchannels) {
wrappedSubchannel.throttleKeepalive(newKeepaliveTime);
}
}
}

removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
this.wrappedSubchannels.delete(wrappedSubchannel);
}

doPick(metadata: Metadata, extraPickInfo: {[key: string]: string}) {
return this.currentPicker.pick({metadata: metadata, extraPickInfo: extraPickInfo});
}
Expand Down
8 changes: 4 additions & 4 deletions packages/grpc-js/src/load-balancer-outlier-detection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
constructor(childSubchannel: SubchannelInterface, private mapEntry?: MapEntry) {
super(childSubchannel);
this.childSubchannelState = childSubchannel.getConnectivityState();
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState) => {
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState, keepaliveTime) => {
this.childSubchannelState = newState;
if (!this.ejected) {
for (const listener of this.stateListeners) {
listener(this, previousState, newState);
listener(this, previousState, newState, keepaliveTime);
}
}
});
Expand Down Expand Up @@ -265,14 +265,14 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
eject() {
this.ejected = true;
for (const listener of this.stateListeners) {
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE);
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE, -1);
}
}

uneject() {
this.ejected = false;
for (const listener of this.stateListeners) {
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState);
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState, -1);
}
}

Expand Down
7 changes: 6 additions & 1 deletion packages/grpc-js/src/subchannel-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import { Subchannel } from "./subchannel";
export type ConnectivityStateListener = (
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState
newState: ConnectivityState,
keepaliveTime: number
) => void;

/**
Expand All @@ -40,6 +41,7 @@ export interface SubchannelInterface {
removeConnectivityStateListener(listener: ConnectivityStateListener): void;
startConnecting(): void;
getAddress(): string;
throttleKeepalive(newKeepaliveTime: number): void;
ref(): void;
unref(): void;
getChannelzRef(): SubchannelRef;
Expand Down Expand Up @@ -67,6 +69,9 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
getAddress(): string {
return this.child.getAddress();
}
throttleKeepalive(newKeepaliveTime: number): void {
this.child.throttleKeepalive(newKeepaliveTime);
}
ref(): void {
this.child.ref();
}
Expand Down
22 changes: 15 additions & 7 deletions packages/grpc-js/src/subchannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class Subchannel {

private backoffTimeout: BackoffTimeout;

private keepaliveTimeMultiplier = 1;
private keepaliveTime: number;
/**
* Tracks channels and subchannel pools with references to this subchannel
*/
Expand Down Expand Up @@ -111,6 +111,8 @@ export class Subchannel {
}, backoffOptions);
this.subchannelAddressString = subchannelAddressToString(subchannelAddress);

this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;

if (options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
Expand Down Expand Up @@ -169,7 +171,7 @@ export class Subchannel {
private startConnectingInternal() {
let options = this.options;
if (options['grpc.keepalive_time_ms']) {
const adjustedKeepaliveTime = Math.min(options['grpc.keepalive_time_ms'] * this.keepaliveTimeMultiplier, KEEPALIVE_MAX_TIME_MS);
const adjustedKeepaliveTime = Math.min(this.keepaliveTime, KEEPALIVE_MAX_TIME_MS);
options = {...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime};
}
this.connector.connect(this.subchannelAddress, this.credentials, options).then(
Expand All @@ -181,14 +183,14 @@ export class Subchannel {
}
transport.addDisconnectListener((tooManyPings) => {
this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE);
if (tooManyPings) {
this.keepaliveTimeMultiplier *= 2;
if (tooManyPings && this.keepaliveTime > 0) {
this.keepaliveTime *= 2;
logging.log(
LogVerbosity.ERROR,
`Connection to ${uriToString(this.channelTarget)} at ${
this.subchannelAddressString
} rejected by server because of excess pings. Increasing ping interval multiplier to ${
this.keepaliveTimeMultiplier
} rejected by server because of excess pings. Increasing ping interval to ${
this.keepaliveTime
} ms`
);
}
Expand Down Expand Up @@ -262,7 +264,7 @@ export class Subchannel {
/* We use a shallow copy of the stateListeners array in case a listener
* is removed during this iteration */
for (const listener of [...this.stateListeners]) {
listener(this, previousState, newState);
listener(this, previousState, newState, this.keepaliveTime);
}
return true;
}
Expand Down Expand Up @@ -403,4 +405,10 @@ export class Subchannel {
getRealSubchannel(): this {
return this;
}

throttleKeepalive(newKeepaliveTime: number) {
if (newKeepaliveTime > this.keepaliveTime) {
this.keepaliveTime = newKeepaliveTime;
}
}
}
7 changes: 5 additions & 2 deletions packages/grpc-js/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class Http2Transport implements Transport {
/**
* The amount of time in between sending pings
*/
private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS;
private keepaliveTimeMs: number = -1;
/**
* The amount of time to wait for an acknowledgement after sending a ping
*/
Expand Down Expand Up @@ -133,7 +133,7 @@ class Http2Transport implements Transport {
]
.filter((e) => e)
.join(' '); // remove falsey values first

if ('grpc.keepalive_time_ms' in options) {
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
}
Expand Down Expand Up @@ -334,6 +334,9 @@ class Http2Transport implements Transport {
}

private startKeepalivePings() {
if (this.keepaliveTimeMs < 0) {
return;
}
this.keepaliveIntervalId = setInterval(() => {
this.sendPing();
}, this.keepaliveTimeMs);
Expand Down