Skip to content

Commit 34ee600

Browse files
authoredJun 6, 2024··
Wait for all BinderServerTransports to terminate before ... (#11240)
... returning security policy executor to the object pool. Fixes #10897.
1 parent 1670e97 commit 34ee600

File tree

5 files changed

+232
-17
lines changed

5 files changed

+232
-17
lines changed
 

‎binder/src/main/java/io/grpc/binder/BinderServerBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public Server build() {
166166
ObjectPool<? extends Executor> executorPool = serverImplBuilder.getExecutorPool();
167167
Executor executor = executorPool.getObject();
168168
BinderTransportSecurity.installAuthInterceptor(this, executor);
169-
internalBuilder.setShutdownListener(() -> executorPool.returnObject(executor));
169+
internalBuilder.setTerminationListener(() -> executorPool.returnObject(executor));
170170
return super.build();
171171
}
172172
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.grpc.binder.internal;
2+
3+
import static com.google.common.base.Preconditions.checkState;
4+
5+
import io.grpc.Attributes;
6+
import io.grpc.Metadata;
7+
import io.grpc.internal.ServerListener;
8+
import io.grpc.internal.ServerStream;
9+
import io.grpc.internal.ServerTransport;
10+
import io.grpc.internal.ServerTransportListener;
11+
import javax.annotation.concurrent.GuardedBy;
12+
13+
/**
14+
* Tracks which {@link BinderTransport.BinderServerTransport} are currently active and allows
15+
* invoking a {@link Runnable} only once all transports are terminated.
16+
*/
17+
final class ActiveTransportTracker implements ServerListener {
18+
private final ServerListener delegate;
19+
private final Runnable terminationListener;
20+
21+
@GuardedBy("this")
22+
private boolean shutdown = false;
23+
24+
@GuardedBy("this")
25+
private int activeTransportCount = 0;
26+
27+
/**
28+
* @param delegate the original server listener that this object decorates. Usually passed to
29+
* {@link BinderServer#start(ServerListener)}.
30+
* @param terminationListener invoked only once the server has started shutdown ({@link
31+
* #serverShutdown()} AND the last active transport is terminated.
32+
*/
33+
ActiveTransportTracker(ServerListener delegate, Runnable terminationListener) {
34+
this.delegate = delegate;
35+
this.terminationListener = terminationListener;
36+
}
37+
38+
@Override
39+
public ServerTransportListener transportCreated(ServerTransport transport) {
40+
synchronized (this) {
41+
checkState(!shutdown, "Illegal transportCreated() after serverShutdown()");
42+
activeTransportCount++;
43+
}
44+
ServerTransportListener originalListener = delegate.transportCreated(transport);
45+
return new TrackedTransportListener(originalListener);
46+
}
47+
48+
private void untrack() {
49+
Runnable maybeTerminationListener;
50+
synchronized (this) {
51+
activeTransportCount--;
52+
maybeTerminationListener = getListenerIfTerminated();
53+
}
54+
// Prefer running the listener outside of the synchronization lock to release it sooner, since
55+
// we don't know how the callback is implemented nor how long it will take. This should
56+
// minimize the possibility of deadlocks.
57+
if (maybeTerminationListener != null) {
58+
maybeTerminationListener.run();
59+
}
60+
}
61+
62+
@Override
63+
public void serverShutdown() {
64+
delegate.serverShutdown();
65+
Runnable maybeTerminationListener;
66+
synchronized (this) {
67+
shutdown = true;
68+
maybeTerminationListener = getListenerIfTerminated();
69+
}
70+
// We may be able to shutdown immediately if there are no active transports.
71+
//
72+
// Executed outside of the lock. See "untrack()" above.
73+
if (maybeTerminationListener != null) {
74+
maybeTerminationListener.run();
75+
}
76+
}
77+
78+
@GuardedBy("this")
79+
private Runnable getListenerIfTerminated() {
80+
return (shutdown && activeTransportCount == 0) ? terminationListener : null;
81+
}
82+
83+
/**
84+
* Wraps a {@link ServerTransportListener}, unregistering it from the parent tracker once the
85+
* transport terminates.
86+
*/
87+
private final class TrackedTransportListener implements ServerTransportListener {
88+
private final ServerTransportListener delegate;
89+
90+
TrackedTransportListener(ServerTransportListener delegate) {
91+
this.delegate = delegate;
92+
}
93+
94+
@Override
95+
public void streamCreated(ServerStream stream, String method, Metadata headers) {
96+
delegate.streamCreated(stream, method, headers);
97+
}
98+
99+
@Override
100+
public Attributes transportReady(Attributes attributes) {
101+
return delegate.transportReady(attributes);
102+
}
103+
104+
@Override
105+
public void transportTerminated() {
106+
delegate.transportTerminated();
107+
untrack();
108+
}
109+
}
110+
}

‎binder/src/main/java/io/grpc/binder/internal/BinderServer.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
6868
private final LeakSafeOneWayBinder hostServiceBinder;
6969
private final BinderTransportSecurity.ServerPolicyChecker serverPolicyChecker;
7070
private final InboundParcelablePolicy inboundParcelablePolicy;
71-
private final BinderTransportSecurity.ShutdownListener transportSecurityShutdownListener;
71+
private final Runnable terminationListener;
7272

7373
@GuardedBy("this")
7474
private ServerListener listener;
@@ -86,7 +86,7 @@ private BinderServer(Builder builder) {
8686
ImmutableList.copyOf(checkNotNull(builder.streamTracerFactories, "streamTracerFactories"));
8787
this.serverPolicyChecker = BinderInternal.createPolicyChecker(builder.serverSecurityPolicy);
8888
this.inboundParcelablePolicy = builder.inboundParcelablePolicy;
89-
this.transportSecurityShutdownListener = builder.shutdownListener;
89+
this.terminationListener = builder.terminationListener;
9090
hostServiceBinder = new LeakSafeOneWayBinder(this);
9191
}
9292

@@ -97,7 +97,7 @@ public IBinder getHostBinder() {
9797

9898
@Override
9999
public synchronized void start(ServerListener serverListener) throws IOException {
100-
this.listener = serverListener;
100+
listener = new ActiveTransportTracker(serverListener, terminationListener);
101101
executorService = executorServicePool.getObject();
102102
}
103103

@@ -130,7 +130,6 @@ public synchronized void shutdown() {
130130
hostServiceBinder.setHandler(GoAwayHandler.INSTANCE);
131131
listener.serverShutdown();
132132
executorService = executorServicePool.returnObject(executorService);
133-
transportSecurityShutdownListener.onServerShutdown();
134133
}
135134
}
136135

@@ -208,7 +207,7 @@ public static class Builder {
208207
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
209208
ServerSecurityPolicy serverSecurityPolicy = SecurityPolicies.serverInternalOnly();
210209
InboundParcelablePolicy inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT;
211-
BinderTransportSecurity.ShutdownListener shutdownListener = () -> {};
210+
Runnable terminationListener = () -> {};
212211

213212
public BinderServer build() {
214213
return new BinderServer(this);
@@ -269,12 +268,13 @@ public Builder setInboundParcelablePolicy(InboundParcelablePolicy inboundParcela
269268
}
270269

271270
/**
272-
* Installs a callback that will be invoked when this server is {@link #shutdown()}
271+
* Installs a callback that will be invoked when this server is {@link #shutdown()} and all of
272+
* its transports are terminated.
273273
*
274274
* <p>Optional.
275275
*/
276-
public Builder setShutdownListener(BinderTransportSecurity.ShutdownListener shutdownListener) {
277-
this.shutdownListener = checkNotNull(shutdownListener, "shutdownListener");
276+
public Builder setTerminationListener(Runnable terminationListener) {
277+
this.terminationListener = checkNotNull(terminationListener, "terminationListener");
278278
return this;
279279
}
280280
}

‎binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java

-8
Original file line numberDiff line numberDiff line change
@@ -238,12 +238,4 @@ public interface ServerPolicyChecker {
238238
*/
239239
ListenableFuture<Status> checkAuthorizationForServiceAsync(int uid, String serviceName);
240240
}
241-
242-
/**
243-
* A listener invoked when the {@link io.grpc.binder.internal.BinderServer} shuts down, allowing
244-
* resources to be potentially cleaned up.
245-
*/
246-
public interface ShutdownListener {
247-
void onServerShutdown();
248-
}
249241
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2024 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.binder.internal;
18+
19+
import static org.junit.Assert.assertThrows;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.verifyNoInteractions;
23+
import static org.mockito.Mockito.when;
24+
25+
import io.grpc.internal.ServerListener;
26+
import io.grpc.internal.ServerTransport;
27+
import io.grpc.internal.ServerTransportListener;
28+
import org.junit.Before;
29+
import org.junit.Rule;
30+
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
import org.mockito.Mock;
33+
import org.mockito.junit.MockitoJUnit;
34+
import org.mockito.junit.MockitoRule;
35+
import org.robolectric.RobolectricTestRunner;
36+
37+
@RunWith(RobolectricTestRunner.class)
38+
public final class ActiveTransportTrackerTest {
39+
@Rule public final MockitoRule mocks = MockitoJUnit.rule();
40+
41+
private ActiveTransportTracker tracker;
42+
43+
@Mock Runnable mockShutdownListener;
44+
@Mock ServerListener mockServerListener;
45+
@Mock ServerTransportListener mockServerTransportListener;
46+
@Mock ServerTransport mockServerTransport;
47+
48+
@Before
49+
public void setUp() {
50+
when(mockServerListener.transportCreated(any())).thenReturn(mockServerTransportListener);
51+
tracker = new ActiveTransportTracker(mockServerListener, mockShutdownListener);
52+
}
53+
54+
@Test
55+
public void testServerShutdown_onlyNotifiesAfterAllTransportAreTerminated() {
56+
ServerTransportListener wrapperListener1 = registerNewTransport();
57+
ServerTransportListener wrapperListener2 = registerNewTransport();
58+
59+
tracker.serverShutdown();
60+
// 2 active transports, notification scheduled
61+
verifyNoInteractions(mockShutdownListener);
62+
63+
wrapperListener1.transportTerminated();
64+
// 1 active transport remaining, notification still pending
65+
verifyNoInteractions(mockShutdownListener);
66+
67+
wrapperListener2.transportTerminated();
68+
// No more active transports, shutdown notified
69+
verify(mockShutdownListener).run();
70+
}
71+
72+
@Test
73+
public void testServerShutdown_noActiveTransport_notifiesTerminationImmediately() {
74+
verifyNoInteractions(mockShutdownListener);
75+
76+
tracker.serverShutdown();
77+
78+
verify(mockShutdownListener).run();
79+
}
80+
81+
@Test
82+
public void testLastTransportTerminated_serverNotShutdownYet_doesNotNotify() {
83+
ServerTransportListener wrapperListener = registerNewTransport();
84+
verifyNoInteractions(mockShutdownListener);
85+
86+
wrapperListener.transportTerminated();
87+
88+
verifyNoInteractions(mockShutdownListener);
89+
}
90+
91+
@Test
92+
public void testTransportCreation_afterServerShutdown_throws() {
93+
tracker.serverShutdown();
94+
95+
assertThrows(IllegalStateException.class, this::registerNewTransport);
96+
}
97+
98+
@Test
99+
public void testServerListenerCallbacks_invokesDelegates() {
100+
ServerTransportListener listener = tracker.transportCreated(mockServerTransport);
101+
verify(mockServerListener).transportCreated(mockServerTransport);
102+
103+
listener.transportTerminated();
104+
verify(mockServerTransportListener).transportTerminated();
105+
106+
tracker.serverShutdown();
107+
verify(mockServerListener).serverShutdown();
108+
}
109+
110+
private ServerTransportListener registerNewTransport() {
111+
return tracker.transportCreated(mockServerTransport);
112+
}
113+
}

0 commit comments

Comments
 (0)
Please sign in to comment.