@@ -69,23 +69,8 @@ final class DelayedClientTransport implements ManagedClientTransport {
69
69
@ GuardedBy ("lock" )
70
70
private Collection <PendingStream > pendingStreams = new LinkedHashSet <>();
71
71
72
- /**
73
- * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
74
- * terminated.
75
- */
76
- @ GuardedBy ("lock" )
77
- private Status shutdownStatus ;
78
-
79
- /**
80
- * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
81
- * to idle.
82
- */
83
- @ GuardedBy ("lock" )
84
- @ Nullable
85
- private SubchannelPicker lastPicker ;
86
-
87
- @ GuardedBy ("lock" )
88
- private long lastPickerVersion ;
72
+ /** Immutable state needed for picking. 'lock' must be held for writing. */
73
+ private volatile PickerState pickerState = new PickerState (null , null );
89
74
90
75
/**
91
76
* Creates a new delayed transport.
@@ -139,33 +124,30 @@ public final ClientStream newStream(
139
124
try {
140
125
PickSubchannelArgs args = new PickSubchannelArgsImpl (
141
126
method , headers , callOptions , new PickDetailsConsumerImpl (tracers ));
142
- SubchannelPicker picker = null ;
143
- long pickerVersion = -1 ;
127
+ PickerState state = pickerState ;
144
128
while (true ) {
145
- synchronized (lock ) {
146
- if (shutdownStatus != null ) {
147
- return new FailingClientStream (shutdownStatus , tracers );
148
- }
149
- if (lastPicker == null ) {
150
- return createPendingStream (args , tracers );
151
- }
152
- // Check for second time through the loop, and whether anything changed
153
- if (picker != null && pickerVersion == lastPickerVersion ) {
154
- return createPendingStream (args , tracers );
155
- }
156
- picker = lastPicker ;
157
- pickerVersion = lastPickerVersion ;
129
+ if (state .shutdownStatus != null ) {
130
+ return new FailingClientStream (state .shutdownStatus , tracers );
158
131
}
159
- PickResult pickResult = picker .pickSubchannel (args );
160
- ClientTransport transport = GrpcUtil .getTransportFromPickResult (pickResult ,
161
- callOptions .isWaitForReady ());
162
- if (transport != null ) {
163
- return transport .newStream (
164
- args .getMethodDescriptor (), args .getHeaders (), args .getCallOptions (),
165
- tracers );
132
+ if (state .lastPicker != null ) {
133
+ PickResult pickResult = state .lastPicker .pickSubchannel (args );
134
+ ClientTransport transport = GrpcUtil .getTransportFromPickResult (pickResult ,
135
+ callOptions .isWaitForReady ());
136
+ if (transport != null ) {
137
+ return transport .newStream (
138
+ args .getMethodDescriptor (), args .getHeaders (), args .getCallOptions (),
139
+ tracers );
140
+ }
166
141
}
167
142
// This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
168
143
// race with reprocess()), we will buffer it. Otherwise, will try with the new picker.
144
+ synchronized (lock ) {
145
+ PickerState newerState = pickerState ;
146
+ if (state == newerState ) {
147
+ return createPendingStream (args , tracers );
148
+ }
149
+ state = newerState ;
150
+ }
169
151
}
170
152
} finally {
171
153
syncContext .drain ();
@@ -210,10 +192,10 @@ public ListenableFuture<SocketStats> getStats() {
210
192
@ Override
211
193
public final void shutdown (final Status status ) {
212
194
synchronized (lock ) {
213
- if (shutdownStatus != null ) {
195
+ if (pickerState . shutdownStatus != null ) {
214
196
return ;
215
197
}
216
- shutdownStatus = status ;
198
+ pickerState = pickerState . withShutdownStatus ( status ) ;
217
199
syncContext .executeLater (new Runnable () {
218
200
@ Override
219
201
public void run () {
@@ -288,8 +270,7 @@ final int getPendingStreamsCount() {
288
270
final void reprocess (@ Nullable SubchannelPicker picker ) {
289
271
ArrayList <PendingStream > toProcess ;
290
272
synchronized (lock ) {
291
- lastPicker = picker ;
292
- lastPickerVersion ++;
273
+ pickerState = pickerState .withPicker (picker );
293
274
if (picker == null || !hasPendingStreams ()) {
294
275
return ;
295
276
}
@@ -338,7 +319,7 @@ final void reprocess(@Nullable SubchannelPicker picker) {
338
319
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter
339
320
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
340
321
syncContext .executeLater (reportTransportNotInUse );
341
- if (shutdownStatus != null && reportTransportTerminated != null ) {
322
+ if (pickerState . shutdownStatus != null && reportTransportTerminated != null ) {
342
323
syncContext .executeLater (reportTransportTerminated );
343
324
reportTransportTerminated = null ;
344
325
}
@@ -384,7 +365,7 @@ public void cancel(Status reason) {
384
365
boolean justRemovedAnElement = pendingStreams .remove (this );
385
366
if (!hasPendingStreams () && justRemovedAnElement ) {
386
367
syncContext .executeLater (reportTransportNotInUse );
387
- if (shutdownStatus != null ) {
368
+ if (pickerState . shutdownStatus != null ) {
388
369
syncContext .executeLater (reportTransportTerminated );
389
370
reportTransportTerminated = null ;
390
371
}
@@ -409,4 +390,32 @@ public void appendTimeoutInsight(InsightBuilder insight) {
409
390
super .appendTimeoutInsight (insight );
410
391
}
411
392
}
393
+
394
+ static final class PickerState {
395
+ /**
396
+ * The last picker that {@link #reprocess} has used. May be set to null when the channel has
397
+ * moved to idle.
398
+ */
399
+ @ Nullable
400
+ final SubchannelPicker lastPicker ;
401
+ /**
402
+ * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
403
+ * terminated.
404
+ */
405
+ @ Nullable
406
+ final Status shutdownStatus ;
407
+
408
+ private PickerState (SubchannelPicker lastPicker , Status shutdownStatus ) {
409
+ this .lastPicker = lastPicker ;
410
+ this .shutdownStatus = shutdownStatus ;
411
+ }
412
+
413
+ public PickerState withPicker (SubchannelPicker newPicker ) {
414
+ return new PickerState (newPicker , this .shutdownStatus );
415
+ }
416
+
417
+ public PickerState withShutdownStatus (Status newShutdownStatus ) {
418
+ return new PickerState (this .lastPicker , newShutdownStatus );
419
+ }
420
+ }
412
421
}
0 commit comments