Skip to content

Commit

Permalink
rls: fix a data race involving the LRU cache (#5925)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Jan 13, 2023
1 parent be06d52 commit 9228cff
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 198 deletions.
29 changes: 21 additions & 8 deletions balancer/rls/balancer.go
Expand Up @@ -125,8 +125,11 @@ type rlsBalancer struct {
// fact that in places where we need to acquire both the locks, we always
// start off reading the cache.

// cacheMu guards access to the data cache and pending requests map.
cacheMu sync.RWMutex
// cacheMu guards access to the data cache and pending requests map. We
// cannot use an RWMutex here since even an operation like
// dataCache.getEntry() modifies the underlying LRU, which is implemented as
// a doubly linked list.
cacheMu sync.Mutex
dataCache *dataCache // Cache of RLS data.
pendingMap map[cacheKey]*backoffState // Map of pending RLS requests.

Expand Down Expand Up @@ -263,17 +266,14 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
// channels, we also swap out the throttling state.
b.handleControlChannelUpdate(newCfg)

// If the new config changes the size of the data cache, we might have to
// evict entries to get the cache size down to the newly specified size.
if newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes {
b.dataCache.resize(newCfg.cacheSizeBytes)
}

// Any changes to child policy name or configuration needs to be handled by
// either creating new child policies or pushing updates to existing ones.
b.resolverState = ccs.ResolverState
b.handleChildPolicyConfigUpdate(newCfg, &ccs)

// Resize the cache if the size in the config has changed.
resizeCache := newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes

// Update the copy of the config in the LB policy before releasing the lock.
b.lbCfg = newCfg

Expand All @@ -284,6 +284,19 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
b.updateCh.Put(resumePickerUpdates{done: done})
b.stateMu.Unlock()
<-done

if resizeCache {
// If the new config changes reduces the size of the data cache, we
// might have to evict entries to get the cache size down to the newly
// specified size.
//
// And we cannot do this operation above (where we compute the
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
// `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
b.cacheMu.Unlock()
}
return nil
}

Expand Down
66 changes: 13 additions & 53 deletions balancer/rls/cache.go
Expand Up @@ -91,8 +91,6 @@ type cacheEntry struct {
// size stores the size of this cache entry. Used to enforce the cache size
// specified in the LB policy configuration.
size int64
// onEvict is the callback to be invoked when this cache entry is evicted.
onEvict func()
}

// backoffState wraps all backoff related state associated with a cache entry.
Expand Down Expand Up @@ -156,20 +154,6 @@ func (l *lru) getLeastRecentlyUsed() cacheKey {
return e.Value.(cacheKey)
}

// iterateAndRun traverses the lru in least-recently-used order and calls the
// provided function for every element.
//
// Callers may delete the cache entry associated with the cacheKey passed into
// f, but they may not perform any other operation which reorders the elements
// in the lru.
func (l *lru) iterateAndRun(f func(cacheKey)) {
var next *list.Element
for e := l.ll.Front(); e != nil; e = next {
next = e.Next()
f(e.Value.(cacheKey))
}
}

// dataCache contains a cache of RLS data used by the LB policy to make routing
// decisions.
//
Expand Down Expand Up @@ -252,29 +236,22 @@ func (dc *dataCache) resize(size int64) (backoffCancelled bool) {
// The return value indicates if any expired entries were evicted.
//
// The LB policy invokes this method periodically to purge expired entries.
func (dc *dataCache) evictExpiredEntries() (evicted bool) {
func (dc *dataCache) evictExpiredEntries() bool {
if dc.shutdown.HasFired() {
return false
}

evicted = false
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key)
return
}

evicted := false
for key, entry := range dc.entries {
// Only evict entries for which both the data expiration time and
// backoff expiration time fields are in the past.
now := time.Now()
if entry.expiryTime.After(now) || entry.backoffExpiryTime.After(now) {
return
continue
}
evicted = true
dc.deleteAndcleanup(key, entry)
})
evicted = true
}
return evicted
}

Expand All @@ -285,22 +262,15 @@ func (dc *dataCache) evictExpiredEntries() (evicted bool) {
// The LB policy invokes this method when the control channel moves from READY
// to TRANSIENT_FAILURE back to READY. See `monitorConnectivityState` method on
// the `controlChannel` type for more details.
func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) (backoffReset bool) {
func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) bool {
if dc.shutdown.HasFired() {
return false
}

backoffReset = false
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key)
return
}

backoffReset := false
for _, entry := range dc.entries {
if entry.backoffState == nil {
return
continue
}
if entry.backoffState.timer != nil {
entry.backoffState.timer.Stop()
Expand All @@ -310,7 +280,7 @@ func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) (backoffRe
entry.backoffTime = time.Time{}
entry.backoffExpiryTime = time.Time{}
backoffReset = true
})
}
return backoffReset
}

Expand Down Expand Up @@ -377,25 +347,15 @@ func (dc *dataCache) removeEntryForTesting(key cacheKey) {
// - the entry is removed from the map of entries
// - current size of the data cache is update
// - the key is removed from the LRU
// - onEvict is invoked in a separate goroutine
func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) {
delete(dc.entries, key)
dc.currentSize -= entry.size
dc.keys.removeEntry(key)
if entry.onEvict != nil {
go entry.onEvict()
}
}

func (dc *dataCache) stop() {
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while shutting down", key)
return
}
for key, entry := range dc.entries {
dc.deleteAndcleanup(key, entry)
})
}
dc.shutdown.Fire()
}
33 changes: 0 additions & 33 deletions balancer/rls/cache_test.go
Expand Up @@ -117,39 +117,6 @@ func (s) TestLRU_BasicOperations(t *testing.T) {
}
}

func (s) TestLRU_IterateAndRun(t *testing.T) {
initCacheEntries()
// Create an LRU and add some entries to it.
lru := newLRU()
for _, k := range cacheKeys {
lru.addEntry(k)
}

// Iterate through the lru to make sure that entries are returned in the
// least recently used order.
var gotKeys []cacheKey
lru.iterateAndRun(func(key cacheKey) {
gotKeys = append(gotKeys, key)
})
if !cmp.Equal(gotKeys, cacheKeys, cmp.AllowUnexported(cacheKey{})) {
t.Fatalf("lru.iterateAndRun returned %v, want %v", gotKeys, cacheKeys)
}

// Make sure that removing entries from the lru while iterating through it
// is a safe operation.
lru.iterateAndRun(func(key cacheKey) {
lru.removeEntry(key)
})

// Check the lru internals to make sure we freed up all the memory.
if len := lru.ll.Len(); len != 0 {
t.Fatalf("Number of entries in the lru's underlying list is %d, want 0", len)
}
if len := len(lru.m); len != 0 {
t.Fatalf("Number of entries in the lru's underlying map is %d, want 0", len)
}
}

func (s) TestDataCache_BasicOperations(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
Expand Down

0 comments on commit 9228cff

Please sign in to comment.