From 0c894a47e4bfe73d7c83d251896a181ac9c11cc6 Mon Sep 17 00:00:00 2001 From: Troy Connor Date: Thu, 14 Sep 2023 11:30:18 -0400 Subject: [PATCH 1/2] add SetErrorWatchHandler to informers Signed-off-by: Troy Connor --- pkg/cache/internal/informers.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 1d2c9ce2b4..ac9ee3f36e 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -49,14 +49,19 @@ type InformersOpts struct { Selector Selector Transform cache.TransformFunc UnsafeDisableDeepCopy bool + WatchErrorHandler cache.WatchErrorHandler } // NewInformers creates a new InformersMap that can create informers under the hood. func NewInformers(config *rest.Config, options *InformersOpts) *Informers { newInformer := cache.NewSharedIndexInformer + watchErrorHandler := cache.DefaultWatchErrorHandler if options.NewInformer != nil { newInformer = *options.NewInformer } + if options.WatchErrorHandler != nil { + watchErrorHandler = options.WatchErrorHandler + } return &Informers{ config: config, httpClient: options.HTTPClient, @@ -76,6 +81,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { transform: options.Transform, unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, newInformer: newInformer, + watchErrorHandler: watchErrorHandler, } } @@ -159,6 +165,8 @@ type Informers struct { // NewInformer allows overriding of the shared index informer constructor for testing. newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer + + watchErrorHandler cache.WatchErrorHandler } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -323,6 +331,11 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) + // Set WatchErrorHandler on SharedIndexInformer + if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil { + return nil, false, err + } + // Check to see if there is a transformer for this gvk if err := sharedIndexInformer.SetTransform(ip.transform); err != nil { return nil, false, err From 897e04cda7dd93163e086d2371aaf70d0d2378c9 Mon Sep 17 00:00:00 2001 From: Troy Connor Date: Mon, 18 Sep 2023 10:02:29 -0400 Subject: [PATCH 2/2] pass watchErrorHandler through Options struct Signed-off-by: Troy Connor --- pkg/cache/cache.go | 7 +++++++ pkg/cache/internal/informers.go | 17 +++++++++-------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index d8446e85b3..b8e29248bb 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -188,6 +188,12 @@ type Options struct { // unless there is already one set in ByObject or DefaultNamespaces. DefaultTransform toolscache.TransformFunc + // DefaultWatchErrorHandler will be used to the WatchErrorHandler which is called + // whenever ListAndWatch drops the connection with an error. + // + // After calling this handler, the informer will backoff and retry. + DefaultWatchErrorHandler toolscache.WatchErrorHandler + // DefaultUnsafeDisableDeepCopy is the default for UnsafeDisableDeepCopy // for everything that doesn't specify this. // @@ -353,6 +359,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { Field: config.FieldSelector, }, Transform: config.Transform, + WatchErrorHandler: opts.DefaultWatchErrorHandler, UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false), NewInformer: opts.newInformer, }), diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index ac9ee3f36e..6c2fb374e1 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -55,13 +55,9 @@ type InformersOpts struct { // NewInformers creates a new InformersMap that can create informers under the hood. func NewInformers(config *rest.Config, options *InformersOpts) *Informers { newInformer := cache.NewSharedIndexInformer - watchErrorHandler := cache.DefaultWatchErrorHandler if options.NewInformer != nil { newInformer = *options.NewInformer } - if options.WatchErrorHandler != nil { - watchErrorHandler = options.WatchErrorHandler - } return &Informers{ config: config, httpClient: options.HTTPClient, @@ -81,7 +77,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { transform: options.Transform, unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, newInformer: newInformer, - watchErrorHandler: watchErrorHandler, + watchErrorHandler: options.WatchErrorHandler, } } @@ -166,6 +162,9 @@ type Informers struct { // NewInformer allows overriding of the shared index informer constructor for testing. newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer + // WatchErrorHandler allows the shared index informer's + // watchErrorHandler to be set by overriding the options + // or to use the default watchErrorHandler watchErrorHandler cache.WatchErrorHandler } @@ -331,9 +330,11 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) - // Set WatchErrorHandler on SharedIndexInformer - if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil { - return nil, false, err + // Set WatchErrorHandler on SharedIndexInformer if set + if ip.watchErrorHandler != nil { + if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil { + return nil, false, err + } } // Check to see if there is a transformer for this gvk