Skip to content

Commit

Permalink
add support or concurrent MD upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
ykakarap committed Apr 3, 2023
1 parent a2318d1 commit bc42715
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 63 deletions.
4 changes: 4 additions & 0 deletions api/v1beta1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
// will not be completed until the annotation is removed and all MachineDeployments are upgraded.
ClusterTopologyDeferUpgradeAnnotation = "topology.cluster.x-k8s.io/defer-upgrade"

// ClusterTopologyUpgradeConcurrency can be set on a Classy Cluster to define the maximum concurrency while
// upgrading MachineDeployments of a Classy Cluster.
ClusterTopologyUpgradeConcurrency = "topology.cluster.x-k8s.io/upgrade-concurrency"

// ClusterTopologyUnsafeUpdateClassNameAnnotation can be used to disable the webhook check on
// update that disallows a pre-existing Cluster to be populated with Topology information and Class.
ClusterTopologyUnsafeUpdateClassNameAnnotation = "unsafe.topology.cluster.x-k8s.io/disable-update-class-name-check"
Expand Down
8 changes: 0 additions & 8 deletions internal/controllers/topology/cluster/desired_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,14 +841,6 @@ func computeMachineDeploymentVersion(s *scope.Scope, machineDeploymentTopology c
return currentVersion, nil
}

// At this point the control plane is stable (not scaling, not upgrading, not being upgraded).
// Checking to see if the machine deployments are also stable.
// If any of the MachineDeployments is rolling out, do not upgrade the machine deployment yet.
if s.Current.MachineDeployments.IsAnyRollingOut() {
s.UpgradeTracker.MachineDeployments.MarkPendingUpgrade(currentMDState.Object.Name)
return currentVersion, nil
}

// Control plane and machine deployments are stable.
// Ready to pick up the topology version.
s.UpgradeTracker.MachineDeployments.MarkRollingOut(currentMDState.Object.Name)
Expand Down
130 changes: 87 additions & 43 deletions internal/controllers/topology/cluster/desired_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,23 +1635,25 @@ func TestComputeMachineDeployment(t *testing.T) {
}).
Build()

machineDeploymentStable := builder.MachineDeployment("test-namespace", "md-1").
machineDeploymentStable := builder.MachineDeployment("test-namespace", "md-stable").
WithGeneration(1).
WithReplicas(2).
WithStatus(clusterv1.MachineDeploymentStatus{
ObservedGeneration: 2,
Replicas: 2,
ReadyReplicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: 2,
}).
Build()

