Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rls: fix a data race involving the LRU cache #5925

Merged
merged 2 commits into from Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 18 additions & 8 deletions balancer/rls/balancer.go
Expand Up @@ -125,8 +125,11 @@ type rlsBalancer struct {
// fact that in places where we need to acquire both the locks, we always
// start off reading the cache.

// cacheMu guards access to the data cache and pending requests map.
cacheMu sync.RWMutex
// cacheMu guards access to the data cache and pending requests map. We
// cannot use an RWMutex here since even a operation like
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*an operation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// dataCache.getEntry() modifies the underlying LRU, which is implemented as
// a doubly linked list.
cacheMu sync.Mutex
dataCache *dataCache // Cache of RLS data.
pendingMap map[cacheKey]*backoffState // Map of pending RLS requests.

Expand Down Expand Up @@ -263,17 +266,16 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
// channels, we also swap out the throttling state.
b.handleControlChannelUpdate(newCfg)

// If the new config changes the size of the data cache, we might have to
// evict entries to get the cache size down to the newly specified size.
if newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes {
b.dataCache.resize(newCfg.cacheSizeBytes)
}

// Any changes to child policy name or configuration needs to be handled by
// either creating new child policies or pushing updates to existing ones.
b.resolverState = ccs.ResolverState
b.handleChildPolicyConfigUpdate(newCfg, &ccs)

resizeCache := false
if newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes {
resizeCache = true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Resize the cache if the size in the config has changed.
resizeCache := newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes

(Comment optional, but explains what could be seen as "more confusing logic", and still uses fewer lines.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks.


// Update the copy of the config in the LB policy before releasing the lock.
b.lbCfg = newCfg

Expand All @@ -284,6 +286,14 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
b.updateCh.Put(resumePickerUpdates{done: done})
b.stateMu.Unlock()
<-done

if resizeCache {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this operation above, where we compute it, and remove the bool entirely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cacheMu needs to be grabbed before stateMu, which is why we can't do this operation up there. Added a comment for the same.

// If the new config changes the size of the data cache, we might have to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"If the new config reduces the size"...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// evict entries to get the cache size down to the newly specified size.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
b.cacheMu.Unlock()
}
return nil
}

Expand Down
66 changes: 13 additions & 53 deletions balancer/rls/cache.go
Expand Up @@ -91,8 +91,6 @@ type cacheEntry struct {
// size stores the size of this cache entry. Used to enforce the cache size
// specified in the LB policy configuration.
size int64
// onEvict is the callback to be invoked when this cache entry is evicted.
onEvict func()
}

// backoffState wraps all backoff related state associated with a cache entry.
Expand Down Expand Up @@ -156,20 +154,6 @@ func (l *lru) getLeastRecentlyUsed() cacheKey {
return e.Value.(cacheKey)
}

// iterateAndRun traverses the lru in least-recently-used order and calls the
// provided function for every element.
//
// Callers may delete the cache entry associated with the cacheKey passed into
// f, but they may not perform any other operation which reorders the elements
// in the lru.
func (l *lru) iterateAndRun(f func(cacheKey)) {
var next *list.Element
for e := l.ll.Front(); e != nil; e = next {
next = e.Next()
f(e.Value.(cacheKey))
}
}

// dataCache contains a cache of RLS data used by the LB policy to make routing
// decisions.
//
Expand Down Expand Up @@ -252,29 +236,22 @@ func (dc *dataCache) resize(size int64) (backoffCancelled bool) {
// The return value indicates if any expired entries were evicted.
//
// The LB policy invokes this method periodically to purge expired entries.
func (dc *dataCache) evictExpiredEntries() (evicted bool) {
func (dc *dataCache) evictExpiredEntries() bool {
if dc.shutdown.HasFired() {
return false
}

evicted = false
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key)
return
}

evicted := false
for key, entry := range dc.entries {
// Only evict entries for which both the data expiration time and
// backoff expiration time fields are in the past.
now := time.Now()
if entry.expiryTime.After(now) || entry.backoffExpiryTime.After(now) {
return
continue
}
evicted = true
dc.deleteAndcleanup(key, entry)
})
evicted = true
}
return evicted
}

Expand All @@ -285,22 +262,15 @@ func (dc *dataCache) evictExpiredEntries() (evicted bool) {
// The LB policy invokes this method when the control channel moves from READY
// to TRANSIENT_FAILURE back to READY. See `monitorConnectivityState` method on
// the `controlChannel` type for more details.
func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) (backoffReset bool) {
func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) bool {
if dc.shutdown.HasFired() {
return false
}

backoffReset = false
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key)
return
}

