Skip to content

Commit

Permalink
Move mergeChan; add functional opts
Browse files Browse the repository at this point in the history
Signed-off-by: Max Smythe <smythe@google.com>
  • Loading branch information
maxsmythe committed May 13, 2023
1 parent 3792912 commit f0339e3
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 79 deletions.
48 changes: 25 additions & 23 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ var (
_ Cache = &informerCache{}
)

type InformerGetOption func(*internal.GetOptions)

// BlockUntilSynced determines whether a get request for an informer should block
// until the informer's cache has synced.
func BlockUntilSynced(shouldBlock bool) InformerGetOption {
return func(opts *internal.GetOptions) {
opts.DoNotBlockUntilSynced = !shouldBlock
}
}

// ErrCacheNotStarted is returned when trying to read from the cache that wasn't started.
type ErrCacheNotStarted struct{}

Expand All @@ -60,7 +70,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
return err
}

started, cache, err := ic.Informers.Get(ctx, gvk, out)
started, cache, err := ic.Informers.Get(ctx, gvk, out, &internal.GetOptions{})
if err != nil {
return err
}
Expand All @@ -78,7 +88,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts .
return err
}

started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj)
started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj, &internal.GetOptions{})
if err != nil {
return err
}
Expand Down Expand Up @@ -125,50 +135,42 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem
}

// GetInformerForKind returns the informer for the GroupVersionKind.
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
// Map the gvk to an object
obj, err := ic.scheme.New(gvk)
if err != nil {
return nil, err
}

_, i, err := ic.Informers.Get(ctx, gvk, obj)
cfg := &internal.GetOptions{}
for _, opt := range opts {
opt(cfg)
}

_, i, err := ic.Informers.Get(ctx, gvk, obj, cfg)
if err != nil {
return nil, err
}
return i.Informer, err
}

// GetInformer returns the informer for the obj.
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return nil, err
}

_, i, err := ic.Informers.Get(ctx, gvk, obj)
if err != nil {
return nil, err
cfg := &internal.GetOptions{}
for _, opt := range opts {
opt(cfg)
}
return i.Informer, err
}

// GetInformerNonBlocking returns the informer for the obj without waiting for its cache to sync.
func (ic *informerCache) GetInformerNonBlocking(ctx context.Context, obj client.Object) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
_, i, err := ic.Informers.Get(ctx, gvk, obj, cfg)
if err != nil {
return nil, err
}

// Use a canceled context to signal non-blocking
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()

_, i, err := ic.Informers.Get(canceledCtx, gvk, obj)
if err != nil && !apierrors.IsTimeout(err) {
return nil, err
}
return i.Informer, nil
return i.Informer, err
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
Expand Down
70 changes: 14 additions & 56 deletions pkg/cache/internal/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/internal/syncs"
)

// InformersOpts configures an InformerMap.
Expand Down Expand Up @@ -95,7 +96,7 @@ type Cache struct {
// via the provided stop argument.
func (c *Cache) Start(stop <-chan struct{}) {
// Stop on either the whole map stopping or just this informer being removed.
internalStop, cancel := eitherChan(stop, c.stop)
internalStop, cancel := syncs.MergeChans(stop, c.stop)
defer cancel()
c.Informer.Run(internalStop)
}
Expand All @@ -106,6 +107,14 @@ type tracker struct {
Metadata map[schema.GroupVersionKind]*Cache
}

// GetOptions provides configuration to customize the behavior when
// getting an informer.
type GetOptions struct {
// DoNotBlockUntilSynced tells Get() to return the informer immediately,
// without waiting for its cache to sync.
DoNotBlockUntilSynced bool
}

// Informers create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
// It uses a standard parameter codec constructed based on the given generated Scheme.
type Informers struct {
Expand Down Expand Up @@ -294,7 +303,7 @@ func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) {
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts *GetOptions) (bool, *Cache, error) {
// Return the informer if it is found
i, started, ok := ip.get(gvk, obj)
if !ok {
Expand All @@ -304,14 +313,10 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
}
}

if started && !i.Informer.HasSynced() {
// Cancel for context, informer stopping, or entire map stopping.
syncStop, cancel := mergeChan(ctx.Done(), i.stop, ip.ctx.Done())
defer cancel()

if started && !i.Informer.HasSynced() && !opts.DoNotBlockUntilSynced {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(syncStop, i.Informer.HasSynced) {
return started, i, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
}
}

Expand Down Expand Up @@ -592,50 +597,3 @@ func restrictNamespaceBySelector(namespaceOpt string, s Selector) string {
}
return ""
}

// eitherChan returns a channel that is closed when either of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func eitherChan(a, b <-chan struct{}) (<-chan struct{}, context.CancelFunc) {
var once sync.Once
out := make(chan struct{})
cancel := make(chan struct{})
cancelFunc := func() {
once.Do(func() {
close(cancel)
})
}
go func() {
defer close(out)
select {
case <-a:
case <-b:
case <-cancel:
}
}()

return out, cancelFunc
}

