Skip to content

Commit

Permalink
add support or concurrent MD upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
ykakarap committed Mar 31, 2023
1 parent a2318d1 commit 510d8a9
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 88 deletions.
6 changes: 6 additions & 0 deletions api/v1alpha4/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (src *Cluster) ConvertTo(dstRaw conversion.Hub) error {
if restored.Spec.Topology.Workers != nil {
if dst.Spec.Topology.Workers == nil {
dst.Spec.Topology.Workers = &clusterv1.WorkersTopology{}
dst.Spec.Topology.Workers.MaxUpgradeConcurrency = restored.Spec.Topology.Workers.MaxUpgradeConcurrency
}
for i := range restored.Spec.Topology.Workers.MachineDeployments {
dst.Spec.Topology.Workers.MachineDeployments[i].FailureDomain = restored.Spec.Topology.Workers.MachineDeployments[i].FailureDomain
Expand Down Expand Up @@ -375,3 +376,8 @@ func Convert_v1beta1_ClusterClass_To_v1alpha4_ClusterClass(in *clusterv1.Cluster
// ClusterClass.Status has been added in v1beta1.
return autoConvert_v1beta1_ClusterClass_To_v1alpha4_ClusterClass(in, out, s)
}

func Convert_v1beta1_WorkersTopology_To_v1alpha4_WorkersTopology(in *clusterv1.WorkersTopology, out *WorkersTopology, s apiconversion.Scope) error {
// workersTopology.maxUpgradeConcurrency has been added in v1beta1.
return autoConvert_v1beta1_WorkersTopology_To_v1alpha4_WorkersTopology(in, out, s)
}
16 changes: 6 additions & 10 deletions api/v1alpha4/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions api/v1beta1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ type ControlPlaneTopology struct {

// WorkersTopology represents the different sets of worker nodes in the cluster.
type WorkersTopology struct {
// MaxUpgradeConcurrency is the maximum number of MachineDeployments that will be upgraded
// concurrently.
// +kubebuilder:default=1
// +optional
MaxUpgradeConcurrency *int32 `json:"maxUpgradeConcurrency,omitempty"`

// MachineDeployments is a list of machine deployments in the cluster.
// +optional
MachineDeployments []MachineDeploymentTopology `json:"machineDeployments,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/v1beta1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/crd/bases/cluster.x-k8s.io_clusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions internal/controllers/topology/cluster/desired_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,14 +841,6 @@ func computeMachineDeploymentVersion(s *scope.Scope, machineDeploymentTopology c
return currentVersion, nil
}

// At this point the control plane is stable (not scaling, not upgrading, not being upgraded).
// Checking to see if the machine deployments are also stable.
// If any of the MachineDeployments is rolling out, do not upgrade the machine deployment yet.
if s.Current.MachineDeployments.IsAnyRollingOut() {
s.UpgradeTracker.MachineDeployments.MarkPendingUpgrade(currentMDState.Object.Name)
return currentVersion, nil
}

// Control plane and machine deployments are stable.
// Ready to pick up the topology version.
s.UpgradeTracker.MachineDeployments.MarkRollingOut(currentMDState.Object.Name)
Expand Down
118 changes: 60 additions & 58 deletions internal/controllers/topology/cluster/desired_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,32 +1635,32 @@ func TestComputeMachineDeployment(t *testing.T) {
}).
Build()

machineDeploymentStable := builder.MachineDeployment("test-namespace", "md-1").
WithGeneration(1).
WithReplicas(2).
WithStatus(clusterv1.MachineDeploymentStatus{
ObservedGeneration: 2,
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: 2,
}).
Build()

machineDeploymentRollingOut := builder.MachineDeployment("test-namespace", "md-1").
WithGeneration(1).
WithReplicas(2).
WithStatus(clusterv1.MachineDeploymentStatus{
ObservedGeneration: 2,
Replicas: 1,
UpdatedReplicas: 1,
AvailableReplicas: 1,
}).
Build()

machineDeploymentsStateRollingOut := scope.MachineDeploymentsStateMap{
"class-1": &scope.MachineDeploymentState{Object: machineDeploymentStable},
"class-2": &scope.MachineDeploymentState{Object: machineDeploymentRollingOut},
}
//machineDeploymentStable := builder.MachineDeployment("test-namespace", "md-1").
// WithGeneration(1).
// WithReplicas(2).
// WithStatus(clusterv1.MachineDeploymentStatus{
// ObservedGeneration: 2,
// Replicas: 2,
// UpdatedReplicas: 2,
// AvailableReplicas: 2,
// }).
// Build()

//machineDeploymentRollingOut := builder.MachineDeployment("test-namespace", "md-1").
// WithGeneration(1).
// WithReplicas(2).
// WithStatus(clusterv1.MachineDeploymentStatus{
// ObservedGeneration: 2,
// Replicas: 1,
// UpdatedReplicas: 1,
// AvailableReplicas: 1,
// }).
// Build()

//machineDeploymentsStateRollingOut := scope.MachineDeploymentsStateMap{
// "class-1": &scope.MachineDeploymentState{Object: machineDeploymentStable},
// "class-2": &scope.MachineDeploymentState{Object: machineDeploymentRollingOut},
//}

// Note: in all the following tests we are setting it up so that the control plane is already
// stable at the topology version.
Expand All @@ -1679,13 +1679,14 @@ func TestComputeMachineDeployment(t *testing.T) {
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.3",
},
{
name: "use machine deployment's spec.template.spec.version if one of the machine deployments is rolling out",
machineDeploymentsState: machineDeploymentsStateRollingOut,
currentMDVersion: pointer.String("v1.2.2"),
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.2",
},
// TODO(ykakarap): replace with a concurrency compatible case
//{
// name: "use machine deployment's spec.template.spec.version if one of the machine deployments is rolling out",
// machineDeploymentsState: machineDeploymentsStateRollingOut,
// currentMDVersion: pointer.String("v1.2.2"),
// topologyVersion: "v1.2.3",
// expectedVersion: "v1.2.2",
//},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -1840,27 +1841,27 @@ func TestComputeMachineDeploymentVersion(t *testing.T) {
UnavailableReplicas: 0,
}).
Build()
machineDeploymentRollingOut := builder.MachineDeployment("test-namespace", "md-2").
WithGeneration(1).
WithReplicas(2).
WithStatus(clusterv1.MachineDeploymentStatus{
ObservedGeneration: 2,
Replicas: 1,
UpdatedReplicas: 1,
AvailableReplicas: 1,
ReadyReplicas: 1,
UnavailableReplicas: 1,
}).
Build()
//machineDeploymentRollingOut := builder.MachineDeployment("test-namespace", "md-2").
// WithGeneration(1).
// WithReplicas(2).
// WithStatus(clusterv1.MachineDeploymentStatus{
// ObservedGeneration: 2,
// Replicas: 1,
// UpdatedReplicas: 1,
// AvailableReplicas: 1,
// ReadyReplicas: 1,
// UnavailableReplicas: 1,
// }).
// Build()

machineDeploymentsStateStable := scope.MachineDeploymentsStateMap{
"md1": &scope.MachineDeploymentState{Object: machineDeploymentStable},
"md2": &scope.MachineDeploymentState{Object: machineDeploymentStable},
}
machineDeploymentsStateRollingOut := scope.MachineDeploymentsStateMap{
"md1": &scope.MachineDeploymentState{Object: machineDeploymentStable},
"md2": &scope.MachineDeploymentState{Object: machineDeploymentRollingOut},
}
//machineDeploymentsStateRollingOut := scope.MachineDeploymentsStateMap{
// "md1": &scope.MachineDeploymentState{Object: machineDeploymentStable},
// "md2": &scope.MachineDeploymentState{Object: machineDeploymentRollingOut},
//}

tests := []struct {
name string
Expand Down Expand Up @@ -1895,15 +1896,16 @@ func TestComputeMachineDeploymentVersion(t *testing.T) {
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.2",
},
{
name: "should return machine deployment's spec.template.spec.version if any one of the machine deployments is rolling out",
currentMachineDeploymentState: &scope.MachineDeploymentState{Object: builder.MachineDeployment("test1", "md-current").WithVersion("v1.2.2").Build()},
machineDeploymentsStateMap: machineDeploymentsStateRollingOut,
currentControlPlane: controlPlaneStable123,
desiredControlPlane: controlPlaneDesired,
topologyVersion: "v1.2.3",
expectedVersion: "v1.2.2",
},
// TODO(ykakarap): Replace this case with a concurrency comtabile test case
//{
// name: "should return machine deployment's spec.template.spec.version if any one of the machine deployments is rolling out",
// currentMachineDeploymentState: &scope.MachineDeploymentState{Object: builder.MachineDeployment("test1", "md-current").WithVersion("v1.2.2").Build()},
// machineDeploymentsStateMap: machineDeploymentsStateRollingOut,
// currentControlPlane: controlPlaneStable123,
// desiredControlPlane: controlPlaneDesired,
// topologyVersion: "v1.2.3",
// expectedVersion: "v1.2.2",
//},
{
// Control plane is considered upgrading if the control plane's spec.version and status.version is not equal.
name: "should return machine deployment's spec.template.spec.version if control plane is upgrading",
Expand Down
10 changes: 9 additions & 1 deletion internal/controllers/topology/cluster/scope/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package scope

import (
"k8s.io/utils/pointer"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)

Expand Down Expand Up @@ -45,12 +47,18 @@ func New(cluster *clusterv1.Cluster) *Scope {
// enforce TypeMeta values in the Cluster object so we can assume it is always set during reconciliation.
cluster.APIVersion = clusterv1.GroupVersion.String()
cluster.Kind = "Cluster"
maxMDUpgradeConcurrency := 0
if cluster.Spec.Topology != nil && cluster.Spec.Topology.Workers != nil {
maxMDUpgradeConcurrency = int(pointer.Int32Deref(cluster.Spec.Topology.Workers.MaxUpgradeConcurrency, 1))
}
return &Scope{
Blueprint: &ClusterBlueprint{},
Current: &ClusterState{
Cluster: cluster,
},
UpgradeTracker: NewUpgradeTracker(),
UpgradeTracker: NewUpgradeTracker(
MaxMDUpgradeConcurrency(maxMDUpgradeConcurrency),
),
HookResponseTracker: NewHookResponseTracker(),
}
}
48 changes: 37 additions & 11 deletions internal/controllers/topology/cluster/scope/upgradetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package scope

import "k8s.io/apimachinery/pkg/util/sets"

const maxMachineDeploymentUpgradeConcurrency = 1

// UpgradeTracker is a helper to capture the upgrade status and make upgrade decisions.
type UpgradeTracker struct {
ControlPlane ControlPlaneUpgradeTracker
Expand All @@ -46,19 +44,47 @@ type ControlPlaneUpgradeTracker struct {
// MachineDeploymentUpgradeTracker holds the current upgrade status and makes upgrade
// decisions for MachineDeployments.
type MachineDeploymentUpgradeTracker struct {
pendingNames sets.Set[string]
deferredNames sets.Set[string]
rollingOutNames sets.Set[string]
holdUpgrades bool
pendingNames sets.Set[string]
deferredNames sets.Set[string]
rollingOutNames sets.Set[string]
holdUpgrades bool
maxMachineDeploymentUpgradeConcurrency int
}

// UpgradeTrackerOptions contains the options for NewUpgradeTracker.
type UpgradeTrackerOptions struct {
maxMDUpgradeConcurrency int
}

// UpgradeTrackerOption returns an option for the NewUpgradeTracker function.
type UpgradeTrackerOption interface {
ApplyToUpgradeTracker(options *UpgradeTrackerOptions)
}

// MaxMDUpgradeConcurrency sets the upper limit for the number of Machine Deployments that can upgrade
// concurrently.
type MaxMDUpgradeConcurrency int

// ApplyToUpgradeTracker applies the given UpgradeTrackerOptions.
func (m MaxMDUpgradeConcurrency) ApplyToUpgradeTracker(options *UpgradeTrackerOptions) {
options.maxMDUpgradeConcurrency = int(m)
}

// NewUpgradeTracker returns an upgrade tracker with empty tracking information.
func NewUpgradeTracker() *UpgradeTracker {
func NewUpgradeTracker(opts ...UpgradeTrackerOption) *UpgradeTracker {
options := &UpgradeTrackerOptions{}
for _, o := range opts {
o.ApplyToUpgradeTracker(options)
}
if options.maxMDUpgradeConcurrency == 0 {
options.maxMDUpgradeConcurrency = 1
}
return &UpgradeTracker{
MachineDeployments: MachineDeploymentUpgradeTracker{
pendingNames: sets.Set[string]{},
deferredNames: sets.Set[string]{},
rollingOutNames: sets.Set[string]{},
pendingNames: sets.Set[string]{},
deferredNames: sets.Set[string]{},
rollingOutNames: sets.Set[string]{},
maxMachineDeploymentUpgradeConcurrency: options.maxMDUpgradeConcurrency,
},
}
}
Expand Down Expand Up @@ -96,7 +122,7 @@ func (m *MachineDeploymentUpgradeTracker) AllowUpgrade() bool {
if m.holdUpgrades {
return false
}
return m.rollingOutNames.Len() < maxMachineDeploymentUpgradeConcurrency
return m.rollingOutNames.Len() < m.maxMachineDeploymentUpgradeConcurrency
}

// MarkPendingUpgrade marks a machine deployment as in need of an upgrade.
Expand Down

0 comments on commit 510d8a9

Please sign in to comment.