@@ -278,37 +278,38 @@ private void periodicClean() {
278
278
@ GuardedBy ("lock" )
279
279
private CachedRouteLookupResponse asyncRlsCall (
280
280
RouteLookupRequest request , @ Nullable BackoffPolicy backoffPolicy ) {
281
- logger .log (ChannelLogLevel .DEBUG , "Making an async call to RLS" );
282
281
if (throttler .shouldThrottle ()) {
283
- logger .log (ChannelLogLevel .DEBUG , "Request is throttled" );
282
+ logger .log (ChannelLogLevel .DEBUG , "[RLS Entry {0}] Throttled RouteLookup" , request );
284
283
// Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
285
284
// on this result
286
285
return CachedRouteLookupResponse .backoffEntry (createBackOffEntry (
287
286
request , Status .RESOURCE_EXHAUSTED .withDescription ("RLS throttled" ), backoffPolicy ));
288
287
}
289
288
final SettableFuture <RouteLookupResponse > response = SettableFuture .create ();
290
289
io .grpc .lookup .v1 .RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER .convert (request );
291
- logger .log (ChannelLogLevel .DEBUG , "Sending RouteLookupRequest: {0}" , routeLookupRequest );
290
+ logger .log (ChannelLogLevel .DEBUG ,
291
+ "[RLS Entry {0}] Starting RouteLookup: {1}" , request , routeLookupRequest );
292
292
rlsStub .withDeadlineAfter (callTimeoutNanos , TimeUnit .NANOSECONDS )
293
293
.routeLookup (
294
294
routeLookupRequest ,
295
295
new StreamObserver <io .grpc .lookup .v1 .RouteLookupResponse >() {
296
296
@ Override
297
297
public void onNext (io .grpc .lookup .v1 .RouteLookupResponse value ) {
298
- logger .log (ChannelLogLevel .DEBUG , "Received RouteLookupResponse: {0}" , value );
298
+ logger .log (ChannelLogLevel .DEBUG ,
299
+ "[RLS Entry {0}] RouteLookup succeeded: {1}" , request , value );
299
300
response .set (RESPONSE_CONVERTER .reverse ().convert (value ));
300
301
}
301
302
302
303
@ Override
303
304
public void onError (Throwable t ) {
304
- logger .log (ChannelLogLevel .DEBUG , "Error looking up route:" , t );
305
+ logger .log (ChannelLogLevel .DEBUG ,
306
+ "[RLS Entry {0}] RouteLookup failed: {1}" , request , t );
305
307
response .setException (t );
306
308
throttler .registerBackendResponse (true );
307
309
}
308
310
309
311
@ Override
310
312
public void onCompleted () {
311
- logger .log (ChannelLogLevel .DEBUG , "routeLookup call completed" );
312
313
throttler .registerBackendResponse (false );
313
314
}
314
315
});
@@ -323,13 +324,10 @@ public void onCompleted() {
323
324
*/
324
325
@ CheckReturnValue
325
326
final CachedRouteLookupResponse get (final RouteLookupRequest request ) {
326
- logger .log (ChannelLogLevel .DEBUG , "Acquiring lock to get cached entry" );
327
327
synchronized (lock ) {
328
- logger .log (ChannelLogLevel .DEBUG , "Acquired lock to get cached entry" );
329
328
final CacheEntry cacheEntry ;
330
329
cacheEntry = linkedHashLruCache .read (request );
331
330
if (cacheEntry == null ) {
332
- logger .log (ChannelLogLevel .DEBUG , "No cache entry found, making a new RLS request" );
333
331
PendingCacheEntry pendingEntry = pendingCallCache .get (request );
334
332
if (pendingEntry != null ) {
335
333
return CachedRouteLookupResponse .pendingResponse (pendingEntry );
@@ -339,15 +337,12 @@ final CachedRouteLookupResponse get(final RouteLookupRequest request) {
339
337
340
338
if (cacheEntry instanceof DataCacheEntry ) {
341
339
// cache hit, initiate async-refresh if entry is staled
342
- logger .log (ChannelLogLevel .DEBUG , "Cache hit for the request" );
343
340
DataCacheEntry dataEntry = ((DataCacheEntry ) cacheEntry );
344
341
if (dataEntry .isStaled (ticker .read ())) {
345
- logger .log (ChannelLogLevel .DEBUG , "Cache entry is stale" );
346
342
dataEntry .maybeRefresh ();
347
343
}
348
344
return CachedRouteLookupResponse .dataEntry ((DataCacheEntry ) cacheEntry );
349
345
}
350
- logger .log (ChannelLogLevel .DEBUG , "Cache hit for a backup entry" );
351
346
return CachedRouteLookupResponse .backoffEntry ((BackoffCacheEntry ) cacheEntry );
352
347
}
353
348
}
@@ -409,8 +404,8 @@ private DataCacheEntry createDataEntry(
409
404
RouteLookupRequest request , RouteLookupResponse routeLookupResponse ) {
410
405
logger .log (
411
406
ChannelLogLevel .DEBUG ,
412
- "Transition to data cache: routeLookupResponse={0 }" ,
413
- routeLookupResponse );
407
+ "[RLS Entry {0}] Transition to data cache: routeLookupResponse={1 }" ,
408
+ request , routeLookupResponse );
414
409
DataCacheEntry entry = new DataCacheEntry (request , routeLookupResponse );
415
410
// Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
416
411
// this cache update because the lock is held
@@ -421,18 +416,19 @@ private DataCacheEntry createDataEntry(
421
416
@ GuardedBy ("lock" )
422
417
private BackoffCacheEntry createBackOffEntry (
423
418
RouteLookupRequest request , Status status , @ Nullable BackoffPolicy backoffPolicy ) {
424
- logger .log (ChannelLogLevel .DEBUG , "Transition to back off: status={0}" , status );
425
419
if (backoffPolicy == null ) {
426
420
backoffPolicy = backoffProvider .get ();
427
421
}
428
422
long delayNanos = backoffPolicy .nextBackoffNanos ();
423
+ logger .log (
424
+ ChannelLogLevel .DEBUG ,
425
+ "[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}" ,
426
+ request , status , delayNanos );
429
427
BackoffCacheEntry entry = new BackoffCacheEntry (request , status , backoffPolicy );
430
428
// Lock is held, so the task can't execute before the assignment
431
429
entry .scheduledFuture = scheduledExecutorService .schedule (
432
430
() -> refreshBackoffEntry (entry ), delayNanos , TimeUnit .NANOSECONDS );
433
431
linkedHashLruCache .cacheAndClean (request , entry );
434
- logger .log (ChannelLogLevel .DEBUG , "BackoffCacheEntry created with a delay of {0} nanos" ,
435
- delayNanos );
436
432
return entry ;
437
433
}
438
434
@@ -443,7 +439,8 @@ private void refreshBackoffEntry(BackoffCacheEntry entry) {
443
439
// Future was previously cancelled
444
440
return ;
445
441
}
446
- logger .log (ChannelLogLevel .DEBUG , "Calling RLS for transition to pending" );
442
+ logger .log (ChannelLogLevel .DEBUG ,
443
+ "[RLS Entry {0}] Calling RLS for transition to pending" , entry .request );
447
444
linkedHashLruCache .invalidate (entry .request );
448
445
asyncRlsCall (entry .request , entry .backoffPolicy );
449
446
}
@@ -659,10 +656,10 @@ void maybeRefresh() {
659
656
synchronized (lock ) { // Lock is already held, but ErrorProne can't tell
660
657
if (pendingCallCache .containsKey (request )) {
661
658
// pending already requested
662
- logger .log (ChannelLogLevel .DEBUG ,
663
- "A pending refresh request already created, no need to proceed with refresh" );
664
659
return ;
665
660
}
661
+ logger .log (ChannelLogLevel .DEBUG ,
662
+ "[RLS Entry {0}] Cache entry is stale, refreshing" , request );
666
663
asyncRlsCall (request , /* backoffPolicy= */ null );
667
664
}
668
665
}
@@ -943,13 +940,10 @@ private final class BackoffRefreshListener implements ChildLbStatusListener {
943
940
944
941
@ Override
945
942
public void onStatusChanged (ConnectivityState newState ) {
946
- logger .log (ChannelLogLevel .DEBUG , "LB status changed to: {0}" , newState );
947
943
if (prevState == ConnectivityState .TRANSIENT_FAILURE
948
944
&& newState == ConnectivityState .READY ) {
949
945
logger .log (ChannelLogLevel .DEBUG , "Transitioning from TRANSIENT_FAILURE to READY" );
950
- logger .log (ChannelLogLevel .DEBUG , "Acquiring lock force refresh backoff cache entries" );
951
946
synchronized (lock ) {
952
- logger .log (ChannelLogLevel .DEBUG , "Lock acquired for refreshing backoff cache entries" );
953
947
for (CacheEntry value : linkedHashLruCache .values ()) {
954
948
if (value instanceof BackoffCacheEntry ) {
955
949
refreshBackoffEntry ((BackoffCacheEntry ) value );
@@ -983,31 +977,22 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
983
977
RouteLookupRequest request =
984
978
requestFactory .create (serviceName , methodName , args .getHeaders ());
985
979
final CachedRouteLookupResponse response = CachingRlsLbClient .this .get (request );
986
- logger .log (ChannelLogLevel .DEBUG ,
987
- "Got route lookup cache entry for service={0}, method={1}, headers={2}:\n {3}" ,
988
- new Object []{serviceName , methodName , args .getHeaders (), response });
989
980
990
981
if (response .getHeaderData () != null && !response .getHeaderData ().isEmpty ()) {
991
- logger .log (ChannelLogLevel .DEBUG , "Updating RLS metadata from the RLS response headers" );
992
982
Metadata headers = args .getHeaders ();
993
983
headers .discardAll (RLS_DATA_KEY );
994
984
headers .put (RLS_DATA_KEY , response .getHeaderData ());
995
985
}
996
986
String defaultTarget = lbPolicyConfig .getRouteLookupConfig ().defaultTarget ();
997
- logger .log (ChannelLogLevel .DEBUG , "defaultTarget = {0}" , defaultTarget );
998
987
boolean hasFallback = defaultTarget != null && !defaultTarget .isEmpty ();
999
988
if (response .hasData ()) {
1000
- logger .log (ChannelLogLevel .DEBUG , "RLS response has data, proceed with selecting a picker" );
1001
989
ChildPolicyWrapper childPolicyWrapper = response .getChildPolicyWrapper ();
1002
990
SubchannelPicker picker =
1003
991
(childPolicyWrapper != null ) ? childPolicyWrapper .getPicker () : null ;
1004
992
if (picker == null ) {
1005
- logger .log (ChannelLogLevel .DEBUG ,
1006
- "Child policy wrapper didn't return a picker, returning PickResult with no results" );
1007
993
return PickResult .withNoResult ();
1008
994
}
1009
995
// Happy path
1010
- logger .log (ChannelLogLevel .DEBUG , "Returning PickResult" );
1011
996
PickResult pickResult = picker .pickSubchannel (args );
1012
997
if (pickResult .hasResult ()) {
1013
998
helper .getMetricRecorder ().addLongCounter (TARGET_PICKS_COUNTER , 1 ,
@@ -1017,20 +1002,15 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
1017
1002
}
1018
1003
return pickResult ;
1019
1004
} else if (response .hasError ()) {
1020
- logger .log (ChannelLogLevel .DEBUG , "RLS response has errors" );
1021
1005
if (hasFallback ) {
1022
- logger .log (ChannelLogLevel .DEBUG , "Using RLS fallback" );
1023
1006
return useFallback (args );
1024
1007
}
1025
- logger .log (ChannelLogLevel .DEBUG , "No RLS fallback, returning PickResult with an error" );
1026
1008
helper .getMetricRecorder ().addLongCounter (FAILED_PICKS_COUNTER , 1 ,
1027
1009
Arrays .asList (helper .getChannelTarget (), lookupService ), Collections .emptyList ());
1028
1010
return PickResult .withError (
1029
1011
convertRlsServerStatus (response .getStatus (),
1030
1012
lbPolicyConfig .getRouteLookupConfig ().lookupService ()));
1031
1013
} else {
1032
- logger .log (ChannelLogLevel .DEBUG ,
1033
- "RLS response had no data, return a PickResult with no data" );
1034
1014
return PickResult .withNoResult ();
1035
1015
}
1036
1016
}
@@ -1067,21 +1047,18 @@ private String determineMetricsPickResult(PickResult pickResult) {
1067
1047
1068
1048
private void startFallbackChildPolicy () {
1069
1049
String defaultTarget = lbPolicyConfig .getRouteLookupConfig ().defaultTarget ();
1070
- logger .log (ChannelLogLevel .DEBUG , "starting fallback to {0}" , defaultTarget );
1071
- logger .log (ChannelLogLevel .DEBUG , "Acquiring lock to start fallback child policy" );
1072
1050
synchronized (lock ) {
1073
- logger .log (ChannelLogLevel .DEBUG , "Acquired lock for starting fallback child policy" );
1074
1051
if (fallbackChildPolicyWrapper != null ) {
1075
1052
return ;
1076
1053
}
1054
+ logger .log (ChannelLogLevel .DEBUG , "starting fallback to {0}" , defaultTarget );
1077
1055
fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory .createOrGet (defaultTarget );
1078
1056
}
1079
1057
}
1080
1058
1081
1059
// GuardedBy CachingRlsLbClient.lock
1082
1060
void close () {
1083
1061
synchronized (lock ) { // Lock is already held, but ErrorProne can't tell
1084
- logger .log (ChannelLogLevel .DEBUG , "Closing RLS picker" );
1085
1062
if (fallbackChildPolicyWrapper != null ) {
1086
1063
refCountedChildPolicyWrapperFactory .release (fallbackChildPolicyWrapper );
1087
1064
}
0 commit comments