Skip to content

Commit 8133318

Browse files
temawiejona86
authored andcommittedMay 9, 2024·
rls: Add gauge metric recording (#11175)
Adds these gauges: - grpc.lb.rls.cache_entries - grpc.lb.rls.cache_size
1 parent f737cbc commit 8133318

File tree

3 files changed

+127
-12
lines changed

3 files changed

+127
-12
lines changed
 

Diff for: ‎rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

+38
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,14 @@
3737
import io.grpc.LoadBalancer.ResolvedAddresses;
3838
import io.grpc.LoadBalancer.SubchannelPicker;
3939
import io.grpc.LongCounterMetricInstrument;
40+
import io.grpc.LongGaugeMetricInstrument;
4041
import io.grpc.ManagedChannel;
4142
import io.grpc.ManagedChannelBuilder;
4243
import io.grpc.Metadata;
4344
import io.grpc.MetricInstrumentRegistry;
45+
import io.grpc.MetricRecorder.BatchCallback;
46+
import io.grpc.MetricRecorder.BatchRecorder;
47+
import io.grpc.MetricRecorder.Registration;
4448
import io.grpc.Status;
4549
import io.grpc.internal.BackoffPolicy;
4650
import io.grpc.internal.ExponentialBackoffPolicy;
@@ -65,6 +69,7 @@
6569
import java.util.HashMap;
6670
import java.util.List;
6771
import java.util.Map;
72+
import java.util.UUID;
6873
import java.util.concurrent.Future;
6974
import java.util.concurrent.ScheduledExecutorService;
7075
import java.util.concurrent.TimeUnit;
@@ -94,6 +99,10 @@ final class CachingRlsLbClient {
9499
private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
95100
private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
96101
private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
102+
private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
103+
private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
104+
private final Registration gaugeRegistration;
105+
private final String metricsInstanceUuid = UUID.randomUUID().toString();
97106

98107
// All cache status changes (pending, backoff, success) must be under this lock
99108
private final Object lock = new Object();
@@ -138,6 +147,14 @@ final class CachingRlsLbClient {
138147
"Number of LB picks failed due to either a failed RLS request or the RLS channel being "
139148
+ "throttled", "pick", Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
140149
Collections.emptyList(), true);
150+
CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
151+
"Number of entries in the RLS cache", "entry",
152+
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
153+
Collections.emptyList(), true);
154+
CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
155+
"The current size of the RLS cache", "byte",
156+
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
157+
Collections.emptyList(), true);
141158
}
142159

143160
private CachingRlsLbClient(Builder builder) {
@@ -202,6 +219,26 @@ private CachingRlsLbClient(Builder builder) {
202219
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
203220
childLbHelperProvider,
204221
new BackoffRefreshListener());
222+
223+
gaugeRegistration = helper.getMetricRecorder()
224+
.registerBatchCallback(new BatchCallback() {
225+
@Override
226+
public void accept(BatchRecorder recorder) {
227+
int estimatedSize;
228+
long estimatedSizeBytes;
229+
synchronized (lock) {
230+
estimatedSize = linkedHashLruCache.estimatedSize();
231+
estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
232+
}
233+
recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
234+
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
235+
metricsInstanceUuid), Collections.emptyList());
236+
recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
237+
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
238+
metricsInstanceUuid), Collections.emptyList());
239+
}
240+
}, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
241+
205242
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
206243
}
207244

@@ -306,6 +343,7 @@ void close() {
306343
pendingCallCache.clear();
307344
rlsChannel.shutdownNow();
308345
rlsPicker.close();
346+
gaugeRegistration.close();
309347
}
310348
}
311349

Diff for: ‎rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java

+83-2
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323
import static io.grpc.rls.CachingRlsLbClient.RLS_DATA_KEY;
2424
import static org.junit.Assert.assertNotNull;
2525
import static org.junit.Assert.assertSame;
26+
import static org.mockito.AdditionalAnswers.delegatesTo;
2627
import static org.mockito.ArgumentMatchers.any;
28+
import static org.mockito.ArgumentMatchers.argThat;
2729
import static org.mockito.ArgumentMatchers.eq;
2830
import static org.mockito.Mockito.inOrder;
2931
import static org.mockito.Mockito.mock;
3032
import static org.mockito.Mockito.times;
3133
import static org.mockito.Mockito.verify;
34+
import static org.mockito.Mockito.when;
3235

