Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: Add RPC event for blocking for a picker update #6422

Merged
merged 17 commits into from Jul 18, 2023
2 changes: 1 addition & 1 deletion clientconn.go
Expand Up @@ -349,7 +349,7 @@ func (cc *ClientConn) exitIdleMode() error {
cc.idlenessState = ccIdlenessStateExitingIdle
exitedIdle := false
if cc.blockingpicker == nil {
cc.blockingpicker = newPickerWrapper()
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
} else {
cc.blockingpicker.exitIdleMode()
exitedIdle = true
Expand Down
22 changes: 15 additions & 7 deletions picker_wrapper.go
Expand Up @@ -28,21 +28,26 @@ import (
"google.golang.org/grpc/internal/channelz"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {
mu sync.Mutex
done bool
idle bool
blockingCh chan struct{}
picker balancer.Picker
mu sync.Mutex
done bool
idle bool
blockingCh chan struct{}
picker balancer.Picker
statsHandlers []stats.Handler // to record blocking picker calls
}

func newPickerWrapper() *pickerWrapper {
return &pickerWrapper{blockingCh: make(chan struct{})}
func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
return &pickerWrapper{
blockingCh: make(chan struct{}),
statsHandlers: statsHandlers,
}
}

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
Expand Down Expand Up @@ -125,6 +130,9 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
}
case <-ch:
for _, sh := range pw.statsHandlers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment here or where statsHandler field is defined about what this is for and how this is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment on field: statsHandlers []stats.Handler // to record blocking picker calls

sh.HandleRPC(ctx, &stats.PickerUpdated{})
}
}
continue
}
Expand Down
10 changes: 5 additions & 5 deletions picker_wrapper_test.go
Expand Up @@ -66,7 +66,7 @@ func (p *testingPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error
}

func (s) TestBlockingPickTimeout(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
Expand All @@ -75,7 +75,7 @@ func (s) TestBlockingPickTimeout(t *testing.T) {
}

func (s) TestBlockingPick(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
// All goroutines should block because picker is nil in bp.
var finishedCount uint64
for i := goroutineCount; i > 0; i-- {
Expand All @@ -94,7 +94,7 @@ func (s) TestBlockingPick(t *testing.T) {
}

func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
var finishedCount uint64
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
// All goroutines should block because picker returns no subConn available.
Expand All @@ -114,7 +114,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
}

func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount})
var finishedCount uint64
// All goroutines should block because picker returns transientFailure and
Expand All @@ -135,7 +135,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
}

