Skip to content

Commit 06df25b

Browse files
authoredApr 26, 2024
core,xds: Metrics recording in WRR LB (#11129)
Adds the recording of the four metrics documented in: https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#weighted-round-robin-lb-policy
1 parent 795ee0f commit 06df25b

File tree

2 files changed

+218
-7
lines changed

2 files changed

+218
-7
lines changed
 

‎xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java

+81-7
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424
import com.google.common.base.MoreObjects;
2525
import com.google.common.base.Preconditions;
2626
import com.google.common.collect.ImmutableList;
27+
import com.google.common.collect.Lists;
2728
import io.grpc.ConnectivityState;
2829
import io.grpc.ConnectivityStateInfo;
2930
import io.grpc.Deadline.Ticker;
31+
import io.grpc.DoubleHistogramMetricInstrument;
3032
import io.grpc.EquivalentAddressGroup;
3133
import io.grpc.ExperimentalApi;
3234
import io.grpc.LoadBalancer;
3335
import io.grpc.LoadBalancerProvider;
36+
import io.grpc.LongCounterMetricInstrument;
37+
import io.grpc.MetricInstrumentRegistry;
3438
import io.grpc.NameResolver;
3539
import io.grpc.Status;
3640
import io.grpc.SynchronizationContext;
@@ -57,12 +61,17 @@
5761
import java.util.logging.Logger;
5862

5963
/**
60-
* A {@link LoadBalancer} that provides weighted-round-robin load-balancing over
61-
* the {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
64+
* A {@link LoadBalancer} that provides weighted-round-robin load-balancing over the
65+
* {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
6266
* determined by backend metrics using ORCA.
6367
*/
6468
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9885")
6569
final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
70+
71+
private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
72+
private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
73+
private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;
74+
private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;
6675
private static final Logger log = Logger.getLogger(
6776
WeightedRoundRobinLoadBalancer.class.getName());
6877
private WeightedRoundRobinLoadBalancerConfig config;
@@ -74,6 +83,31 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
7483
private final long infTime;
7584
private final Ticker ticker;
7685

86+
// The metric instruments are only registered once and shared by all instances of this LB.
87+
static {
88+
MetricInstrumentRegistry metricInstrumentRegistry
89+
= MetricInstrumentRegistry.getDefaultRegistry();
90+
RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
91+
"Number of scheduler updates in which there were not enough endpoints with valid "
92+
+ "weight, which caused the WRR policy to fall back to RR behavior", "update",
93+
Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), true);
94+
ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
95+
"grpc.lb.wrr.endpoint_weight_not_yet_usable",
96+
"Number of endpoints from each scheduler update that don't yet have usable weight "
97+
+ "information", "endpoint", Lists.newArrayList("grpc.target"),
98+
Lists.newArrayList("grpc.lb.locality"), true);
99+
ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
100+
"grpc.lb.wrr.endpoint_weight_stale",
101+
"Number of endpoints from each scheduler update whose latest weight is older than the "
102+
+ "expiration period", "endpoint", Lists.newArrayList("grpc.target"),
103+
Lists.newArrayList("grpc.lb.locality"), true);
104+
ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
105+
"grpc.lb.wrr.endpoint_weights", "The histogram buckets will be endpoint weight ranges.",
106+
"weight", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
107+
Lists.newArrayList("grpc.lb.locality"),
108+
true);
109+
}
110+
77111
public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
78112
this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random());
79113
}
@@ -145,7 +179,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
145179
@Override
146180
public SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
147181
return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
148-
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence);
182+
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper());
149183
}
150184

151185
@VisibleForTesting
@@ -163,16 +197,18 @@ public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider, Obj
163197
super(key, policyProvider, childConfig, initialPicker);
164198
}
165199

