Skip to content

Commit

Permalink
Move realtime composition event handlers into their own file
Browse files Browse the repository at this point in the history
Signed-off-by: Nic Cope <nicc@rk0n.org>
  • Loading branch information
negz committed May 8, 2024
1 parent be9f41d commit a9c3a2f
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 289 deletions.
48 changes: 0 additions & 48 deletions internal/controller/apiextensions/composite/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ import (

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kunstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
runtimeevent "sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -731,47 +727,3 @@ func getComposerResourcesNames(cds []ComposedResource) []string {
}
return names
}

// EnqueueForCompositionRevisionFunc returns a function that enqueues (the
// related) XRs when a new CompositionRevision is created. This speeds up
// reconciliation of XRs on changes to the Composition by not having to wait for
// the 60s sync period, but be instant.
func EnqueueForCompositionRevisionFunc(of resource.CompositeKind, list func(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error, log logging.Logger) func(ctx context.Context, createEvent runtimeevent.TypedCreateEvent[*v1.CompositionRevision], q workqueue.RateLimitingInterface) {
return func(ctx context.Context, createEvent runtimeevent.TypedCreateEvent[*v1.CompositionRevision], q workqueue.RateLimitingInterface) {
rev := createEvent.Object

// get all XRs
xrs := kunstructured.UnstructuredList{}
xrs.SetGroupVersionKind(schema.GroupVersionKind(of))
xrs.SetKind(schema.GroupVersionKind(of).Kind + "List")
if err := list(ctx, &xrs); err != nil {
// logging is most we can do here. This is a programming error if it happens.
log.Info("cannot list in CompositionRevision handler", "type", schema.GroupVersionKind(of).String(), "error", err)
return
}

// enqueue all those that reference the Composition of this revision
compName := rev.Labels[v1.LabelCompositionName]
if compName == "" {
return
}
for _, u := range xrs.Items {
xr := composite.Unstructured{Unstructured: u}

// only automatic
if pol := xr.GetCompositionUpdatePolicy(); pol != nil && *pol == xpv1.UpdateManual {
continue
}

// only those that reference the right Composition
if ref := xr.GetCompositionReference(); ref == nil || ref.Name != compName {
continue
}

q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: xr.GetName(),
Namespace: xr.GetNamespace(),
}})
}
}
}
217 changes: 0 additions & 217 deletions internal/controller/apiextensions/composite/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package composite

import (
"context"
"reflect"
"testing"
"time"

Expand All @@ -27,19 +26,14 @@ import (
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kunstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
runtimeevent "sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/errors"
"github.com/crossplane/crossplane-runtime/pkg/event"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/crossplane/crossplane-runtime/pkg/resource"
Expand Down Expand Up @@ -843,214 +837,3 @@ func TestFilterToXRPatches(t *testing.T) {
})
}
}

func TestEnqueueForCompositionRevisionFunc(t *testing.T) {
type args struct {
of schema.GroupVersionKind
list func(_ context.Context, list client.ObjectList, opts ...client.ListOption) error
event runtimeevent.TypedCreateEvent[*v1.CompositionRevision]
}
type want struct {
added []interface{}
}

dog := schema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "Dog"}
dogList := dog.GroupVersion().WithKind("DogList")

