diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index d8446e85b3..b8e29248bb 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -188,6 +188,12 @@ type Options struct { // unless there is already one set in ByObject or DefaultNamespaces. DefaultTransform toolscache.TransformFunc + // DefaultWatchErrorHandler will be used to the WatchErrorHandler which is called + // whenever ListAndWatch drops the connection with an error. + // + // After calling this handler, the informer will backoff and retry. + DefaultWatchErrorHandler toolscache.WatchErrorHandler + // DefaultUnsafeDisableDeepCopy is the default for UnsafeDisableDeepCopy // for everything that doesn't specify this. // @@ -353,6 +359,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { Field: config.FieldSelector, }, Transform: config.Transform, + WatchErrorHandler: opts.DefaultWatchErrorHandler, UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false), NewInformer: opts.newInformer, }), diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 1d2c9ce2b4..6c2fb374e1 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -49,6 +49,7 @@ type InformersOpts struct { Selector Selector Transform cache.TransformFunc UnsafeDisableDeepCopy bool + WatchErrorHandler cache.WatchErrorHandler } // NewInformers creates a new InformersMap that can create informers under the hood. @@ -76,6 +77,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { transform: options.Transform, unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, newInformer: newInformer, + watchErrorHandler: options.WatchErrorHandler, } } @@ -159,6 +161,11 @@ type Informers struct { // NewInformer allows overriding of the shared index informer constructor for testing. newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer + + // WatchErrorHandler allows the shared index informer's + // watchErrorHandler to be set by overriding the options + // or to use the default watchErrorHandler + watchErrorHandler cache.WatchErrorHandler } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -323,6 +330,13 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) + // Set WatchErrorHandler on SharedIndexInformer if set + if ip.watchErrorHandler != nil { + if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil { + return nil, false, err + } + } + // Check to see if there is a transformer for this gvk if err := sharedIndexInformer.SetTransform(ip.transform); err != nil { return nil, false, err