Skip to content

Commit

Permalink
Merge pull request #2363 from murgatroid99/grpc-js_channel_keepalive_…
Browse files Browse the repository at this point in the history
…throttling

grpc-js: Propagate keepalive throttling throughout channel
  • Loading branch information
murgatroid99 committed Feb 15, 2023
2 parents ba08267 + 2ed8e71 commit 72b99a1
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 18 deletions.
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.8",
"version": "1.8.9",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
53 changes: 50 additions & 3 deletions packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import { ResolvingCall } from './resolving-call';
import { getNextCallNumber } from './call-number';
import { restrictControlPlaneStatusCode } from './control-plane-status';
import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';
import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from './subchannel-interface';

/**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
Expand Down Expand Up @@ -84,6 +85,33 @@ const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB

class ChannelSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface {
private stateListeners: ConnectivityStateListener[] = [];
private refCount = 0;
constructor(childSubchannel: SubchannelInterface, private channel: InternalChannel) {
super(childSubchannel);
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState, keepaliveTime) => {
channel.throttleKeepalive(keepaliveTime);
for (const listener of this.stateListeners) {
listener(this, previousState, newState, keepaliveTime);
}
});
}

ref(): void {
this.child.ref();
this.refCount += 1;
}

unref(): void {
this.child.unref();
this.refCount -= 1;
if (this.refCount <= 0) {
this.channel.removeWrappedSubchannel(this);
}
}
}

export class InternalChannel {

private resolvingLoadBalancer: ResolvingLoadBalancer;
Expand Down Expand Up @@ -116,8 +144,10 @@ export class InternalChannel {
* configSelector becomes set or the channel state becomes anything other
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private keepaliveTime: number;
private wrappedSubchannels: Set<ChannelSubchannelWrapper> = new Set();

// Channelz info
private readonly channelzEnabled: boolean = true;
Expand Down Expand Up @@ -190,6 +220,7 @@ export class InternalChannel {
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
);
this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
Expand All @@ -201,10 +232,13 @@ export class InternalChannel {
Object.assign({}, this.options, subchannelArgs),
this.credentials
);
subchannel.throttleKeepalive(this.keepaliveTime);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
}
return subchannel;
const wrappedSubchannel = new ChannelSubchannelWrapper(subchannel, this);
this.wrappedSubchannels.add(wrappedSubchannel);
return wrappedSubchannel;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.currentPicker = picker;
Expand Down Expand Up @@ -369,6 +403,19 @@ export class InternalChannel {
}
}

throttleKeepalive(newKeepaliveTime: number) {
if (newKeepaliveTime > this.keepaliveTime) {
this.keepaliveTime = newKeepaliveTime;
for (const wrappedSubchannel of this.wrappedSubchannels) {
wrappedSubchannel.throttleKeepalive(newKeepaliveTime);
}
}
}

removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
this.wrappedSubchannels.delete(wrappedSubchannel);
}

doPick(metadata: Metadata, extraPickInfo: {[key: string]: string}) {
return this.currentPicker.pick({metadata: metadata, extraPickInfo: extraPickInfo});
}
Expand Down
8 changes: 4 additions & 4 deletions packages/grpc-js/src/load-balancer-outlier-detection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
constructor(childSubchannel: SubchannelInterface, private mapEntry?: MapEntry) {
super(childSubchannel);
this.childSubchannelState = childSubchannel.getConnectivityState();
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState) => {
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState, keepaliveTime) => {
this.childSubchannelState = newState;
if (!this.ejected) {
for (const listener of this.stateListeners) {
listener(this, previousState, newState);
listener(this, previousState, newState, keepaliveTime);
}
}
});
Expand Down Expand Up @@ -265,14 +265,14 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
eject() {
this.ejected = true;
for (const listener of this.stateListeners) {
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE);
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE, -1);
}
}

uneject() {
this.ejected = false;
for (const listener of this.stateListeners) {
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState);
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState, -1);
}
}

Expand Down
7 changes: 6 additions & 1 deletion packages/grpc-js/src/subchannel-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import { Subchannel } from "./subchannel";
export type ConnectivityStateListener = (
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState
newState: ConnectivityState,
keepaliveTime: number
) => void;

/**
Expand All @@ -40,6 +41,7 @@ export interface SubchannelInterface {
removeConnectivityStateListener(listener: ConnectivityStateListener): void;
startConnecting(): void;
getAddress(): string;
throttleKeepalive(newKeepaliveTime: number): void;
ref(): void;
unref(): void;
getChannelzRef(): SubchannelRef;
Expand Down Expand Up @@ -67,6 +69,9 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
getAddress(): string {
return this.child.getAddress();
}
throttleKeepalive(newKeepaliveTime: number): void {
this.child.throttleKeepalive(newKeepaliveTime);
}
ref(): void {
this.child.ref();
}
Expand Down
22 changes: 15 additions & 7 deletions packages/grpc-js/src/subchannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class Subchannel {

private backoffTimeout: BackoffTimeout;

private keepaliveTimeMultiplier = 1;
private keepaliveTime: number;
/**
* Tracks channels and subchannel pools with references to this subchannel
*/
Expand Down Expand Up @@ -111,6 +111,8 @@ export class Subchannel {
}, backoffOptions);
this.subchannelAddressString = subchannelAddressToString(subchannelAddress);

