Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/ringhash: cache connectivity state of subchannels inside picker #6351

Merged
merged 3 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 1 addition & 4 deletions xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) {
// Make an RPC to get the ring_hash LB policy to reconnect and thereby move
// to TRANSIENT_FAILURE upon connection failure.
client.EmptyCall(ctx, &testpb.Empty{})
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
if cc.GetState() == connectivity.TransientFailure {
break
}
for state := cc.GetState(); state != connectivity.TransientFailure && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
}
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout waiting for channel to reach %q after server shutdown: %v", connectivity.TransientFailure, err)
Expand Down
15 changes: 10 additions & 5 deletions xds/internal/balancer/ringhash/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@ import (
)

type picker struct {
ring *ring
logger *grpclog.PrefixLogger
ring *ring
logger *grpclog.PrefixLogger
subConnStates map[*subConn]connectivity.State
}

func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker {
return &picker{ring: ring, logger: logger}
states := make(map[*subConn]connectivity.State)
for _, e := range ring.items {
states[e.sc] = e.sc.effectiveState()
}
return &picker{ring: ring, logger: logger, subConnStates: states}
}

// handleRICSResult is the return type of handleRICS. It's needed to wrap the
Expand All @@ -54,7 +59,7 @@ type handleRICSResult struct {
// or Shutdown. If it's true, the PickResult and error should be returned from
// Pick() as is.
func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
switch state := e.sc.effectiveState(); state {
switch state := p.subConnStates[e.sc]; state {
case connectivity.Ready:
return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true
case connectivity.Idle:
Expand Down Expand Up @@ -118,7 +123,7 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro
// but don't not trigger Connect() on the other SubConns.
var firstNonFailedFound bool
for ee := nextSkippingDuplicates(p.ring, e2); ee != e; ee = nextSkippingDuplicates(p.ring, ee) {
scState := ee.sc.effectiveState()
scState := p.subConnStates[ee.sc]
if scState == connectivity.Ready {
return balancer.PickResult{SubConn: ee.sc.sc}, nil
}
Expand Down
10 changes: 6 additions & 4 deletions xds/internal/balancer/ringhash/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/testutils"
)

Expand Down Expand Up @@ -96,7 +98,7 @@ func (s) TestPickerPickFirstTwo(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &picker{ring: tt.ring}
p := newPicker(tt.ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
got, err := p.Pick(balancer.PickInfo{
Ctx: SetRequestHash(context.Background(), tt.hash),
})
Expand Down Expand Up @@ -126,7 +128,7 @@ func (s) TestPickerPickTriggerTFConnect(t *testing.T) {
connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure,
connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure,
})
p := &picker{ring: ring}
p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
_, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
if err == nil {
t.Fatalf("Pick() error = %v, want non-nil", err)
Expand Down Expand Up @@ -156,7 +158,7 @@ func (s) TestPickerPickTriggerTFReturnReady(t *testing.T) {
ring := newTestRing([]connectivity.State{
connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Ready,
})
p := &picker{ring: ring}
p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
pr, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
if err != nil {
t.Fatalf("Pick() error = %v, want nil", err)
Expand All @@ -182,7 +184,7 @@ func (s) TestPickerPickTriggerTFWithIdle(t *testing.T) {
ring := newTestRing([]connectivity.State{
connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure,
})
p := &picker{ring: ring}
p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
_, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
if err == balancer.ErrNoSubConnAvailable {
t.Fatalf("Pick() error = %v, want %v", err, balancer.ErrNoSubConnAvailable)
Expand Down
24 changes: 5 additions & 19 deletions xds/internal/balancer/ringhash/ringhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,37 +347,23 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
newSCState := scs.effectiveState()
b.logger.Infof("SubConn's effective old state was: %v, new state is %v", oldSCState, newSCState)

var sendUpdate bool
oldBalancerState := b.state
b.state = b.csEvltr.recordTransition(oldSCState, newSCState)
if oldBalancerState != b.state {
sendUpdate = true
}

switch s {
case connectivity.Idle:
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Connecting:
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Ready:
// We need to regenerate the picker even if the ring has not changed
// because we could be moving from TRANSIENT_FAILURE to READY, in which
// case, we need to update the error picker returned earlier.
b.regeneratePicker()
sendUpdate = true
case connectivity.TransientFailure:
// Save error to be reported via picker.
b.connErr = state.ConnectionError
b.regeneratePicker()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Not a part of this PR, but this scStates field I feel like is wrongly named. This is a map from balancer.SubConn to the Ring Hashes SubConn with extra info. I feel like states is only a part of the reason this map exists.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it probably was changed during development. 99% of why there exists a map is state tracking. It also does a queueConnect in rare cases.

}

if sendUpdate {
if oldSCState != newSCState {
// Because the picker caches the state of the subconns, we always
// regenerate and update the picker when the effective SubConn state
// changes.
b.regeneratePicker()
b.logger.Infof("Pushing new state %v and picker %p", b.state, b.picker)
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
Expand Down