Skip to content

Commit

Permalink
Use a single cache for all dynamic controllers
Browse files Browse the repository at this point in the history
Crossplane uses a controller engine to dynamically start claim and XR
controllers when a new XRD is installed.

Before this commit, each controller gets at least one cache. This is
because when I built this functionality, you couldn't stop a single
informer within a cache (a cache is basically a map of informers by
GVK).

When realtime composition is enabled, there are even more caches. One
per composed resource GVK. A GVK routed cache routes cache lookups to
these various delegate caches.

Meanwhile, controller-runtime recently made it possible to stop an
informer within a cache. It's also been possible to remove an event
handler from an informer for some time (since Kubernetes 1.26).

kubernetes-sigs/controller-runtime#2285
kubernetes-sigs/controller-runtime#2046

This commit uses a single client, backed by a single cache, across all
dynamic controllers (specifically the definition, offered, claim, and
XR controllers).

Compared to the current implementation, this commit:

* Takes fewer global locks when realtime compositions are enabled.
  Locking is now mostly at the controller scope.
* Works with the breaking changes to source.Source introduced in
  controller-runtime v0.18. :)

I think this makes the realtime composition code a little easier to
follow by consolodating it into the ControllerEngine, but that's
pretty subjective.

Signed-off-by: Nic Cope <nicc@rk0n.org>
  • Loading branch information
negz committed May 8, 2024
1 parent a9c3a2f commit 0f50200
Show file tree
Hide file tree
Showing 21 changed files with 2,340 additions and 2,669 deletions.
88 changes: 86 additions & 2 deletions cmd/crossplane/core/core.go
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"crypto/tls"
"fmt"
"io"
"os"
"path/filepath"
"time"
Expand All @@ -30,6 +31,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -47,6 +49,7 @@ import (

"github.com/crossplane/crossplane/internal/controller/apiextensions"
apiextensionscontroller "github.com/crossplane/crossplane/internal/controller/apiextensions/controller"
"github.com/crossplane/crossplane/internal/controller/engine"
"github.com/crossplane/crossplane/internal/controller/pkg"
pkgcontroller "github.com/crossplane/crossplane/internal/controller/pkg/controller"
"github.com/crossplane/crossplane/internal/features"
Expand Down Expand Up @@ -134,6 +137,8 @@ func (c *startCommand) Run(s *runtime.Scheme, log logging.Logger) error { //noli
Deduplicate: true,
})

