Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃尡 Proposal for dynamic informer cache #2285

Merged
merged 2 commits into from Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/cache/cache.go
Expand Up @@ -81,6 +81,9 @@ type Informers interface {
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error)

// RemoveInformer removes an informer entry and stops it if it was running.
RemoveInformer(ctx context.Context, obj client.Object) error

// Start runs all the informers known to this cache until the context is closed.
// It blocks.
Start(ctx context.Context) error
Expand Down Expand Up @@ -119,6 +122,8 @@ type Informer interface {

// HasSynced return true if the informers underlying store has synced.
HasSynced() bool
// IsStopped returns true if the informer has been stopped.
IsStopped() bool
}

// Options are the optional arguments for creating a new Cache object.
Expand Down
78 changes: 78 additions & 0 deletions pkg/cache/cache_test.go
Expand Up @@ -1758,6 +1758,42 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
By("verifying the object is received on the channel")
Eventually(out).Should(Receive(Equal(pod)))
})
It("should be able to stop and restart informers", func() {
By("getting a shared index informer for a pod")
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "informer-obj",
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
}
sii, err := informerCache.GetInformer(context.TODO(), pod)
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeTrue())

By("removing the existing informer")
Expect(informerCache.RemoveInformer(context.TODO(), pod)).To(Succeed())
Eventually(sii.IsStopped).WithTimeout(5 * time.Second).Should(BeTrue())

By("recreating the informer")

sii2, err := informerCache.GetInformer(context.TODO(), pod)
Expect(err).NotTo(HaveOccurred())
Expect(sii2).NotTo(BeNil())
Expect(sii2.HasSynced()).To(BeTrue())

By("validating the two informers are in different states")
Expect(sii.IsStopped()).To(BeTrue())
Expect(sii2.IsStopped()).To(BeFalse())
})
It("should be able to get an informer by group/version/kind", func() {
By("getting an shared index informer for gvk = core/v1/pod")
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
Expand Down Expand Up @@ -1942,6 +1978,48 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Eventually(out).Should(Receive(Equal(pod)))
})

It("should be able to stop and restart informers", func() {
By("getting a shared index informer for a pod")
pod := &unstructured.Unstructured{
Object: map[string]interface{}{
"spec": map[string]interface{}{
"containers": []map[string]interface{}{
{
"name": "nginx",
"image": "nginx",
},
},
},
},
}
pod.SetName("informer-obj2")
pod.SetNamespace("default")
pod.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Pod",
})
sii, err := informerCache.GetInformer(context.TODO(), pod)
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeTrue())

By("removing the existing informer")
Expect(informerCache.RemoveInformer(context.TODO(), pod)).To(Succeed())
Eventually(sii.IsStopped).WithTimeout(5 * time.Second).Should(BeTrue())

By("recreating the informer")

sii2, err := informerCache.GetInformer(context.TODO(), pod)
Expect(err).NotTo(HaveOccurred())
Expect(sii2).NotTo(BeNil())
Expect(sii2.HasSynced()).To(BeTrue())

By("validating the two informers are in different states")
Expect(sii.IsStopped()).To(BeTrue())
Expect(sii2.IsStopped()).To(BeFalse())
})

It("should be able to index an object field then retrieve objects by that field", func() {
By("creating the cache")
informer, err := cache.New(cfg, cache.Options{})
Expand Down
8 changes: 8 additions & 0 deletions pkg/cache/delegating_by_gvk_cache.go
Expand Up @@ -52,6 +52,14 @@ func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectLis
return cache.List(ctx, list, opts...)
}

func (dbt *delegatingByGVKCache) RemoveInformer(ctx context.Context, obj client.Object) error {
cache, err := dbt.cacheForObject(obj)
if err != nil {
return err
}
return cache.RemoveInformer(ctx, obj)
}

func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
cache, err := dbt.cacheForObject(obj)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions pkg/cache/informer_cache.go
Expand Up @@ -190,6 +190,17 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou
return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{})
}

// RemoveInformer deactivates and removes the informer from the cache.
func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return err
}

ic.Informers.Remove(gvk, obj)
return nil
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
// to indicate that this can be started without requiring the leader lock.
func (ic *informerCache) NeedLeaderElection() bool {
Expand Down
14 changes: 14 additions & 0 deletions pkg/cache/informertest/fake_cache.go
Expand Up @@ -73,6 +73,20 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object, opts
return c.informerFor(gvk, obj)
}

// RemoveInformer implements Informers.
func (c *FakeInformers) RemoveInformer(ctx context.Context, obj client.Object) error {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
gvks, _, err := c.Scheme.ObjectKinds(obj)
if err != nil {
return err
}
gvk := gvks[0]
delete(c.InformersByGVK, gvk)
return nil
}