machineDeploymentRollingOut := builder.MachineDeployment("test-namespace", "md-1").
machineDeploymentRollingOut := builder.MachineDeployment("test-namespace", "md-rolling").
WithGeneration(1).
WithReplicas(2).
WithStatus(clusterv1.MachineDeploymentStatus{
ObservedGeneration: 2,
Replicas: 1,
ReadyReplicas: 1,
UpdatedReplicas: 1,
AvailableReplicas: 1,
}).
Expand All @@ -1669,28 +1671,46 @@ func TestComputeMachineDeployment(t *testing.T) {
name string
machineDeploymentsState scope.MachineDeploymentsStateMap
currentMDVersion *string
upgradeConcurrency string
topologyVersion string
expectedVersion string
}{
{
name: "use cluster.spec.topology.version if creating a new machine deployment",
machineDeploymentsState: nil,
upgradeConcurrency: "1",
currentMDVersion: nil,
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.3",
},
{
name: "use machine deployment's spec.template.spec.version if one of the machine deployments is rolling out",
name: "use machine deployment's spec.template.spec.version if one of the machine deployments is rolling out, concurrency limit reached",
machineDeploymentsState: machineDeploymentsStateRollingOut,
upgradeConcurrency: "1",
currentMDVersion: pointer.String("v1.2.2"),
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.2",
},
{
name: "use cluster.spec.topology.version if one of the machine deployments is rolling out, concurrency limit not reached",
machineDeploymentsState: machineDeploymentsStateRollingOut,
upgradeConcurrency: "2",
currentMDVersion: pointer.String("v1.2.2"),
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.3",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
s := scope.New(cluster)

testCluster := cluster.DeepCopy()
if testCluster.Annotations == nil {
testCluster.Annotations = map[string]string{}
}
testCluster.Annotations[clusterv1.ClusterTopologyUpgradeConcurrency] = tt.upgradeConcurrency

s := scope.New(testCluster)
s.Blueprint = blueprint
s.Blueprint.Topology.Version = tt.topologyVersion
s.Blueprint.Topology.ControlPlane = clusterv1.ControlPlaneTopology{
Expand All @@ -1709,6 +1729,7 @@ func TestComputeMachineDeployment(t *testing.T) {
WithStatus(clusterv1.MachineDeploymentStatus{
ObservedGeneration: 2,
Replicas: 2,
ReadyReplicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: 2,
}).
Expand All @@ -1722,6 +1743,7 @@ func TestComputeMachineDeployment(t *testing.T) {
s.Current.ControlPlane = &scope.ControlPlaneState{
Object: controlPlaneStable123,
}
s.UpgradeTracker.MachineDeployments.MarkRollingOut(s.Current.MachineDeployments.RollingOut()...)
desiredControlPlaneState := &scope.ControlPlaneState{
Object: controlPlaneStable123,
}
Expand Down Expand Up @@ -1828,45 +1850,55 @@ func TestComputeMachineDeploymentVersion(t *testing.T) {
//
// A machine deployment is considered upgrading if any of the above conditions
// is false.
machineDeploymentStable := builder.MachineDeployment("test-namespace", "md-1").
WithGeneration(1).
WithReplicas(2).
WithStatus(clusterv1.MachineDeploymentStatus{
ObservedGeneration: 2,
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: 2,
ReadyReplicas: 2,
UnavailableReplicas: 0,
}).
Build()
machineDeploymentRollingOut := builder.MachineDeployment("test-namespace", "md-2").
WithGeneration(1).
WithReplicas(2).
WithStatus(clusterv1.MachineDeploymentStatus{
ObservedGeneration: 2,
Replicas: 1,
UpdatedReplicas: 1,
AvailableReplicas: 1,
ReadyReplicas: 1,
UnavailableReplicas: 1,
}).
Build()
stableMachineDeployment := func(ns, name string) *clusterv1.MachineDeployment {
return builder.MachineDeployment(ns, name).
WithGeneration(1).
WithReplicas(2).
WithStatus(clusterv1.MachineDeploymentStatus{
ObservedGeneration: 2,
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: 2,
ReadyReplicas: 2,
UnavailableReplicas: 0,
}).
Build()
}

rollingMachineDeployment := func(ns, name string) *clusterv1.MachineDeployment {
return builder.MachineDeployment(ns, name).
WithGeneration(1).
WithReplicas(2).
WithStatus(clusterv1.MachineDeploymentStatus{
ObservedGeneration: 2,
Replicas: 1,
UpdatedReplicas: 1,
AvailableReplicas: 1,
ReadyReplicas: 1,
UnavailableReplicas: 1,
}).
Build()
}

machineDeploymentsStateStable := scope.MachineDeploymentsStateMap{
"md1": &scope.MachineDeploymentState{Object: machineDeploymentStable},
"md2": &scope.MachineDeploymentState{Object: machineDeploymentStable},
"md1": &scope.MachineDeploymentState{Object: stableMachineDeployment("test1", "md1")},
"md2": &scope.MachineDeploymentState{Object: stableMachineDeployment("test1", "md2")},
}
oneStableOneRollingMachineDeploymentState := scope.MachineDeploymentsStateMap{
"md1": &scope.MachineDeploymentState{Object: stableMachineDeployment("test1", "md1")},
"md2": &scope.MachineDeploymentState{Object: rollingMachineDeployment("test1", "md2")},
}
machineDeploymentsStateRollingOut := scope.MachineDeploymentsStateMap{
"md1": &scope.MachineDeploymentState{Object: machineDeploymentStable},
"md2": &scope.MachineDeploymentState{Object: machineDeploymentRollingOut},
twoRollingMachineDeploymentState := scope.MachineDeploymentsStateMap{
"md1": &scope.MachineDeploymentState{Object: rollingMachineDeployment("test1", "md1")},
"md2": &scope.MachineDeploymentState{Object: rollingMachineDeployment("test1", "md2")},
}

tests := []struct {
name string
machineDeploymentTopology clusterv1.MachineDeploymentTopology
currentMachineDeploymentState *scope.MachineDeploymentState
machineDeploymentsStateMap scope.MachineDeploymentsStateMap
upgradeConcurrency int
currentControlPlane *unstructured.Unstructured
desiredControlPlane *unstructured.Unstructured
topologyVersion string
Expand Down Expand Up @@ -1895,15 +1927,6 @@ func TestComputeMachineDeploymentVersion(t *testing.T) {
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.2",
},
{
name: "should return machine deployment's spec.template.spec.version if any one of the machine deployments is rolling out",
currentMachineDeploymentState: &scope.MachineDeploymentState{Object: builder.MachineDeployment("test1", "md-current").WithVersion("v1.2.2").Build()},
machineDeploymentsStateMap: machineDeploymentsStateRollingOut,
currentControlPlane: controlPlaneStable123,
desiredControlPlane: controlPlaneDesired,
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.2",
},
{
// Control plane is considered upgrading if the control plane's spec.version and status.version is not equal.
name: "should return machine deployment's spec.template.spec.version if control plane is upgrading",
Expand Down Expand Up @@ -1941,6 +1964,26 @@ func TestComputeMachineDeploymentVersion(t *testing.T) {
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.3",
},
{
name: "should return cluster.spec.topology.version if control plane is stable, other machine deployments are rolling out, concurrency limit not reached",
currentMachineDeploymentState: &scope.MachineDeploymentState{Object: builder.MachineDeployment("test1", "md-current").WithVersion("v1.2.2").Build()},
machineDeploymentsStateMap: oneStableOneRollingMachineDeploymentState,
upgradeConcurrency: 2,
currentControlPlane: controlPlaneStable123,
desiredControlPlane: controlPlaneDesired,
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.3",
},
{
name: "should return machine deployment's spec.template.spec.version if control plane is stable, other machine deployments are rolling out, concurrency limit reached",
currentMachineDeploymentState: &scope.MachineDeploymentState{Object: builder.MachineDeployment("test1", "md-current").WithVersion("v1.2.2").Build()},
machineDeploymentsStateMap: twoRollingMachineDeploymentState,
upgradeConcurrency: 2,
currentControlPlane: controlPlaneStable123,
desiredControlPlane: controlPlaneDesired,
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.2",
},
}

for _, tt := range tests {
Expand All @@ -1959,9 +2002,10 @@ func TestComputeMachineDeploymentVersion(t *testing.T) {
ControlPlane: &scope.ControlPlaneState{Object: tt.currentControlPlane},
MachineDeployments: tt.machineDeploymentsStateMap,
},
UpgradeTracker: scope.NewUpgradeTracker(),
UpgradeTracker: scope.NewUpgradeTracker(scope.MaxMDUpgradeConcurrency(tt.upgradeConcurrency)),
}
desiredControlPlaneState := &scope.ControlPlaneState{Object: tt.desiredControlPlane}
s.UpgradeTracker.MachineDeployments.MarkRollingOut(s.Current.MachineDeployments.RollingOut()...)
version, err := computeMachineDeploymentVersion(s, tt.machineDeploymentTopology, desiredControlPlaneState, tt.currentMachineDeploymentState)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(version).To(Equal(tt.expectedVersion))
Expand Down
13 changes: 12 additions & 1 deletion internal/controllers/topology/cluster/scope/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package scope

import (
"strconv"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)

Expand Down Expand Up @@ -45,12 +47,21 @@ func New(cluster *clusterv1.Cluster) *Scope {
// enforce TypeMeta values in the Cluster object so we can assume it is always set during reconciliation.
cluster.APIVersion = clusterv1.GroupVersion.String()
cluster.Kind = "Cluster"

// Determine the maximum upgrade concurrency from the annotation on the cluster.
maxMDUpgradeConcurrency := 1
if concurrency, ok := cluster.Annotations[clusterv1.ClusterTopologyUpgradeConcurrency]; ok {
// The error can be ignored because the webhook ensures that the value is a positive integer.
maxMDUpgradeConcurrency, _ = strconv.Atoi(concurrency)
}
return &Scope{
Blueprint: &ClusterBlueprint{},
Current: &ClusterState{
Cluster: cluster,
},
UpgradeTracker: NewUpgradeTracker(),
UpgradeTracker: NewUpgradeTracker(
MaxMDUpgradeConcurrency(maxMDUpgradeConcurrency),
),
HookResponseTracker: NewHookResponseTracker(),
}
}
59 changes: 59 additions & 0 deletions internal/controllers/topology/cluster/scope/scope_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scope

import (
"testing"

. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)

func TestNew(t *testing.T) {
t.Run("should set the right maxMachineDeploymentUpgradeConcurrency in UpgradeTracker from the cluster annotation", func(t *testing.T) {
tests := []struct {
name string
cluster *clusterv1.Cluster
want int
}{
{
name: "if the annotation is not present the concurrency should default to 1",
cluster: &clusterv1.Cluster{},
want: 1,
},
{
name: "if the cluster has the annotation it should set the upgrade concurrency value",
cluster: &clusterv1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
clusterv1.ClusterTopologyUpgradeConcurrency: "2",
},
},
},
want: 2,
},
}

for _, tt := range tests {
g := NewWithT(t)
s := New(tt.cluster)
g.Expect(s.UpgradeTracker.MachineDeployments.maxMachineDeploymentUpgradeConcurrency).To(Equal(tt.want))
}
})
}

0 comments on commit bc42715

Please sign in to comment.