From d29324bfcfd7339583392c5ad3fa159fc7627e01 Mon Sep 17 00:00:00 2001 From: Simone Tiraboschi Date: Thu, 1 Feb 2024 06:25:42 +0100 Subject: [PATCH] Bump controller-runtime to v0.17.0 (#2785) - Bump controller-runtime to v0.17.0 - Introduce a few minor fixes for new fakeclient behaviour, see: https://github.com/kubernetes-sigs/controller-runtime/pull/2633 Signed-off-by: Simone Tiraboschi --- controllers/commontestutils/testClient.go | 2 +- .../hyperconverged_controller_test.go | 2 +- controllers/hyperconverged/testUtils_test.go | 1 - go.mod | 2 +- go.sum | 6 +- pkg/util/util.go | 34 +++++-- .../pkg/util/util.go | 34 +++++-- vendor/modules.txt | 5 +- .../controller-runtime/.golangci.yml | 4 +- .../sigs.k8s.io/controller-runtime/RELEASE.md | 14 ++- .../controller-runtime/pkg/cache/cache.go | 18 +++- .../pkg/cache/delegating_by_gvk_cache.go | 8 ++ .../pkg/cache/informer_cache.go | 11 +++ .../pkg/cache/internal/cache_reader.go | 55 ++++++++++- .../pkg/cache/internal/informers.go | 57 +++++++++-- .../pkg/cache/multi_namespace_cache.go | 31 ++++++ .../pkg/client/apiutil/apimachinery.go | 21 ---- .../pkg/client/apiutil/restmapper.go | 24 +++-- .../controller-runtime/pkg/client/client.go | 18 +++- .../pkg/client/fake/client.go | 99 ++++++++++--------- .../controller-runtime/pkg/client/options.go | 5 +- .../controllerutil/controllerutil.go | 94 +++++++++++++++++- .../pkg/handler/eventhandler.go | 2 +- .../pkg/internal/field/selector/utils.go | 16 +-- .../pkg/internal/syncs/syncs.go | 38 +++++++ .../controller-runtime/pkg/manager/manager.go | 6 +- .../pkg/metrics/workqueue.go | 4 +- .../pkg/reconcile/reconcile.go | 34 ++++++- .../pkg/webhook/admission/defaulter.go | 2 + .../pkg/webhook/admission/http.go | 52 +++++++--- .../pkg/webhook/admission/validator.go | 2 + .../pkg/webhook/admission/validator_custom.go | 1 - .../controller-runtime/pkg/webhook/alias.go | 2 + 33 files changed, 534 insertions(+), 170 deletions(-) create mode 100644 vendor/sigs.k8s.io/controller-runtime/pkg/internal/syncs/syncs.go diff --git a/controllers/commontestutils/testClient.go b/controllers/commontestutils/testClient.go index 412e6f414..141a4be3e 100644 --- a/controllers/commontestutils/testClient.go +++ b/controllers/commontestutils/testClient.go @@ -128,7 +128,7 @@ func (c *HcoTestClient) InitiateCreateErrors(f FakeWriteErrorGenerator) { } func (c *HcoTestClient) Scheme() *runtime.Scheme { - return &runtime.Scheme{} + return c.client.Scheme() } func (c *HcoTestClient) RESTMapper() meta.RESTMapper { diff --git a/controllers/hyperconverged/hyperconverged_controller_test.go b/controllers/hyperconverged/hyperconverged_controller_test.go index 81d105010..1f2b7a4c5 100644 --- a/controllers/hyperconverged/hyperconverged_controller_test.go +++ b/controllers/hyperconverged/hyperconverged_controller_test.go @@ -726,7 +726,7 @@ var _ = Describe("HyperconvergedController", func() { hco.Spec.Workloads = hcov1beta1.HyperConvergedConfig{NodePlacement: commontestutils.NewNodePlacement()} existingResource, err := operands.NewKubeVirt(hco, namespace) Expect(err).ToNot(HaveOccurred()) - existingResource.Kind = kubevirtcorev1.KubeVirtGroupVersionKind.Kind // necessary for metrics + existingResource.APIVersion, existingResource.Kind = kubevirtcorev1.KubeVirtGroupVersionKind.ToAPIVersionAndKind() // necessary for metrics // now, modify KV's node placement existingResource.Spec.Infra.NodePlacement.Tolerations = append(hco.Spec.Infra.NodePlacement.Tolerations, corev1.Toleration{ diff --git a/controllers/hyperconverged/testUtils_test.go b/controllers/hyperconverged/testUtils_test.go index 5ca812673..794eb6499 100644 --- a/controllers/hyperconverged/testUtils_test.go +++ b/controllers/hyperconverged/testUtils_test.go @@ -242,7 +242,6 @@ func getBasicDeployment() *BasicExpected { expectedCDI, err := operands.NewCDI(hco) ExpectWithOffset(1, err).ToNot(HaveOccurred()) expectedCDI.Status.Conditions = getGenericCompletedConditions() - expectedCDI.Kind = cdiv1beta1.CDIGroupVersionKind.Kind res.cdi = expectedCDI expectedCNA, err := operands.NewNetworkAddons(hco) diff --git a/go.mod b/go.mod index 53554f17b..deafb69f4 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( kubevirt.io/controller-lifecycle-operator-sdk/api v0.2.4 kubevirt.io/managed-tenant-quota v1.2.0 kubevirt.io/ssp-operator/api v0.19.0 - sigs.k8s.io/controller-runtime v0.16.3 + sigs.k8s.io/controller-runtime v0.17.0 sigs.k8s.io/controller-tools v0.14.0 ) diff --git a/go.sum b/go.sum index 00b8b1454..db18e785d 100644 --- a/go.sum +++ b/go.sum @@ -260,7 +260,7 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= @@ -465,8 +465,8 @@ kubevirt.io/managed-tenant-quota v1.2.0 h1:AE+rmmVfwzvUrWlBfplr/Mhw60YDDEI3j0/Ty kubevirt.io/managed-tenant-quota v1.2.0/go.mod h1:oiTB7mMAOXacveW9rbZYoWieMxKvnF+vEyueEN+KCEQ= kubevirt.io/ssp-operator/api v0.19.0 h1:K1sBY1nIdQO1CoUkYRuz6CepZU97u4dlhqv1ldRyaxs= kubevirt.io/ssp-operator/api v0.19.0/go.mod h1:7lHBbaTsT5pBgrvipn+ZaoQR4t6TQHMlbcq2Wh5o714= -sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4= -sigs.k8s.io/controller-runtime v0.16.3/go.mod h1:j7bialYoSn142nv9sCOJmQgDXQXxnroFU4VnX/brVJ0= +sigs.k8s.io/controller-runtime v0.17.0 h1:fjJQf8Ukya+VjogLO6/bNX9HE6Y2xpsO5+fyS26ur/s= +sigs.k8s.io/controller-runtime v0.17.0/go.mod h1:+MngTvIQQQhfXtwfdGw/UOQ/aIaqsYywfCINOtwMO/s= sigs.k8s.io/controller-tools v0.14.0 h1:rnNoCC5wSXlrNoBKKzL70LNJKIQKEzT6lloG6/LF73A= sigs.k8s.io/controller-tools v0.14.0/go.mod h1:TV7uOtNNnnR72SpzhStvPkoS/U5ir0nMudrkrC4M9Sc= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= diff --git a/pkg/util/util.go b/pkg/util/util.go index a5fbfe18e..131144ca3 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -2,7 +2,6 @@ package util import ( "context" - "encoding/json" "errors" "fmt" "os" @@ -23,6 +22,7 @@ import ( "k8s.io/client-go/tools/reference" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) // ForceRunModeEnv indicates if the operator should be forced to run in either local @@ -79,13 +79,26 @@ var GetOperatorNamespace = func(logger logr.Logger) (string, error) { // ToUnstructured converts an arbitrary object (which MUST obey the // k8s object conventions) to an Unstructured -func ToUnstructured(obj interface{}) (*unstructured.Unstructured, error) { - b, err := json.Marshal(obj) - if err != nil { - return nil, err +func ToUnstructured(obj runtime.Object, c client.Client) (*unstructured.Unstructured, error) { + apiVersion, kind := obj.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind() + if apiVersion == "" || kind == "" { + gvk, err := apiutil.GVKForObject(obj, c.Scheme()) + if err != nil { + return nil, err + } + ta, err := meta.TypeAccessor(obj) + if err != nil { + return nil, err + } + ta.SetKind(gvk.Kind) + ta.SetAPIVersion(gvk.GroupVersion().String()) } + u := &unstructured.Unstructured{} - if err := json.Unmarshal(b, u); err != nil { + var err error + u.Object, err = runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + + if err != nil { return nil, err } return u, nil @@ -99,15 +112,16 @@ func GetRuntimeObject(ctx context.Context, c client.Client, obj client.Object) e // ComponentResourceRemoval removes the resource `obj` if it exists and belongs to the HCO // with wait=true it will wait, (util ctx timeout, please set it!) for the resource to be effectively deleted -func ComponentResourceRemoval(ctx context.Context, c client.Client, obj interface{}, hcoName string, logger logr.Logger, dryRun bool, wait bool, protectNonHCOObjects bool) (bool, error) { - resource, err := ToUnstructured(obj) +func ComponentResourceRemoval(ctx context.Context, c client.Client, obj client.Object, hcoName string, logger logr.Logger, dryRun bool, wait bool, protectNonHCOObjects bool) (bool, error) { + + logger.Info("Removing resource", "name", obj.GetName(), "namespace", obj.GetNamespace(), "GVK", obj.GetObjectKind().GroupVersionKind(), "dryRun", dryRun) + + resource, err := ToUnstructured(obj, c) if err != nil { logger.Error(err, "Failed to convert object to Unstructured") return false, err } - logger.Info("Removing resource", "name", resource.GetName(), "namespace", resource.GetNamespace(), "GVK", resource.GetObjectKind().GroupVersionKind(), "dryRun", dryRun) - ok, err := getResourceForDeletion(ctx, c, resource, logger) if !ok { return false, err diff --git a/tests/vendor/github.com/kubevirt/hyperconverged-cluster-operator/pkg/util/util.go b/tests/vendor/github.com/kubevirt/hyperconverged-cluster-operator/pkg/util/util.go index a5fbfe18e..131144ca3 100644 --- a/tests/vendor/github.com/kubevirt/hyperconverged-cluster-operator/pkg/util/util.go +++ b/tests/vendor/github.com/kubevirt/hyperconverged-cluster-operator/pkg/util/util.go @@ -2,7 +2,6 @@ package util import ( "context" - "encoding/json" "errors" "fmt" "os" @@ -23,6 +22,7 @@ import ( "k8s.io/client-go/tools/reference" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) // ForceRunModeEnv indicates if the operator should be forced to run in either local @@ -79,13 +79,26 @@ var GetOperatorNamespace = func(logger logr.Logger) (string, error) { // ToUnstructured converts an arbitrary object (which MUST obey the // k8s object conventions) to an Unstructured -func ToUnstructured(obj interface{}) (*unstructured.Unstructured, error) { - b, err := json.Marshal(obj) - if err != nil { - return nil, err +func ToUnstructured(obj runtime.Object, c client.Client) (*unstructured.Unstructured, error) { + apiVersion, kind := obj.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind() + if apiVersion == "" || kind == "" { + gvk, err := apiutil.GVKForObject(obj, c.Scheme()) + if err != nil { + return nil, err + } + ta, err := meta.TypeAccessor(obj) + if err != nil { + return nil, err + } + ta.SetKind(gvk.Kind) + ta.SetAPIVersion(gvk.GroupVersion().String()) } + u := &unstructured.Unstructured{} - if err := json.Unmarshal(b, u); err != nil { + var err error + u.Object, err = runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + + if err != nil { return nil, err } return u, nil @@ -99,15 +112,16 @@ func GetRuntimeObject(ctx context.Context, c client.Client, obj client.Object) e // ComponentResourceRemoval removes the resource `obj` if it exists and belongs to the HCO // with wait=true it will wait, (util ctx timeout, please set it!) for the resource to be effectively deleted -func ComponentResourceRemoval(ctx context.Context, c client.Client, obj interface{}, hcoName string, logger logr.Logger, dryRun bool, wait bool, protectNonHCOObjects bool) (bool, error) { - resource, err := ToUnstructured(obj) +func ComponentResourceRemoval(ctx context.Context, c client.Client, obj client.Object, hcoName string, logger logr.Logger, dryRun bool, wait bool, protectNonHCOObjects bool) (bool, error) { + + logger.Info("Removing resource", "name", obj.GetName(), "namespace", obj.GetNamespace(), "GVK", obj.GetObjectKind().GroupVersionKind(), "dryRun", dryRun) + + resource, err := ToUnstructured(obj, c) if err != nil { logger.Error(err, "Failed to convert object to Unstructured") return false, err } - logger.Info("Removing resource", "name", resource.GetName(), "namespace", resource.GetNamespace(), "GVK", resource.GetObjectKind().GroupVersionKind(), "dryRun", dryRun) - ok, err := getResourceForDeletion(ctx, c, resource, logger) if !ok { return false, err diff --git a/vendor/modules.txt b/vendor/modules.txt index 025fc200f..f59f92ca9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -692,8 +692,8 @@ kubevirt.io/managed-tenant-quota/staging/src/kubevirt.io/managed-tenant-quota-ap # kubevirt.io/ssp-operator/api v0.19.0 ## explicit; go 1.20 kubevirt.io/ssp-operator/api/v1beta2 -# sigs.k8s.io/controller-runtime v0.16.3 -## explicit; go 1.20 +# sigs.k8s.io/controller-runtime v0.17.0 +## explicit; go 1.21 sigs.k8s.io/controller-runtime sigs.k8s.io/controller-runtime/pkg/builder sigs.k8s.io/controller-runtime/pkg/cache @@ -722,6 +722,7 @@ sigs.k8s.io/controller-runtime/pkg/internal/log sigs.k8s.io/controller-runtime/pkg/internal/objectutil sigs.k8s.io/controller-runtime/pkg/internal/recorder sigs.k8s.io/controller-runtime/pkg/internal/source +sigs.k8s.io/controller-runtime/pkg/internal/syncs sigs.k8s.io/controller-runtime/pkg/leaderelection sigs.k8s.io/controller-runtime/pkg/log sigs.k8s.io/controller-runtime/pkg/log/zap diff --git a/vendor/sigs.k8s.io/controller-runtime/.golangci.yml b/vendor/sigs.k8s.io/controller-runtime/.golangci.yml index deb6a783d..a95c15b2c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/.golangci.yml +++ b/vendor/sigs.k8s.io/controller-runtime/.golangci.yml @@ -59,9 +59,9 @@ linters-settings: - pkg: sigs.k8s.io/controller-runtime alias: ctrl staticcheck: - go: "1.20" + go: "1.21" stylecheck: - go: "1.20" + go: "1.21" revive: rules: # The following rules are recommended https://github.com/mgechev/revive#recommended-configuration diff --git a/vendor/sigs.k8s.io/controller-runtime/RELEASE.md b/vendor/sigs.k8s.io/controller-runtime/RELEASE.md index f234494fe..2a857b976 100644 --- a/vendor/sigs.k8s.io/controller-runtime/RELEASE.md +++ b/vendor/sigs.k8s.io/controller-runtime/RELEASE.md @@ -4,9 +4,9 @@ The Kubernetes controller-runtime Project is released on an as-needed basis. The **Note:** Releases are done from the `release-MAJOR.MINOR` branches. For PATCH releases is not required to create a new branch you will just need to ensure that all big fixes are cherry-picked into the respective -`release-MAJOR.MINOR` branch. To know more about versioning check https://semver.org/. +`release-MAJOR.MINOR` branch. To know more about versioning check https://semver.org/. -## How to do a release +## How to do a release ### Create the new branch and the release tag @@ -15,7 +15,7 @@ to create a new branch you will just need to ensure that all big fixes are cherr ### Now, let's generate the changelog -1. Create the changelog from the new branch `release-` (`git checkout release-`). +1. Create the changelog from the new branch `release-` (`git checkout release-`). You will need to use the [kubebuilder-release-tools][kubebuilder-release-tools] to generate the notes. See [here][release-notes-generation] > **Note** @@ -24,12 +24,12 @@ You will need to use the [kubebuilder-release-tools][kubebuilder-release-tools] ### Draft a new release from GitHub -1. Create a new tag with the correct version from the new `release-` branch +1. Create a new tag with the correct version from the new `release-` branch 2. Add the changelog on it and publish. Now, the code source is released ! ### Add a new Prow test the for the new branch release -1. Create a new prow test under [github.com/kubernetes/test-infra/tree/master/config/jobs/kubernetes-sigs/controller-runtime](https://github.com/kubernetes/test-infra/tree/master/config/jobs/kubernetes-sigs/controller-runtime) +1. Create a new prow test under [github.com/kubernetes/test-infra/tree/master/config/jobs/kubernetes-sigs/controller-runtime](https://github.com/kubernetes/test-infra/tree/master/config/jobs/kubernetes-sigs/controller-runtime) for the new `release-` branch. (i.e. for the `0.11.0` release see the PR: https://github.com/kubernetes/test-infra/pull/25205) 2. Ping the infra PR in the controller-runtime slack channel for reviews. @@ -45,3 +45,7 @@ For more info, see the release page: https://github.com/kubernetes-sigs/controll ```` 2. An announcement email is sent to `kubebuilder@googlegroups.com` with the subject `[ANNOUNCE] Controller-Runtime $VERSION is released` + +[kubebuilder-release-tools]: https://github.com/kubernetes-sigs/kubebuilder-release-tools +[release-notes-generation]: https://github.com/kubernetes-sigs/kubebuilder-release-tools/blob/master/README.md#release-notes-generation +[release-process]: https://github.com/kubernetes-sigs/kubebuilder/blob/master/VERSIONING.md#releasing diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go index 5410e1cdd..1cecf88e5 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go @@ -33,7 +33,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" toolscache "k8s.io/client-go/tools/cache" - "k8s.io/utils/pointer" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/cache/internal" "sigs.k8s.io/controller-runtime/pkg/client" @@ -83,6 +83,9 @@ type Informers interface { // of the underlying object. GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) + // RemoveInformer removes an informer entry and stops it if it was running. + RemoveInformer(ctx context.Context, obj client.Object) error + // Start runs all the informers known to this cache until the context is closed. // It blocks. Start(ctx context.Context) error @@ -121,6 +124,8 @@ type Informer interface { // HasSynced return true if the informers underlying store has synced. HasSynced() bool + // IsStopped returns true if the informer has been stopped. + IsStopped() bool } // AllNamespaces should be used as the map key to deliminate namespace settings @@ -199,6 +204,12 @@ type Options struct { // unless there is already one set in ByObject or DefaultNamespaces. DefaultTransform toolscache.TransformFunc + // DefaultWatchErrorHandler will be used to the WatchErrorHandler which is called + // whenever ListAndWatch drops the connection with an error. + // + // After calling this handler, the informer will backoff and retry. + DefaultWatchErrorHandler toolscache.WatchErrorHandler + // DefaultUnsafeDisableDeepCopy is the default for UnsafeDisableDeepCopy // for everything that doesn't specify this. // @@ -369,7 +380,8 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { Field: config.FieldSelector, }, Transform: config.Transform, - UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false), + WatchErrorHandler: opts.DefaultWatchErrorHandler, + UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), NewInformer: opts.newInformer, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, @@ -400,7 +412,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { // Construct a new Mapper if unset if opts.Mapper == nil { var err error - opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config, opts.HTTPClient) + opts.Mapper, err = apiutil.NewDynamicRESTMapper(config, opts.HTTPClient) if err != nil { return Options{}, fmt.Errorf("could not create RESTMapper from config: %w", err) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go index f3fa4800d..4db8208a6 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go @@ -52,6 +52,14 @@ func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectLis return cache.List(ctx, list, opts...) } +func (dbt *delegatingByGVKCache) RemoveInformer(ctx context.Context, obj client.Object) error { + cache, err := dbt.cacheForObject(obj) + if err != nil { + return err + } + return cache.RemoveInformer(ctx, obj) +} + func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) { cache, err := dbt.cacheForObject(obj) if err != nil { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/informer_cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/informer_cache.go index 0f1b4e93d..091667b7f 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/informer_cache.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/informer_cache.go @@ -190,6 +190,17 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{}) } +// RemoveInformer deactivates and removes the informer from the cache. +func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error { + gvk, err := apiutil.GVKForObject(obj, ic.scheme) + if err != nil { + return err + } + + ic.Informers.Remove(gvk, obj) + return nil +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock. func (ic *informerCache) NeedLeaderElection() bool { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go index eb941f034..2e4f5ce52 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go @@ -23,6 +23,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -117,16 +118,14 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli switch { case listOpts.FieldSelector != nil: - // TODO(directxman12): support more complicated field selectors by - // combining multiple indices, GetIndexers, etc - field, val, requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector) + requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector) if !requiresExact { return fmt.Errorf("non-exact field matches are not supported by the cache") } // list all objects by the field selector. If this is namespaced and we have one, ask for the // namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces" // namespace. - objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val)) + objs, err = byIndexes(c.indexer, listOpts.FieldSelector.Requirements(), listOpts.Namespace) case listOpts.Namespace != "": objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace) default: @@ -178,6 +177,54 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli return apimeta.SetList(out, runtimeObjs) } +func byIndexes(indexer cache.Indexer, requires fields.Requirements, namespace string) ([]interface{}, error) { + var ( + err error + objs []interface{} + vals []string + ) + indexers := indexer.GetIndexers() + for idx, req := range requires { + indexName := FieldIndexName(req.Field) + indexedValue := KeyToNamespacedKey(namespace, req.Value) + if idx == 0 { + // we use first require to get snapshot data + // TODO(halfcrazy): use complicated index when client-go provides byIndexes + // https://github.com/kubernetes/kubernetes/issues/109329 + objs, err = indexer.ByIndex(indexName, indexedValue) + if err != nil { + return nil, err + } + if len(objs) == 0 { + return nil, nil + } + continue + } + fn, exist := indexers[indexName] + if !exist { + return nil, fmt.Errorf("index with name %s does not exist", indexName) + } + filteredObjects := make([]interface{}, 0, len(objs)) + for _, obj := range objs { + vals, err = fn(obj) + if err != nil { + return nil, err + } + for _, val := range vals { + if val == indexedValue { + filteredObjects = append(filteredObjects, obj) + break + } + } + } + if len(filteredObjects) == 0 { + return nil, nil + } + objs = filteredObjects + } + return objs, nil +} + // objectKeyToStorageKey converts an object key to store key. // It's akin to MetaNamespaceKeyFunc. It's separate from // String to allow keeping the key format easily in sync with diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go index 1d2c9ce2b..c270e809c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/internal/syncs" ) // InformersOpts configures an InformerMap. @@ -49,6 +50,7 @@ type InformersOpts struct { Selector Selector Transform cache.TransformFunc UnsafeDisableDeepCopy bool + WatchErrorHandler cache.WatchErrorHandler } // NewInformers creates a new InformersMap that can create informers under the hood. @@ -76,6 +78,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { transform: options.Transform, unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, newInformer: newInformer, + watchErrorHandler: options.WatchErrorHandler, } } @@ -86,6 +89,20 @@ type Cache struct { // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader + + // Stop can be used to stop this individual informer. + stop chan struct{} +} + +// Start starts the informer managed by a MapEntry. +// Blocks until the informer stops. The informer can be stopped +// either individually (via the entry's stop channel) or globally +// via the provided stop argument. +func (c *Cache) Start(stop <-chan struct{}) { + // Stop on either the whole map stopping or just this informer being removed. + internalStop, cancel := syncs.MergeChans(stop, c.stop) + defer cancel() + c.Informer.Run(internalStop) } type tracker struct { @@ -159,6 +176,11 @@ type Informers struct { // NewInformer allows overriding of the shared index informer constructor for testing. newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer + + // WatchErrorHandler allows the shared index informer's + // watchErrorHandler to be set by overriding the options + // or to use the default watchErrorHandler + watchErrorHandler cache.WatchErrorHandler } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -173,13 +195,13 @@ func (ip *Informers) Start(ctx context.Context) error { // Start each informer for _, i := range ip.tracker.Structured { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } for _, i := range ip.tracker.Unstructured { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } for _, i := range ip.tracker.Metadata { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } // Set started to true so we immediately start any informers added later. @@ -194,7 +216,7 @@ func (ip *Informers) Start(ctx context.Context) error { return nil } -func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) { +func (ip *Informers) startInformerLocked(cacheEntry *Cache) { // Don't start the informer in case we are already waiting for the items in // the waitGroup to finish, since waitGroups don't support waiting and adding // at the same time. @@ -205,7 +227,7 @@ func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) { ip.waitGroup.Add(1) go func() { defer ip.waitGroup.Done() - informer.Run(ip.ctx.Done()) + cacheEntry.Start(ip.ctx.Done()) }() } @@ -281,6 +303,21 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r return started, i, nil } +// Remove removes an informer entry and stops it if it was running. +func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) { + ip.mu.Lock() + defer ip.mu.Unlock() + + informerMap := ip.informersByType(obj) + + entry, ok := informerMap[gvk] + if !ok { + return + } + close(entry.stop) + delete(informerMap, gvk) +} + func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache { switch obj.(type) { case runtime.Unstructured: @@ -323,6 +360,13 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) + // Set WatchErrorHandler on SharedIndexInformer if set + if ip.watchErrorHandler != nil { + if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil { + return nil, false, err + } + } + // Check to see if there is a transformer for this gvk if err := sharedIndexInformer.SetTransform(ip.transform); err != nil { return nil, false, err @@ -342,13 +386,14 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O scopeName: mapping.Scope.Name(), disableDeepCopy: ip.unsafeDisableDeepCopy, }, + stop: make(chan struct{}), } ip.informersByType(obj)[gvk] = i // Start the informer in case the InformersMap has started, otherwise it will be // started when the InformersMap starts. if ip.started { - ip.startInformerLocked(i.Informer) + ip.startInformerLocked(i) } return i, ip.started, nil } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go index 87c31a7c0..e38da1455 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go @@ -109,6 +109,27 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil } +func (c *multiNamespaceCache) RemoveInformer(ctx context.Context, obj client.Object) error { + // If the object is clusterscoped, get the informer from clusterCache, + // if not use the namespaced caches. + isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper) + if err != nil { + return err + } + if !isNamespaced { + return c.clusterCache.RemoveInformer(ctx, obj) + } + + for _, cache := range c.namespaceToCache { + err := cache.RemoveInformer(ctx, obj) + if err != nil { + return err + } + } + + return nil +} + func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) { // If the object is cluster scoped, get the informer from clusterCache, // if not use the namespaced caches. @@ -391,3 +412,13 @@ func (i *multiNamespaceInformer) HasSynced() bool { } return true } + +// IsStopped checks if each namespaced informer has stopped, returns false if any are still running. +func (i *multiNamespaceInformer) IsStopped() bool { + for _, informer := range i.namespaceToInformer { + if stopped := informer.IsStopped(); !stopped { + return false + } + } + return true +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/apimachinery.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/apimachinery.go index 6a1bfb546..3c0206bea 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/apimachinery.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/apimachinery.go @@ -31,11 +31,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" - "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" ) var ( @@ -60,25 +58,6 @@ func AddToProtobufScheme(addToScheme func(*runtime.Scheme) error) error { return addToScheme(protobufScheme) } -// NewDiscoveryRESTMapper constructs a new RESTMapper based on discovery -// information fetched by a new client with the given config. -func NewDiscoveryRESTMapper(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) { - if httpClient == nil { - return nil, fmt.Errorf("httpClient must not be nil, consider using rest.HTTPClientFor(c) to create a client") - } - - // Get a mapper - dc, err := discovery.NewDiscoveryClientForConfigAndClient(c, httpClient) - if err != nil { - return nil, err - } - gr, err := restmapper.GetAPIGroupResources(dc) - if err != nil { - return nil, err - } - return restmapper.NewDiscoveryRESTMapper(gr), nil -} - // IsObjectNamespaced returns true if the object is namespace scoped. // For unstructured objects the gvk is found from the object itself. func IsObjectNamespaced(obj runtime.Object, scheme *runtime.Scheme, restmapper meta.RESTMapper) (bool, error) { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go index d5e03b2b1..5af02063b 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go @@ -21,6 +21,7 @@ import ( "net/http" "sync" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -166,8 +167,10 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er if err != nil { return err } - for _, version := range apiGroup.Versions { - versions = append(versions, version.Version) + if apiGroup != nil { + for _, version := range apiGroup.Versions { + versions = append(versions, version.Version) + } } } @@ -254,17 +257,12 @@ func (m *mapper) findAPIGroupByName(groupName string) (*metav1.APIGroup, error) m.mu.Unlock() // Looking in the cache again. - { - m.mu.RLock() - group, ok := m.apiGroups[groupName] - m.mu.RUnlock() - if ok { - return group, nil - } - } + m.mu.RLock() + defer m.mu.RUnlock() - // If there is still nothing, return an error. - return nil, fmt.Errorf("failed to find API group %q", groupName) + // Don't return an error here if the API group is not present. + // The reloaded RESTMapper will take care of returning a NoMatchError. + return m.apiGroups[groupName], nil } // fetchGroupVersionResources fetches the resources for the specified group and its versions. @@ -276,7 +274,7 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string groupVersion := schema.GroupVersion{Group: groupName, Version: version} apiResourceList, err := m.client.ServerResourcesForGroupVersion(groupVersion.String()) - if err != nil { + if err != nil && !apierrors.IsNotFound(err) { failedGroups[groupVersion] = err } if apiResourceList != nil { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go index 2fb0acb7b..c0ebb39e3 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go @@ -90,11 +90,18 @@ type CacheOptions struct { type NewClientFunc func(config *rest.Config, options Options) (Client, error) // New returns a new Client using the provided config and Options. -// The returned client reads *and* writes directly from the server -// (it doesn't use object caches). It understands how to work with -// normal types (both custom resources and aggregated/built-in resources), -// as well as unstructured types. // +// The client's read behavior is determined by Options.Cache. +// If either Options.Cache or Options.Cache.Reader is nil, +// the client reads directly from the API server. +// If both Options.Cache and Options.Cache.Reader are non-nil, +// the client reads from a local cache. However, specific +// resources can still be configured to bypass the cache based +// on Options.Cache.Unstructured and Options.Cache.DisableFor. +// Write operations are always performed directly on the API server. +// +// The client understands how to work with normal types (both custom resources +// and aggregated/built-in resources), as well as unstructured types. // In the case of normal types, the scheme will be used to look up the // corresponding group, version, and kind for the given type. In the // case of unstructured types, the group, version, and kind will be extracted @@ -210,7 +217,8 @@ func newClient(config *rest.Config, options Options) (*client, error) { var _ Client = &client{} -// client is a client.Client that reads and writes directly from/to an API server. +// client is a client.Client configured to either read from a local cache or directly from the API server. +// Write operations are always performed directly on the API server. // It lazily initializes new clients at the time they are used. type client struct { typedClient typedClient diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go index 9deb6756c..790a1faab 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/fake/client.go @@ -334,10 +334,12 @@ func (t versionedTracker) Create(gvr schema.GroupVersionResource, obj runtime.Ob // tries to assign whatever it finds into a ListType it gets from schema.New() - Thus we have to ensure // we save as the very same type, otherwise subsequent List requests will fail. func convertFromUnstructuredIfNecessary(s *runtime.Scheme, o runtime.Object) (runtime.Object, error) { - gvk := o.GetObjectKind().GroupVersionKind() - u, isUnstructured := o.(runtime.Unstructured) - if !isUnstructured || !s.Recognizes(gvk) { + if !isUnstructured { + return o, nil + } + gvk := o.GetObjectKind().GroupVersionKind() + if !s.Recognizes(gvk) { return o, nil } @@ -380,12 +382,9 @@ func (t versionedTracker) update(gvr schema.GroupVersionResource, obj runtime.Ob field.ErrorList{field.Required(field.NewPath("metadata.name"), "name is required")}) } - gvk := obj.GetObjectKind().GroupVersionKind() - if gvk.Empty() { - gvk, err = apiutil.GVKForObject(obj, t.scheme) - if err != nil { - return err - } + gvk, err := apiutil.GVKForObject(obj, t.scheme) + if err != nil { + return err } oldObject, err := t.ObjectTracker.Get(gvr, ns, accessor.GetName()) @@ -464,25 +463,25 @@ func (c *fakeClient) Get(ctx context.Context, key client.ObjectKey, obj client.O return err } - gvk, err := apiutil.GVKForObject(obj, c.scheme) - if err != nil { - return err - } - ta, err := meta.TypeAccessor(o) - if err != nil { - return err + if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured { + gvk, err := apiutil.GVKForObject(obj, c.scheme) + if err != nil { + return err + } + ta, err := meta.TypeAccessor(o) + if err != nil { + return err + } + ta.SetKind(gvk.Kind) + ta.SetAPIVersion(gvk.GroupVersion().String()) } - ta.SetKind(gvk.Kind) - ta.SetAPIVersion(gvk.GroupVersion().String()) j, err := json.Marshal(o) if err != nil { return err } - decoder := scheme.Codecs.UniversalDecoder() zero(obj) - _, _, err = decoder.Decode(j, nil, obj) - return err + return json.Unmarshal(j, obj) } func (c *fakeClient) Watch(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (watch.Interface, error) { @@ -527,21 +526,21 @@ func (c *fakeClient) List(ctx context.Context, obj client.ObjectList, opts ...cl return err } - ta, err := meta.TypeAccessor(o) - if err != nil { - return err + if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured { + ta, err := meta.TypeAccessor(o) + if err != nil { + return err + } + ta.SetKind(originalKind) + ta.SetAPIVersion(gvk.GroupVersion().String()) } - ta.SetKind(originalKind) - ta.SetAPIVersion(gvk.GroupVersion().String()) j, err := json.Marshal(o) if err != nil { return err } - decoder := scheme.Codecs.UniversalDecoder() zero(obj) - _, _, err = decoder.Decode(j, nil, obj) - if err != nil { + if err := json.Unmarshal(j, obj); err != nil { return err } @@ -588,9 +587,7 @@ func (c *fakeClient) filterList(list []runtime.Object, gvk schema.GroupVersionKi } func (c *fakeClient) filterWithFields(list []runtime.Object, gvk schema.GroupVersionKind, fs fields.Selector) ([]runtime.Object, error) { - // We only allow filtering on the basis of a single field to ensure consistency with the - // behavior of the cache reader (which we're faking here). - fieldKey, fieldVal, requiresExact := selector.RequiresExactMatch(fs) + requiresExact := selector.RequiresExactMatch(fs) if !requiresExact { return nil, fmt.Errorf("field selector %s is not in one of the two supported forms \"key==val\" or \"key=val\"", fs) @@ -599,15 +596,24 @@ func (c *fakeClient) filterWithFields(list []runtime.Object, gvk schema.GroupVer // Field selection is mimicked via indexes, so there's no sane answer this function can give // if there are no indexes registered for the GroupVersionKind of the objects in the list. indexes := c.indexes[gvk] - if len(indexes) == 0 || indexes[fieldKey] == nil { - return nil, fmt.Errorf("List on GroupVersionKind %v specifies selector on field %s, but no "+ - "index with name %s has been registered for GroupVersionKind %v", gvk, fieldKey, fieldKey, gvk) + for _, req := range fs.Requirements() { + if len(indexes) == 0 || indexes[req.Field] == nil { + return nil, fmt.Errorf("List on GroupVersionKind %v specifies selector on field %s, but no "+ + "index with name %s has been registered for GroupVersionKind %v", gvk, req.Field, req.Field, gvk) + } } - indexExtractor := indexes[fieldKey] filteredList := make([]runtime.Object, 0, len(list)) for _, obj := range list { - if c.objMatchesFieldSelector(obj, indexExtractor, fieldVal) { + matches := true + for _, req := range fs.Requirements() { + indexExtractor := indexes[req.Field] + if !c.objMatchesFieldSelector(obj, indexExtractor, req.Value) { + matches = false + break + } + } + if matches { filteredList = append(filteredList, obj) } } @@ -862,21 +868,22 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client if !handled { panic("tracker could not handle patch method") } - ta, err := meta.TypeAccessor(o) - if err != nil { - return err + + if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured { + ta, err := meta.TypeAccessor(o) + if err != nil { + return err + } + ta.SetKind(gvk.Kind) + ta.SetAPIVersion(gvk.GroupVersion().String()) } - ta.SetKind(gvk.Kind) - ta.SetAPIVersion(gvk.GroupVersion().String()) j, err := json.Marshal(o) if err != nil { return err } - decoder := scheme.Codecs.UniversalDecoder() zero(obj) - _, _, err = decoder.Decode(j, nil, obj) - return err + return json.Unmarshal(j, obj) } // Applying a patch results in a deletionTimestamp that is truncated to the nearest second. @@ -1247,6 +1254,8 @@ func inTreeResourcesWithStatus() []schema.GroupVersionKind { {Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta2", Kind: "FlowSchema"}, {Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta2", Kind: "PriorityLevelConfiguration"}, + {Group: "flowcontrol.apiserver.k8s.io", Version: "v1", Kind: "FlowSchema"}, + {Group: "flowcontrol.apiserver.k8s.io", Version: "v1", Kind: "PriorityLevelConfiguration"}, } } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/options.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/options.go index d81bf25de..798506f48 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/options.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/options.go @@ -419,7 +419,7 @@ type ListOptions struct { LabelSelector labels.Selector // FieldSelector filters results by a particular field. In order // to use this with cache-based implementations, restrict usage to - // a single field-value pair that's been added to the indexers. + // exact match field-value pair that's been added to the indexers. FieldSelector fields.Selector // Namespace represents the namespace to list for, or empty for @@ -514,7 +514,8 @@ type MatchingLabels map[string]string func (m MatchingLabels) ApplyToList(opts *ListOptions) { // TODO(directxman12): can we avoid reserializing this over and over? if opts.LabelSelector == nil { - opts.LabelSelector = labels.NewSelector() + opts.LabelSelector = labels.SelectorFromValidatedSet(map[string]string(m)) + return } // If there's already a selector, we need to AND the two together. noValidSel := labels.SelectorFromValidatedSet(map[string]string(m)) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllerutil/controllerutil.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllerutil/controllerutil.go index f76e012ea..05153f74c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllerutil/controllerutil.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllerutil/controllerutil.go @@ -27,7 +27,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/utils/pointer" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -77,8 +77,8 @@ func SetControllerReference(owner, controlled metav1.Object, scheme *runtime.Sch Kind: gvk.Kind, Name: owner.GetName(), UID: owner.GetUID(), - BlockOwnerDeletion: pointer.Bool(true), - Controller: pointer.Bool(true), + BlockOwnerDeletion: ptr.To(true), + Controller: ptr.To(true), } // Return early with an error if the object is already controlled. @@ -121,6 +121,84 @@ func SetOwnerReference(owner, object metav1.Object, scheme *runtime.Scheme) erro return nil } +// RemoveOwnerReference is a helper method to make sure the given object removes an owner reference to the object provided. +// This allows you to remove the owner to establish a new owner of the object in a subsequent call. +func RemoveOwnerReference(owner, object metav1.Object, scheme *runtime.Scheme) error { + owners := object.GetOwnerReferences() + length := len(owners) + if length < 1 { + return fmt.Errorf("%T does not have any owner references", object) + } + ro, ok := owner.(runtime.Object) + if !ok { + return fmt.Errorf("%T is not a runtime.Object, cannot call RemoveOwnerReference", owner) + } + gvk, err := apiutil.GVKForObject(ro, scheme) + if err != nil { + return err + } + + index := indexOwnerRef(owners, metav1.OwnerReference{ + APIVersion: gvk.GroupVersion().String(), + Name: owner.GetName(), + Kind: gvk.Kind, + }) + if index == -1 { + return fmt.Errorf("%T does not have an owner reference for %T", object, owner) + } + + owners = append(owners[:index], owners[index+1:]...) + object.SetOwnerReferences(owners) + return nil +} + +// HasControllerReference returns true if the object +// has an owner ref with controller equal to true +func HasControllerReference(object metav1.Object) bool { + owners := object.GetOwnerReferences() + for _, owner := range owners { + isTrue := owner.Controller + if owner.Controller != nil && *isTrue { + return true + } + } + return false +} + +// RemoveControllerReference removes an owner reference where the controller +// equals true +func RemoveControllerReference(owner, object metav1.Object, scheme *runtime.Scheme) error { + if ok := HasControllerReference(object); !ok { + return fmt.Errorf("%T does not have a owner reference with controller equals true", object) + } + ro, ok := owner.(runtime.Object) + if !ok { + return fmt.Errorf("%T is not a runtime.Object, cannot call RemoveControllerReference", owner) + } + gvk, err := apiutil.GVKForObject(ro, scheme) + if err != nil { + return err + } + ownerRefs := object.GetOwnerReferences() + index := indexOwnerRef(ownerRefs, metav1.OwnerReference{ + APIVersion: gvk.GroupVersion().String(), + Name: owner.GetName(), + Kind: gvk.Kind, + }) + + if index == -1 { + return fmt.Errorf("%T does not have an controller reference for %T", object, owner) + } + + if ownerRefs[index].Controller == nil || !*ownerRefs[index].Controller { + return fmt.Errorf("%T owner is not the controller reference for %T", owner, object) + } + + ownerRefs = append(ownerRefs[:index], ownerRefs[index+1:]...) + object.SetOwnerReferences(ownerRefs) + return nil +} + func upsertOwnerRef(ref metav1.OwnerReference, object metav1.Object) { owners := object.GetOwnerReferences() if idx := indexOwnerRef(owners, ref); idx == -1 { @@ -166,7 +244,6 @@ func referSameObject(a, b metav1.OwnerReference) bool { if err != nil { return false } - return aGV.Group == bGV.Group && a.Kind == b.Kind && a.Name == b.Name } @@ -193,6 +270,9 @@ const ( // They should complete the sentence "Deployment default/foo has been .. // The MutateFn is called regardless of creating or updating an object. // // It returns the executed operation and an error. +// +// Note: changes made by MutateFn to any sub-resource (status...), will be +// discarded. func CreateOrUpdate(ctx context.Context, c client.Client, obj client.Object, f MutateFn) (OperationResult, error) { key := client.ObjectKeyFromObject(obj) if err := c.Get(ctx, key, obj); err != nil { @@ -230,6 +310,12 @@ func CreateOrUpdate(ctx context.Context, c client.Client, obj client.Object, f M // The MutateFn is called regardless of creating or updating an object. // // It returns the executed operation and an error. +// +// Note: changes to any sub-resource other than status will be ignored. +// Changes to the status sub-resource will only be applied if the object +// already exist. To change the status on object creation, the easiest +// way is to requeue the object in the controller if OperationResult is +// OperationResultCreated func CreateOrPatch(ctx context.Context, c client.Client, obj client.Object, f MutateFn) (OperationResult, error) { key := client.ObjectKeyFromObject(obj) if err := c.Get(ctx, key, obj); err != nil { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go index 2f380f4fc..ff2f3e80b 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go @@ -42,7 +42,7 @@ import ( // Unless you are implementing your own EventHandler, you can ignore the functions on the EventHandler interface. // Most users shouldn't need to implement their own EventHandler. type EventHandler interface { - // Create is called in response to an create event - e.g. Pod Creation. + // Create is called in response to a create event - e.g. Pod Creation. Create(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) // Update is called in response to an update event - e.g. Pod Updated. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/field/selector/utils.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/field/selector/utils.go index 4f6d08431..8f6dc71ed 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/field/selector/utils.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/field/selector/utils.go @@ -22,14 +22,16 @@ import ( ) // RequiresExactMatch checks if the given field selector is of the form `k=v` or `k==v`. -func RequiresExactMatch(sel fields.Selector) (field, val string, required bool) { +func RequiresExactMatch(sel fields.Selector) bool { reqs := sel.Requirements() - if len(reqs) != 1 { - return "", "", false + if len(reqs) == 0 { + return false } - req := reqs[0] - if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals { - return "", "", false + + for _, req := range reqs { + if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals { + return false + } } - return req.Field, req.Value, true + return true } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/syncs/syncs.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/syncs/syncs.go new file mode 100644 index 000000000..c78a30377 --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/syncs/syncs.go @@ -0,0 +1,38 @@ +package syncs + +import ( + "context" + "reflect" + "sync" +) + +// MergeChans returns a channel that is closed when any of the input channels are signaled. +// The caller must call the returned CancelFunc to ensure no resources are leaked. +func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) { + var once sync.Once + out := make(chan T) + cancel := make(chan T) + cancelFunc := func() { + once.Do(func() { + close(cancel) + }) + <-out + } + cases := make([]reflect.SelectCase, len(chans)+1) + for i := range chans { + cases[i] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(chans[i]), + } + } + cases[len(cases)-1] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(cancel), + } + go func() { + defer close(out) + _, _, _ = reflect.Select(cases) + }() + + return out, cancelFunc +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go index 708a9cc16..25c3c7375 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go @@ -34,7 +34,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" - "k8s.io/utils/pointer" + "k8s.io/utils/ptr" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -409,10 +409,10 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, fmt.Errorf("failed to new pprof listener: %w", err) } - errChan := make(chan error) + errChan := make(chan error, 1) runnables := newRunnables(options.BaseContext, errChan) return &controllerManager{ - stopProcedureEngaged: pointer.Int64(0), + stopProcedureEngaged: ptr.To(int64(0)), cluster: cluster, runnables: runnables, errChan: errChan, diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/workqueue.go b/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/workqueue.go index 277b87881..cff1de4c1 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/workqueue.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/workqueue.go @@ -54,14 +54,14 @@ var ( Subsystem: WorkQueueSubsystem, Name: QueueLatencyKey, Help: "How long in seconds an item stays in workqueue before being requested", - Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10), + Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), }, []string{"name"}) workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Subsystem: WorkQueueSubsystem, Name: WorkDurationKey, Help: "How long in seconds processing an item from workqueue takes.", - Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10), + Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), }, []string{"name"}) unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{ diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/reconcile/reconcile.go b/vendor/sigs.k8s.io/controller-runtime/pkg/reconcile/reconcile.go index 0f4e7e16b..f1cce87c8 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/reconcile/reconcile.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/reconcile/reconcile.go @@ -19,9 +19,11 @@ package reconcile import ( "context" "errors" + "reflect" "time" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) // Result contains the result of a Reconciler invocation. @@ -97,7 +99,7 @@ type Reconciler interface { // If the error is nil and the returned Result has a non-zero result.RequeueAfter, the request // will be requeued after the specified duration. // - // If the error is nil and result.RequeueAfter is zero and result.Reque is true, the request + // If the error is nil and result.RequeueAfter is zero and result.Requeue is true, the request // will be requeued using exponential backoff. Reconcile(context.Context, Request) (Result, error) } @@ -110,6 +112,36 @@ var _ Reconciler = Func(nil) // Reconcile implements Reconciler. func (r Func) Reconcile(ctx context.Context, o Request) (Result, error) { return r(ctx, o) } +// ObjectReconciler is a specialized version of Reconciler that acts on instances of client.Object. Each reconciliation +// event gets the associated object from Kubernetes before passing it to Reconcile. An ObjectReconciler can be used in +// Builder.Complete by calling AsReconciler. See Reconciler for more details. +type ObjectReconciler[T client.Object] interface { + Reconcile(context.Context, T) (Result, error) +} + +// AsReconciler creates a Reconciler based on the given ObjectReconciler. +func AsReconciler[T client.Object](client client.Client, rec ObjectReconciler[T]) Reconciler { + return &objectReconcilerAdapter[T]{ + objReconciler: rec, + client: client, + } +} + +type objectReconcilerAdapter[T client.Object] struct { + objReconciler ObjectReconciler[T] + client client.Client +} + +// Reconcile implements Reconciler. +func (a *objectReconcilerAdapter[T]) Reconcile(ctx context.Context, req Request) (Result, error) { + o := reflect.New(reflect.TypeOf(*new(T)).Elem()).Interface().(T) + if err := a.client.Get(ctx, req.NamespacedName, o); err != nil { + return Result{}, client.IgnoreNotFound(err) + } + + return a.objReconciler.Reconcile(ctx, o) +} + // TerminalError is an error that will not be retried but still be logged // and recorded in metrics. func TerminalError(wrapped error) error { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter.go index a3b720716..c9662ce1c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter.go @@ -27,12 +27,14 @@ import ( ) // Defaulter defines functions for setting defaults on resources. +// Deprecated: Ue CustomDefaulter instead. type Defaulter interface { runtime.Object Default() } // DefaultingWebhookFor creates a new Webhook for Defaulting the provided type. +// Deprecated: Use WithCustomDefaulter instead. func DefaultingWebhookFor(scheme *runtime.Scheme, defaulter Defaulter) *Webhook { return &Webhook{ Handler: &mutatingHandler{defaulter: defaulter, decoder: NewDecoder(scheme)}, diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/http.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/http.go index 57e465abb..f049fb66e 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/http.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/http.go @@ -34,6 +34,26 @@ import ( var admissionScheme = runtime.NewScheme() var admissionCodecs = serializer.NewCodecFactory(admissionScheme) +// adapted from https://github.com/kubernetes/kubernetes/blob/c28c2009181fcc44c5f6b47e10e62dacf53e4da0/staging/src/k8s.io/pod-security-admission/cmd/webhook/server/server.go +// +// From https://github.com/kubernetes/apiserver/blob/d6876a0600de06fef75968c4641c64d7da499f25/pkg/server/config.go#L433-L442C5: +// +// 1.5MB is the recommended client request size in byte +// the etcd server should accept. See +// https://github.com/etcd-io/etcd/blob/release-3.4/embed/config.go#L56. +// A request body might be encoded in json, and is converted to +// proto when persisted in etcd, so we allow 2x as the largest request +// body size to be accepted and decoded in a write request. +// +// For the admission request, we can infer that it contains at most two objects +// (the old and new versions of the object being admitted), each of which can +// be at most 3MB in size. For the rest of the request, we can assume that +// it will be less than 1MB in size. Therefore, we can set the max request +// size to 7MB. +// If your use case requires larger max request sizes, please +// open an issue (https://github.com/kubernetes-sigs/controller-runtime/issues/new). +const maxRequestSize = int64(7 * 1024 * 1024) + func init() { utilruntime.Must(v1.AddToScheme(admissionScheme)) utilruntime.Must(v1beta1.AddToScheme(admissionScheme)) @@ -42,27 +62,30 @@ func init() { var _ http.Handler = &Webhook{} func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) { - var body []byte - var err error ctx := r.Context() if wh.WithContextFunc != nil { ctx = wh.WithContextFunc(ctx, r) } - var reviewResponse Response - if r.Body == nil { - err = errors.New("request body is empty") + if r.Body == nil || r.Body == http.NoBody { + err := errors.New("request body is empty") wh.getLogger(nil).Error(err, "bad request") - reviewResponse = Errored(http.StatusBadRequest, err) - wh.writeResponse(w, reviewResponse) + wh.writeResponse(w, Errored(http.StatusBadRequest, err)) return } defer r.Body.Close() - if body, err = io.ReadAll(r.Body); err != nil { + limitedReader := &io.LimitedReader{R: r.Body, N: maxRequestSize} + body, err := io.ReadAll(limitedReader) + if err != nil { wh.getLogger(nil).Error(err, "unable to read the body from the incoming request") - reviewResponse = Errored(http.StatusBadRequest, err) - wh.writeResponse(w, reviewResponse) + wh.writeResponse(w, Errored(http.StatusBadRequest, err)) + return + } + if limitedReader.N <= 0 { + err := fmt.Errorf("request entity is too large; limit is %d bytes", maxRequestSize) + wh.getLogger(nil).Error(err, "unable to read the body from the incoming request; limit reached") + wh.writeResponse(w, Errored(http.StatusRequestEntityTooLarge, err)) return } @@ -70,8 +93,7 @@ func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) { if contentType := r.Header.Get("Content-Type"); contentType != "application/json" { err = fmt.Errorf("contentType=%s, expected application/json", contentType) wh.getLogger(nil).Error(err, "unable to process a request with unknown content type") - reviewResponse = Errored(http.StatusBadRequest, err) - wh.writeResponse(w, reviewResponse) + wh.writeResponse(w, Errored(http.StatusBadRequest, err)) return } @@ -89,14 +111,12 @@ func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) { _, actualAdmRevGVK, err := admissionCodecs.UniversalDeserializer().Decode(body, nil, &ar) if err != nil { wh.getLogger(nil).Error(err, "unable to decode the request") - reviewResponse = Errored(http.StatusBadRequest, err) - wh.writeResponse(w, reviewResponse) + wh.writeResponse(w, Errored(http.StatusBadRequest, err)) return } wh.getLogger(&req).V(5).Info("received request") - reviewResponse = wh.Handle(ctx, req) - wh.writeResponseTyped(w, reviewResponse, actualAdmRevGVK) + wh.writeResponseTyped(w, wh.Handle(ctx, req), actualAdmRevGVK) } // writeResponse writes response to w generically, i.e. without encoding GVK information. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator.go index 00bda8a4c..fa42217bd 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator.go @@ -33,6 +33,7 @@ type Warnings []string // Validator defines functions for validating an operation. // The custom resource kind which implements this interface can validate itself. // To validate the custom resource with another specific struct, use CustomValidator instead. +// Deprecated: Use CustomValidator instead. type Validator interface { runtime.Object @@ -53,6 +54,7 @@ type Validator interface { } // ValidatingWebhookFor creates a new Webhook for validating the provided type. +// Deprecated: Use WithCustomValidator instead. func ValidatingWebhookFor(scheme *runtime.Scheme, validator Validator) *Webhook { return &Webhook{ Handler: &validatingHandler{validator: validator, decoder: NewDecoder(scheme)}, diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator_custom.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator_custom.go index e99fbd8a8..07650aa60 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator_custom.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator_custom.go @@ -30,7 +30,6 @@ import ( // CustomValidator defines functions for validating an operation. // The object to be validated is passed into methods as a parameter. type CustomValidator interface { - // ValidateCreate validates the object on creation. // The optional warnings will be added to the response as warning messages. // Return an error if the object is invalid. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/alias.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/alias.go index 293137db4..e8439e2ea 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/alias.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/alias.go @@ -24,9 +24,11 @@ import ( // define some aliases for common bits of the webhook functionality // Defaulter defines functions for setting defaults on resources. +// Deprecated: Use CustomDefaulter instead. type Defaulter = admission.Defaulter // Validator defines functions for validating an operation. +// Deprecated: Use CustomValidator instead. type Validator = admission.Validator // CustomDefaulter defines functions for setting defaults on resources.