Skip to content

Commit

Permalink
Refactor functional parameters
Browse files Browse the repository at this point in the history
Signed-off-by: Max Smythe <smythe@google.com>
  • Loading branch information
maxsmythe committed May 13, 2023
1 parent f0339e3 commit 55353dc
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 36 deletions.
8 changes: 6 additions & 2 deletions pkg/cache/cache.go
Expand Up @@ -41,6 +41,10 @@ import (
var (
log = logf.RuntimeLog.WithName("object-cache")
defaultSyncPeriod = 10 * time.Hour

// BlockUntilSynced determines whether a get request for an informer should block
// until the informer's cache has synced.
BlockUntilSynced = internal.BlockUntilSynced
)

// Cache knows how to load Kubernetes objects, fetch informers to request
Expand All @@ -60,11 +64,11 @@ type Cache interface {
type Informers interface {
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
// API kind and resource.
GetInformer(ctx context.Context, obj client.Object) (Informer, error)
GetInformer(ctx context.Context, obj client.Object, opts ...internal.InformerGetOption) (Informer, error)

// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...internal.InformerGetOption) (Informer, error)

// Start runs all the informers known to this cache until the context is closed.
// It blocks.
Expand Down
33 changes: 6 additions & 27 deletions pkg/cache/informer_cache.go
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -39,16 +38,6 @@ var (
_ Cache = &informerCache{}
)

type InformerGetOption func(*internal.GetOptions)

// BlockUntilSynced determines whether a get request for an informer should block
// until the informer's cache has synced.
func BlockUntilSynced(shouldBlock bool) InformerGetOption {
return func(opts *internal.GetOptions) {
opts.DoNotBlockUntilSynced = !shouldBlock
}
}

// ErrCacheNotStarted is returned when trying to read from the cache that wasn't started.
type ErrCacheNotStarted struct{}

Expand All @@ -70,7 +59,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
return err
}

started, cache, err := ic.Informers.Get(ctx, gvk, out, &internal.GetOptions{})
started, cache, err := ic.Informers.Get(ctx, gvk, out)
if err != nil {
return err
}
Expand All @@ -88,7 +77,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts .
return err
}

started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj, &internal.GetOptions{})
started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj)
if err != nil {
return err
}
Expand Down Expand Up @@ -135,38 +124,28 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem
}

// GetInformerForKind returns the informer for the GroupVersionKind.
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...internal.InformerGetOption) (Informer, error) {
// Map the gvk to an object
obj, err := ic.scheme.New(gvk)
if err != nil {
return nil, err
}

cfg := &internal.GetOptions{}
for _, opt := range opts {
opt(cfg)
}

_, i, err := ic.Informers.Get(ctx, gvk, obj, cfg)
_, i, err := ic.Informers.Get(ctx, gvk, obj, opts...)
if err != nil {
return nil, err
}
return i.Informer, err
}

// GetInformer returns the informer for the obj.
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...internal.InformerGetOption) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return nil, err
}

cfg := &internal.GetOptions{}
for _, opt := range opts {
opt(cfg)
}

_, i, err := ic.Informers.Get(ctx, gvk, obj, cfg)
_, i, err := ic.Informers.Get(ctx, gvk, obj, opts...)
if err != nil {
return nil, err
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/cache/internal/informers.go
Expand Up @@ -115,6 +115,16 @@ type GetOptions struct {
DoNotBlockUntilSynced bool
}

type InformerGetOption func(*GetOptions)

// BlockUntilSynced determines whether a get request for an informer should block
// until the informer's cache has synced.
func BlockUntilSynced(shouldBlock bool) InformerGetOption {
return func(opts *GetOptions) {
opts.DoNotBlockUntilSynced = !shouldBlock
}
}

// Informers create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
// It uses a standard parameter codec constructed based on the given generated Scheme.
type Informers struct {
Expand Down Expand Up @@ -303,7 +313,7 @@ func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts *GetOptions) (bool, *Cache, error) {
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts ...InformerGetOption) (bool, *Cache, error) {
// Return the informer if it is found
i, started, ok := ip.get(gvk, obj)
if !ok {
Expand All @@ -313,7 +323,12 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
}
}

if started && !i.Informer.HasSynced() && !opts.DoNotBlockUntilSynced {
cfg := &GetOptions{}
for _, opt := range opts {
opt(cfg)
}

if started && !i.Informer.HasSynced() && !cfg.DoNotBlockUntilSynced {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
Expand Down
11 changes: 6 additions & 5 deletions pkg/cache/multi_namespace_cache.go
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
Expand Down Expand Up @@ -93,7 +94,7 @@ type multiNamespaceCache struct {
var _ Cache = &multiNamespaceCache{}

// Methods for multiNamespaceCache to conform to the Informers interface.
func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object, opts ...internal.InformerGetOption) (Informer, error) {
informers := map[string]Informer{}

// If the object is clusterscoped, get the informer from clusterCache,
Expand All @@ -103,7 +104,7 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
return nil, err
}
if !isNamespaced {
clusterCacheInf, err := c.clusterCache.GetInformer(ctx, obj)
clusterCacheInf, err := c.clusterCache.GetInformer(ctx, obj, opts...)
if err != nil {
return nil, err
}
Expand All @@ -123,7 +124,7 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
}

func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...internal.InformerGetOption) (Informer, error) {
informers := map[string]Informer{}

// If the object is clusterscoped, get the informer from clusterCache,
Expand All @@ -133,7 +134,7 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema
return nil, err
}
if !isNamespaced {
clusterCacheInf, err := c.clusterCache.GetInformerForKind(ctx, gvk)
clusterCacheInf, err := c.clusterCache.GetInformerForKind(ctx, gvk, opts...)
if err != nil {
return nil, err
}
Expand All @@ -143,7 +144,7 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema
}

for ns, cache := range c.namespaceToCache {
informer, err := cache.GetInformerForKind(ctx, gvk)
informer, err := cache.GetInformerForKind(ctx, gvk, opts...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 55353dc

Please sign in to comment.