Skip to content

Commit

Permalink
grpc: use CallbackSerializer in balancer wrapper (#6254)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed May 5, 2023
1 parent f193ec0 commit c44f77e
Showing 1 changed file with 74 additions and 184 deletions.
258 changes: 74 additions & 184 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
Expand All @@ -49,147 +48,60 @@ import (
type ccBalancerWrapper struct {
cc *ClientConn

// Since these fields are accessed only from handleXxx() methods which are
// synchronized by the watcher goroutine, we do not need a mutex to protect
// these fields.
balancer *gracefulswitch.Balancer
curBalancerName string

updateCh *buffer.Unbounded // Updates written on this channel are processed by watcher().
resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here.
closed *grpcsync.Event // Indicates if close has been called.
done *grpcsync.Event // Indicates if close has completed its work.
// Outgoing (gRPC --> balancer) calls are guaranteed to execute in a
// mutually exclusive manner as they are scheduled on the
// CallbackSerializer. Fields accessed *only* in serializer callbacks, can
// therefore be accessed without a mutex.
serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc
balancer *gracefulswitch.Balancer
curBalancerName string
}

// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
// is not created until the switchTo() method is invoked.
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
ctx, cancel := context.WithCancel(context.Background())
ccb := &ccBalancerWrapper{
cc: cc,
updateCh: buffer.NewUnbounded(),
resultCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
cc: cc,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
go ccb.watcher()
ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
return ccb
}

// The following xxxUpdate structs wrap the arguments received as part of the
// corresponding update. The watcher goroutine uses the 'type' of the update to
// invoke the appropriate handler routine to handle the update.

type ccStateUpdate struct {
ccs *balancer.ClientConnState
}

type scStateUpdate struct {
sc balancer.SubConn
state connectivity.State
err error
}

type exitIdleUpdate struct{}

type resolverErrorUpdate struct {
err error
}

type switchToUpdate struct {
name string
}

// watcher is a long-running goroutine which reads updates from a channel and
// invokes corresponding methods on the underlying balancer. It ensures that
// these methods are invoked in a synchronous fashion. It also ensures that
// these methods are invoked in the order in which the updates were received.
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
case u, ok := <-ccb.updateCh.Get():
if !ok {
break
}
ccb.updateCh.Load()
if ccb.closed.HasFired() {
break
}
switch update := u.(type) {
case *ccStateUpdate:
ccb.handleClientConnStateChange(update.ccs)
case *scStateUpdate:
ccb.handleSubConnStateChange(update)
case *exitIdleUpdate:
ccb.handleExitIdle()
case *resolverErrorUpdate:
ccb.handleResolverError(update.err)
case *switchToUpdate:
ccb.handleSwitchTo(update.name)
default:
logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", update, update)
}
case <-ccb.closed.Done():
}

if ccb.closed.HasFired() {
ccb.handleClose()
return
}
}
}

// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer.
//
// Unlike other methods invoked by grpc to push updates to the underlying
// balancer, this method cannot simply push the update onto the update channel
// and return. It needs to return the error returned by the underlying balancer
// back to grpc which propagates that to the resolver.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
errCh := make(chan error, 1)
ccb.serializer.Schedule(func(_ context.Context) {
// If the addresses specified in the update contain addresses of type
// "grpclb" and the selected LB policy is not "grpclb", these addresses
// will be filtered out and ccs will be modified with the updated
// address list.
if ccb.curBalancerName != grpclbName {
var addrs []resolver.Address
for _, addr := range ccs.ResolverState.Addresses {
if addr.Type == resolver.GRPCLB {
continue
}
addrs = append(addrs, addr)
}
ccs.ResolverState.Addresses = addrs
}
errCh <- ccb.balancer.UpdateClientConnState(*ccs)
})