// mergeChan returns a channel that is closed when any of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func mergeChan(a, b, c <-chan struct{}) (<-chan struct{}, context.CancelFunc) {
var once sync.Once
out := make(chan struct{})
cancel := make(chan struct{})
cancelFunc := func() {
once.Do(func() {
close(cancel)
})
}
go func() {
defer close(out)
select {
case <-a:
case <-b:
case <-c:
case <-cancel:
}
}()

return out, cancelFunc
}
37 changes: 37 additions & 0 deletions pkg/internal/syncs/syncs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package syncs

import (
"context"
"reflect"
"sync"
)

// MergeChans returns a channel that is closed when any of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) {
var once sync.Once
out := make(chan T)
cancel := make(chan T)
cancelFunc := func() {
once.Do(func() {
close(cancel)
})
}
cases := make([]reflect.SelectCase, len(chans)+1)
for i := range chans {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(chans[i]),
}
}
cases[len(cases)-1] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(cancel),
}
go func() {
defer close(out)
_, _, _ = reflect.Select(cases)
}()

return out, cancelFunc
}
103 changes: 103 additions & 0 deletions pkg/internal/syncs/syncs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package syncs

import (
"testing"
"time"
)

func TestMergeChans(t *testing.T) {
tests := []struct {
name string
count int
signal int
}{
{
name: "single channel",
count: 1,
signal: 0,
},
{
name: "double channel",
count: 2,
signal: 0,
},
{
name: "five channel, close 0",
count: 5,
signal: 0,
},
{
name: "five channel, close 1",
count: 5,
signal: 1,
},
{
name: "five channel, close 2",
count: 5,
signal: 2,
},
{
name: "five channel, close 3",
count: 5,
signal: 3,
},
{
name: "five channel, close 4",
count: 5,
signal: 4,
},
{
name: "single channel, cancel",
count: 1,
signal: -1,
},
{
name: "double channel, cancel",
count: 2,
signal: -1,
},
{
name: "five channel, cancel",
count: 5,
signal: -1,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if callAndClose(test.count, test.signal, 1) {
t.Error("timeout before merged channel closed")
}
})
}
}

func callAndClose(numChans, signalChan, timeoutSeconds int) bool {
chans := make([]chan struct{}, numChans)
readOnlyChans := make([]<-chan struct{}, numChans)
for i := range chans {
chans[i] = make(chan struct{})
readOnlyChans[i] = chans[i]
}
defer func() {
for i := range chans {
close(chans[i])
}
}()

merged, cancel := MergeChans(readOnlyChans...)
defer cancel()

timer := time.NewTimer(time.Duration(timeoutSeconds) * time.Second)

if signalChan >= 0 {
chans[signalChan] <- struct{}{}
} else {
cancel()
}
select {
case <-merged:
return false
case <-timer.C:
return true
}
}

0 comments on commit f0339e3

Please sign in to comment.