Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancergroup: do not cache closed sub-balancers by default #6523

Merged
merged 3 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger)
lb.bg = balancergroup.New(cc, opts, lb, lb.logger)
lb.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: opts,
StateAggregator: lb,
Logger: lb.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
lb.bg.Start()
go lb.run()
return lb
Expand Down
5 changes: 0 additions & 5 deletions balancer/rls/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"google.golang.org/grpc/balancer/rls/internal/test/e2e"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
Expand All @@ -48,10 +47,6 @@ const (
defaultTestShortTimeout = 100 * time.Millisecond
)

func init() {
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}

type s struct {
grpctest.Tester
}
Expand Down
9 changes: 8 additions & 1 deletion balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package weightedtarget
import (
"encoding/json"
"fmt"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
Expand Down Expand Up @@ -54,7 +55,13 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
b.logger = prefixLogger(b)
b.stateAggregator = weightedaggregator.New(cc, b.logger, NewRandomWRR)
b.stateAggregator.Start()
b.bg = balancergroup.New(cc, bOpts, b.stateAggregator, b.logger)
b.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: bOpts,
StateAggregator: b.stateAggregator,
Logger: b.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
b.bg.Start()
b.logger.Infof("Created")
return b
Expand Down
2 changes: 0 additions & 2 deletions balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/internal/testutils"
Expand Down Expand Up @@ -159,7 +158,6 @@ func init() {
wtbBuilder = balancer.Get(Name)
wtbParser = wtbBuilder.(balancer.ConfigParser)

balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
NewRandomWRR = testutils.NewTestWRR
}

Expand Down
103 changes: 74 additions & 29 deletions internal/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,29 @@ func (sbc *subBalancerWrapper) stopBalancer() {
sbc.balancer = nil
}

type balancerCache interface {
// Add adds item with key to the cache. callback is invoked when item is
// removed from the cache.
Add(key, item interface{}, callback func()) (interface{}, bool)

// Remove the item with the key from the cache.
Remove(key interface{}) (item interface{}, ok bool)

// Clear removes all entries, and runs the callbacks if runCallback is true.
Clear(runCallback bool)
}

// A no-op implementation of the balancerCache interface when caching is
// disabled due to Options.SubBalancerCloseTimeout set to 0.
type noopBalancerCache struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this should go into the cache package for other places to be able to share it? And make the constructor for a timeout cache return it if the timeout is zero?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this for a while. I feel that it does not make a whole lot of sense for a type that provides caching functionality to also provide a way to disable the same.

  • I feel that doing so would unnecessary add more complexity to the package providing the cache.
    • We would need to define an interface and create multiple implementations of it.
  • Also, I don't see other usages of this cache where caching might have to be disabled.
    • In fact, we could consider making the cache.NewTimeoutCache return an error if passed a timeout of 0.

Instead explicitly handling the no-caching case in the balancergroup makes more sense because it makes things very clear to the reader of the code as to how and where caching is being done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with the current version. I think if there is a nop implementation then it makes the most sense in the cache package. But just not calling the cache when it's disabled is a perfectly reasonable implementation too.


func (noopBalancerCache) Add(_ interface{}, _ interface{}, cb func()) (interface{}, bool) {
cb()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing this callback synchronously could theoretically cause problems with a pattern like:

b.Lock()
b.cache.Add(blah, blah, func() { b.Lock(); do something; b.Unlock(); })
b.Unlock()

I wonder if we should just go cb() now while seeing this, or wait until we run into that problem, which may never happen anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree go cb() should have been what the noop implementation should have done. The problem probably doesn't happen with the current implementation of the timeout cache because the timer callback is invoked in a separate goroutine even if the timeout value is 0.

return nil, false
}
func (noopBalancerCache) Remove(key interface{}) (item interface{}, ok bool) { return nil, false }
func (noopBalancerCache) Clear(bool) {}

// BalancerGroup takes a list of balancers, and make them into one balancer.
//
// Note that this struct doesn't implement balancer.Balancer, because it's not
Expand Down Expand Up @@ -226,7 +249,7 @@ type BalancerGroup struct {
outgoingStarted bool
idToBalancerConfig map[string]*subBalancerWrapper
// Cache for sub-balancers when they are removed.
balancerCache *cache.TimeoutCache
deletedBalancerCache balancerCache

// incomingMu is to make sure this balancer group doesn't send updates to cc
// after it's closed.
Expand Down Expand Up @@ -256,24 +279,42 @@ type BalancerGroup struct {
scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
}

// DefaultSubBalancerCloseTimeout is defined as a variable instead of const for
// testing.
//
// TODO: make it a parameter for New().
var DefaultSubBalancerCloseTimeout = 15 * time.Minute
// Options wraps the arguments to be passed to the BalancerGroup ctor.
type Options struct {
// CC is a reference to the parent balancer.ClientConn.
CC balancer.ClientConn
// BuildOpts contains build options to be used when creating sub-balancers.
BuildOpts balancer.BuildOptions
// StateAggregator is an implementation of the BalancerStateAggregator
// interface to aggregate picker and connectivity states from sub-balancers.
StateAggregator BalancerStateAggregator
// Logger is a group specific prefix logger.
Logger *grpclog.PrefixLogger
// SubBalancerCloseTimeout is the amount of time deleted sub-balancers spend
// in the idle cache. A value of zero here disables caching of deleted
// sub-balancers.
SubBalancerCloseTimeout time.Duration
}

// New creates a new BalancerGroup. Note that the BalancerGroup
// needs to be started to work.
func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, logger *grpclog.PrefixLogger) *BalancerGroup {
return &BalancerGroup{
cc: cc,
buildOpts: bOpts,
logger: logger,
stateAggregator: stateAggregator,
func New(opts Options) *BalancerGroup {
var bc balancerCache
if opts.SubBalancerCloseTimeout != time.Duration(0) {
bc = cache.NewTimeoutCache(opts.SubBalancerCloseTimeout)
} else {
bc = noopBalancerCache{}
}

idToBalancerConfig: make(map[string]*subBalancerWrapper),
balancerCache: cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWrapper),
return &BalancerGroup{
cc: opts.CC,
buildOpts: opts.BuildOpts,
stateAggregator: opts.StateAggregator,
logger: opts.Logger,

deletedBalancerCache: bc,
idToBalancerConfig: make(map[string]*subBalancerWrapper),
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWrapper),
}
}

Expand Down Expand Up @@ -321,7 +362,7 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
// If outgoingStarted is true, search in the cache. Otherwise, cache is
// guaranteed to be empty, searching is unnecessary.
if bg.outgoingStarted {
if old, ok := bg.balancerCache.Remove(id); ok {
if old, ok := bg.deletedBalancerCache.Remove(id); ok {
sbc, _ = old.(*subBalancerWrapper)
if sbc != nil && sbc.builder != builder {
// If the sub-balancer in cache was built with a different
Expand Down Expand Up @@ -395,18 +436,22 @@ func (bg *BalancerGroup) Remove(id string) {
bg.outgoingMu.Lock()
if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
if bg.outgoingStarted {
bg.balancerCache.Add(id, sbToRemove, func() {
// A sub-balancer evicted from the timeout cache needs to closed
// and its subConns need to removed, unconditionally. There is a
// possibility that a sub-balancer might be removed (thereby
// moving it to the cache) around the same time that the
// balancergroup is closed, and by the time we get here the
// balancergroup might be closed. Check for `outgoingStarted ==
// true` at that point can lead to a leaked sub-balancer.
bg.outgoingMu.Lock()
sbToRemove.stopBalancer()
bg.outgoingMu.Unlock()
bg.cleanupSubConns(sbToRemove)
bg.deletedBalancerCache.Add(id, sbToRemove, func() {
// Spawn a goroutine to stop the sub-balancer as the callback
// could be called inline from the cache, leading to a deadlock.
go func() {
// A sub-balancer evicted from the timeout cache needs to closed
// and its subConns need to removed, unconditionally. There is a
// possibility that a sub-balancer might be removed (thereby
// moving it to the cache) around the same time that the
// balancergroup is closed, and by the time we get here the
// balancergroup might be closed. Check for `outgoingStarted ==
// true` at that point can lead to a leaked sub-balancer.
bg.outgoingMu.Lock()
sbToRemove.stopBalancer()
bg.outgoingMu.Unlock()
bg.cleanupSubConns(sbToRemove)
}()
})
}
delete(bg.idToBalancerConfig, id)
Expand Down Expand Up @@ -561,7 +606,7 @@ func (bg *BalancerGroup) Close() {

// Clear(true) runs clear function to close sub-balancers in cache. It
// must be called out of outgoing mutex.
bg.balancerCache.Clear(true)
bg.deletedBalancerCache.Clear(true)

bg.outgoingMu.Lock()
if bg.outgoingStarted {
Expand Down