Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

leastrequest: fix data race in leastrequest picker #6606

Merged
merged 1 commit into from Sep 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions balancer/leastrequest/leastrequest.go
Expand Up @@ -80,7 +80,7 @@ func (bb) Name() string {
}

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*int32)}
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
b.Balancer = baseBuilder.Build(cc, bOpts)
return b
Expand All @@ -92,7 +92,7 @@ type leastRequestBalancer struct {
balancer.Balancer

choiceCount uint32
scRPCCounts map[balancer.SubConn]*int32 // Hold onto RPC counts to keep track for subsequent picker updates.
scRPCCounts map[balancer.SubConn]*atomic.Int32 // Hold onto RPC counts to keep track for subsequent picker updates.
}

func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
Expand All @@ -108,7 +108,7 @@ func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnStat

type scWithRPCCount struct {
sc balancer.SubConn
numRPCs *int32
numRPCs *atomic.Int32
}

func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
Expand All @@ -126,7 +126,7 @@ func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picke
// Create new refs if needed.
for sc := range info.ReadySCs {
if _, ok := lrb.scRPCCounts[sc]; !ok {
lrb.scRPCCounts[sc] = new(int32)
lrb.scRPCCounts[sc] = new(atomic.Int32)
}
}

Expand Down Expand Up @@ -162,18 +162,18 @@ func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
pickedSC = &sc
continue
}
if *sc.numRPCs < *pickedSC.numRPCs {
if sc.numRPCs.Load() < pickedSC.numRPCs.Load() {
pickedSC = &sc
}
}
// "The counter for a subchannel should be atomically incremented by one
// after it has been successfully picked by the picker." - A48
atomic.AddInt32(pickedSC.numRPCs, 1)
pickedSC.numRPCs.Add(1)
// "the picker should add a callback for atomically decrementing the
// subchannel counter once the RPC finishes (regardless of Status code)." -
// A48.
done := func(balancer.DoneInfo) {
atomic.AddInt32(pickedSC.numRPCs, -1)
pickedSC.numRPCs.Add(-1)
}
return balancer.PickResult{
SubConn: pickedSC.sc,
Expand Down