Skip to content

Commit

Permalink
Validate SchedulingConfig when starting the scheduler (#3191)
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq committed Dec 12, 2023
1 parent 2cf9798 commit d255ffc
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 52 deletions.
8 changes: 2 additions & 6 deletions cmd/scheduler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ func RootCmd() *cobra.Command {

func loadConfig() (schedulerconfig.Configuration, error) {
var config schedulerconfig.Configuration
userSpecifiedConfigs := viper.GetStringSlice(CustomConfigLocation)

common.LoadConfig(&config, "./config/scheduler", userSpecifiedConfigs)

// TODO: once we're happy with this we can move it to common app startup
err := commonconfig.Validate(config)
common.LoadConfig(&config, "./config/scheduler", viper.GetStringSlice(CustomConfigLocation))
err := config.Validate()
if err != nil {
commonconfig.LogValidationErrors(err)
}
Expand Down
41 changes: 38 additions & 3 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package configuration

import (
"fmt"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/go-playground/validator/v10"
"github.com/go-redis/redis"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -207,7 +209,7 @@ type SchedulingConfig struct {
IndexedTaints []string
// WellKnownNodeTypes defines a set of well-known node types; these are used
// to define "home" and "away" nodes for a given priority class.
WellKnownNodeTypes []WellKnownNodeType
WellKnownNodeTypes []WellKnownNodeType `validate:"dive"`
// Default value of GangNodeUniformityLabelAnnotation if none is provided.
DefaultGangNodeUniformityLabel string
// Kubernetes pods may specify a termination grace period.
Expand Down Expand Up @@ -252,6 +254,39 @@ type SchedulingConfig struct {
EnableNewPreemptionStrategy bool
}

const (
DuplicateWellKnownNodeTypeErrorMessage = "duplicate well-known node type name"
AwayNodeTypesWithoutPreemptionErrorMessage = "priority class has away node types but is not preemptible"
UnknownWellKnownNodeTypeErrorMessage = "priority class refers to unknown well-known node type"
)

func SchedulingConfigValidation(sl validator.StructLevel) {
c := sl.Current().Interface().(SchedulingConfig)

wellKnownNodeTypes := make(map[string]bool)
for i, wellKnownNodeType := range c.WellKnownNodeTypes {
if wellKnownNodeTypes[wellKnownNodeType.Name] {
fieldName := fmt.Sprintf("WellKnownNodeTypes[%d].Name", i)
sl.ReportError(wellKnownNodeType.Name, fieldName, "", DuplicateWellKnownNodeTypeErrorMessage, "")
}
wellKnownNodeTypes[wellKnownNodeType.Name] = true
}

for priorityClassName, priorityClass := range c.Preemption.PriorityClasses {
if len(priorityClass.AwayNodeTypes) > 0 && !priorityClass.Preemptible {
fieldName := fmt.Sprintf("Preemption.PriorityClasses[%s].Preemptible", priorityClassName)
sl.ReportError(priorityClass.Preemptible, fieldName, "", AwayNodeTypesWithoutPreemptionErrorMessage, "")
}

for i, awayNodeType := range priorityClass.AwayNodeTypes {
if !wellKnownNodeTypes[awayNodeType.WellKnownNodeTypeName] {
fieldName := fmt.Sprintf("Preemption.PriorityClasses[%s].AwayNodeTypes[%d].WellKnownNodeTypeName", priorityClassName, i)
sl.ReportError(awayNodeType.WellKnownNodeTypeName, fieldName, "", UnknownWellKnownNodeTypeErrorMessage, "")
}
}
}
}

// FairnessModel controls how fairness is computed.
// More specifically, each queue has a cost associated with it and the next job to schedule
// is taken from the queue with smallest cost. FairnessModel determines how that cost is computed.
Expand All @@ -276,7 +311,7 @@ type IndexedResource struct {
// A WellKnownNodeType defines a set of nodes; see AwayNodeType.
type WellKnownNodeType struct {
// Name is the unique identifier for this node type.
Name string
Name string `validate:"required"`
// Taints is the set of taints that characterizes this node type; a node is
// part of this node type if and only if it has all of these taints.
Taints []v1.Taint
Expand Down Expand Up @@ -311,7 +346,7 @@ type PreemptionConfig struct {
// Map from priority class names to priority classes.
// Must be consistent with Kubernetes priority classes.
// I.e., priority classes defined here must be defined in all executor clusters and should map to the same priority.
PriorityClasses map[string]types.PriorityClass
PriorityClasses map[string]types.PriorityClass `validate:"dive"`
// Priority class assigned to pods that do not specify one.
// Must be an entry in PriorityClasses above.
DefaultPriorityClass string
Expand Down
5 changes: 1 addition & 4 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
schedulerobjects.ResourceList{Resources: req.MinimumJobSize},
q.schedulingConfig,
)
sch, err := scheduler.NewPreemptingQueueScheduler(
sch := scheduler.NewPreemptingQueueScheduler(
sctx,
constraints,
q.schedulingConfig.Preemption.NodeEvictionProbability,
Expand All @@ -545,9 +545,6 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str
jobIdsByGangId,
gangIdByJobId,
)
if err != nil {
return nil, err
}
if q.schedulingConfig.AlwaysAttemptScheduling {
sch.SkipUnsuccessfulSchedulingKeyCheck()
}
Expand Down
10 changes: 3 additions & 7 deletions internal/common/config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@ import (
log "github.com/sirupsen/logrus"
)

func Validate(config interface{}) error {
validate := validator.New()
return validate.Struct(config)
}

func LogValidationErrors(err error) {
if err != nil {
for _, err := range err.(validator.ValidationErrors) {
fieldName := stripPrefix(err.Namespace())
switch err.Tag() {
tag := err.Tag()
switch tag {
case "required":
log.Errorf("ConfigError: Field %s is required but was not found", fieldName)
default:
log.Errorf("ConfigError: %s is not a valid value for %s", err.Value(), fieldName)
log.Errorf("ConfigError: Field %s has invalid value %s: %s", fieldName, err.Value(), tag)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions internal/common/types/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type AwayNodeType struct {
// when scheduling "away" jobs of this priority class on the the node type
// referenced by WellKnownNodeTypeName; it overrides the Priority field of
// PriorityClass.
Priority int32
Priority int32 `validate:"gte=0"`
// WellKnownNodeTypeName is the Name of the WellKnownNodeType in question.
WellKnownNodeTypeName string
WellKnownNodeTypeName string `validate:"required"`
}

type PriorityClass struct {
Priority int32
Priority int32 `validate:"gte=0"`
// If true, Armada may preempt jobs of this class to improve fairness.
Preemptible bool
// Limits resources assigned to jobs of this priority class.
Expand All @@ -30,7 +30,7 @@ type PriorityClass struct {
//
// The scheduler first tries to schedule jobs of this priority class as
// "home" jobs, and then tries the elements of this slice in order.
AwayNodeTypes []AwayNodeType
AwayNodeTypes []AwayNodeType `validate:"dive"`
}

func (priorityClass PriorityClass) Equal(other PriorityClass) bool {
Expand Down
7 changes: 7 additions & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package configuration
import (
"time"

"github.com/go-playground/validator/v10"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/armada/configuration"
Expand Down Expand Up @@ -57,6 +58,12 @@ type Configuration struct {
PulsarSendTimeout time.Duration `validate:"required"`
}

func (c Configuration) Validate() error {
validate := validator.New()
validate.RegisterStructValidation(configuration.SchedulingConfigValidation, configuration.SchedulingConfig{})
return validate.Struct(c)
}

type MetricsConfig struct {
// If true, disable metric collection and publishing.
Disabled bool
Expand Down
55 changes: 55 additions & 0 deletions internal/scheduler/configuration/configuration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package configuration

import (
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/types"
)

func TestSchedulingConfigValidate(t *testing.T) {
c := Configuration{
Scheduling: configuration.SchedulingConfig{
WellKnownNodeTypes: []configuration.WellKnownNodeType{
{
Name: "gpu",
Taints: []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}},
},
{
Name: "gpu",
Taints: []v1.Taint{{Key: "other-key", Value: "true", Effect: v1.TaintEffectNoSchedule}},
},
},
Preemption: configuration.PreemptionConfig{
PriorityClasses: map[string]types.PriorityClass{
"armada-preemptible-away": {
Priority: 100,

AwayNodeTypes: []types.AwayNodeType{
{
Priority: 50,
WellKnownNodeTypeName: "not-gpu",
},
},
},
},
},
},
}
expected := []string{
configuration.DuplicateWellKnownNodeTypeErrorMessage,
configuration.AwayNodeTypesWithoutPreemptionErrorMessage,
configuration.UnknownWellKnownNodeTypeErrorMessage,
}

err := c.Validate()

assert.Error(t, err)
s := err.Error()
for _, expected := range expected {
assert.Contains(t, s, expected)
}
}
16 changes: 2 additions & 14 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package scheduler

import (
"fmt"
"math/rand"
"reflect"
"time"
Expand All @@ -13,7 +12,6 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/armadaerrors"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
Expand Down Expand Up @@ -64,16 +62,7 @@ func NewPreemptingQueueScheduler(
initialNodeIdByJobId map[string]string,
initialJobIdsByGangId map[string]map[string]bool,
initialGangIdByJobId map[string]string,
) (*PreemptingQueueScheduler, error) {
for priorityClassName, priorityClass := range sctx.PriorityClasses {
if len(priorityClass.AwayNodeTypes) > 0 && !priorityClass.Preemptible {
return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "priorityClasses",
Value: sctx.PriorityClasses,
Message: fmt.Sprintf("only preemptible priority classes can have away node types; priority class %s violates this invariant", priorityClassName),
})
}
}
) *PreemptingQueueScheduler {
if initialNodeIdByJobId == nil {
initialNodeIdByJobId = make(map[string]string)
}
Expand All @@ -87,7 +76,7 @@ func NewPreemptingQueueScheduler(
for gangId, jobIds := range initialJobIdsByGangId {
initialJobIdsByGangId[gangId] = maps.Clone(jobIds)
}
scheduler := PreemptingQueueScheduler{
return &PreemptingQueueScheduler{
schedulingContext: sctx,
constraints: constraints,
nodeEvictionProbability: nodeEvictionProbability,
Expand All @@ -99,7 +88,6 @@ func NewPreemptingQueueScheduler(
jobIdsByGangId: initialJobIdsByGangId,
gangIdByJobId: maps.Clone(initialGangIdByJobId),
}
return &scheduler, nil
}

func (sch *PreemptingQueueScheduler) EnableAssertions() {
Expand Down
9 changes: 3 additions & 6 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1875,7 +1875,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
schedulerobjects.ResourceList{Resources: tc.MinimumJobSize},
tc.SchedulingConfig,
)
sch, err := NewPreemptingQueueScheduler(
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
tc.SchedulingConfig.Preemption.NodeEvictionProbability,
Expand All @@ -1887,7 +1887,6 @@ func TestPreemptingQueueScheduler(t *testing.T) {
jobIdsByGangId,
gangIdByJobId,
)
require.NoError(t, err)
sch.EnableAssertions()
if tc.SchedulingConfig.EnableNewPreemptionStrategy {
sch.EnableNewPreemptionStrategy()
Expand Down Expand Up @@ -2209,7 +2208,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
schedulerobjects.ResourceList{Resources: tc.MinimumJobSize},
tc.SchedulingConfig,
)
sch, err := NewPreemptingQueueScheduler(
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
tc.SchedulingConfig.Preemption.NodeEvictionProbability,
Expand All @@ -2221,7 +2220,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
nil,
nil,
)
require.NoError(b, err)
result, err := sch.Schedule(ctx)
require.NoError(b, err)
require.Equal(b, 0, len(result.PreemptedJobs))
Expand Down Expand Up @@ -2265,7 +2263,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue], limiterByQueue[queue])
require.NoError(b, err)
}
sch, err := NewPreemptingQueueScheduler(
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
tc.SchedulingConfig.Preemption.NodeEvictionProbability,
Expand All @@ -2277,7 +2275,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
nil,
nil,
)
require.NoError(b, err)
result, err := sch.Schedule(ctx)
require.NoError(b, err)

Expand Down
5 changes: 1 addition & 4 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors(
minimumJobSize,
l.schedulingConfig,
)
scheduler, err := NewPreemptingQueueScheduler(
scheduler := NewPreemptingQueueScheduler(
sctx,
constraints,
l.schedulingConfig.Preemption.NodeEvictionProbability,
Expand All @@ -430,9 +430,6 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors(
fsctx.jobIdsByGangId,
fsctx.gangIdByJobId,
)
if err != nil {
return nil, nil, err
}
if l.schedulingConfig.AlwaysAttemptScheduling {
scheduler.SkipUnsuccessfulSchedulingKeyCheck()
}
Expand Down
5 changes: 1 addition & 4 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
schedulerobjects.ResourceList{},
s.schedulingConfig,
)
sch, err := scheduler.NewPreemptingQueueScheduler(
sch := scheduler.NewPreemptingQueueScheduler(
sctx,
constraints,
s.schedulingConfig.Preemption.NodeEvictionProbability,
Expand All @@ -472,9 +472,6 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
nil,
nil,
)
if err != nil {
return err
}
if s.schedulingConfig.EnableNewPreemptionStrategy {
sch.EnableNewPreemptionStrategy()
}
Expand Down

0 comments on commit d255ffc

Please sign in to comment.