this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;

if (options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
Expand Down Expand Up @@ -169,7 +171,7 @@ export class Subchannel {
private startConnectingInternal() {
let options = this.options;
if (options['grpc.keepalive_time_ms']) {
const adjustedKeepaliveTime = Math.min(options['grpc.keepalive_time_ms'] * this.keepaliveTimeMultiplier, KEEPALIVE_MAX_TIME_MS);
const adjustedKeepaliveTime = Math.min(this.keepaliveTime, KEEPALIVE_MAX_TIME_MS);
options = {...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime};
}
this.connector.connect(this.subchannelAddress, this.credentials, options).then(
Expand All @@ -181,14 +183,14 @@ export class Subchannel {
}
transport.addDisconnectListener((tooManyPings) => {
this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE);
if (tooManyPings) {
this.keepaliveTimeMultiplier *= 2;
if (tooManyPings && this.keepaliveTime > 0) {
this.keepaliveTime *= 2;
logging.log(
LogVerbosity.ERROR,
`Connection to ${uriToString(this.channelTarget)} at ${
this.subchannelAddressString
} rejected by server because of excess pings. Increasing ping interval multiplier to ${
this.keepaliveTimeMultiplier
} rejected by server because of excess pings. Increasing ping interval to ${
this.keepaliveTime
} ms`
);
}
Expand Down Expand Up @@ -262,7 +264,7 @@ export class Subchannel {
/* We use a shallow copy of the stateListeners array in case a listener
* is removed during this iteration */
for (const listener of [...this.stateListeners]) {
listener(this, previousState, newState);
listener(this, previousState, newState, this.keepaliveTime);
}
return true;
}
Expand Down Expand Up @@ -403,4 +405,10 @@ export class Subchannel {
getRealSubchannel(): this {
return this;
}

throttleKeepalive(newKeepaliveTime: number) {
if (newKeepaliveTime > this.keepaliveTime) {
this.keepaliveTime = newKeepaliveTime;
}
}
}
7 changes: 5 additions & 2 deletions packages/grpc-js/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class Http2Transport implements Transport {
/**
* The amount of time in between sending pings
*/
private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS;
private keepaliveTimeMs: number = -1;
/**
* The amount of time to wait for an acknowledgement after sending a ping
*/
Expand Down Expand Up @@ -133,7 +133,7 @@ class Http2Transport implements Transport {
]
.filter((e) => e)
.join(' '); // remove falsey values first

if ('grpc.keepalive_time_ms' in options) {
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
}
Expand Down Expand Up @@ -334,6 +334,9 @@ class Http2Transport implements Transport {
}

private startKeepalivePings() {
if (this.keepaliveTimeMs < 0) {
return;
}
this.keepaliveIntervalId = setInterval(() => {
this.sendPing();
}, this.keepaliveTimeMs);
Expand Down

0 comments on commit 72b99a1

Please sign in to comment.