166-
private double getWeight() {
200+
private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) {
167201
if (config == null) {
168202
return 0;
169203
}
170204
long now = ticker.nanoTime();
171205
if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
172206
nonEmptySince = infTime;
207+
staleEndpoints.incrementAndGet();
173208
return 0;
174209
} else if (now - nonEmptySince < config.blackoutPeriodNanos
175210
&& config.blackoutPeriodNanos > 0) {
211+
notYetUsableEndpoints.incrementAndGet();
176212
return 0;
177213
} else {
178214
return weight;
@@ -336,10 +372,11 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
336372
private final float errorUtilizationPenalty;
337373
private final AtomicInteger sequence;
338374
private final int hashCode;
375+
private final LoadBalancer.Helper helper;
339376
private volatile StaticStrideScheduler scheduler;
340377

341378
WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
342-
float errorUtilizationPenalty, AtomicInteger sequence) {
379+
float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper) {
343380
checkNotNull(children, "children");
344381
Preconditions.checkArgument(!children.isEmpty(), "empty child list");
345382
this.children = children;
@@ -353,6 +390,7 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
353390
this.enableOobLoadReport = enableOobLoadReport;
354391
this.errorUtilizationPenalty = errorUtilizationPenalty;
355392
this.sequence = checkNotNull(sequence, "sequence");
393+
this.helper = helper;
356394

357395
// For equality we treat children as a set; use hash code as defined by Set
358396
int sum = 0;
@@ -387,11 +425,37 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
387425

388426
private void updateWeight() {
389427
float[] newWeights = new float[children.size()];
428+
AtomicInteger staleEndpoints = new AtomicInteger();
429+
AtomicInteger notYetUsableEndpoints = new AtomicInteger();
390430
for (int i = 0; i < children.size(); i++) {
391-
double newWeight = ((WeightedChildLbState)children.get(i)).getWeight();
431+
double newWeight = ((WeightedChildLbState) children.get(i)).getWeight(staleEndpoints,
432+
notYetUsableEndpoints);
433+
// TODO: add target and locality labels once available
434+
helper.getMetricRecorder()
435+
.recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight, ImmutableList.of(""),
436+
ImmutableList.of(""));
392437
newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
393438
}
439+
if (staleEndpoints.get() > 0) {
440+
// TODO: add target and locality labels once available
441+
helper.getMetricRecorder()
442+
.addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
443+
ImmutableList.of(""),
444+
ImmutableList.of(""));
445+
}
446+
if (notYetUsableEndpoints.get() > 0) {
447+
// TODO: add target and locality labels once available
448+
helper.getMetricRecorder()
449+
.addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
450+
ImmutableList.of(""), ImmutableList.of(""));
451+
}
452+
394453
this.scheduler = new StaticStrideScheduler(newWeights, sequence);
454+
if (this.scheduler.usesRoundRobin()) {
455+
// TODO: add target and locality labels once available
456+
helper.getMetricRecorder()
457+
.addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(""), ImmutableList.of(""));
458+
}
395459
}
396460