// The claim and XR controllers don't use the manager's cache or client.
// They use their own. They're setup later in this method.
eb := record.NewBroadcaster()
mgr, err := ctrl.NewManager(ratelimiter.LimitRESTConfig(cfg, c.MaxReconcileRate), ctrl.Options{
Scheme: s,
Expand Down Expand Up @@ -270,9 +275,88 @@ func (c *startCommand) Run(s *runtime.Scheme, log logging.Logger) error { //noli
log.Info("Alpha feature enabled", "flag", features.EnableAlphaClaimSSA)
}

// Claim and XR controllers are started and stopped dynamically by the
// ControllerEngine below. When realtime compositions are enabled, they also
// start and stop their watches (e.g. of composed resources) dynamically. To
// do this, the ControllerEngine must have exclusive ownership of a cache.
// This allows it to track what controllers are using the cache's informers.
ca, err := cache.New(mgr.GetConfig(), cache.Options{
HTTPClient: mgr.GetHTTPClient(),
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
SyncPeriod: &c.SyncInterval,

// When a CRD is deleted, any informers for its GVKs will start trying
// to restart their watches, and fail with scary errors. This should
// only happen when realtime composition is enabled, and we should GC
// the informer within 60 seconds. This handler tries to make the error
// a little more informative, and less scary.
DefaultWatchErrorHandler: func(_ *kcache.Reflector, err error) {
if errors.Is(io.EOF, err) {
// Watch closed normally.
return
}
log.Debug("Watch error - probably due to CRD being uninstalled", "error", err)
},
})
if err != nil {
return errors.Wrap(err, "cannot create cache for API extension controllers")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
// Don't start the cache until the manager is elected.
<-mgr.Elected()

if err := ca.Start(ctx); err != nil {
log.Info("API extensions cache returned an error", "error", err)
}

log.Info("API extensions cache stopped")
}()

cl, err := client.New(mgr.GetConfig(), client.Options{
HTTPClient: mgr.GetHTTPClient(),
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
Cache: &client.CacheOptions{
Reader: ca,

// Don't cache secrets - there may be a lot of them.
DisableFor: []client.Object{&corev1.Secret{}},

// When this is enabled the client will automatically start a cache
// informer whenever an XR controller Gets a new kind of composed
// resource.
//
// We need to stop the informer when it's not needed anymore. At
// best an unused informer wastes memory and keeps an unneeded watch
// open on the API server. At worst, the CRD the informer listens
// for is uninstalled and the informer continuously tries to
// restart its watch, logging errors the whole time.
//
// When realtime composition is enabled, controllers record what
// GVKs they watch. We can garbage collect an informer when no
// controller watches its types. When it's not enabled we don't know
// when to garbage collect informers.
//
// TODO(negz): GC informers when their CRD is deleted.
// TODO(negz): GC informers when no XR composes their GVK.
Unstructured: o.Features.Enabled(features.EnableAlphaRealtimeCompositions),
},
})
if err != nil {
return errors.Wrap(err, "cannot create client for API extension controllers")
}

ce := engine.New(mgr, engine.TrackInformers(ca, mgr.GetScheme()), engine.WithLogger(log))
ao := apiextensionscontroller.Options{
Options: o,
FunctionRunner: functionRunner,
Options: o,
ControllerClient: cl,
ControllerFieldIndexer: ca,
ControllerEngine: ce,
FunctionRunner: functionRunner,
}

if err := apiextensions.Setup(mgr, ao); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Expand Up @@ -142,7 +142,7 @@ require (
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
Expand All @@ -154,7 +154,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/go-containerregistry/pkg/authn/kubernetes v0.0.0-20230516205744-dbecb1de8cfa // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0
github.com/google/uuid v1.6.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
13 changes: 1 addition & 12 deletions internal/controller/apiextensions/claim/reconciler.go
Expand Up @@ -27,7 +27,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
Expand All @@ -37,7 +36,6 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/crossplane/crossplane-runtime/pkg/resource/unstructured"
"github.com/crossplane/crossplane-runtime/pkg/resource/unstructured/claim"
"github.com/crossplane/crossplane-runtime/pkg/resource/unstructured/composite"

Expand Down Expand Up @@ -221,14 +219,6 @@ func defaultCRClaim(c client.Client) crClaim {
// A ReconcilerOption configures a Reconciler.
type ReconcilerOption func(*Reconciler)

// WithClient specifies how the Reconciler should interact with the Kubernetes
// API.
func WithClient(c client.Client) ReconcilerOption {
return func(r *Reconciler) {
r.client = c
}
}

// WithManagedFieldsUpgrader specifies how the Reconciler should upgrade claim
// and composite resource (XR) managed fields from client-side apply to
// server-side apply.
Expand Down Expand Up @@ -300,8 +290,7 @@ func WithPollInterval(after time.Duration) ReconcilerOption {
// The returned Reconciler will apply only the ObjectMetaConfigurator by
// default; most callers should supply one or more CompositeConfigurators to
// configure their composite resources.
func NewReconciler(m manager.Manager, of resource.CompositeClaimKind, with resource.CompositeKind, o ...ReconcilerOption) *Reconciler {
c := unstructured.NewClient(m.GetClient())
func NewReconciler(c client.Client, of resource.CompositeClaimKind, with resource.CompositeKind, o ...ReconcilerOption) *Reconciler {
r := &Reconciler{
client: c,
gvkClaim: schema.GroupVersionKind(of),
Expand Down

0 comments on commit 0f50200

Please sign in to comment.