Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: Add RPC event for blocking for a picker update #6422

Merged
merged 17 commits into from
Jul 18, 2023
6 changes: 5 additions & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
Expand Down Expand Up @@ -349,7 +350,7 @@ func (cc *ClientConn) exitIdleMode() error {
cc.idlenessState = ccIdlenessStateExitingIdle
exitedIdle := false
if cc.blockingpicker == nil {
cc.blockingpicker = newPickerWrapper()
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
} else {
cc.blockingpicker.exitIdleMode()
exitedIdle = true
Expand Down Expand Up @@ -743,6 +744,9 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
}
select {
case <-cc.firstResolveEvent.Done():
for _, sh := range cc.dopts.copts.StatsHandlers {
sh.HandleRPC(ctx, &stats.InitialResolverResult{})
}
return nil
case <-ctx.Done():
return status.FromContextError(ctx.Err()).Err()
Expand Down
23 changes: 16 additions & 7 deletions picker_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,26 @@ import (
"google.golang.org/grpc/internal/channelz"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {
mu sync.Mutex
done bool
idle bool
blockingCh chan struct{}
picker balancer.Picker
mu sync.Mutex
done bool
idle bool
blockingCh chan struct{}
picker balancer.Picker
statsHandlers []stats.Handler
}

func newPickerWrapper() *pickerWrapper {
return &pickerWrapper{blockingCh: make(chan struct{})}
func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
return &pickerWrapper{
blockingCh: make(chan struct{}),
statsHandlers: statsHandlers,
}
}

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
Expand Down Expand Up @@ -125,6 +130,10 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
}
case <-ch:
for _, sh := range pw.statsHandlers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment here or where statsHandler field is defined about what this is for and how this is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment on field: statsHandlers []stats.Handler // to record blocking picker calls

sh.HandleRPC(ctx, &stats.PickerUpdated{})
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: nix newline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah whoops. Deleted newline.

}
continue
}
Expand Down
10 changes: 5 additions & 5 deletions picker_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *testingPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error
}

func (s) TestBlockingPickTimeout(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
Expand All @@ -75,7 +75,7 @@ func (s) TestBlockingPickTimeout(t *testing.T) {
}

func (s) TestBlockingPick(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
// All goroutines should block because picker is nil in bp.
var finishedCount uint64
for i := goroutineCount; i > 0; i-- {
Expand All @@ -94,7 +94,7 @@ func (s) TestBlockingPick(t *testing.T) {
}

func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
var finishedCount uint64
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
// All goroutines should block because picker returns no subConn available.
Expand All @@ -114,7 +114,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
}

func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount})
var finishedCount uint64
// All goroutines should block because picker returns transientFailure and
Expand All @@ -135,7 +135,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
}

