Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rls: fix a data race involving the LRU cache #5925

Merged
merged 2 commits into from Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 21 additions & 8 deletions balancer/rls/balancer.go
Expand Up @@ -125,8 +125,11 @@ type rlsBalancer struct {
// fact that in places where we need to acquire both the locks, we always
// start off reading the cache.

// cacheMu guards access to the data cache and pending requests map.
cacheMu sync.RWMutex
// cacheMu guards access to the data cache and pending requests map. We
// cannot use an RWMutex here since even an operation like
// dataCache.getEntry() modifies the underlying LRU, which is implemented as
// a doubly linked list.
cacheMu sync.Mutex
dataCache *dataCache // Cache of RLS data.
pendingMap map[cacheKey]*backoffState // Map of pending RLS requests.

Expand Down Expand Up @@ -263,17 +266,14 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
// channels, we also swap out the throttling state.
b.handleControlChannelUpdate(newCfg)

// If the new config changes the size of the data cache, we might have to
// evict entries to get the cache size down to the newly specified size.
if newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes {
b.dataCache.resize(newCfg.cacheSizeBytes)
}

// Any changes to child policy name or configuration needs to be handled by
// either creating new child policies or pushing updates to existing ones.
b.resolverState = ccs.ResolverState
b.handleChildPolicyConfigUpdate(newCfg, &ccs)

// Resize the cache if the size in the config has changed.
resizeCache := newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes

// Update the copy of the config in the LB policy before releasing the lock.
b.lbCfg = newCfg

Expand All @@ -284,6 +284,19 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
b.updateCh.Put(resumePickerUpdates{done: done})
b.stateMu.Unlock()
<-done

if resizeCache {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this operation above, where we compute it, and remove the bool entirely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cacheMu needs to be grabbed before stateMu, which is why we can't do this operation up there. Added a comment for the same.

// If the new config changes reduces the size of the data cache, we
// might have to evict entries to get the cache size down to the newly
// specified size.
//
// And we cannot do this operation above (where we compute the
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
// `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
b.cacheMu.Unlock()
}
return nil
}

Expand Down
66 changes: 13 additions & 53 deletions balancer/rls/cache.go
Expand Up @@ -91,8 +91,6 @@ type cacheEntry struct {
// size stores the size of this cache entry. Used to enforce the cache size
// specified in the LB policy configuration.
size int64
// onEvict is the callback to be invoked when this cache entry is evicted.
onEvict func()
}

// backoffState wraps all backoff related state associated with a cache entry.
Expand Down Expand Up @@ -156,20 +154,6 @@ func (l *lru) getLeastRecentlyUsed() cacheKey {
return e.Value.(cacheKey)
}

// iterateAndRun traverses the lru in least-recently-used order and calls the
// provided function for every element.
//
// Callers may delete the cache entry associated with the cacheKey passed into
// f, but they may not perform any other operation which reorders the elements
// in the lru.
func (l *lru) iterateAndRun(f func(cacheKey)) {
var next *list.Element
for e := l.ll.Front(); e != nil; e = next {
next = e.Next()
f(e.Value.(cacheKey))
}
}

// dataCache contains a cache of RLS data used by the LB policy to make routing
// decisions.
//
Expand Down Expand Up @@ -252,29 +236,22 @@ func (dc *dataCache) resize(size int64) (backoffCancelled bool) {
// The return value indicates if any expired entries were evicted.
//
// The LB policy invokes this method periodically to purge expired entries.
func (dc *dataCache) evictExpiredEntries() (evicted bool) {
func (dc *dataCache) evictExpiredEntries() bool {
if dc.shutdown.HasFired() {
return false
}

evicted = false
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key)
return
}

evicted := false
for key, entry := range dc.entries {
// Only evict entries for which both the data expiration time and
// backoff expiration time fields are in the past.
now := time.Now()
if entry.expiryTime.After(now) || entry.backoffExpiryTime.After(now) {
return
continue
}
evicted = true
dc.deleteAndcleanup(key, entry)
})
evicted = true
}
return evicted
}

Expand All @@ -285,22 +262,15 @@ func (dc *dataCache) evictExpiredEntries() (evicted bool) {
// The LB policy invokes this method when the control channel moves from READY
// to TRANSIENT_FAILURE back to READY. See `monitorConnectivityState` method on
// the `controlChannel` type for more details.
func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) (backoffReset bool) {
func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) bool {
if dc.shutdown.HasFired() {
return false
}

backoffReset = false
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key)
return
}

backoffReset := false
for _, entry := range dc.entries {
if entry.backoffState == nil {
return
continue
}
if entry.backoffState.timer != nil {
entry.backoffState.timer.Stop()
Expand All @@ -310,7 +280,7 @@ func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) (backoffRe
entry.backoffTime = time.Time{}
entry.backoffExpiryTime = time.Time{}
backoffReset = true
})
}
return backoffReset
}

Expand Down Expand Up @@ -377,25 +347,15 @@ func (dc *dataCache) removeEntryForTesting(key cacheKey) {
// - the entry is removed from the map of entries
// - current size of the data cache is update
// - the key is removed from the LRU
// - onEvict is invoked in a separate goroutine
func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) {
delete(dc.entries, key)
dc.currentSize -= entry.size
dc.keys.removeEntry(key)
if entry.onEvict != nil {
go entry.onEvict()
}
}

func (dc *dataCache) stop() {
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while shutting down", key)
return
}
for key, entry := range dc.entries {
dc.deleteAndcleanup(key, entry)
})
}
dc.shutdown.Fire()
}
33 changes: 0 additions & 33 deletions balancer/rls/cache_test.go
Expand Up @@ -117,39 +117,6 @@ func (s) TestLRU_BasicOperations(t *testing.T) {
}
}

func (s) TestLRU_IterateAndRun(t *testing.T) {
initCacheEntries()
// Create an LRU and add some entries to it.
lru := newLRU()
for _, k := range cacheKeys {
lru.addEntry(k)
}

// Iterate through the lru to make sure that entries are returned in the
// least recently used order.
var gotKeys []cacheKey
lru.iterateAndRun(func(key cacheKey) {
gotKeys = append(gotKeys, key)
})
if !cmp.Equal(gotKeys, cacheKeys, cmp.AllowUnexported(cacheKey{})) {
t.Fatalf("lru.iterateAndRun returned %v, want %v", gotKeys, cacheKeys)
}

// Make sure that removing entries from the lru while iterating through it
// is a safe operation.
lru.iterateAndRun(func(key cacheKey) {
lru.removeEntry(key)
})

// Check the lru internals to make sure we freed up all the memory.
if len := lru.ll.Len(); len != 0 {
t.Fatalf("Number of entries in the lru's underlying list is %d, want 0", len)
}
if len := len(lru.m); len != 0 {
t.Fatalf("Number of entries in the lru's underlying map is %d, want 0", len)
}
}

func (s) TestDataCache_BasicOperations(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
Expand Down