Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request #61 from ibuildthecloud/main
Browse files Browse the repository at this point in the history
Fine tune backoff
  • Loading branch information
ibuildthecloud committed Mar 14, 2023
2 parents 66c90aa + 3aca3b5 commit 8b20d03
Showing 1 changed file with 33 additions and 10 deletions.
43 changes: 33 additions & 10 deletions pkg/router/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type HandlerSet struct {

limiterLock sync.Mutex
limiters map[limiterKey]*rate.Limiter
waiting map[limiterKey]struct{}
}

type limiterKey struct {
Expand Down Expand Up @@ -188,21 +189,45 @@ func (m *HandlerSet) WatchGVK(gvks ...schema.GroupVersionKind) error {
return merr.NewErrors(watchErrs...)
}

func (m *HandlerSet) getDelay(gvk schema.GroupVersionKind, key string) time.Duration {
func (m *HandlerSet) checkDelay(gvk schema.GroupVersionKind, key string) bool {
m.limiterLock.Lock()
defer m.limiterLock.Unlock()
limit, ok := m.limiters[limiterKey{key: key, gvk: gvk}]
lKey := limiterKey{key: key, gvk: gvk}

if _, ok := m.waiting[lKey]; ok {
return false
}

limit, ok := m.limiters[lKey]
if !ok {
// Limit to once every 15 seconds with a burst of 5. This limits the
// Limit to once every 15 seconds with a burst of 10. This limits the
// overall rate at which we can process a key regardless of the key
// source (change event, trigger, error re-enqueue)
limit = rate.NewLimiter(rate.Limit(1.0/15.0), 10)
if m.limiters == nil {
m.limiters = map[limiterKey]*rate.Limiter{}
}
m.limiters[limiterKey{key: key, gvk: gvk}] = limit
m.limiters[lKey] = limit
}
return limit.Reserve().Delay()

delay := limit.Reserve().Delay()
if delay > 0 {
if m.waiting == nil {
m.waiting = map[limiterKey]struct{}{}
}
m.waiting[lKey] = struct{}{}
go func() {
logrus.Warnf("Backing off [%s] [%s] for %s", key, gvk, delay)
time.Sleep(delay)
m.limiterLock.Lock()
defer m.limiterLock.Unlock()
delete(m.waiting, lKey)
_ = m.backend.Trigger(gvk, ReplayPrefix+key, 0)
}()
return false
}

return true
}

func (m *HandlerSet) forgetBackoff(gvk schema.GroupVersionKind, key string) {
Expand All @@ -224,12 +249,10 @@ func (m *HandlerSet) onChange(gvk schema.GroupVersionKind, key string, runtimeOb
key = strings.TrimPrefix(key, ReplayPrefix)
}

if !fromReplay {
if !fromReplay && !fromTrigger {
// Process delay have key has be reassigned from the TriggerPrefix
delay := m.getDelay(gvk, key)
if delay > 0 {
logrus.Warnf("Backing off %s for key %s on GVK %s", delay, key, gvk)
return runtimeObject, m.backend.Trigger(gvk, ReplayPrefix+key, delay)
if !m.checkDelay(gvk, key) {
return runtimeObject, nil
}
}

Expand Down

0 comments on commit 8b20d03

Please sign in to comment.