// WaitForCacheSync implements Informers.
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
if c.Synced == nil {
Expand Down
43 changes: 37 additions & 6 deletions pkg/cache/internal/informers.go
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
maxsmythe marked this conversation as resolved.
Show resolved Hide resolved
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/internal/syncs"
)

// InformersOpts configures an InformerMap.
Expand Down Expand Up @@ -86,6 +87,20 @@ type Cache struct {

// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader

// Stop can be used to stop this individual informer.
stop chan struct{}
}

// Start starts the informer managed by a MapEntry.
// Blocks until the informer stops. The informer can be stopped
// either individually (via the entry's stop channel) or globally
// via the provided stop argument.
func (c *Cache) Start(stop <-chan struct{}) {
// Stop on either the whole map stopping or just this informer being removed.
internalStop, cancel := syncs.MergeChans(stop, c.stop)
defer cancel()
c.Informer.Run(internalStop)
}

type tracker struct {
Expand Down Expand Up @@ -173,13 +188,13 @@ func (ip *Informers) Start(ctx context.Context) error {

// Start each informer
for _, i := range ip.tracker.Structured {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
for _, i := range ip.tracker.Unstructured {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
for _, i := range ip.tracker.Metadata {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}

// Set started to true so we immediately start any informers added later.
Expand All @@ -194,7 +209,7 @@ func (ip *Informers) Start(ctx context.Context) error {
return nil
}

func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
func (ip *Informers) startInformerLocked(cacheEntry *Cache) {
// Don't start the informer in case we are already waiting for the items in
// the waitGroup to finish, since waitGroups don't support waiting and adding
// at the same time.
Expand All @@ -205,7 +220,7 @@ func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
ip.waitGroup.Add(1)
go func() {
defer ip.waitGroup.Done()
informer.Run(ip.ctx.Done())
cacheEntry.Start(ip.ctx.Done())
}()
}

Expand Down Expand Up @@ -281,6 +296,21 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
return started, i, nil
}

// Remove removes an informer entry and stops it if it was running.
func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
vincepri marked this conversation as resolved.
Show resolved Hide resolved
ip.mu.Lock()
defer ip.mu.Unlock()

informerMap := ip.informersByType(obj)

entry, ok := informerMap[gvk]
if !ok {
return
}
close(entry.stop)
delete(informerMap, gvk)
}

func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache {
switch obj.(type) {
case runtime.Unstructured:
Expand Down Expand Up @@ -342,13 +372,14 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
scopeName: mapping.Scope.Name(),
disableDeepCopy: ip.unsafeDisableDeepCopy,
},
stop: make(chan struct{}),
}
ip.informersByType(obj)[gvk] = i

// Start the informer in case the InformersMap has started, otherwise it will be
// started when the InformersMap starts.
if ip.started {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
return i, ip.started, nil
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/cache/multi_namespace_cache.go
Expand Up @@ -108,6 +108,27 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
}

func (c *multiNamespaceCache) RemoveInformer(ctx context.Context, obj client.Object) error {
// If the object is clusterscoped, get the informer from clusterCache,
// if not use the namespaced caches.
isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
if err != nil {
return err
}
if !isNamespaced {
return c.clusterCache.RemoveInformer(ctx, obj)
}

for _, cache := range c.namespaceToCache {
err := cache.RemoveInformer(ctx, obj)
if err != nil {
return err
}
}

return nil
}

func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
// If the object is cluster scoped, get the informer from clusterCache,
// if not use the namespaced caches.
Expand Down Expand Up @@ -387,3 +408,13 @@ func (i *multiNamespaceInformer) HasSynced() bool {
}
return true
}

// IsStopped checks if each namespaced informer has stopped, returns false if any are still running.
func (i *multiNamespaceInformer) IsStopped() bool {
for _, informer := range i.namespaceToInformer {
if stopped := informer.IsStopped(); !stopped {
return false
}
}
return true
}
38 changes: 38 additions & 0 deletions pkg/internal/syncs/syncs.go
@@ -0,0 +1,38 @@
package syncs

import (
"context"
"reflect"
"sync"
)

// MergeChans returns a channel that is closed when any of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) {
var once sync.Once
out := make(chan T)
cancel := make(chan T)
cancelFunc := func() {
once.Do(func() {
close(cancel)
maxsmythe marked this conversation as resolved.
Show resolved Hide resolved
})
<-out
}
cases := make([]reflect.SelectCase, len(chans)+1)
for i := range chans {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(chans[i]),
}
}
cases[len(cases)-1] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(cancel),
}
go func() {
defer close(out)
_, _, _ = reflect.Select(cases)
}()

return out, cancelFunc
}