From 9228cffc1a0702e602baa365e984397ab63fa295 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 12 Jan 2023 16:02:10 -0800 Subject: [PATCH] rls: fix a data race involving the LRU cache (#5925) --- balancer/rls/balancer.go | 29 ++++++--- balancer/rls/cache.go | 66 ++++--------------- balancer/rls/cache_test.go | 33 ---------- balancer/rls/picker.go | 129 +++++++------------------------------ 4 files changed, 59 insertions(+), 198 deletions(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index f18f4531d83..f0cff9ac445 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -125,8 +125,11 @@ type rlsBalancer struct { // fact that in places where we need to acquire both the locks, we always // start off reading the cache. - // cacheMu guards access to the data cache and pending requests map. - cacheMu sync.RWMutex + // cacheMu guards access to the data cache and pending requests map. We + // cannot use an RWMutex here since even an operation like + // dataCache.getEntry() modifies the underlying LRU, which is implemented as + // a doubly linked list. + cacheMu sync.Mutex dataCache *dataCache // Cache of RLS data. pendingMap map[cacheKey]*backoffState // Map of pending RLS requests. @@ -263,17 +266,14 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error // channels, we also swap out the throttling state. b.handleControlChannelUpdate(newCfg) - // If the new config changes the size of the data cache, we might have to - // evict entries to get the cache size down to the newly specified size. - if newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes { - b.dataCache.resize(newCfg.cacheSizeBytes) - } - // Any changes to child policy name or configuration needs to be handled by // either creating new child policies or pushing updates to existing ones. b.resolverState = ccs.ResolverState b.handleChildPolicyConfigUpdate(newCfg, &ccs) + // Resize the cache if the size in the config has changed. + resizeCache := newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes + // Update the copy of the config in the LB policy before releasing the lock. b.lbCfg = newCfg @@ -284,6 +284,19 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error b.updateCh.Put(resumePickerUpdates{done: done}) b.stateMu.Unlock() <-done + + if resizeCache { + // If the new config changes reduces the size of the data cache, we + // might have to evict entries to get the cache size down to the newly + // specified size. + // + // And we cannot do this operation above (where we compute the + // `resizeCache` boolean) because `cacheMu` needs to be grabbed before + // `stateMu` if we are to hold both locks at the same time. + b.cacheMu.Lock() + b.dataCache.resize(newCfg.cacheSizeBytes) + b.cacheMu.Unlock() + } return nil } diff --git a/balancer/rls/cache.go b/balancer/rls/cache.go index 9a38072c774..d7a6a1a436c 100644 --- a/balancer/rls/cache.go +++ b/balancer/rls/cache.go @@ -91,8 +91,6 @@ type cacheEntry struct { // size stores the size of this cache entry. Used to enforce the cache size // specified in the LB policy configuration. size int64 - // onEvict is the callback to be invoked when this cache entry is evicted. - onEvict func() } // backoffState wraps all backoff related state associated with a cache entry. @@ -156,20 +154,6 @@ func (l *lru) getLeastRecentlyUsed() cacheKey { return e.Value.(cacheKey) } -// iterateAndRun traverses the lru in least-recently-used order and calls the -// provided function for every element. -// -// Callers may delete the cache entry associated with the cacheKey passed into -// f, but they may not perform any other operation which reorders the elements -// in the lru. -func (l *lru) iterateAndRun(f func(cacheKey)) { - var next *list.Element - for e := l.ll.Front(); e != nil; e = next { - next = e.Next() - f(e.Value.(cacheKey)) - } -} - // dataCache contains a cache of RLS data used by the LB policy to make routing // decisions. // @@ -252,29 +236,22 @@ func (dc *dataCache) resize(size int64) (backoffCancelled bool) { // The return value indicates if any expired entries were evicted. // // The LB policy invokes this method periodically to purge expired entries. -func (dc *dataCache) evictExpiredEntries() (evicted bool) { +func (dc *dataCache) evictExpiredEntries() bool { if dc.shutdown.HasFired() { return false } - evicted = false - dc.keys.iterateAndRun(func(key cacheKey) { - entry, ok := dc.entries[key] - if !ok { - // This should never happen. - dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key) - return - } - + evicted := false + for key, entry := range dc.entries { // Only evict entries for which both the data expiration time and // backoff expiration time fields are in the past. now := time.Now() if entry.expiryTime.After(now) || entry.backoffExpiryTime.After(now) { - return + continue } - evicted = true dc.deleteAndcleanup(key, entry) - }) + evicted = true + } return evicted } @@ -285,22 +262,15 @@ func (dc *dataCache) evictExpiredEntries() (evicted bool) { // The LB policy invokes this method when the control channel moves from READY // to TRANSIENT_FAILURE back to READY. See `monitorConnectivityState` method on // the `controlChannel` type for more details. -func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) (backoffReset bool) { +func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) bool { if dc.shutdown.HasFired() { return false } - backoffReset = false - dc.keys.iterateAndRun(func(key cacheKey) { - entry, ok := dc.entries[key] - if !ok { - // This should never happen. - dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key) - return - } - + backoffReset := false + for _, entry := range dc.entries { if entry.backoffState == nil { - return + continue } if entry.backoffState.timer != nil { entry.backoffState.timer.Stop() @@ -310,7 +280,7 @@ func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) (backoffRe entry.backoffTime = time.Time{} entry.backoffExpiryTime = time.Time{} backoffReset = true - }) + } return backoffReset } @@ -377,25 +347,15 @@ func (dc *dataCache) removeEntryForTesting(key cacheKey) { // - the entry is removed from the map of entries // - current size of the data cache is update // - the key is removed from the LRU -// - onEvict is invoked in a separate goroutine func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) { delete(dc.entries, key) dc.currentSize -= entry.size dc.keys.removeEntry(key) - if entry.onEvict != nil { - go entry.onEvict() - } } func (dc *dataCache) stop() { - dc.keys.iterateAndRun(func(key cacheKey) { - entry, ok := dc.entries[key] - if !ok { - // This should never happen. - dc.logger.Errorf("cacheKey %+v not found in the cache while shutting down", key) - return - } + for key, entry := range dc.entries { dc.deleteAndcleanup(key, entry) - }) + } dc.shutdown.Fire() } diff --git a/balancer/rls/cache_test.go b/balancer/rls/cache_test.go index cb9b060b59a..80185f39c92 100644 --- a/balancer/rls/cache_test.go +++ b/balancer/rls/cache_test.go @@ -117,39 +117,6 @@ func (s) TestLRU_BasicOperations(t *testing.T) { } } -func (s) TestLRU_IterateAndRun(t *testing.T) { - initCacheEntries() - // Create an LRU and add some entries to it. - lru := newLRU() - for _, k := range cacheKeys { - lru.addEntry(k) - } - - // Iterate through the lru to make sure that entries are returned in the - // least recently used order. - var gotKeys []cacheKey - lru.iterateAndRun(func(key cacheKey) { - gotKeys = append(gotKeys, key) - }) - if !cmp.Equal(gotKeys, cacheKeys, cmp.AllowUnexported(cacheKey{})) { - t.Fatalf("lru.iterateAndRun returned %v, want %v", gotKeys, cacheKeys) - } - - // Make sure that removing entries from the lru while iterating through it - // is a safe operation. - lru.iterateAndRun(func(key cacheKey) { - lru.removeEntry(key) - }) - - // Check the lru internals to make sure we freed up all the memory. - if len := lru.ll.Len(); len != 0 { - t.Fatalf("Number of entries in the lru's underlying list is %d, want 0", len) - } - if len := len(lru.m); len != 0 { - t.Fatalf("Number of entries in the lru's underlying map is %d, want 0", len) - } -} - func (s) TestDataCache_BasicOperations(t *testing.T) { initCacheEntries() dc := newDataCache(5, nil) diff --git a/balancer/rls/picker.go b/balancer/rls/picker.go index bd5985ad9e7..3305f4529fd 100644 --- a/balancer/rls/picker.go +++ b/balancer/rls/picker.go @@ -84,10 +84,8 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { md, _ := metadata.FromOutgoingContext(info.Ctx) reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName) - // Grab a read-lock to perform a cache lookup. If it so happens that we need - // to write to the cache (if we have to send out an RLS request), we will - // release the read-lock and acquire a write-lock. - p.lb.cacheMu.RLock() + p.lb.cacheMu.Lock() + defer p.lb.cacheMu.Unlock() // Lookup data cache and pending request map using request path and keys. cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str} @@ -98,75 +96,62 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { switch { // No data cache entry. No pending request. case dcEntry == nil && pendingEntry == nil: - p.lb.cacheMu.RUnlock() - bs := &backoffState{bs: defaultBackoffStrategy} - return p.sendRequestAndReturnPick(cacheKey, bs, reqKeys.Map, info) + throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "") + if throttled { + return p.useDefaultPickIfPossible(info, errRLSThrottled) + } + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable // No data cache entry. Pending request exits. case dcEntry == nil && pendingEntry != nil: - p.lb.cacheMu.RUnlock() return balancer.PickResult{}, balancer.ErrNoSubConnAvailable // Data cache hit. No pending request. case dcEntry != nil && pendingEntry == nil: if dcEntry.expiryTime.After(now) { if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) { - // Executing the proactive cache refresh in a goroutine simplifies - // acquiring and releasing of locks. - go func(bs *backoffState) { - p.lb.cacheMu.Lock() - // It is OK to ignore the return value which indicates if this request - // was throttled. This is an attempt to proactively refresh the cache, - // and it is OK for it to fail. - p.sendRouteLookupRequest(cacheKey, bs, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData) - p.lb.cacheMu.Unlock() - }(dcEntry.backoffState) + p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData) } // Delegate to child policies. - res, err := p.delegateToChildPolicies(dcEntry, info) - p.lb.cacheMu.RUnlock() + res, err := p.delegateToChildPoliciesLocked(dcEntry, info) return res, err } // We get here only if the data cache entry has expired. If entry is in // backoff, delegate to default target or fail the pick. if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) { - st := dcEntry.status - p.lb.cacheMu.RUnlock() - // Avoid propagating the status code received on control plane RPCs to the // data plane which can lead to unexpected outcomes as we do not control // the status code sent by the control plane. Propagating the status // message received from the control plane is still fine, as it could be // useful for debugging purposes. + st := dcEntry.status return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error()))) } // We get here only if the entry has expired and is not in backoff. - bs := *dcEntry.backoffState - p.lb.cacheMu.RUnlock() - return p.sendRequestAndReturnPick(cacheKey, &bs, reqKeys.Map, info) + throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "") + if throttled { + return p.useDefaultPickIfPossible(info, errRLSThrottled) + } + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable // Data cache hit. Pending request exists. default: if dcEntry.expiryTime.After(now) { - res, err := p.delegateToChildPolicies(dcEntry, info) - p.lb.cacheMu.RUnlock() + res, err := p.delegateToChildPoliciesLocked(dcEntry, info) return res, err } // Data cache entry has expired and pending request exists. Queue pick. - p.lb.cacheMu.RUnlock() return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } } -// delegateToChildPolicies is a helper function which iterates through the list -// of child policy wrappers in a cache entry and attempts to find a child policy -// to which this RPC can be routed to. If all child policies are in +// delegateToChildPoliciesLocked is a helper function which iterates through the +// list of child policy wrappers in a cache entry and attempts to find a child +// policy to which this RPC can be routed to. If all child policies are in // TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily. -// -// Caller must hold at least a read-lock on p.lb.cacheMu. -func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) { +func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) { const rlsDataHeaderName = "x-google-rls-data" for i, cpw := range dcEntry.childPolicyWrappers { state := (*balancer.State)(atomic.LoadPointer(&cpw.state)) @@ -194,69 +179,6 @@ func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.P return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } -// sendRequestAndReturnPick is called to send out an RLS request on the control -// channel. Since sending out an RLS request entails creating an entry in the -// pending request map, this method needs to acquire the write-lock on the -// cache. This also means that the caller must release the read-lock that they -// could have been holding. This means that things could have happened in -// between and therefore a fresh lookup on the cache needs to be performed here -// with the write-lock and all cases need to be handled. -// -// Acquires the write-lock on the cache. Caller must not hold p.lb.cacheMu. -func (p *rlsPicker) sendRequestAndReturnPick(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, info balancer.PickInfo) (balancer.PickResult, error) { - p.lb.cacheMu.Lock() - defer p.lb.cacheMu.Unlock() - - // We need to perform another cache lookup to ensure that things haven't - // changed since the last lookup. - dcEntry := p.lb.dataCache.getEntry(cacheKey) - pendingEntry := p.lb.pendingMap[cacheKey] - - // Existence of a pending map entry indicates that someone sent out a request - // before us and the response is pending. Skip sending a new request. - // Piggyback on the existing one by queueing the pick. - if pendingEntry != nil { - return balancer.PickResult{}, balancer.ErrNoSubConnAvailable - } - - // If no data cache entry exists, it means that no one jumped in front of us. - // We need to send out an RLS request and queue the pick. - if dcEntry == nil { - throttled := p.sendRouteLookupRequest(cacheKey, bs, reqKeys, rlspb.RouteLookupRequest_REASON_MISS, "") - if throttled { - return p.useDefaultPickIfPossible(info, errRLSThrottled) - } - return balancer.PickResult{}, balancer.ErrNoSubConnAvailable - } - - // Existence of a data cache entry indicates either that someone sent out a - // request before us and received a response, or we got here in the first - // place because we found an expired entry in the data cache. - now := time.Now() - switch { - // Valid data cache entry. Delegate to its child policies. - case dcEntry.expiryTime.After(now): - return p.delegateToChildPolicies(dcEntry, info) - - // Entry is in backoff. Delegate to default target or fail the pick. - case dcEntry.backoffState != nil && dcEntry.backoffTime.After(now): - // Avoid propagating the status code received on control plane RPCs to the - // data plane which can lead to unexpected outcomes as we do not control - // the status code sent by the control plane. Propagating the status - // message received from the control plane is still fine, as it could be - // useful for debugging purposes. - return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", dcEntry.status.Error()))) - - // Entry has expired, but is not in backoff. Send request and queue pick. - default: - throttled := p.sendRouteLookupRequest(cacheKey, bs, reqKeys, rlspb.RouteLookupRequest_REASON_MISS, "") - if throttled { - return p.useDefaultPickIfPossible(info, errRLSThrottled) - } - return balancer.PickResult{}, balancer.ErrNoSubConnAvailable - } -} - // useDefaultPickIfPossible is a helper method which delegates to the default // target if one is configured, or fails the pick with the given error. func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) { @@ -267,12 +189,11 @@ func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefa return balancer.PickResult{}, errOnNoDefault } -// sendRouteLookupRequest adds an entry to the pending request map and sends out -// an RLS request using the passed in arguments. Returns a value indicating if -// the request was throttled by the client-side adaptive throttler. -// -// Caller must hold a write-lock on p.lb.cacheMu. -func (p *rlsPicker) sendRouteLookupRequest(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string) bool { +// sendRouteLookupRequestLocked adds an entry to the pending request map and +// sends out an RLS request using the passed in arguments. Returns a value +// indicating if the request was throttled by the client-side adaptive +// throttler. +func (p *rlsPicker) sendRouteLookupRequestLocked(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string) bool { if p.lb.pendingMap[cacheKey] != nil { return false }