Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-use the last desired patch on empty batch #3453

Merged
merged 3 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 50 additions & 35 deletions cmd/ghalistener/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@ type Config struct {
// The Worker's role is to process the messages it receives from the listener.
// It then initiates Kubernetes API requests to carry out the necessary actions.
type Worker struct {
clientset *kubernetes.Clientset
config Config
lastPatch int
lastPatchID int
logger *logr.Logger
clientset *kubernetes.Clientset
config Config
lastPatch int
patchSeq int
logger *logr.Logger
}

var _ listener.Handler = (*Worker)(nil)

func New(config Config, options ...Option) (*Worker, error) {
w := &Worker{
config: config,
lastPatch: -1,
lastPatchID: -1,
config: config,
lastPatch: -1,
patchSeq: -1,
}

conf, err := rest.InClusterConfig()
Expand Down Expand Up @@ -163,27 +163,8 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart
// The function then scales the ephemeral runner set by applying the merge patch.
// Finally, it logs the scaled ephemeral runner set details and returns nil if successful.
// If any error occurs during the process, it returns an error with a descriptive message.
func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) {
// Max runners should always be set by the resource builder either to the configured value,
// or the maximum int32 (resourcebuilder.newAutoScalingListener()).
targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners)

logValues := []any{
"assigned job", count,
"decision", targetRunnerCount,
"min", w.config.MinRunners,
"max", w.config.MaxRunners,
"currentRunnerCount", w.lastPatch,
"jobsCompleted", jobsCompleted,
}

if count == 0 && jobsCompleted == 0 {
w.lastPatchID = 0
} else {
w.lastPatchID++
}

w.lastPatch = targetRunnerCount
func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count, jobsCompleted int) (int, error) {
patchID := w.setDesiredWorkerState(count, jobsCompleted)

original, err := json.Marshal(
&v1alpha1.EphemeralRunnerSet{
Expand All @@ -200,8 +181,8 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
patch, err := json.Marshal(
&v1alpha1.EphemeralRunnerSet{
Spec: v1alpha1.EphemeralRunnerSetSpec{
Replicas: targetRunnerCount,
PatchID: w.lastPatchID,
Replicas: w.lastPatch,
PatchID: patchID,
},
},
)
Expand All @@ -210,14 +191,13 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
return 0, err
}

w.logger.Info("Compare", "original", string(original), "patch", string(patch))
mergePatch, err := jsonpatch.CreateMergePatch(original, patch)
if err != nil {
return 0, fmt.Errorf("failed to create merge patch json for ephemeral runner set: %w", err)
}

w.logger.Info("Created merge patch json for EphemeralRunnerSet update", "json", string(mergePatch))

w.logger.Info("Scaling ephemeral runner set", logValues...)
w.logger.Info("Preparing EphemeralRunnerSet update", "json", string(mergePatch))

patchedEphemeralRunnerSet := &v1alpha1.EphemeralRunnerSet{}
err = w.clientset.RESTClient().
Expand All @@ -238,5 +218,40 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
"name", w.config.EphemeralRunnerSetName,
"replicas", patchedEphemeralRunnerSet.Spec.Replicas,
)
return targetRunnerCount, nil
return w.lastPatch, nil
}

// calculateDesiredState calculates the desired state of the worker based on the desired count and the the number of jobs completed.
func (w *Worker) setDesiredWorkerState(count, jobsCompleted int) int {
// Max runners should always be set by the resource builder either to the configured value,
// or the maximum int32 (resourcebuilder.newAutoScalingListener()).
targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners)
w.patchSeq++
desiredPatchID := w.patchSeq

if count == 0 && jobsCompleted == 0 { // empty batch
targetRunnerCount = max(w.lastPatch, targetRunnerCount)
if targetRunnerCount == w.config.MinRunners {
// We have an empty batch, and the last patch was the min runners.
// Since this is an empty batch, and we are at the min runners, they should all be idle.
// If controller created few more pods on accident (during scale down events),
// this situation allows the controller to scale down to the min runners.
// However, it is important to keep the patch sequence increasing so we don't ignore one batch.
desiredPatchID = 0
}
Comment on lines +234 to +241
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that this code is unnecessary. We want to trigger a reconcile in case we receive an empty batch and targetRunnerCount == w.config.MinRunners, by changing the PatchID value, but this will happen anyway because of this code:

w.patchSeq++
desiredPatchID := w.patchSeq

Why reset this counter to zero?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of setting patchID to 0 is to remove ephemeral runners that are created, but shouldn't have been.
There is a rare case where the listener receives scale down event. Let's say the desired state was 5 and the 1 has finished.
During the patch, 2 ephemeral runners are finished.
This situation would cause ephemeral runner set controller to create 1 more pod (We match the current state of the cluster, which is 3, the patch says we need to have a state of 4). Very rare case but can happen when runners are finishing at the same time. At that point, the ephemeral runner set will create 1 more runner.

If the cluster is busy, this situation can actually improve the performance, since on the next job, you will already have a runner ready to pick up the job, so it is re-used.
But, when your cluster becomes idle, that is when we really have to remove these runners that are created due to a timing difference. So to self correct, we force the min runners at the time they are all idle.

}

w.lastPatch = targetRunnerCount

w.logger.Info(
"Calculated target runner count",
"assigned job", count,
"decision", targetRunnerCount,
"min", w.config.MinRunners,
"max", w.config.MaxRunners,
"currentRunnerCount", w.lastPatch,
"jobsCompleted", jobsCompleted,
)

return desiredPatchID
}