Skip to content

Commit

Permalink
leastrequest: fix data race in leastrequest picker (#6606)
Browse files Browse the repository at this point in the history
Co-authored-by: Huang Chong <hchtgh315@gmail.com>
  • Loading branch information
arvindbr8 and huangchong94 committed Sep 5, 2023
1 parent e26457d commit 5d1c0ae
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions balancer/leastrequest/leastrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (bb) Name() string {
}

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*int32)}
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
b.Balancer = baseBuilder.Build(cc, bOpts)
return b
Expand All @@ -92,7 +92,7 @@ type leastRequestBalancer struct {
balancer.Balancer

choiceCount uint32
scRPCCounts map[balancer.SubConn]*int32 // Hold onto RPC counts to keep track for subsequent picker updates.
scRPCCounts map[balancer.SubConn]*atomic.Int32 // Hold onto RPC counts to keep track for subsequent picker updates.
}

func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
Expand All @@ -108,7 +108,7 @@ func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnStat

type scWithRPCCount struct {
sc balancer.SubConn
numRPCs *int32
numRPCs *atomic.Int32
}

func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
Expand All @@ -126,7 +126,7 @@ func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picke
// Create new refs if needed.
for sc := range info.ReadySCs {
if _, ok := lrb.scRPCCounts[sc]; !ok {
lrb.scRPCCounts[sc] = new(int32)
lrb.scRPCCounts[sc] = new(atomic.Int32)
}
}

Expand Down Expand Up @@ -162,18 +162,18 @@ func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
pickedSC = &sc
continue
}
if *sc.numRPCs < *pickedSC.numRPCs {
if sc.numRPCs.Load() < pickedSC.numRPCs.Load() {
pickedSC = &sc
}
}
// "The counter for a subchannel should be atomically incremented by one
// after it has been successfully picked by the picker." - A48
atomic.AddInt32(pickedSC.numRPCs, 1)
pickedSC.numRPCs.Add(1)
// "the picker should add a callback for atomically decrementing the
// subchannel counter once the RPC finishes (regardless of Status code)." -
// A48.
done := func(balancer.DoneInfo) {
atomic.AddInt32(pickedSC.numRPCs, -1)
pickedSC.numRPCs.Add(-1)
}
return balancer.PickResult{
SubConn: pickedSC.sc,
Expand Down

0 comments on commit 5d1c0ae

Please sign in to comment.