func (s) TestBlockingPickSCNotReady(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
var finishedCount uint64
// All goroutines should block because subConn is not ready.
Expand Down
6 changes: 5 additions & 1 deletion stats/opencensus/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,13 @@ func populateSpan(ctx context.Context, rs stats.RPCStats, ti *traceInfo) {
trace.BoolAttribute("Client", rs.Client),
trace.BoolAttribute("FailFast", rs.FailFast),
)
case *stats.InitialResolverResult:
span.Annotate(nil, "Delayed name resolution complete")
case *stats.PickerUpdated:
span.Annotate(nil, "Delayed LB pick complete")
case *stats.InPayload:
// message id - "must be calculated as two different counters starting
// from 1 one for sent messages and one for received messages."
// from one for sent messages and one for received messages."
mi := atomic.AddUint32(&ti.countRecvMsg, 1)
span.AddMessageReceiveEvent(int64(mi), int64(rs.Length), int64(rs.CompressedLength))
case *stats.OutPayload:
Expand Down
21 changes: 21 additions & 0 deletions stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ func (s *Begin) IsClient() bool { return s.Client }

func (s *Begin) isRPCStats() {}

// InitialResolverResult represents an event that the resolved finished
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved->resolver

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched.

// resolving if the ClientConn blocks on resolver resolution while performing a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"...if the RPC occurred before the initial resolver result."?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My string is still technically correct, but yours falls under the subset of my language and is more specific :). Switched.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an external docstring, so we should find something that is as clear and concise as possible.

"blocks on resolver resolution" is confusing to read and technically the ClientConn doesn't block -- the RPC does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I agree. Will push once I finish tests.

// RPC.
type InitialResolverResult struct{}

// IsClient indicates if the stats information is from client side. Only Client
// Side interfaces with a resolver, thus always returns true.
func (irr *InitialResolverResult) IsClient() bool { return true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit for all these methods. Since the receiver is not used, we don't even have to name it. So, this can simply be:

func (*InitialResolverResult) IsClient() bool { return true }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmmm ok I was actually wondering that too. Deleted, thanks.


func (irr *InitialResolverResult) isRPCStats() {}

// PickerUpdated represents an event that the picker finished picking if the
// ClientConn blocks on picker picking while performing a RPC.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"PickerUpdated indicates that the LB Policy's picker was updated after the previous pick attempt returned ErrNoSubConnAvailable." Or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the only case that this gets triggered (see list of 4 in https://github.com/grpc/grpc-go/blob/master/picker_wrapper.go#L87). Any other ideas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true. I was trying to keep it simple. The alternative would be something more vague like "PickerUpdated indicates that the LB policy provided a new picker while the RPC was waiting for one."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do the more vague one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.. "picker finished picking if the CC blocks on picker picking" makes no sense to me 😛

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update to something like PickerUpdated indicates that the LB policy provided a new picker while the RPC was waiting for one.

type PickerUpdated struct{}

// IsClient indicates if the stats information is from client side. Only Client
// Side interfaces with a Picker, thus always returns true.
func (pu *PickerUpdated) IsClient() bool { return true }

func (pu *PickerUpdated) isRPCStats() {}

// InPayload contains the information for an incoming payload.
type InPayload struct {
// Client is true if this InPayload is from client side.
Expand Down
14 changes: 10 additions & 4 deletions test/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ func (s) TestRetryStats(t *testing.T) {
handler.mu.Lock()
want := []stats.RPCStats{
&stats.Begin{},
&stats.PickerUpdated{},
&stats.PickerUpdated{},
&stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
&stats.OutPayload{WireLength: 5},
&stats.End{},
Expand Down Expand Up @@ -579,7 +581,7 @@ func (s) TestRetryStats(t *testing.T) {
// There is a race between receiving the payload (triggered by the
// application / gRPC library) and receiving the trailer (triggered at the
// transport layer). Adjust the received stats accordingly if necessary.
const tIdx, pIdx = 13, 14
const tIdx, pIdx = 15, 16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing tests should probably filter out PickerUpdated events entirely, unless we know they are deterministic in those tests. And we should have a new test with a custom LB policy that confirms those events work as intended.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

_, okT := handler.s[tIdx].(*stats.InTrailer)
_, okP := handler.s[pIdx].(*stats.InPayload)
if okT && okP {
Expand All @@ -594,7 +596,11 @@ func (s) TestRetryStats(t *testing.T) {
t.Fatalf("at position %v: got %T; want %T", i, s, w)
}
wv, sv := reflect.ValueOf(w).Elem(), reflect.ValueOf(s).Elem()

if sv.NumField() == 0 {
// Resolver blocking and Picker blocking events have no fields,
// isClient() always returns true.
continue
}
// Validate that Client is always true
if sv.FieldByName("Client").Interface().(bool) != true {
t.Fatalf("at position %v: got Client=false; want true", i)
Expand All @@ -620,8 +626,8 @@ func (s) TestRetryStats(t *testing.T) {
}

// Validate timings between last Begin and preceding End.
end := handler.s[8].(*stats.End)
begin := handler.s[9].(*stats.Begin)
end := handler.s[10].(*stats.End)
begin := handler.s[11].(*stats.Begin)
diff := begin.BeginTime.Sub(end.EndTime)
if diff < 10*time.Millisecond || diff > 50*time.Millisecond {
t.Fatalf("pushback time before final attempt = %v; want ~10ms", diff)
Expand Down