Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃悰 Add node watcher to MachinePool controller #8443

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/v1beta1/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,15 @@ func AddDefaultIndexes(ctx context.Context, mgr ctrl.Manager) error {
}
}

if feature.Gates.Enabled(feature.MachinePool) {
if err := ByMachinePoolNode(ctx, mgr); err != nil {
return err
}

if err := ByMachinePoolProviderID(ctx, mgr); err != nil {
return err
}
}

return nil
}
105 changes: 105 additions & 0 deletions api/v1beta1/index/machinepool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package index

import (
"context"
"fmt"

"github.com/pkg/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/cluster-api/controllers/noderefutil"
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
)

const (
// MachinePoolNodeNameField is used by the MachinePool Controller to index MachinePools by Node name, and add a watch on Nodes.
MachinePoolNodeNameField = "status.nodeRefs.name"

// MachinePoolProviderIDField is used to index MachinePools by ProviderID. It's useful to find MachinePools
// in a management cluster from Nodes in a workload cluster.
MachinePoolProviderIDField = "spec.providerIDList"
)

// ByMachinePoolNode adds the machinepool node name index to the
// managers cache.
func ByMachinePoolNode(ctx context.Context, mgr ctrl.Manager) error {
if err := mgr.GetCache().IndexField(ctx, &expv1.MachinePool{},
MachinePoolNodeNameField,
MachinePoolByNodeName,
); err != nil {
return errors.Wrap(err, "error setting index field")
}

return nil
}

// MachinePoolByNodeName contains the logic to index MachinePools by Node name.
func MachinePoolByNodeName(o client.Object) []string {
machinepool, ok := o.(*expv1.MachinePool)
if !ok {
panic(fmt.Sprintf("Expected a MachinePool but got a %T", o))
}

if len(machinepool.Status.NodeRefs) == 0 {
return nil
}

nodeNames := make([]string, 0, len(machinepool.Status.NodeRefs))
for _, ref := range machinepool.Status.NodeRefs {
nodeNames = append(nodeNames, ref.Name)
}
return nodeNames
}

// ByMachinePoolProviderID adds the machinepool providerID index to the
// managers cache.
func ByMachinePoolProviderID(ctx context.Context, mgr ctrl.Manager) error {
if err := mgr.GetCache().IndexField(ctx, &expv1.MachinePool{},
MachinePoolProviderIDField,
machinePoolByProviderID,
); err != nil {
return errors.Wrap(err, "error setting index field")
}

return nil
}

func machinePoolByProviderID(o client.Object) []string {
machinepool, ok := o.(*expv1.MachinePool)
if !ok {
panic(fmt.Sprintf("Expected a MachinePool but got a %T", o))
}

if len(machinepool.Spec.ProviderIDList) == 0 {
return nil
}

providerIDs := make([]string, 0, len(machinepool.Spec.ProviderIDList))
for _, id := range machinepool.Spec.ProviderIDList {
providerID, err := noderefutil.NewProviderID(id)
if err != nil {
// Failed to create providerID, skipping.
continue
}
providerIDs = append(providerIDs, providerID.IndexKey())
}

return providerIDs
}
112 changes: 112 additions & 0 deletions api/v1beta1/index/machinepool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package index

import (
"testing"

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/cluster-api/controllers/noderefutil"
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
)

func TestIndexMachinePoolByNodeName(t *testing.T) {
testCases := []struct {
name string
object client.Object
expected []string
}{
{
name: "when the machinepool has no NodeRef",
object: &expv1.MachinePool{},
expected: []string{},
},
{
name: "when the machinepool has valid NodeRefs",
object: &expv1.MachinePool{
Status: expv1.MachinePoolStatus{
NodeRefs: []corev1.ObjectReference{
{
Name: "node1",
},
{
Name: "node2",
},
},
},
},
expected: []string{"node1", "node2"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)
got := MachinePoolByNodeName(tc.object)
g.Expect(got).To(ConsistOf(tc.expected))
})
}
}

func TestIndexMachinePoolByProviderID(t *testing.T) {
g := NewWithT(t)
validProviderID, err := noderefutil.NewProviderID("aws://region/zone/1")
g.Expect(err).ToNot(HaveOccurred())
otherValidProviderID, err := noderefutil.NewProviderID("aws://region/zone/2")
g.Expect(err).ToNot(HaveOccurred())

testCases := []struct {
name string
object client.Object
expected []string
}{
{
name: "MachinePool has no providerID",
object: &expv1.MachinePool{},
expected: nil,
},
{
name: "MachinePool has invalid providerID",
object: &expv1.MachinePool{
Spec: expv1.MachinePoolSpec{
ProviderIDList: []string{"invalid"},
},
},
expected: []string{},
},
{
name: "MachinePool has valid providerIDs",
object: &expv1.MachinePool{
Spec: expv1.MachinePoolSpec{
ProviderIDList: []string{validProviderID.String(), otherValidProviderID.String()},
},
},
expected: []string{validProviderID.IndexKey(), otherValidProviderID.IndexKey()},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)
got := machinePoolByProviderID(tc.object)
g.Expect(got).To(BeEquivalentTo(tc.expected))
})
}
}
3 changes: 3 additions & 0 deletions exp/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

