Skip to content

Commit f9b6e5f

Browse files
committedMay 13, 2024·
rls: Guarantee backoff will update RLS picker
Previously, picker was likely null if entering backoff soon after start-up. This prevented the picker from being updated and directing queued RPCs to the fallback. It would work for new RPCs if RLS returned extremely rapidly; both ManagedChannelImpl and DelayedClientTransport do a pick before enqueuing so the ManagedChannelImpl pick could request from RLS and DelayedClientTransport could use the response. So the test uses a delay to purposefully avoid that unlikely-in-real-life case. Creating a resolving OOB channel for InProcess doesn't actually change the destination from the parent, because InProcess uses directaddress. Thus the fakeRlsServiceImpl is now being added to the fake backend server, because the same server is used for RLS within the test. b/333185213
1 parent 77a1e77 commit f9b6e5f

File tree

4 files changed

+76
-15
lines changed

4 files changed

+76
-15
lines changed
 

‎rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

+9-14
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,12 @@ public void accept(BatchRecorder recorder) {
248248
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
249249
}
250250

251+
void init() {
252+
synchronized (lock) {
253+
refCountedChildPolicyWrapperFactory.init();
254+
}
255+
}
256+
251257
/**
252258
* Convert the status to UNAVAILABLE and enhance the error message.
253259
* @param status status as provided by server
@@ -385,7 +391,7 @@ private void pendingRpcComplete(PendingCacheEntry entry) {
385391
} catch (Exception e) {
386392
createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
387393
// Cache updated. updateBalancingState() to reattempt picks
388-
helper.propagateRlsError();
394+
helper.triggerPendingRpcProcessing();
389395
}
390396
}
391397
}
@@ -457,19 +463,8 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne
457463
super.updateBalancingState(newState, newPicker);
458464
}
459465

460-
void propagateRlsError() {
461-
getSynchronizationContext().execute(new Runnable() {
462-
@Override
463-
public void run() {
464-
if (picker != null) {
465-
// Refresh the channel state and let pending RPCs reprocess the picker.
466-
updateBalancingState(state, picker);
467-
}
468-
}
469-
});
470-
}
471-
472466
void triggerPendingRpcProcessing() {
467+
checkState(state != null, "updateBalancingState hasn't yet been called");
473468
helper.getSynchronizationContext().execute(
474469
() -> super.updateBalancingState(state, picker));
475470
}
@@ -842,7 +837,7 @@ Builder setBackoffProvider(BackoffPolicy.Provider provider) {
842837

843838
CachingRlsLbClient build() {
844839
CachingRlsLbClient client = new CachingRlsLbClient(this);
845-
helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
840+
client.init();
846841
return client;
847842
}
848843
}

‎rls/src/main/java/io/grpc/rls/ChildLoadBalancerHelper.java

+4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ static final class ChildLoadBalancerHelperProvider {
7777
this.picker = checkNotNull(picker, "picker");
7878
}
7979

80+
void init() {
81+
helper.updateBalancingState(ConnectivityState.CONNECTING, picker);
82+
}
83+
8084
ChildLoadBalancerHelper forTarget(String target) {
8185
return new ChildLoadBalancerHelper(target, helper, subchannelStateManager, picker);
8286
}

‎rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java

+4
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,10 @@ public RefCountedChildPolicyWrapperFactory(
225225
this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener");
226226
}
227227

228+
void init() {
229+
childLbHelperProvider.init();
230+
}
231+
228232
ChildPolicyWrapper createOrGet(String target) {
229233
// TODO(creamsoup) check if the target is valid or not
230234
RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);

‎rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java

+59-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ public void uncaughtException(Thread t, Throwable e) {
134134
private final FakeHelper helperDelegate = new FakeHelper();
135135
private final Helper helper =
136136
mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate));
137-
private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl();
137+
private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl(
138+
fakeClock.getScheduledExecutorService());
138139
private final Deque<FakeSubchannel> subchannels = new LinkedList<>();
139140
private final FakeThrottler fakeThrottler = new FakeThrottler();
140141
private final String channelTarget = "channelTarget";
@@ -296,6 +297,38 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
296297
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
297298
}
298299

300+
@Test
301+
public void fallbackWithDelay_succeeds() throws Exception {
302+
fakeRlsServerImpl.setResponseDelay(100, TimeUnit.MILLISECONDS);
303+
grpcCleanupRule.register(
304+
InProcessServerBuilder.forName("fake-bigtable.googleapis.com")
305+
.addService(ServerServiceDefinition.builder("com.google")
306+
.addMethod(fakeSearchMethod, (call, headers) -> {
307+
call.sendHeaders(new Metadata());
308+
call.sendMessage(null);
309+
call.close(Status.OK, new Metadata());
310+
return new ServerCall.Listener<Void>() {};
311+
})
312+
.build())
313+
.addService(fakeRlsServerImpl)
314+
.directExecutor()
315+
.build()
316+
.start());
317+
ManagedChannel channel = grpcCleanupRule.register(
318+
InProcessChannelBuilder.forName("fake-bigtable.googleapis.com")
319+
.defaultServiceConfig(parseJson(getServiceConfigJsonStr()))
320+
.directExecutor()
321+
.build());
322+
323+
StreamRecorder<Void> recorder = StreamRecorder.create();
324+
StreamObserver<Void> requestObserver = ClientCalls.asyncClientStreamingCall(
325+
channel.newCall(fakeSearchMethod, CallOptions.DEFAULT), recorder);
326+
requestObserver.onCompleted();
327+
fakeClock.forwardTime(100, TimeUnit.MILLISECONDS);
328+
assertThat(recorder.awaitCompletion(10, TimeUnit.SECONDS)).isTrue();
329+
assertThat(recorder.getError()).isNull();
330+
}
331+
299332
@Test
300333
public void metricsWithRealChannel() throws Exception {
301334
grpcCleanupRule.register(
@@ -308,6 +341,7 @@ public void metricsWithRealChannel() throws Exception {
308341
return new ServerCall.Listener<Void>() {};
309342
})
310343
.build())
344+
.addService(fakeRlsServerImpl)
311345
.directExecutor()
312346
.build()
313347
.start());
@@ -761,17 +795,41 @@ private static final class FakeRlsServerImpl
761795
private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
762796
RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
763797

798+
private final ScheduledExecutorService scheduler;
799+
private long delay;
800+
private TimeUnit delayUnit;
801+
802+
public FakeRlsServerImpl(ScheduledExecutorService scheduler) {
803+
this.scheduler = scheduler;
804+
}
805+
764806
private Map<RouteLookupRequest, RouteLookupResponse> lookupTable = ImmutableMap.of();
765807

766808
private void setLookupTable(Map<RouteLookupRequest, RouteLookupResponse> lookupTable) {
767809
this.lookupTable = checkNotNull(lookupTable, "lookupTable");
768810
}
769811

812+
void setResponseDelay(long delay, TimeUnit unit) {
813+
this.delay = delay;
814+
this.delayUnit = unit;
815+
}
816+
770817
@Override
818+
@SuppressWarnings("FutureReturnValueIgnored")
771819
public void routeLookup(io.grpc.lookup.v1.RouteLookupRequest request,
772820
StreamObserver<io.grpc.lookup.v1.RouteLookupResponse> responseObserver) {
773821
RouteLookupResponse response =
774822
lookupTable.get(REQUEST_CONVERTER.convert(request));
823+
Runnable sendResponse = () -> sendResponse(response, responseObserver);
824+
if (delay != 0) {
825+
scheduler.schedule(sendResponse, delay, delayUnit);
826+
} else {
827+
sendResponse.run();
828+
}
829+
}
830+
831+
private void sendResponse(RouteLookupResponse response,
832+
StreamObserver<io.grpc.lookup.v1.RouteLookupResponse> responseObserver) {
775833
if (response == null) {
776834
responseObserver.onError(new RuntimeException("not found"));
777835
} else {

0 commit comments

Comments
 (0)
Please sign in to comment.