backoffReset := false
for _, entry := range dc.entries {
if entry.backoffState == nil {
return
continue
}
if entry.backoffState.timer != nil {
entry.backoffState.timer.Stop()
Expand All @@ -310,7 +280,7 @@ func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) (backoffRe
entry.backoffTime = time.Time{}
entry.backoffExpiryTime = time.Time{}
backoffReset = true
})
}
return backoffReset
}

Expand Down Expand Up @@ -377,25 +347,15 @@ func (dc *dataCache) removeEntryForTesting(key cacheKey) {
// - the entry is removed from the map of entries
// - current size of the data cache is update
// - the key is removed from the LRU
// - onEvict is invoked in a separate goroutine
func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) {
delete(dc.entries, key)
dc.currentSize -= entry.size
dc.keys.removeEntry(key)
if entry.onEvict != nil {
go entry.onEvict()
}
}

func (dc *dataCache) stop() {
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while shutting down", key)
return
}
for key, entry := range dc.entries {
dc.deleteAndcleanup(key, entry)
})
}
dc.shutdown.Fire()
}
33 changes: 0 additions & 33 deletions balancer/rls/cache_test.go
Expand Up @@ -117,39 +117,6 @@ func (s) TestLRU_BasicOperations(t *testing.T) {
}
}

func (s) TestLRU_IterateAndRun(t *testing.T) {
initCacheEntries()
// Create an LRU and add some entries to it.
lru := newLRU()
for _, k := range cacheKeys {
lru.addEntry(k)
}

// Iterate through the lru to make sure that entries are returned in the
// least recently used order.
var gotKeys []cacheKey
lru.iterateAndRun(func(key cacheKey) {
gotKeys = append(gotKeys, key)
})
if !cmp.Equal(gotKeys, cacheKeys, cmp.AllowUnexported(cacheKey{})) {
t.Fatalf("lru.iterateAndRun returned %v, want %v", gotKeys, cacheKeys)
}

// Make sure that removing entries from the lru while iterating through it
// is a safe operation.
lru.iterateAndRun(func(key cacheKey) {
lru.removeEntry(key)
})

// Check the lru internals to make sure we freed up all the memory.
if len := lru.ll.Len(); len != 0 {
t.Fatalf("Number of entries in the lru's underlying list is %d, want 0", len)
}
if len := len(lru.m); len != 0 {
t.Fatalf("Number of entries in the lru's underlying map is %d, want 0", len)
}
}

func (s) TestDataCache_BasicOperations(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
Expand Down
118 changes: 20 additions & 98 deletions balancer/rls/picker.go
Expand Up @@ -84,10 +84,8 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
md, _ := metadata.FromOutgoingContext(info.Ctx)
reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)

// Grab a read-lock to perform a cache lookup. If it so happens that we need
// to write to the cache (if we have to send out an RLS request), we will
// release the read-lock and acquire a write-lock.
p.lb.cacheMu.RLock()
p.lb.cacheMu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reduce the amount of time we're holding this at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following actions are happening under the lock in this method:

  • read the cache entry:
    • this is non-blocking and should be fast.
  • send an RLS request if required:
    • this is the only thing which could block. But the control channel implementation spawns a goroutine to do the actual work of sending the RLS rpc. So, this should also be fast.
  • queue the pick or delegate to child policies (or default policy):
    • this should also be non-blocking and should be fast.

So, I think we are OK to hold the lock for the whole duration of this method. If we are blocking or taking too long in this method, we have other problems as well I guess (since lb policies are expected to be quick in Pick() and non block in there).

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine, I was just wondering because the defer makes it impossible to give it up early on one path vs. another, and ideally we hold this for as short a time as possible.

defer p.lb.cacheMu.Unlock()

