Skip to content

Commit

Permalink
xds/ringhash: cache connectivity state of subchannels inside picker (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Jun 7, 2023
1 parent 1b66663 commit 761c084
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 32 deletions.
5 changes: 1 addition & 4 deletions xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) {
// Make an RPC to get the ring_hash LB policy to reconnect and thereby move
// to TRANSIENT_FAILURE upon connection failure.
client.EmptyCall(ctx, &testpb.Empty{})
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
if cc.GetState() == connectivity.TransientFailure {
break
}
for state := cc.GetState(); state != connectivity.TransientFailure && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
}
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout waiting for channel to reach %q after server shutdown: %v", connectivity.TransientFailure, err)
Expand Down
15 changes: 10 additions & 5 deletions xds/internal/balancer/ringhash/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@ import (
)

type picker struct {
ring *ring
logger *grpclog.PrefixLogger
ring *ring
logger *grpclog.PrefixLogger
subConnStates map[*subConn]connectivity.State
}

func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker {
return &picker{ring: ring, logger: logger}
states := make(map[*subConn]connectivity.State)
for _, e := range ring.items {
states[e.sc] = e.sc.effectiveState()
}
return &picker{ring: ring, logger: logger, subConnStates: states}
}

// handleRICSResult is the return type of handleRICS. It's needed to wrap the
Expand All @@ -54,7 +59,7 @@ type handleRICSResult struct {
// or Shutdown. If it's true, the PickResult and error should be returned from
// Pick() as is.
func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
switch state := e.sc.effectiveState(); state {
switch state := p.subConnStates[e.sc]; state {
case connectivity.Ready:
return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true
case connectivity.Idle:
Expand Down Expand Up @@ -118,7 +123,7 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro
// but don't not trigger Connect() on the other SubConns.
var firstNonFailedFound bool
for ee := nextSkippingDuplicates(p.ring, e2); ee != e; ee = nextSkippingDuplicates(p.ring, ee) {
scState := ee.sc.effectiveState()
scState := p.subConnStates[ee.sc]
if scState == connectivity.Ready {
return balancer.PickResult{SubConn: ee.sc.sc}, nil
}
Expand Down
10 changes: 6 additions & 4 deletions xds/internal/balancer/ringhash/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/testutils"
)

Expand Down Expand Up @@ -96,7 +98,7 @@ func (s) TestPickerPickFirstTwo(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &picker{ring: tt.ring}
p := newPicker(tt.ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
got, err := p.Pick(balancer.PickInfo{
Ctx: SetRequestHash(context.Background(), tt.hash),
})
Expand Down Expand Up @@ -126,7 +128,7 @@ func (s) TestPickerPickTriggerTFConnect(t *testing.T) {
connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure,
connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure,
})
p := &picker{ring: ring}
p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
_, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
if err == nil {
t.Fatalf("Pick() error = %v, want non-nil", err)
Expand Down Expand Up @@ -156,7 +158,7 @@ func (s) TestPickerPickTriggerTFReturnReady(t *testing.T) {
ring := newTestRing([]connectivity.State{
connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Ready,
})
p := &picker{ring: ring}
p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
pr, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
if err != nil {
t.Fatalf("Pick() error = %v, want nil", err)
Expand All @@ -182,7 +184,7 @@ func (s) TestPickerPickTriggerTFWithIdle(t *testing.T) {
ring := newTestRing([]connectivity.State{
connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure,
})
p := &picker{ring: ring}
p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
_, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
if err == balancer.ErrNoSubConnAvailable {
t.Fatalf("Pick() error = %v, want %v", err, balancer.ErrNoSubConnAvailable)
Expand Down
24 changes: 5 additions & 19 deletions xds/internal/balancer/ringhash/ringhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,37 +347,23 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
newSCState := scs.effectiveState()
b.logger.Infof("SubConn's effective old state was: %v, new state is %v", oldSCState, newSCState)

var sendUpdate bool
oldBalancerState := b.state
b.state = b.csEvltr.recordTransition(oldSCState, newSCState)
if oldBalancerState != b.state {
sendUpdate = true
}

switch s {
case connectivity.Idle:
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Connecting:
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Ready:
// We need to regenerate the picker even if the ring has not changed
// because we could be moving from TRANSIENT_FAILURE to READY, in which
// case, we need to update the error picker returned earlier.
b.regeneratePicker()
sendUpdate = true
case connectivity.TransientFailure:
// Save error to be reported via picker.
b.connErr = state.ConnectionError
b.regeneratePicker()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
}

if sendUpdate {
if oldSCState != newSCState {
// Because the picker caches the state of the subconns, we always
// regenerate and update the picker when the effective SubConn state
// changes.
b.regeneratePicker()
b.logger.Infof("Pushing new state %v and picker %p", b.state, b.picker)
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
Expand Down

0 comments on commit 761c084

Please sign in to comment.