var res interface{}
var ok bool
// If the balancer wrapper is closed when waiting for this state update to
// be handled, the callback serializer will be closed as well, and we can
// rely on its Done channel to ensure that we don't block here forever.
select {
case res, ok = <-ccb.resultCh.Get():
if !ok {
// The result channel is closed only when the balancer wrapper is closed.
return nil
}
ccb.resultCh.Load()
case <-ccb.closed.Done():
// Return early if the balancer wrapper is closed while we are waiting for
// the underlying balancer to process a ClientConnState update.
return nil
}
// If the returned error is nil, attempting to type assert to error leads to
// panic. So, this needs to handled separately.
if res == nil {
case err := <-errCh:
return err
case <-ccb.serializer.Done:
return nil
}
return res.(error)
}

// handleClientConnStateChange handles a ClientConnState update from the update
// channel and invokes the appropriate method on the underlying balancer.
//
// If the addresses specified in the update contain addresses of type "grpclb"
// and the selected LB policy is not "grpclb", these addresses will be filtered
// out and ccs will be modified with the updated address list.
func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
if ccb.curBalancerName != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
var addrs []resolver.Address
for _, addr := range ccs.ResolverState.Addresses {
if addr.Type == resolver.GRPCLB {
continue
}
addrs = append(addrs, addr)
}
ccs.ResolverState.Addresses = addrs
}
ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))
}

// updateSubConnState is invoked by grpc to push a subConn state update to the
Expand All @@ -202,39 +114,27 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti
// tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
// this function will be called with (nil, Shutdown). We don't need to call
// balancer method in this case.
//
// TODO: Suppress the above mentioned state change to Shutdown, so we don't
// have to handle it here.
if sc == nil {
return
}
ccb.updateCh.Put(&scStateUpdate{
sc: sc,
state: s,
err: err,
ccb.serializer.Schedule(func(_ context.Context) {
ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
})
}

// handleSubConnStateChange handles a SubConnState update from the update
// channel and invokes the appropriate method on the underlying balancer.
func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
}

func (ccb *ccBalancerWrapper) exitIdle() {
ccb.updateCh.Put(&exitIdleUpdate{})
}

func (ccb *ccBalancerWrapper) handleExitIdle() {
if ccb.cc.GetState() != connectivity.Idle {
return
}
ccb.balancer.ExitIdle()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.balancer.ExitIdle()
})
}

func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.updateCh.Put(&resolverErrorUpdate{err: err})
}

func (ccb *ccBalancerWrapper) handleResolverError(err error) {
ccb.balancer.ResolverError(err)
ccb.serializer.Schedule(func(_ context.Context) {
ccb.balancer.ResolverError(err)
})
}

// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
Expand All @@ -248,49 +148,39 @@ func (ccb *ccBalancerWrapper) handleResolverError(err error) {
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
ccb.updateCh.Put(&switchToUpdate{name: name})
}

// handleSwitchTo handles a balancer switch update from the update channel. It
// calls the SwitchTo() method on the gracefulswitch.Balancer with a
// balancer.Builder corresponding to name. If no balancer.Builder is registered
// for the given name, it uses the default LB policy which is "pick_first".
func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {
// TODO: Other languages use case-insensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
return
}
ccb.serializer.Schedule(func(_ context.Context) {
// TODO: Other languages use case-sensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
return
}

// TODO: Ensure that name is a registered LB policy when we get here.
// We currently only validate the `loadBalancingConfig` field. We need to do
// the same for the `loadBalancingPolicy` field and reject the service config
// if the specified policy is not registered.
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
}
// Use the default LB policy, pick_first, if no LB policy with name is
// found in the registry.
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
}

if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
})
}

func (ccb *ccBalancerWrapper) close() {
ccb.closed.Fire()
<-ccb.done.Done()
}

func (ccb *ccBalancerWrapper) handleClose() {
// Close the serializer to ensure that no more calls from gRPC are sent to
// the balancer. We don't have to worry about suppressing calls from a
// closed balancer because these are handled by the ClientConn (balancer
// wrapper is only ever closed when the ClientConn is closed).
ccb.serializerCancel()
<-ccb.serializer.Done
ccb.balancer.Close()
ccb.updateCh.Close()
ccb.resultCh.Close()
ccb.done.Fire()
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
Expand Down

0 comments on commit c44f77e

Please sign in to comment.