"sigs.k8s.io/cluster-api/controllers/remote"
machinepool "sigs.k8s.io/cluster-api/exp/internal/controllers"
)

// MachinePoolReconciler reconciles a MachinePool object.
type MachinePoolReconciler struct {
Client client.Client
APIReader client.Reader
Tracker *remote.ClusterCacheTracker
CecileRobertMichon marked this conversation as resolved.
Show resolved Hide resolved

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand All @@ -39,6 +41,7 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M
return (&machinepool.MachinePoolReconciler{
Client: r.Client,
APIReader: r.APIReader,
Tracker: r.Tracker,
WatchFilterValue: r.WatchFilterValue,
}).SetupWithManager(ctx, mgr, options)
}
82 changes: 82 additions & 0 deletions exp/internal/controllers/machinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"
"sync"

"github.com/pkg/errors"
Expand All @@ -34,10 +35,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/api/v1beta1/index"
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/cluster-api/controllers/remote"
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
Expand All @@ -62,6 +66,7 @@ const (
type MachinePoolReconciler struct {
Client client.Client
APIReader client.Reader
Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand Down Expand Up @@ -289,3 +294,80 @@ func (r *MachinePoolReconciler) reconcileDeleteExternal(ctx context.Context, m *
// Return true if there are no more external objects.
return len(objects) == 0, nil
}

func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error {
log := ctrl.LoggerFrom(ctx)

if !conditions.IsTrue(cluster, clusterv1.ControlPlaneInitializedCondition) {
log.V(5).Info("Skipping node watching setup because control plane is not initialized")
return nil
}

// If there is no tracker, don't watch remote nodes
if r.Tracker == nil {
return nil
}

return r.Tracker.Watch(ctx, remote.WatchInput{
Name: "machinepool-watchNodes",
Cluster: util.ObjectKey(cluster),
Watcher: r.controller,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool),
})
}

func (r *MachinePoolReconciler) nodeToMachinePool(o client.Object) []reconcile.Request {
node, ok := o.(*corev1.Node)
if !ok {
panic(fmt.Sprintf("Expected a Node but got a %T", o))
}

var filters []client.ListOption
// Match by clusterName when the node has the annotation.
if clusterName, ok := node.GetAnnotations()[clusterv1.ClusterNameAnnotation]; ok {
filters = append(filters, client.MatchingLabels{
clusterv1.ClusterNameLabel: clusterName,
})
}

// Match by namespace when the node has the annotation.
if namespace, ok := node.GetAnnotations()[clusterv1.ClusterNamespaceAnnotation]; ok {
filters = append(filters, client.InNamespace(namespace))
}

// Match by nodeName and status.nodeRef.name.
machinePoolList := &expv1.MachinePoolList{}
if err := r.Client.List(
context.TODO(),
machinePoolList,
append(filters, client.MatchingFields{index.MachinePoolNodeNameField: node.Name})...); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure how to get indexing to work for MachinePools given there are multiple nodes per MachinePool... The extract func here returns a list of all node names associated with the MachinePool. What should this filter be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the documentation looks like this way using the filter should work as MatchingFields also supports cache indices.

And, indexing with multiple "keys" is apparently supported however it would lack compatibility with Kubernetes API server (not sure what the means or what the impact would be).

return nil
}

// There should be exactly 1 MachinePool for the node.
if len(machinePoolList.Items) == 1 {
return []reconcile.Request{{NamespacedName: util.ObjectKey(&machinePoolList.Items[0])}}
}

// Otherwise let's match by providerID. This is useful when e.g the NodeRef has not been set yet.
// Match by providerID
nodeProviderID, err := noderefutil.NewProviderID(node.Spec.ProviderID)
if err != nil {
return nil
}
machinePoolList = &expv1.MachinePoolList{}
if err := r.Client.List(
context.TODO(),
machinePoolList,
append(filters, client.MatchingFields{index.MachinePoolProviderIDField: nodeProviderID.IndexKey()})...); err != nil {
return nil
}

// There should be exactly 1 MachinePool for the node.
if len(machinePoolList.Items) == 1 {
return []reconcile.Request{{NamespacedName: util.ObjectKey(&machinePoolList.Items[0])}}
}

return nil
}
9 changes: 8 additions & 1 deletion exp/internal/controllers/machinepool_controller_noderef.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ type getNodeReferencesResult struct {

func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster *clusterv1.Cluster, mp *expv1.MachinePool) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Create a watch on the nodes in the Cluster.
if err := r.watchClusterNodes(ctx, cluster); err != nil {
return ctrl.Result{}, err
}

// Check that the MachinePool hasn't been deleted or in the process.
if !mp.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
Expand Down Expand Up @@ -80,7 +86,8 @@ func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster *
if err != nil {
if err == errNoAvailableNodes {
log.Info("Cannot assign NodeRefs to MachinePool, no matching Nodes")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
// No need to requeue here. Nodes emit an event that triggers reconciliation.
return ctrl.Result{}, nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
r.recorder.Event(mp, corev1.EventTypeWarning, "FailedSetNodeRef", err.Error())
return ctrl.Result{}, errors.Wrapf(err, "failed to get node references")
Expand Down