func (s) TestBlockingPickSCNotReady(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
var finishedCount uint64
// All goroutines should block because subConn is not ready.
Expand Down
4 changes: 3 additions & 1 deletion stats/opencensus/trace.go
Expand Up @@ -99,9 +99,11 @@ func populateSpan(ctx context.Context, rs stats.RPCStats, ti *traceInfo) {
trace.BoolAttribute("Client", rs.Client),
trace.BoolAttribute("FailFast", rs.FailFast),
)
case *stats.PickerUpdated:
span.Annotate(nil, "Delayed LB pick complete")
case *stats.InPayload:
// message id - "must be calculated as two different counters starting
// from 1 one for sent messages and one for received messages."
// from one for sent messages and one for received messages."
mi := atomic.AddUint32(&ti.countRecvMsg, 1)
span.AddMessageReceiveEvent(int64(mi), int64(rs.Length), int64(rs.CompressedLength))
case *stats.OutPayload:
Expand Down
10 changes: 10 additions & 0 deletions stats/stats.go
Expand Up @@ -59,6 +59,16 @@ func (s *Begin) IsClient() bool { return s.Client }

func (s *Begin) isRPCStats() {}

// PickerUpdated represents an event that the picker finished picking if the
// ClientConn blocks on picker picking while performing a RPC.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"PickerUpdated indicates that the LB Policy's picker was updated after the previous pick attempt returned ErrNoSubConnAvailable." Or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the only case that this gets triggered (see list of 4 in https://github.com/grpc/grpc-go/blob/master/picker_wrapper.go#L87). Any other ideas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true. I was trying to keep it simple. The alternative would be something more vague like "PickerUpdated indicates that the LB policy provided a new picker while the RPC was waiting for one."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do the more vague one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.. "picker finished picking if the CC blocks on picker picking" makes no sense to me 😛

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update to something like PickerUpdated indicates that the LB policy provided a new picker while the RPC was waiting for one.

type PickerUpdated struct{}

// IsClient indicates if the stats information is from client side. Only Client
// Side interfaces with a Picker, thus always returns true.
func (*PickerUpdated) IsClient() bool { return true }

func (*PickerUpdated) isRPCStats() {}

// InPayload contains the information for an incoming payload.
type InPayload struct {
// Client is true if this InPayload is from client side.
Expand Down
167 changes: 167 additions & 0 deletions test/end2end_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
Expand All @@ -44,6 +45,8 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
Expand All @@ -62,6 +65,7 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
Expand All @@ -82,6 +86,7 @@ const defaultHealthService = "grpc.health.v1.Health"

func init() {
channelz.TurnOn()
balancer.Register(blockingPickerBalancerBuilder{})
}

type s struct {
Expand Down Expand Up @@ -6362,3 +6367,165 @@ func (s) TestGlobalBinaryLoggingOptions(t *testing.T) {
t.Fatalf("want 8 server side binary logging events, got %v", ssbl.mml.events)
}
}

type statsHandlerRecordEvents struct {
mu sync.Mutex
s []stats.RPCStats
}

func (*statsHandlerRecordEvents) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
func (h *statsHandlerRecordEvents) HandleRPC(_ context.Context, s stats.RPCStats) {
h.mu.Lock()
h.s = append(h.s, s)
h.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: defer as a matter of course is probably a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
func (*statsHandlerRecordEvents) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (*statsHandlerRecordEvents) HandleConn(context.Context, stats.ConnStats) {}

type blockingPicker struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called blocking? Is that because of it queues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Induces a block. I'll call it triggerRPCBlockPicker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also switched the balancer type names accordingly.

pickDone func()
}

func (bp *blockingPicker) Pick(pi balancer.PickInfo) (balancer.PickResult, error) {
defer bp.pickDone()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW defer doesn't actually do anything different than just calling it directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that guarantee the first pick hits err sub conn available before the client conn processes the new picker? And thus deterministically induce this event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched anyway. But this worries me. I guess the client conn has already called into the picker so the new one won't overwrite that call into component.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not saying you must change it, but noting that the functionally it is identical. defer x(); return <no function call> is the same as x(); return <no function call>. If you did:

defer x()
return y()

...it would be the same as:

blah := y()
x()
return blah

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I had a feeling. So does the fact that another picker comes before the ErrNoSubConnAvailable logically cause any problems? Does that make it deterministic or is it guaranteed to finish the processing from the picker callout?

return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

const name = "blockingPickerBalancer"

type blockingPickerBalancerBuilder struct{}

func (blockingPickerBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &blockingPickerBalancer{
ClientConn: cc,
}
// round_robin child to complete balancer tree with a usable leaf policy and
// have RPCs actually work.
builder := balancer.Get(roundrobin.Name)
if builder == nil {
// Shouldn't happen, defensive programming. Registered from import of
// roundrobin package.
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

panic here instead of returning nil and maybe invoking some other panic later. And below.

Actually this one isn't needed as builder.Build will just panic if builder is nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Why does this matter philosophically though? Test crashes closer to root cause of the crash?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if it fails then you want it to fail as close to where the error is as possible. If you cause another failure somewhere else it's harder to debug.

}
rr := builder.Build(b, bOpts)
if rr == nil {
// Shouldn't happen, defensive programming.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: generally, commenting a panic with "this shouldn't happen" isn't helpful. Obviously it shouldn't happen or else we wouldn't have made it panic. You can say why it shouldn't happen if it's interesting, or just leave the comment out if it's obvious or if the panic explains it (which is better than having a comment explaining it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, whoops, this was left over from when this wasn't an explicit panic. Deleted comment.

return nil
}
b.Balancer = rr
return b
}

func (blockingPickerBalancerBuilder) ParseConfig(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return &bpbConfig{}, nil
}

func (blockingPickerBalancerBuilder) Name() string {
return name
}

type bpbConfig struct {
serviceconfig.LoadBalancingConfig
}

type blockingPickerBalancer struct {
stateMu sync.Mutex
childState balancer.State

blockingPickerDone *grpcsync.Event
// embed a ClientConn to wrap only UpdateState() operation
balancer.ClientConn
// embed a Balancer to wrap only UpdateClientConnState() operation
balancer.Balancer
}

func (bpb *blockingPickerBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
bpb.blockingPickerDone = grpcsync.NewEvent()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this per-updateCCS instead of created in Build?

err := bpb.Balancer.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass s directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to eat the config and send down nil, but I guess this gets nil as the config. Switched.

bpb.ClientConn.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &blockingPicker{
pickDone: func() {
bpb.blockingPickerDone.Fire()
bpb.stateMu.Lock()
cs := bpb.childState
bpb.stateMu.Unlock()
bpb.ClientConn.UpdateState(cs)
},
},
})
return err
}

func (bpb *blockingPickerBalancer) UpdateState(state balancer.State) {
bpb.stateMu.Lock()
bpb.childState = state
bpb.stateMu.Unlock()
if bpb.blockingPickerDone.HasFired() { // guard first one to get a picker sending ErrNoSubConnAvailable first
bpb.ClientConn.UpdateState(state) // after the first rr picker update, cc will trigger more, so actually forward these
}
}

// TestPickerBlockingStatsCall tests the emission of a stats handler call that
// represents the picker had to block due to ErrNoSubConnAvailable being
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Terminology nit: the picker doesn't block. I guess the blockingPicker blocks. But more accurately, the RPC blocks (or in C++/Java: is queued) waiting for a new picker from the LB policy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched language accordingly :).

// returned from the first picker call.
func (s) TestPickerBlockingStatsCall(t *testing.T) {
sh := &statsHandlerRecordEvents{}
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
}

if err := ss.StartServer(); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

lbCfgJSON := `{
"loadBalancingConfig": [
{
"blockingPickerBalancer": {}
}
]
}`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix formatting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks solid on local. Idk why it's showing up like this on Github diff. Let me mess with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I messed with it and I think it looks better now. Let me know what you think.


sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON)
mr := manual.NewBuilderWithScheme("pickerupdatedbalancer")
defer mr.Close()
print("ss.Address when setting: ", ss.Address)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove or change to t.Log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, sorry, deleted.

mr.InitialState(resolver.State{
Addresses: []resolver.Address{
{Addr: ss.Address},
},
ServiceConfig: sc,
})

cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithStatsHandler(sh), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)
if _, err := testServiceClient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("Unexpected error from UnaryCall: %v", err)
}

var pickerUpdatedCount uint
for _, stat := range sh.s {
if _, ok := stat.(*stats.PickerUpdated); ok {
pickerUpdatedCount++
}
}
if pickerUpdatedCount == 0 {
t.Fatalf("sh.pickerUpdated count: %v, want: >0", pickerUpdatedCount)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be able to be deterministic and more specific... == 1 or == 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it was determinisitc at 2 but switched to this to make sure. Switched back to 2.

}
4 changes: 4 additions & 0 deletions test/retry_test.go
Expand Up @@ -474,6 +474,10 @@ func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) conte
return ctx
}
func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
// these calls come in nondeterministically - so can just ignore
if _, ok := s.(*stats.PickerUpdated); ok {
return
}
h.mu.Lock()
h.s = append(h.s, s)
h.mu.Unlock()
Expand Down