Skip to content

Commit

Permalink
Merge pull request #2285 from maxsmythe/dynamic-informer-cache
Browse files Browse the repository at this point in the history
馃尡 Proposal for dynamic informer cache
  • Loading branch information
k8s-ci-robot committed Oct 23, 2023
2 parents 15d7928 + f4dbd14 commit cc2a25b
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 6 deletions.
5 changes: 5 additions & 0 deletions pkg/cache/cache.go
Expand Up @@ -83,6 +83,9 @@ type Informers interface {
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error)

// RemoveInformer removes an informer entry and stops it if it was running.
RemoveInformer(ctx context.Context, obj client.Object) error

// Start runs all the informers known to this cache until the context is closed.
// It blocks.
Start(ctx context.Context) error
Expand Down Expand Up @@ -121,6 +124,8 @@ type Informer interface {

// HasSynced return true if the informers underlying store has synced.
HasSynced() bool
// IsStopped returns true if the informer has been stopped.
IsStopped() bool
}

// AllNamespaces should be used as the map key to deliminate namespace settings
Expand Down
78 changes: 78 additions & 0 deletions pkg/cache/cache_test.go
Expand Up @@ -1887,6 +1887,42 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
By("verifying the object is received on the channel")
Eventually(out).Should(Receive(Equal(pod)))
})
It("should be able to stop and restart informers", func() {
By("getting a shared index informer for a pod")
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "informer-obj",
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
}
sii, err := informerCache.GetInformer(context.TODO(), pod)
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeTrue())

By("removing the existing informer")
Expect(informerCache.RemoveInformer(context.TODO(), pod)).To(Succeed())
Eventually(sii.IsStopped).WithTimeout(5 * time.Second).Should(BeTrue())

By("recreating the informer")

sii2, err := informerCache.GetInformer(context.TODO(), pod)
Expect(err).NotTo(HaveOccurred())
Expect(sii2).NotTo(BeNil())
Expect(sii2.HasSynced()).To(BeTrue())

By("validating the two informers are in different states")
Expect(sii.IsStopped()).To(BeTrue())
Expect(sii2.IsStopped()).To(BeFalse())
})
It("should be able to get an informer by group/version/kind", func() {
By("getting an shared index informer for gvk = core/v1/pod")
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
Expand Down Expand Up @@ -2116,6 +2152,48 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Eventually(out).Should(Receive(Equal(pod)))
})

It("should be able to stop and restart informers", func() {
By("getting a shared index informer for a pod")
pod := &unstructured.Unstructured{
Object: map[string]interface{}{
"spec": map[string]interface{}{
"containers": []map[string]interface{}{
{
"name": "nginx",
"image": "nginx",
},
},
},
},
}
pod.SetName("informer-obj2")
pod.SetNamespace("default")
pod.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Pod",
})
sii, err := informerCache.GetInformer(context.TODO(), pod)
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeTrue())

By("removing the existing informer")
Expect(informerCache.RemoveInformer(context.TODO(), pod)).To(Succeed())
Eventually(sii.IsStopped).WithTimeout(5 * time.Second).Should(BeTrue())

By("recreating the informer")

sii2, err := informerCache.GetInformer(context.TODO(), pod)
Expect(err).NotTo(HaveOccurred())
Expect(sii2).NotTo(BeNil())
Expect(sii2.HasSynced()).To(BeTrue())

By("validating the two informers are in different states")
Expect(sii.IsStopped()).To(BeTrue())
Expect(sii2.IsStopped()).To(BeFalse())
})

It("should be able to index an object field then retrieve objects by that field", func() {
By("creating the cache")
informer, err := cache.New(cfg, cache.Options{})
Expand Down
8 changes: 8 additions & 0 deletions pkg/cache/delegating_by_gvk_cache.go
Expand Up @@ -52,6 +52,14 @@ func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectLis
return cache.List(ctx, list, opts...)
}

func (dbt *delegatingByGVKCache) RemoveInformer(ctx context.Context, obj client.Object) error {
cache, err := dbt.cacheForObject(obj)
if err != nil {
return err
}
return cache.RemoveInformer(ctx, obj)
}

func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
cache, err := dbt.cacheForObject(obj)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions pkg/cache/informer_cache.go
Expand Up @@ -190,6 +190,17 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou
return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{})
}

// RemoveInformer deactivates and removes the informer from the cache.
func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return err
}

ic.Informers.Remove(gvk, obj)
return nil
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
// to indicate that this can be started without requiring the leader lock.
func (ic *informerCache) NeedLeaderElection() bool {
Expand Down
14 changes: 14 additions & 0 deletions pkg/cache/informertest/fake_cache.go
Expand Up @@ -73,6 +73,20 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object, opts
return c.informerFor(gvk, obj)
}

