Skip to content

Commit fea577c

Browse files
authoredMay 23, 2024··
core: Exit idle mode when delayed transport is in use
8844cf7 triggered a regression where a new RPC wouldn't cause the channel to exit idle mode, if an RPC was still progressing on an old transport. This was already possible previously, but was racy. 8844cf7 made it less racy and more obvious. The two added `exitIdleMode()` calls in this commit are companions to those in `enterIdleMode()`, which detect whether the channel should immediately exit idle mode. Noticed in cl/635819804.
1 parent 0b5f38d commit fea577c

File tree

4 files changed

+95
-7
lines changed

4 files changed

+95
-7
lines changed
 

‎core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -889,12 +889,6 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
889889
if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
890890
return newClientCall(method, callOptions);
891891
}
892-
syncContext.execute(new Runnable() {
893-
@Override
894-
public void run() {
895-
exitIdleMode();
896-
}
897-
});
898892
if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
899893
// This is an optimization for the case (typically with InProcessTransport) when name
900894
// resolution result is immediately available at this point. Otherwise, some users'
@@ -927,6 +921,10 @@ public void run() {
927921
if (pendingCalls == null) {
928922
pendingCalls = new LinkedHashSet<>();
929923
inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
924+
// It's possible to be in idle mode while inUseStateAggregator is in-use, if one of
925+
// the subchannels is in use. But we should never be in idle mode when pendingCalls is
926+
// in use.
927+
exitIdleMode();
930928
}
931929
pendingCalls.add(pendingCall);
932930
} else {
@@ -2081,6 +2079,12 @@ public Attributes filterTransport(Attributes attributes) {
20812079
@Override
20822080
public void transportInUse(final boolean inUse) {
20832081
inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
2082+
if (inUse) {
2083+
// It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
2084+
// subchannels is in use. But we should never be in idle mode when delayed transport is in
2085+
// use.
2086+
exitIdleMode();
2087+
}
20842088
}
20852089

20862090
@Override

‎core/src/testFixtures/java/io/grpc/internal/FakeClock.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ private void schedule(ScheduledTask task, long delay, TimeUnit unit) {
188188
}
189189

190190
@Override public boolean isShutdown() {
191-
throw new UnsupportedOperationException();
191+
// If shutdown is not implemented, then it is never shutdown.
192+
return false;
192193
}
193194

194195
@Override public boolean isTerminated() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2024 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.testing.integration;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
21+
import io.grpc.ManagedChannel;
22+
import io.grpc.ServerInterceptors;
23+
import io.grpc.inprocess.InProcessChannelBuilder;
24+
import io.grpc.inprocess.InProcessServerBuilder;
25+
import io.grpc.internal.FakeClock;
26+
import io.grpc.internal.testing.StreamRecorder;
27+
import io.grpc.stub.StreamObserver;
28+
import io.grpc.testing.GrpcCleanupRule;
29+
import io.grpc.testing.integration.EmptyProtos.Empty;
30+
import io.grpc.testing.integration.Messages.ResponseParameters;
31+
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
32+
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
33+
import java.util.concurrent.TimeUnit;
34+
import org.junit.Rule;
35+
import org.junit.Test;
36+
import org.junit.runner.RunWith;
37+
import org.junit.runners.JUnit4;
38+
39+
/** Tests for ManagedChannelImpl that use a real transport. */
40+
@RunWith(JUnit4.class)
41+
public final class ManagedChannelImplIntegrationTest {
42+
private static final String SERVER_NAME = ManagedChannelImplIntegrationTest.class.getName();
43+
@Rule
44+
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
45+
46+
@Test
47+
public void idleWhileRpcInTransport_exitsIdleForNewRpc() throws Exception {
48+
FakeClock fakeClock = new FakeClock();
49+
grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME)
50+
.directExecutor()
51+
.addService(
52+
ServerInterceptors.intercept(
53+
new TestServiceImpl(fakeClock.getScheduledExecutorService()),
54+
TestServiceImpl.interceptors()))
55+
.build()
56+
.start());
57+
ManagedChannel channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME)
58+
.directExecutor()
59+
.build());
60+
61+
TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel);
62+
TestServiceGrpc.TestServiceStub asyncStub = TestServiceGrpc.newStub(channel);
63+
StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
64+
StreamObserver<StreamingOutputCallRequest> requestObserver =
65+
asyncStub.fullDuplexCall(responseObserver);
66+
requestObserver.onNext(StreamingOutputCallRequest.newBuilder()
67+
.addResponseParameters(ResponseParameters.newBuilder()
68+
.setIntervalUs(Integer.MAX_VALUE))
69+
.build());
70+
try {
71+
channel.enterIdle();
72+
assertThat(blockingStub
73+
.withDeadlineAfter(10, TimeUnit.SECONDS)
74+
.emptyCall(Empty.getDefaultInstance()))
75+
.isEqualTo(Empty.getDefaultInstance());
76+
} finally {
77+
requestObserver.onError(new RuntimeException("cleanup"));
78+
}
79+
}
80+
}

‎xds/src/test/java/io/grpc/xds/CsdsServiceTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ public void fetchClientConfig_interruptedException() {
169169
grpcServerRule.getServiceRegistry()
170170
.addService(new CsdsService(new FakeXdsClientPoolFactory(throwingXdsClient)));
171171

172+
// Hack to prevent the interrupted exception from propagating through to the client stub.
173+
grpcServerRule.getChannel().getState(true);
174+
172175
try {
173176
ClientStatusResponse response = csdsStub.fetchClientStatus(REQUEST);
174177
fail("Should've failed, got response: " + response);

0 commit comments

Comments
 (0)
Please sign in to comment.