3336
import com.google.common.base.Converter;
3437
import com.google.common.collect.ImmutableList;
@@ -47,10 +50,14 @@
4750
import io.grpc.LoadBalancer.PickResult;
4851
import io.grpc.LoadBalancer.SubchannelPicker;
4952
import io.grpc.LoadBalancerProvider;
53+
import io.grpc.LongGaugeMetricInstrument;
5054
import io.grpc.ManagedChannel;
5155
import io.grpc.ManagedChannelBuilder;
5256
import io.grpc.Metadata;
5357
import io.grpc.MetricRecorder;
58+
import io.grpc.MetricRecorder.BatchCallback;
59+
import io.grpc.MetricRecorder.BatchRecorder;
60+
import io.grpc.MetricRecorder.Registration;
5461
import io.grpc.NameResolver.ConfigOrError;
5562
import io.grpc.Status;
5663
import io.grpc.Status.Code;
@@ -95,12 +102,14 @@
95102
import java.util.concurrent.TimeoutException;
96103
import javax.annotation.Nonnull;
97104
import org.junit.After;
105+
import org.junit.Before;
98106
import org.junit.Rule;
99107
import org.junit.Test;
100108
import org.junit.runner.RunWith;
101109
import org.junit.runners.JUnit4;
102-
import org.mockito.AdditionalAnswers;
103110
import org.mockito.ArgumentCaptor;
111+
import org.mockito.ArgumentMatcher;
112+
import org.mockito.Captor;
104113
import org.mockito.InOrder;
105114
import org.mockito.Mock;
106115
import org.mockito.junit.MockitoJUnit;
@@ -124,6 +133,13 @@ public class CachingRlsLbClientTest {
124133
private SocketAddress socketAddress;
125134
@Mock
126135
private MetricRecorder mockMetricRecorder;
136+
@Mock
137+
private BatchRecorder mockBatchRecorder;
138+
@Mock
139+
private Registration mockGaugeRegistration;
140+
@Captor
141+
private ArgumentCaptor<BatchCallback> gaugeBatchCallbackCaptor;
142+
127143

128144
private final SynchronizationContext syncContext =
129145
new SynchronizationContext(new UncaughtExceptionHandler() {
@@ -145,7 +161,7 @@ public void uncaughtException(Thread t, Throwable e) {
145161
private final ChildLoadBalancingPolicy childLbPolicy =
146162
new ChildLoadBalancingPolicy("target", Collections.<String, Object>emptyMap(), lbProvider);
147163
private final Helper helper =
148-
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
164+
mock(Helper.class, delegatesTo(new FakeHelper()));
149165
private final FakeThrottler fakeThrottler = new FakeThrottler();
150166
private final LbPolicyConfiguration lbPolicyConfiguration =
151167
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, null, childLbPolicy);
@@ -168,6 +184,11 @@ private void setUpRlsLbClient() {
168184
.build();
169185
}
170186

187+
@Before
188+
public void setUpMockMetricRecorder() {
189+
when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration);
190+
}
191+
171192
@After
172193
public void tearDown() throws Exception {
173194
rlsLbClient.close();
@@ -636,6 +657,51 @@ private void setState(ChildPolicyWrapper policyWrapper, ConnectivityState newSta
636657
policyWrapper.getHelper().updateBalancingState(newState, policyWrapper.getPicker());
637658
}
638659

660+
@Test
661+
public void metricGauges() throws ExecutionException, InterruptedException, TimeoutException {
662+
setUpRlsLbClient();
663+
664+
verify(mockMetricRecorder).registerBatchCallback(gaugeBatchCallbackCaptor.capture(),
665+
any());
666+
667+
BatchCallback gaugeBatchCallback = gaugeBatchCallbackCaptor.getValue();
668+
669+
// Verify the correct cache gauge values when requested at this point.
670+
InOrder inOrder = inOrder(mockBatchRecorder);
671+
gaugeBatchCallback.accept(mockBatchRecorder);
672+
inOrder.verify(mockBatchRecorder).recordLongGauge(
673+
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(0L),
674+
any(), any());
675+
inOrder.verify(mockBatchRecorder)
676+
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
677+
eq(0L), any(), any());
678+
679+
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(
680+
ImmutableMap.of("server", "bigtable.googleapis.com", "service-key", "foo", "method-key",
681+
"bar"));
682+
rlsServerImpl.setLookupTable(ImmutableMap.of(routeLookupRequest,
683+
RouteLookupResponse.create(ImmutableList.of("target"), "header")));
684+
685+
// Make a request that will populate the cache with an entry
686+
getInSyncContext(routeLookupRequest);
687+
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
688+
689+
// Gauge values should reflect the new cache entry.
690+
gaugeBatchCallback.accept(mockBatchRecorder);
691+
inOrder.verify(mockBatchRecorder).recordLongGauge(
692+
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(1L),
693+
any(), any());
694+
inOrder.verify(mockBatchRecorder)
695+
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
696+
eq(260L), any(), any());
697+
698+
inOrder.verifyNoMoreInteractions();
699+
700+
// Shutdown
701+
rlsLbClient.close();
702+
verify(mockGaugeRegistration).close();
703+
}
704+
639705
private static RouteLookupConfig getRouteLookupConfig() {
640706
return RouteLookupConfig.builder()
641707
.grpcKeybuilders(ImmutableList.of(
@@ -667,6 +733,21 @@ public long nextBackoffNanos() {
667733
};
668734
}
669735

736+
private static class LongGaugeInstrumentArgumentMatcher implements
737+
ArgumentMatcher<LongGaugeMetricInstrument> {
738+
739+
private final String instrumentName;
740+
741+
public LongGaugeInstrumentArgumentMatcher(String instrumentName) {
742+
this.instrumentName = instrumentName;
743+
}
744+
745+
@Override
746+
public boolean matches(LongGaugeMetricInstrument instrument) {
747+
return instrument.getName().equals(instrumentName);
748+
}
749+
}
750+
670751
private static final class FakeBackoffProvider implements BackoffPolicy.Provider {
671752

672753
private BackoffPolicy nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);

Diff for: ‎rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java

+6-10
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import static org.mockito.Mockito.mock;
2929
import static org.mockito.Mockito.times;
3030
import static org.mockito.Mockito.verify;
31-
import static org.mockito.Mockito.verifyNoMoreInteractions;
31+
import static org.mockito.Mockito.when;
3232

3333
import com.google.common.base.Converter;
3434
import com.google.common.collect.ImmutableList;
@@ -59,6 +59,7 @@
5959
import io.grpc.MethodDescriptor.MethodType;
6060
import io.grpc.MetricInstrument;
6161
import io.grpc.MetricRecorder;
62+
import io.grpc.MetricRecorder.Registration;
6263
import io.grpc.MetricSink;
6364
import io.grpc.NameResolver.ConfigOrError;
6465
import io.grpc.NoopMetricSink;
@@ -128,6 +129,8 @@ public void uncaughtException(Thread t, Throwable e) {
128129
});
129130
@Mock
130131
private MetricRecorder mockMetricRecorder;
132+
@Mock
133+
private Registration mockGaugeRegistration;
131134
private final FakeHelper helperDelegate = new FakeHelper();
132135
private final Helper helper =
133136
mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate));
@@ -186,6 +189,8 @@ public CachingRlsLbClient.Builder get() {
186189

187190
searchSubchannelArgs = newPickSubchannelArgs(fakeSearchMethod);
188191
rescueSubchannelArgs = newPickSubchannelArgs(fakeRescueMethod);
192+
193+
when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration);
189194
}
190195