// RemoveInformer implements Informers.
func (c *FakeInformers) RemoveInformer(ctx context.Context, obj client.Object) error {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
gvks, _, err := c.Scheme.ObjectKinds(obj)
if err != nil {
return err
}
gvk := gvks[0]
delete(c.InformersByGVK, gvk)
return nil
}

// WaitForCacheSync implements Informers.
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
if c.Synced == nil {
Expand Down
43 changes: 37 additions & 6 deletions pkg/cache/internal/informers.go
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/internal/syncs"
)

// InformersOpts configures an InformerMap.
Expand Down Expand Up @@ -88,6 +89,20 @@ type Cache struct {

// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader

// Stop can be used to stop this individual informer.
stop chan struct{}
}

// Start starts the informer managed by a MapEntry.
// Blocks until the informer stops. The informer can be stopped
// either individually (via the entry's stop channel) or globally
// via the provided stop argument.
func (c *Cache) Start(stop <-chan struct{}) {
// Stop on either the whole map stopping or just this informer being removed.
internalStop, cancel := syncs.MergeChans(stop, c.stop)
defer cancel()
c.Informer.Run(internalStop)
}

type tracker struct {
Expand Down Expand Up @@ -180,13 +195,13 @@ func (ip *Informers) Start(ctx context.Context) error {

// Start each informer
for _, i := range ip.tracker.Structured {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
for _, i := range ip.tracker.Unstructured {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
for _, i := range ip.tracker.Metadata {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}

// Set started to true so we immediately start any informers added later.
Expand All @@ -201,7 +216,7 @@ func (ip *Informers) Start(ctx context.Context) error {
return nil
}

func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
func (ip *Informers) startInformerLocked(cacheEntry *Cache) {
// Don't start the informer in case we are already waiting for the items in
// the waitGroup to finish, since waitGroups don't support waiting and adding
// at the same time.
Expand All @@ -212,7 +227,7 @@ func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
ip.waitGroup.Add(1)
go func() {
defer ip.waitGroup.Done()
informer.Run(ip.ctx.Done())
cacheEntry.Start(ip.ctx.Done())
}()
}

Expand Down Expand Up @@ -288,6 +303,21 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
return started, i, nil
}

// Remove removes an informer entry and stops it if it was running.
func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
ip.mu.Lock()
defer ip.mu.Unlock()

informerMap := ip.informersByType(obj)

entry, ok := informerMap[gvk]
if !ok {
return
}
close(entry.stop)
delete(informerMap, gvk)
}

func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache {
switch obj.(type) {
case runtime.Unstructured:
Expand Down Expand Up @@ -356,13 +386,14 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
scopeName: mapping.Scope.Name(),
disableDeepCopy: ip.unsafeDisableDeepCopy,
},
stop: make(chan struct{}),
}
ip.informersByType(obj)[gvk] = i

// Start the informer in case the InformersMap has started, otherwise it will be
// started when the InformersMap starts.
if ip.started {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
return i, ip.started, nil
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/cache/multi_namespace_cache.go
Expand Up @@ -109,6 +109,27 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
}

func (c *multiNamespaceCache) RemoveInformer(ctx context.Context, obj client.Object) error {
// If the object is clusterscoped, get the informer from clusterCache,
// if not use the namespaced caches.
isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
if err != nil {
return err
}
if !isNamespaced {
return c.clusterCache.RemoveInformer(ctx, obj)
}

for _, cache := range c.namespaceToCache {
err := cache.RemoveInformer(ctx, obj)
if err != nil {
return err
}
}

return nil
}

func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
// If the object is cluster scoped, get the informer from clusterCache,
// if not use the namespaced caches.
Expand Down Expand Up @@ -391,3 +412,13 @@ func (i *multiNamespaceInformer) HasSynced() bool {
}
return true
}

// IsStopped checks if each namespaced informer has stopped, returns false if any are still running.
func (i *multiNamespaceInformer) IsStopped() bool {
for _, informer := range i.namespaceToInformer {
if stopped := informer.IsStopped(); !stopped {
return false
}
}
return true
}
38 changes: 38 additions & 0 deletions pkg/internal/syncs/syncs.go
@@ -0,0 +1,38 @@
package syncs

import (
"context"
"reflect"
"sync"
)

// MergeChans returns a channel that is closed when any of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) {
var once sync.Once
out := make(chan T)
cancel := make(chan T)
cancelFunc := func() {
once.Do(func() {
close(cancel)
})
<-out
}
cases := make([]reflect.SelectCase, len(chans)+1)
for i := range chans {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(chans[i]),
}
}
cases[len(cases)-1] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(cancel),
}
go func() {
defer close(out)
_, _, _ = reflect.Select(cases)
}()

return out, cancelFunc
}

0 comments on commit cc2a25b

Please sign in to comment.