Skip to content

Commit

Permalink
Ensure that the framework is available using RESTMapper instead of ge…
Browse files Browse the repository at this point in the history
…tting theCRDs

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Aug 11, 2023
1 parent 7f2b00b commit b95d0fd
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 49 deletions.
7 changes: 0 additions & 7 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ rules:
- list
- update
- watch
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs:
- list
- watch
- apiGroups:
- batch
resources:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ require (
github.com/ray-project/kuberay/ray-operator v0.0.0-20230613204710-aeed3cdcbdcc
go.uber.org/zap v1.24.0
k8s.io/api v0.27.4
k8s.io/apiextensions-apiserver v0.27.4
k8s.io/apimachinery v0.27.4
k8s.io/apiserver v0.27.4
k8s.io/client-go v0.27.4
Expand Down Expand Up @@ -78,6 +77,7 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.27.4 // indirect
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
76 changes: 35 additions & 41 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"os"
"strings"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -30,10 +30,9 @@ import (
zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
schedulingv1 "k8s.io/api/scheduling/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/discovery"
Expand All @@ -51,7 +50,6 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/job"
"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
Expand All @@ -78,7 +76,6 @@ func init() {

utilruntime.Must(kueue.AddToScheme(scheme))
utilruntime.Must(configapi.AddToScheme(scheme))
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
// Add any additional framework integration types.
utilruntime.Must(
jobframework.ForEachIntegration(func(_ string, cb jobframework.IntegrationCallbacks) error {
Expand Down Expand Up @@ -159,7 +156,7 @@ func main() {
// Cert won't be ready until manager starts, so start a goroutine here which
// will block until the cert is ready before setting up the controllers.
// Controllers who register after manager starts will start directly.
go setupControllers(ctx, mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)
go setupControllers(mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)

go func() {
queues.CleanUpOnContext(ctx)
Expand Down Expand Up @@ -196,7 +193,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
}
}

func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
setupLog.Info("Waiting for certificate generation to complete")
Expand All @@ -214,34 +211,46 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache
os.Exit(1)
}

crds := findCustomResources(ctx, mgr)

opts := []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(manageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(waitForPodsReady(cfg)),
jobframework.WithKubeServerVersion(serverVersionFetcher),
}
err := jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
log := setupLog.WithValues("jobFrameworkName", name)
if isFrameworkEnabled(cfg, name) && crds.Has(name) {
if err := cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)),
opts...,
).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller")
return err
}
if err := cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")
return err
}
} else {
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
log.Error(err, "Unable to create noop webhook")
return err

if isFrameworkEnabled(cfg, name) {
if _, err := mgr.GetRESTMapper().RESTMapping(cb.GVK.GroupKind(), cb.GVK.Version); err != nil {
// TODO: If the below PR is released, we need to change a way to check if the GVK is registered.
// REF: https://github.com/kubernetes-sigs/controller-runtime/pull/2425
// if !meta.IsNoMatchError(err) {
// return err
// }
var NoMatchingErr *discovery.ErrGroupDiscoveryFailed
if !meta.IsNoMatchError(err) && !errors.As(err, &NoMatchingErr) {
return err
}
log.Info("No matching API server for job framework, skip to create controller and webhook")
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)),
opts...,
).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create controller")
return err
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")
return err
}
return nil
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
log.Error(err, "Unable to create noop webhook")
return err
}
return nil
})
if err != nil {
Expand Down Expand Up @@ -340,18 +349,3 @@ func isFrameworkEnabled(cfg *configapi.Configuration, name string) bool {
}
return false
}

// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=list;watch

func findCustomResources(ctx context.Context, mgr ctrl.Manager) sets.Set[string] {
var crds = apiextensionsv1.CustomResourceDefinitionList{}
if err := mgr.GetClient().List(ctx, &crds); err != nil {
setupLog.Error(err, "Unable to get crd list")
os.Exit(1)
}
customResources := sets.New[string](job.FrameworkName)
for _, crd := range crds.Items {
customResources.Insert(strings.Join([]string{crd.Spec.Group, crd.Spec.Names.Singular}, "/"))
}
return customResources
}
3 changes: 3 additions & 0 deletions pkg/controller/jobframework/integrationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -50,6 +51,8 @@ type IntegrationCallbacks struct {
SetupWebhook func(mgr ctrl.Manager, opts ...Option) error
// JobType holds an object of the type managed by the integration's webhook
JobType runtime.Object
// GVK is the GroupVersionKind for the job.
GVK schema.GroupVersionKind
// SetupIndexes registers any additional indexes with the controllers manager
// (this callback is optional)
SetupIndexes func(ctx context.Context, indexer client.FieldIndexer) error
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func init() {
NewReconciler: NewReconciler,
SetupWebhook: SetupWebhook,
JobType: &batchv1.Job{},
GVK: gvk,
}))
}

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/jobset/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func init() {
NewReconciler: NewReconciler,
SetupWebhook: SetupJobSetWebhook,
JobType: &jobsetapi.JobSet{},
GVK: gvk,
AddToScheme: jobsetapi.AddToScheme,
IsManagingObjectsOwner: isJobSet,
}))
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/mpijob/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func init() {
NewReconciler: NewReconciler,
SetupWebhook: SetupMPIJobWebhook,
JobType: &kubeflow.MPIJob{},
GVK: gvk,
AddToScheme: kubeflow.AddToScheme,
IsManagingObjectsOwner: isMPIJob,
}))
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/rayjob/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func init() {
NewReconciler: NewReconciler,
SetupWebhook: SetupRayJobWebhook,
JobType: &rayjobapi.RayJob{},
GVK: gvk,
AddToScheme: rayjobapi.AddToScheme,
}))
}
Expand Down

0 comments on commit b95d0fd

Please sign in to comment.