397461
@Override
@@ -454,6 +518,7 @@ public boolean equals(Object o) {
454518
static final class StaticStrideScheduler {
455519
private final short[] scaledWeights;
456520
private final AtomicInteger sequence;
521+
private final boolean usesRoundRobin;
457522
private static final int K_MAX_WEIGHT = 0xFFFF;
458523

459524
// Assuming the mean of all known weights is M, StaticStrideScheduler will clamp
@@ -494,8 +559,10 @@ static final class StaticStrideScheduler {
494559
if (numWeightedChannels > 0) {
495560
unscaledMeanWeight = sumWeight / numWeightedChannels;
496561
unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));
562+
usesRoundRobin = false;
497563
} else {
498564
// Fall back to round robin if all values are non-positives
565+
usesRoundRobin = true;
499566
unscaledMeanWeight = 1;
500567
unscaledMaxWeight = 1;
501568
}
@@ -521,7 +588,14 @@ static final class StaticStrideScheduler {
521588
this.sequence = sequence;
522589
}
523590

524-
/** Returns the next sequence number and atomically increases sequence with wraparound. */
591+
// Without properly weighted channels, we do plain vanilla round_robin.
592+
boolean usesRoundRobin() {
593+
return usesRoundRobin;
594+
}
595+
596+
/**
597+
* Returns the next sequence number and atomically increases sequence with wraparound.
598+
*/
525599
private long nextSequence() {
526600
return Integer.toUnsignedLong(sequence.getAndIncrement());
527601
}

‎xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java

+137
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.common.truth.Truth.assertThat;
2020
import static io.grpc.ConnectivityState.CONNECTING;
2121
import static org.mockito.AdditionalAnswers.delegatesTo;
22+
import static org.mockito.ArgumentMatchers.argThat;
2223
import static org.mockito.Mockito.any;
2324
import static org.mockito.Mockito.eq;
2425
import static org.mockito.Mockito.mock;
@@ -40,6 +41,7 @@
4041
import io.grpc.ClientCall;
4142
import io.grpc.ConnectivityState;
4243
import io.grpc.ConnectivityStateInfo;
44+
import io.grpc.DoubleHistogramMetricInstrument;
4345
import io.grpc.EquivalentAddressGroup;
4446
import io.grpc.LoadBalancer;
4547
import io.grpc.LoadBalancer.CreateSubchannelArgs;
@@ -49,6 +51,8 @@
4951
import io.grpc.LoadBalancer.Subchannel;
5052
import io.grpc.LoadBalancer.SubchannelPicker;
5153
import io.grpc.LoadBalancer.SubchannelStateListener;
54+
import io.grpc.LongCounterMetricInstrument;
55+
import io.grpc.MetricRecorder;
5256
import io.grpc.Status;
5357
import io.grpc.SynchronizationContext;
5458
import io.grpc.internal.FakeClock;
@@ -82,6 +86,7 @@
8286
import org.junit.runner.RunWith;
8387
import org.junit.runners.JUnit4;
8488
import org.mockito.ArgumentCaptor;
89+
import org.mockito.ArgumentMatcher;
8590
import org.mockito.Captor;
8691
import org.mockito.InOrder;
8792
import org.mockito.Mock;
@@ -120,6 +125,9 @@ public class WeightedRoundRobinLoadBalancerTest {
120125

121126
private final FakeClock fakeClock = new FakeClock();
122127

128+
@Mock
129+
private MetricRecorder mockMetricRecorder;
130+
123131
private WeightedRoundRobinLoadBalancerConfig weightedConfig =
124132
WeightedRoundRobinLoadBalancerConfig.newBuilder().build();
125133

@@ -1121,6 +1129,130 @@ public void removingAddressShutsdownSubchannel() {
11211129
inOrder.verify(subchannel2).shutdown();
11221130
}
11231131

1132+
1133+
@Test
1134+
public void metrics() {
1135+
// Give WRR some valid addresses to work with.
1136+
syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
1137+
.setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig)
1138+
.setAttributes(affinity).build()));
1139+
1140+
// Flip the three subchannels to READY state to initiate the WRR logic
1141+
Iterator<Subchannel> it = subchannels.values().iterator();
1142+
Subchannel readySubchannel1 = it.next();
1143+
getSubchannelStateListener(readySubchannel1).onSubchannelState(ConnectivityStateInfo
1144+
.forNonError(ConnectivityState.READY));
1145+
Subchannel readySubchannel2 = it.next();
1146+
getSubchannelStateListener(readySubchannel2).onSubchannelState(ConnectivityStateInfo
1147+
.forNonError(ConnectivityState.READY));
1148+
Subchannel readySubchannel3 = it.next();
1149+
getSubchannelStateListener(readySubchannel3).onSubchannelState(ConnectivityStateInfo
1150+
.forNonError(ConnectivityState.READY));
1151+
1152+
// WRR creates a picker that updates the weights for each of the child subchannels. This should
1153+
// give us three "rr_fallback" metric events as we don't yet have any weights to do weighted
1154+
// round-robin.
1155+
verifyLongCounterRecord("grpc.lb.wrr.rr_fallback", 3, 1);
1156+
1157+
// We should also see six records of endpoint weights. They should all be for 0 as we don't yet
1158+
// have valid weights.
1159+
verifyDoubleHistogramRecord("grpc.lb.wrr.endpoint_weights", 6, 0);
1160+
1161+
// We should not yet be seeing any "endpoint_weight_stale" events since we don't even have
1162+
// valid weights yet.
1163+
verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_stale", 0, 1);
1164+
1165+
// Each time weights are updated, WRR will see if each subchannel weight is useable. As we have
1166+
// no weights yet, we should see three "endpoint_weight_not_yet_usable" metric events with the
1167+
// value increasing by one each time as all the endpoints come online.
1168+
verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1, 1);
1169+
verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1, 2);
1170+
verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1, 3);
1171+
1172+
// Send each child LB state an ORCA update with some valid utilization/qps data so that weights
1173+
// can be calculated.
1174+
Iterator<ChildLbState> childLbStates = wrr.getChildLbStates().iterator();
1175+
((WeightedChildLbState)childLbStates.next()).new OrcaReportListener(
1176+
weightedConfig.errorUtilizationPenalty).onLoadReport(
1177+
InternalCallMetricRecorder.createMetricReport(0.1, 0, 0.1, 1, 0, new HashMap<>(),
1178+
new HashMap<>(), new HashMap<>()));
1179+
((WeightedChildLbState)childLbStates.next()).new OrcaReportListener(
1180+
weightedConfig.errorUtilizationPenalty).onLoadReport(
1181+
InternalCallMetricRecorder.createMetricReport(0.1, 0, 0.1, 1, 0, new HashMap<>(),
1182+
new HashMap<>(), new HashMap<>()));
1183+
((WeightedChildLbState)childLbStates.next()).new OrcaReportListener(
1184+
weightedConfig.errorUtilizationPenalty).onLoadReport(
1185+
InternalCallMetricRecorder.createMetricReport(0.1, 0, 0.1, 1, 0, new HashMap<>(),
1186+
new HashMap<>(), new HashMap<>()));
1187+
1188+
// Let's reset the mock MetricsRecorder so that it's easier to verify what happened after the
1189+
// weights were updated
1190+
reset(mockMetricRecorder);
1191+
1192+
// We go forward in time past the default 10s blackout period before weights can be considered
1193+
// for wrr. The eights would get updated as the default update interval is 1s.
1194+
fakeClock.forwardTime(11, TimeUnit.SECONDS);
1195+
1196+
// Since we have weights on all the child LB states, the weight update should not result in
1197+
// further rr_fallback metric entries.
1198+
verifyLongCounterRecord("grpc.lb.wrr.rr_fallback", 0, 1);
1199+
1200+
// We should not see an increase to the earlier count of "endpoint_weight_not_yet_usable".
1201+
verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0, 1);
1202+
1203+
// No endpoints should have gotten stale yet either.
1204+
verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_stale", 0, 1);
1205+
1206+
// Now with valid weights we should have seen the value in the endpoint weights histogram.
1207+
verifyDoubleHistogramRecord("grpc.lb.wrr.endpoint_weights", 3, 10);
1208+
1209+
reset(mockMetricRecorder);
1210+
1211+
// Weights become stale in three minutes. Let's move ahead in time by 3 minutes and make sure
1212+
// we get metrics events for each endpoint.
1213+
fakeClock.forwardTime(3, TimeUnit.MINUTES);
1214+
1215+
verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_stale", 1, 3);
1216+
1217+
// With the weights stale each three endpoints should report 0 weights.
1218+
verifyDoubleHistogramRecord("grpc.lb.wrr.endpoint_weights", 3, 0);
1219+
1220+
// Since the weights are now stale the update should have triggered an additional rr_fallback
1221+
// event.
1222+
verifyLongCounterRecord("grpc.lb.wrr.rr_fallback", 1, 1);
1223+
1224+
// No further weights-not-useable events should occur, since we have received weights and
1225+
// are out of the blackout.
1226+
verifyLongCounterRecord("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0, 1);
1227+
1228+
// All metric events should be accounted for.
1229+
verifyNoMoreInteractions(mockMetricRecorder);
1230+
}
1231+
1232+
// Verifies that the MetricRecorder has been called to record a long counter value of 1 for the
1233+
// given metric name, the given number of times
1234+
private void verifyLongCounterRecord(String name, int times, long value) {
1235+
verify(mockMetricRecorder, times(times)).addLongCounter(
1236+
argThat(new ArgumentMatcher<LongCounterMetricInstrument>() {
1237+
@Override
1238+
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
1239+
return longCounterInstrument.getName().equals(name);
1240+
}
1241+
}), eq(value), eq(Lists.newArrayList("")), eq(Lists.newArrayList("")));
1242+
}
1243+
1244+
// Verifies that the MetricRecorder has been called to record a given double histogram value the
1245+
// given amount of times.
1246+
private void verifyDoubleHistogramRecord(String name, int times, double value) {
1247+
verify(mockMetricRecorder, times(times)).recordDoubleHistogram(
1248+
argThat(new ArgumentMatcher<DoubleHistogramMetricInstrument>() {
1249+
@Override
1250+
public boolean matches(DoubleHistogramMetricInstrument doubleHistogramInstrument) {
1251+
return doubleHistogramInstrument.getName().equals(name);
1252+
}
1253+
}), eq(value), eq(Lists.newArrayList("")), eq(Lists.newArrayList("")));
1254+
}
1255+
11241256
private int getNumFilteredPendingTasks() {
11251257
return AbstractTestHelper.getNumFilteredPendingTasks(fakeClock);
11261258
}
@@ -1189,5 +1321,10 @@ public Map<Subchannel, Subchannel> getMockToRealSubChannelMap() {
11891321
public Map<Subchannel, SubchannelStateListener> getSubchannelStateListeners() {
11901322
return subchannelStateListeners;
11911323
}
1324+
1325+
@Override
1326+
public MetricRecorder getMetricRecorder() {
1327+
return mockMetricRecorder;
1328+
}
11921329
}
11931330
}

0 commit comments

Comments
 (0)