Skip to content

Commit

Permalink
orca: fix race at producer startup
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 2, 2023
1 parent ed3ceba commit 0281c8e
Showing 1 changed file with 43 additions and 18 deletions.
61 changes: 43 additions & 18 deletions orca/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ type producerBuilder struct{}
func (*producerBuilder) Build(cci interface{}) (balancer.Producer, func()) {
ctx, cancel := context.WithCancel(context.Background())
p := &producer{
client: v3orcaservicegrpc.NewOpenRcaServiceClient(cci.(grpc.ClientConnInterface)),
closed: grpcsync.NewEvent(),
intervals: make(map[time.Duration]int),
listeners: make(map[OOBListener]struct{}),
backoff: internal.DefaultBackoffFunc,
client: v3orcaservicegrpc.NewOpenRcaServiceClient(cci.(grpc.ClientConnInterface)),
closed: grpcsync.NewEvent(),
intervals: make(map[time.Duration]int),
listeners: make(map[OOBListener]struct{}),
backoff: internal.DefaultBackoffFunc,
hasIntervals: make(chan struct{}),
}
go p.run(ctx)
return p, func() {
Expand Down Expand Up @@ -100,16 +101,21 @@ type producer struct {
// reports a result.
backoff func(int) time.Duration

mu sync.Mutex
intervals map[time.Duration]int // map from interval time to count of listeners requesting that time
listeners map[OOBListener]struct{} // set of registered listeners
mu sync.Mutex
intervals map[time.Duration]int // map from interval time to count of listeners requesting that time
listeners map[OOBListener]struct{} // set of registered listeners
hasIntervals chan struct{} // created when intervals is empty; closed and nilled when non-empty.
}

// registerListener adds the listener and its requested report interval to the
// producer.
func (p *producer) registerListener(l OOBListener, interval time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
if p.hasIntervals != nil {
close(p.hasIntervals)
p.hasIntervals = nil
}
p.listeners[l] = struct{}{}
p.intervals[interval]++
}
Expand All @@ -124,26 +130,45 @@ func (p *producer) unregisterListener(l OOBListener, interval time.Duration) {
if p.intervals[interval] == 0 {
delete(p.intervals, interval)
}
if len(p.intervals) == 0 {
p.hasIntervals = make(chan struct{})
}
}

// minInterval returns the smallest key in p.intervals.
// minInterval returns the smallest key in p.intervals. If p.intervals is
// empty, blocks until it is non-empty or the producer is closed. Returns 0 on
// producer closure; it is the caller's duty to determine if 0 is a valid value
// or if the producer was closed.
func (p *producer) minInterval() time.Duration {
p.mu.Lock()
defer p.mu.Unlock()
var min time.Duration
first := true
for t := range p.intervals {
if t < min || first {
min = t
first = false
for !p.closed.HasFired() {
p.mu.Lock()
if len(p.intervals) == 0 {
ch := p.hasIntervals
p.mu.Unlock()
select {
case <-p.closed.Done():
case <-ch:
}
continue
}
var min time.Duration
first := true
for t := range p.intervals {
if t < min || first {
min = t
first = false
}
}
p.mu.Unlock()
return min
}
return min
return 0
}

// run manages the ORCA OOB stream on the subchannel.
func (p *producer) run(ctx context.Context) {
defer p.closed.Fire()

backoffAttempt := 0
backoffTimer := time.NewTimer(0)
for ctx.Err() == nil {
Expand Down

0 comments on commit 0281c8e

Please sign in to comment.