tests := []struct {
name string
args args
want want
}{
{
name: "empty",
args: args{
of: dog,
list: func(_ context.Context, list client.ObjectList, opts ...client.ListOption) error {
// test parameters only here, not in the later tests for brevity.
u, ok := list.(*kunstructured.UnstructuredList)
if !ok {
t.Errorf("list was not an UnstructuredList")
} else if got := u.GroupVersionKind(); got != dogList {
t.Errorf("list was not a DogList, got: %s", got)
}
if len(opts) != 0 {
t.Errorf("unexpected list options: %#v", opts)
}
return nil
},
event: runtimeevent.TypedCreateEvent[*v1.CompositionRevision]{
Object: &v1.CompositionRevision{
ObjectMeta: metav1.ObjectMeta{
Name: "dachshund-sadfa8",
Labels: map[string]string{
v1.LabelCompositionName: "dachshund",
},
},
},
},
},
},
{
name: "automatic management policy",
args: args{
of: dog,
list: func(_ context.Context, list client.ObjectList, _ ...client.ListOption) error {
var obj1 composite.Unstructured
obj1.SetNamespace("ns")
obj1.SetName("obj1")
policy := xpv1.UpdateAutomatic
obj1.SetCompositionUpdatePolicy(&policy)
obj1.SetCompositionReference(&corev1.ObjectReference{Name: "dachshund"})

list.(*kunstructured.UnstructuredList).Items = []kunstructured.Unstructured{obj1.Unstructured}

return nil
},
event: runtimeevent.TypedCreateEvent[*v1.CompositionRevision]{
Object: &v1.CompositionRevision{
ObjectMeta: metav1.ObjectMeta{
Name: "dachshund-sadfa8",
Labels: map[string]string{
v1.LabelCompositionName: "dachshund",
},
},
},
},
},
want: want{
added: []interface{}{reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: "ns",
Name: "obj1",
}}},
},
},
{
name: "manual management policy",
args: args{
of: dog,
list: func(_ context.Context, list client.ObjectList, _ ...client.ListOption) error {
var obj1 composite.Unstructured
obj1.SetNamespace("ns")
obj1.SetName("obj1")
policy := xpv1.UpdateManual
obj1.SetCompositionUpdatePolicy(&policy)
obj1.SetCompositionReference(&corev1.ObjectReference{Name: "dachshund"})

list.(*kunstructured.UnstructuredList).Items = []kunstructured.Unstructured{obj1.Unstructured}

return nil
},
event: runtimeevent.TypedCreateEvent[*v1.CompositionRevision]{
Object: &v1.CompositionRevision{
ObjectMeta: metav1.ObjectMeta{
Name: "dachshund-sadfa8",
Labels: map[string]string{
v1.LabelCompositionName: "dachshund",
},
},
},
},
},
want: want{},
},
{
name: "other composition",
args: args{
of: dog,
list: func(_ context.Context, list client.ObjectList, _ ...client.ListOption) error {
var obj1 composite.Unstructured
obj1.SetNamespace("ns")
obj1.SetName("obj1")
policy := xpv1.UpdateAutomatic
obj1.SetCompositionUpdatePolicy(&policy)
obj1.SetCompositionReference(&corev1.ObjectReference{Name: "bernese"})

list.(*kunstructured.UnstructuredList).Items = []kunstructured.Unstructured{obj1.Unstructured}

return nil
},
event: runtimeevent.TypedCreateEvent[*v1.CompositionRevision]{
Object: &v1.CompositionRevision{
ObjectMeta: metav1.ObjectMeta{
Name: "dachshund-sadfa8",
Labels: map[string]string{
v1.LabelCompositionName: "dachshund",
},
},
},
},
},
want: want{},
},
{
name: "multiple",
args: args{
of: dog,
list: func(_ context.Context, list client.ObjectList, _ ...client.ListOption) error {
var obj1 composite.Unstructured
obj1.SetNamespace("ns")
obj1.SetName("obj1")
automatic := xpv1.UpdateAutomatic
obj1.SetCompositionUpdatePolicy(&automatic)
obj1.SetCompositionReference(&corev1.ObjectReference{Name: "dachshund"})

obj2 := obj1.DeepCopy()
obj2.SetName("obj2")

obj3 := obj1.DeepCopy()
obj3.SetName("obj3")
obj3.SetCompositionReference(&corev1.ObjectReference{Name: "bernese"})

obj4 := obj1.DeepCopy()
obj4.SetName("obj4")
manual := xpv1.UpdateManual
obj4.SetCompositionUpdatePolicy(&manual)

list.(*kunstructured.UnstructuredList).Items = []kunstructured.Unstructured{
obj1.Unstructured,
obj2.Unstructured,
obj3.Unstructured,
}

return nil
},
event: runtimeevent.TypedCreateEvent[*v1.CompositionRevision]{
Object: &v1.CompositionRevision{
ObjectMeta: metav1.ObjectMeta{
Name: "dachshund-sadfa8",
Labels: map[string]string{
v1.LabelCompositionName: "dachshund",
},
},
},
},
},
want: want{
added: []interface{}{
reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "ns", Name: "obj1"}},
reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "ns", Name: "obj2"}},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fn := EnqueueForCompositionRevisionFunc(resource.CompositeKind(tt.args.of), tt.args.list, logging.NewNopLogger())
q := rateLimitingQueueMock{}
fn(context.TODO(), tt.args.event, &q)
if got := q.added; !reflect.DeepEqual(got, tt.want.added) {
t.Errorf("EnqueueForCompositionRevisionFunc(...)(ctx, event, queue) = %v, want %v", got, tt.want)
}
})
}
}

type rateLimitingQueueMock struct {
workqueue.RateLimitingInterface
added []interface{}
}

func (f *rateLimitingQueueMock) Add(item interface{}) {
f.added = append(f.added, item)
}

0 comments on commit a9c3a2f

Please sign in to comment.