// Lookup data cache and pending request map using request path and keys.
cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
Expand All @@ -98,75 +96,62 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
switch {
// No data cache entry. No pending request.
case dcEntry == nil && pendingEntry == nil:
p.lb.cacheMu.RUnlock()
bs := &backoffState{bs: defaultBackoffStrategy}
return p.sendRequestAndReturnPick(cacheKey, bs, reqKeys.Map, info)
throttled := p.sendRouteLookupRequest(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

// No data cache entry. Pending request exits.
case dcEntry == nil && pendingEntry != nil:
p.lb.cacheMu.RUnlock()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

// Data cache hit. No pending request.
case dcEntry != nil && pendingEntry == nil:
if dcEntry.expiryTime.After(now) {
if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) {
// Executing the proactive cache refresh in a goroutine simplifies
// acquiring and releasing of locks.
go func(bs *backoffState) {
p.lb.cacheMu.Lock()
// It is OK to ignore the return value which indicates if this request
// was throttled. This is an attempt to proactively refresh the cache,
// and it is OK for it to fail.
p.sendRouteLookupRequest(cacheKey, bs, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
p.lb.cacheMu.Unlock()
}(dcEntry.backoffState)
p.sendRouteLookupRequest(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
}
// Delegate to child policies.
res, err := p.delegateToChildPolicies(dcEntry, info)
p.lb.cacheMu.RUnlock()
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
}

// We get here only if the data cache entry has expired. If entry is in
// backoff, delegate to default target or fail the pick.
if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
st := dcEntry.status
p.lb.cacheMu.RUnlock()

// Avoid propagating the status code received on control plane RPCs to the
// data plane which can lead to unexpected outcomes as we do not control
// the status code sent by the control plane. Propagating the status
// message received from the control plane is still fine, as it could be
// useful for debugging purposes.
st := dcEntry.status
return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
}

// We get here only if the entry has expired and is not in backoff.
bs := *dcEntry.backoffState
p.lb.cacheMu.RUnlock()
return p.sendRequestAndReturnPick(cacheKey, &bs, reqKeys.Map, info)
throttled := p.sendRouteLookupRequest(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

// Data cache hit. Pending request exists.
default:
if dcEntry.expiryTime.After(now) {
res, err := p.delegateToChildPolicies(dcEntry, info)
p.lb.cacheMu.RUnlock()
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
}
// Data cache entry has expired and pending request exists. Queue pick.
p.lb.cacheMu.RUnlock()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
}

// delegateToChildPolicies is a helper function which iterates through the list
// of child policy wrappers in a cache entry and attempts to find a child policy
// to which this RPC can be routed to. If all child policies are in
// delegateToChildPoliciesLocked is a helper function which iterates through the
// list of child policy wrappers in a cache entry and attempts to find a child
// policy to which this RPC can be routed to. If all child policies are in
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
//
// Caller must hold at least a read-lock on p.lb.cacheMu.
func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
const rlsDataHeaderName = "x-google-rls-data"
for i, cpw := range dcEntry.childPolicyWrappers {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
Expand Down Expand Up @@ -194,69 +179,6 @@ func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.P
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

// sendRequestAndReturnPick is called to send out an RLS request on the control
// channel. Since sending out an RLS request entails creating an entry in the
// pending request map, this method needs to acquire the write-lock on the
// cache. This also means that the caller must release the read-lock that they
// could have been holding. This means that things could have happened in
// between and therefore a fresh lookup on the cache needs to be performed here
// with the write-lock and all cases need to be handled.
//
// Acquires the write-lock on the cache. Caller must not hold p.lb.cacheMu.
func (p *rlsPicker) sendRequestAndReturnPick(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, info balancer.PickInfo) (balancer.PickResult, error) {
p.lb.cacheMu.Lock()
defer p.lb.cacheMu.Unlock()

// We need to perform another cache lookup to ensure that things haven't
// changed since the last lookup.
dcEntry := p.lb.dataCache.getEntry(cacheKey)
pendingEntry := p.lb.pendingMap[cacheKey]

// Existence of a pending map entry indicates that someone sent out a request
// before us and the response is pending. Skip sending a new request.
// Piggyback on the existing one by queueing the pick.
if pendingEntry != nil {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

// If no data cache entry exists, it means that no one jumped in front of us.
// We need to send out an RLS request and queue the pick.
if dcEntry == nil {
throttled := p.sendRouteLookupRequest(cacheKey, bs, reqKeys, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

// Existence of a data cache entry indicates either that someone sent out a
// request before us and received a response, or we got here in the first
// place because we found an expired entry in the data cache.
now := time.Now()
switch {
// Valid data cache entry. Delegate to its child policies.
case dcEntry.expiryTime.After(now):
return p.delegateToChildPolicies(dcEntry, info)

// Entry is in backoff. Delegate to default target or fail the pick.
case dcEntry.backoffState != nil && dcEntry.backoffTime.After(now):
// Avoid propagating the status code received on control plane RPCs to the
// data plane which can lead to unexpected outcomes as we do not control
// the status code sent by the control plane. Propagating the status
// message received from the control plane is still fine, as it could be
// useful for debugging purposes.
return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", dcEntry.status.Error())))

// Entry has expired, but is not in backoff. Send request and queue pick.
default:
throttled := p.sendRouteLookupRequest(cacheKey, bs, reqKeys, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
}

// useDefaultPickIfPossible is a helper method which delegates to the default
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
Expand Down