191196
@After
@@ -226,7 +231,6 @@ public void lb_serverStatusCodeConversion() throws Exception {
226231
assertThat(serverStatus.getDescription()).contains("RLS server returned: ");
227232
assertThat(serverStatus.getDescription()).endsWith("ABORTED: base desc");
228233
assertThat(serverStatus.getDescription()).contains("RLS server conv.test");
229-
verifyNoMoreInteractions(mockMetricRecorder);
230234
}
231235

232236
@Test
@@ -290,8 +294,6 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
290294
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
291295
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
292296
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
293-
294-
verifyNoMoreInteractions(mockMetricRecorder);
295297
}
296298

297299
@Test
@@ -351,7 +353,6 @@ public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception {
351353
inOrder.verify(helper).getChannelTarget();
352354
inOrder.verifyNoMoreInteractions();
353355
verifyFailedPicksCounterAdd(1, 1);
354-
verifyNoMoreInteractions(mockMetricRecorder);
355356
}
356357

357358
@Test
@@ -377,7 +378,6 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
377378
int times = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2;
378379
verifyLongCounterAdd("grpc.lb.rls.default_target_picks", times, 1,
379380
"defaultTarget", "complete");
380-
verifyNoMoreInteractions(mockMetricRecorder);
381381

382382
Subchannel subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel();
383383
assertThat(subchannelIsReady(subchannel)).isTrue();
@@ -422,8 +422,6 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
422422
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
423423
assertThat(res.getSubchannel()).isNull();
424424
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
425-
426-
verifyNoMoreInteractions(mockMetricRecorder);
427425
}
428426

429427
@Test
@@ -499,7 +497,6 @@ public void lb_working_withoutDefaultTarget() throws Exception {
499497
inOrder.verify(helper, atLeast(0)).refreshNameResolution();
500498
inOrder.verifyNoMoreInteractions();
501499
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
502-
verifyNoMoreInteractions(mockMetricRecorder);
503500
}
504501

505502
@Test
@@ -542,7 +539,6 @@ public void lb_nameResolutionFailed() throws Exception {
542539
res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
543540
assertThat(res.getStatus().isOk()).isFalse();
544541
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
545-
verifyNoMoreInteractions(mockMetricRecorder);
546542
}
547543

548544
private PickResult markReadyAndGetPickResult(InOrder inOrder,

0 commit comments

Comments
 (0)
Please sign in to comment.