diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 1ea44d9b83..fa4b0d3dfc 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -81,6 +81,9 @@ type Informers interface { // of the underlying object. GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) + // RemoveInformer removes an informer entry and stops it if it was running. + RemoveInformer(ctx context.Context, obj client.Object) error + // Start runs all the informers known to this cache until the context is closed. // It blocks. Start(ctx context.Context) error @@ -119,6 +122,8 @@ type Informer interface { // HasSynced return true if the informers underlying store has synced. HasSynced() bool + // IsStopped returns true if the informer has been stopped. + IsStopped() bool } // Options are the optional arguments for creating a new Cache object. diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index d98f5f92ee..6f89168a16 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -1758,6 +1758,42 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca By("verifying the object is received on the channel") Eventually(out).Should(Receive(Equal(pod))) }) + It("should be able to stop and restart informers", func() { + By("getting a shared index informer for a pod") + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "informer-obj", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + } + sii, err := informerCache.GetInformer(context.TODO(), pod) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeTrue()) + + By("removing the existing informer") + Expect(informerCache.RemoveInformer(context.TODO(), pod)).To(Succeed()) + Eventually(sii.IsStopped).WithTimeout(5 * time.Second).Should(BeTrue()) + + By("recreating the informer") + + sii2, err := informerCache.GetInformer(context.TODO(), pod) + Expect(err).NotTo(HaveOccurred()) + Expect(sii2).NotTo(BeNil()) + Expect(sii2.HasSynced()).To(BeTrue()) + + By("validating the two informers are in different states") + Expect(sii.IsStopped()).To(BeTrue()) + Expect(sii2.IsStopped()).To(BeFalse()) + }) It("should be able to get an informer by group/version/kind", func() { By("getting an shared index informer for gvk = core/v1/pod") gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} @@ -1942,6 +1978,48 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca Eventually(out).Should(Receive(Equal(pod))) }) + It("should be able to stop and restart informers", func() { + By("getting a shared index informer for a pod") + pod := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "containers": []map[string]interface{}{ + { + "name": "nginx", + "image": "nginx", + }, + }, + }, + }, + } + pod.SetName("informer-obj2") + pod.SetNamespace("default") + pod.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Pod", + }) + sii, err := informerCache.GetInformer(context.TODO(), pod) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeTrue()) + + By("removing the existing informer") + Expect(informerCache.RemoveInformer(context.TODO(), pod)).To(Succeed()) + Eventually(sii.IsStopped).WithTimeout(5 * time.Second).Should(BeTrue()) + + By("recreating the informer") + + sii2, err := informerCache.GetInformer(context.TODO(), pod) + Expect(err).NotTo(HaveOccurred()) + Expect(sii2).NotTo(BeNil()) + Expect(sii2.HasSynced()).To(BeTrue()) + + By("validating the two informers are in different states") + Expect(sii.IsStopped()).To(BeTrue()) + Expect(sii2.IsStopped()).To(BeFalse()) + }) + It("should be able to index an object field then retrieve objects by that field", func() { By("creating the cache") informer, err := cache.New(cfg, cache.Options{}) diff --git a/pkg/cache/delegating_by_gvk_cache.go b/pkg/cache/delegating_by_gvk_cache.go index f3fa4800d2..4db8208a63 100644 --- a/pkg/cache/delegating_by_gvk_cache.go +++ b/pkg/cache/delegating_by_gvk_cache.go @@ -52,6 +52,14 @@ func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectLis return cache.List(ctx, list, opts...) } +func (dbt *delegatingByGVKCache) RemoveInformer(ctx context.Context, obj client.Object) error { + cache, err := dbt.cacheForObject(obj) + if err != nil { + return err + } + return cache.RemoveInformer(ctx, obj) +} + func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) { cache, err := dbt.cacheForObject(obj) if err != nil { diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 0f1b4e93d2..091667b7fa 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -190,6 +190,17 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{}) } +// RemoveInformer deactivates and removes the informer from the cache. +func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error { + gvk, err := apiutil.GVKForObject(obj, ic.scheme) + if err != nil { + return err + } + + ic.Informers.Remove(gvk, obj) + return nil +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock. func (ic *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index 171117698a..a1a442316f 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -73,6 +73,20 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object, opts return c.informerFor(gvk, obj) } +// RemoveInformer implements Informers. +func (c *FakeInformers) RemoveInformer(ctx context.Context, obj client.Object) error { + if c.Scheme == nil { + c.Scheme = scheme.Scheme + } + gvks, _, err := c.Scheme.ObjectKinds(obj) + if err != nil { + return err + } + gvk := gvks[0] + delete(c.InformersByGVK, gvk) + return nil +} + // WaitForCacheSync implements Informers. func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool { if c.Synced == nil { diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 1d2c9ce2b4..b01f566fb6 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/internal/syncs" ) // InformersOpts configures an InformerMap. @@ -86,6 +87,20 @@ type Cache struct { // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader + + // Stop can be used to stop this individual informer. + stop chan struct{} +} + +// Start starts the informer managed by a MapEntry. +// Blocks until the informer stops. The informer can be stopped +// either individually (via the entry's stop channel) or globally +// via the provided stop argument. +func (c *Cache) Start(stop <-chan struct{}) { + // Stop on either the whole map stopping or just this informer being removed. + internalStop, cancel := syncs.MergeChans(stop, c.stop) + defer cancel() + c.Informer.Run(internalStop) } type tracker struct { @@ -173,13 +188,13 @@ func (ip *Informers) Start(ctx context.Context) error { // Start each informer for _, i := range ip.tracker.Structured { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } for _, i := range ip.tracker.Unstructured { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } for _, i := range ip.tracker.Metadata { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } // Set started to true so we immediately start any informers added later. @@ -194,7 +209,7 @@ func (ip *Informers) Start(ctx context.Context) error { return nil } -func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) { +func (ip *Informers) startInformerLocked(cacheEntry *Cache) { // Don't start the informer in case we are already waiting for the items in // the waitGroup to finish, since waitGroups don't support waiting and adding // at the same time. @@ -205,7 +220,7 @@ func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) { ip.waitGroup.Add(1) go func() { defer ip.waitGroup.Done() - informer.Run(ip.ctx.Done()) + cacheEntry.Start(ip.ctx.Done()) }() } @@ -281,6 +296,21 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r return started, i, nil } +// Remove removes an informer entry and stops it if it was running. +func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) { + ip.mu.Lock() + defer ip.mu.Unlock() + + informerMap := ip.informersByType(obj) + + entry, ok := informerMap[gvk] + if !ok { + return + } + close(entry.stop) + delete(informerMap, gvk) +} + func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache { switch obj.(type) { case runtime.Unstructured: @@ -342,13 +372,14 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O scopeName: mapping.Scope.Name(), disableDeepCopy: ip.unsafeDisableDeepCopy, }, + stop: make(chan struct{}), } ip.informersByType(obj)[gvk] = i // Start the informer in case the InformersMap has started, otherwise it will be // started when the InformersMap starts. if ip.started { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } return i, ip.started, nil } diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index 5b20195d77..9d73fc3c2d 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -108,6 +108,27 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil } +func (c *multiNamespaceCache) RemoveInformer(ctx context.Context, obj client.Object) error { + // If the object is clusterscoped, get the informer from clusterCache, + // if not use the namespaced caches. + isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper) + if err != nil { + return err + } + if !isNamespaced { + return c.clusterCache.RemoveInformer(ctx, obj) + } + + for _, cache := range c.namespaceToCache { + err := cache.RemoveInformer(ctx, obj) + if err != nil { + return err + } + } + + return nil +} + func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) { // If the object is cluster scoped, get the informer from clusterCache, // if not use the namespaced caches. @@ -387,3 +408,13 @@ func (i *multiNamespaceInformer) HasSynced() bool { } return true } + +// IsStopped checks if each namespaced informer has stopped, returns false if any are still running. +func (i *multiNamespaceInformer) IsStopped() bool { + for _, informer := range i.namespaceToInformer { + if stopped := informer.IsStopped(); !stopped { + return false + } + } + return true +} diff --git a/pkg/internal/syncs/syncs.go b/pkg/internal/syncs/syncs.go new file mode 100644 index 0000000000..c78a30377a --- /dev/null +++ b/pkg/internal/syncs/syncs.go @@ -0,0 +1,38 @@ +package syncs + +import ( + "context" + "reflect" + "sync" +) + +// MergeChans returns a channel that is closed when any of the input channels are signaled. +// The caller must call the returned CancelFunc to ensure no resources are leaked. +func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) { + var once sync.Once + out := make(chan T) + cancel := make(chan T) + cancelFunc := func() { + once.Do(func() { + close(cancel) + }) + <-out + } + cases := make([]reflect.SelectCase, len(chans)+1) + for i := range chans { + cases[i] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(chans[i]), + } + } + cases[len(cases)-1] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(cancel), + } + go func() { + defer close(out) + _, _, _ = reflect.Select(cases) + }() + + return out, cancelFunc +} diff --git a/pkg/internal/syncs/syncs_test.go b/pkg/internal/syncs/syncs_test.go new file mode 100644 index 0000000000..7bf7d598a0 --- /dev/null +++ b/pkg/internal/syncs/syncs_test.go @@ -0,0 +1,107 @@ +package syncs + +import ( + "testing" + "time" + + // This appears to be needed so that the prow test runner won't fail. + _ "github.com/onsi/ginkgo/v2" + _ "github.com/onsi/gomega" +) + +func TestMergeChans(t *testing.T) { + tests := []struct { + name string + count int + signal int + }{ + { + name: "single channel, close 0", + count: 1, + signal: 0, + }, + { + name: "double channel, close 0", + count: 2, + signal: 0, + }, + { + name: "five channel, close 0", + count: 5, + signal: 0, + }, + { + name: "five channel, close 1", + count: 5, + signal: 1, + }, + { + name: "five channel, close 2", + count: 5, + signal: 2, + }, + { + name: "five channel, close 3", + count: 5, + signal: 3, + }, + { + name: "five channel, close 4", + count: 5, + signal: 4, + }, + { + name: "single channel, cancel", + count: 1, + signal: -1, + }, + { + name: "double channel, cancel", + count: 2, + signal: -1, + }, + { + name: "five channel, cancel", + count: 5, + signal: -1, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if callAndClose(test.count, test.signal, 1) { + t.Error("timeout before merged channel closed") + } + }) + } +} + +func callAndClose(numChans, signalChan, timeoutSeconds int) bool { + chans := make([]chan struct{}, numChans) + readOnlyChans := make([]<-chan struct{}, numChans) + for i := range chans { + chans[i] = make(chan struct{}) + readOnlyChans[i] = chans[i] + } + defer func() { + for i := range chans { + close(chans[i]) + } + }() + + merged, cancel := MergeChans(readOnlyChans...) + defer cancel() + + timer := time.NewTimer(time.Duration(timeoutSeconds) * time.Second) + + if signalChan >= 0 { + chans[signalChan] <- struct{}{} + } else { + cancel() + } + select { + case <-merged: + return false + case <-timer.C: + return true + } +}