Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request #97 from thedadams/multi-client-improvements
Browse files Browse the repository at this point in the history
Improve the multi-client implementation
  • Loading branch information
ibuildthecloud committed Aug 1, 2023
2 parents ef06bde + 19fbc79 commit 6807d93
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 88 deletions.
12 changes: 12 additions & 0 deletions pkg/runtime/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (c *cacheClient) List(ctx context.Context, list kclient.ObjectList, opts ..
}

func (c *cacheClient) Create(ctx context.Context, obj kclient.Object, opts ...kclient.CreateOption) error {
if u, ok := obj.(*uncached.Holder); ok {
return c.uncached.Create(ctx, u.Object, opts...)
}
err := c.cached.Create(ctx, obj, opts...)
if err != nil {
return err
Expand All @@ -158,6 +161,9 @@ func (c *cacheClient) Create(ctx context.Context, obj kclient.Object, opts ...kc
}

func (c *cacheClient) Delete(ctx context.Context, obj kclient.Object, opts ...kclient.DeleteOption) error {
if u, ok := obj.(*uncached.Holder); ok {
return c.uncached.Delete(ctx, u.Object, opts...)
}
err := c.cached.Delete(ctx, obj, opts...)
if err != nil {
return err
Expand All @@ -167,6 +173,9 @@ func (c *cacheClient) Delete(ctx context.Context, obj kclient.Object, opts ...kc
}

func (c *cacheClient) Update(ctx context.Context, obj kclient.Object, opts ...kclient.UpdateOption) error {
if u, ok := obj.(*uncached.Holder); ok {
return c.uncached.Update(ctx, u.Object, opts...)
}
err := c.cached.Update(ctx, obj, opts...)
if err != nil {
return err
Expand All @@ -176,6 +185,9 @@ func (c *cacheClient) Update(ctx context.Context, obj kclient.Object, opts ...kc
}

func (c *cacheClient) Patch(ctx context.Context, obj kclient.Object, patch kclient.Patch, opts ...kclient.PatchOption) error {
if u, ok := obj.(*uncached.Holder); ok {
return c.uncached.Patch(ctx, u.Object, patch, opts...)
}
err := c.cached.Patch(ctx, obj, patch, opts...)
if err != nil {
return err
Expand Down
38 changes: 16 additions & 22 deletions pkg/runtime/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,45 @@ type Runtime struct {

type Config struct {
Rest *rest.Config
Scheme *runtime.Scheme
Namespace string
}

func NewRuntime(cfg *rest.Config, scheme *runtime.Scheme) (*Runtime, error) {
return NewRuntimeWithConfig(Config{
Rest: cfg,
Scheme: scheme,
})
return NewRuntimeWithConfig(Config{Rest: cfg}, scheme)
}

func NewRuntimeForNamespace(cfg *rest.Config, namespace string, scheme *runtime.Scheme) (*Runtime, error) {
return NewRuntimeWithConfigs(Config{Rest: cfg, Scheme: scheme, Namespace: namespace}, nil)
return NewRuntimeWithConfigs(Config{Rest: cfg, Namespace: namespace}, nil, scheme)
}

func NewRuntimeWithConfig(cfg Config) (*Runtime, error) {
return NewRuntimeWithConfigs(cfg, nil)
func NewRuntimeWithConfig(cfg Config, scheme *runtime.Scheme) (*Runtime, error) {
return NewRuntimeWithConfigs(cfg, nil, scheme)
}

func NewRuntimeWithConfigs(defaultConfig Config, cfgs map[string]Config) (*Runtime, error) {
clients := make(map[string]client.Client, len(cfgs))
cachedClients := make(map[string]client.Client, len(cfgs))
caches := make(map[string]cache.Cache, len(cfgs))
schemes := make(map[string]*runtime.Scheme, len(cfgs))
func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Config, scheme *runtime.Scheme) (*Runtime, error) {
clients := make(map[string]client.Client, len(apiGroupConfigs))
cachedClients := make(map[string]client.Client, len(apiGroupConfigs))
caches := make(map[string]cache.Cache, len(apiGroupConfigs))

for key, cfg := range cfgs {
uncachedClient, cachedClient, theCache, err := getClients(cfg)
for key, cfg := range apiGroupConfigs {
uncachedClient, cachedClient, theCache, err := getClients(cfg, scheme)
if err != nil {
return nil, err
}

clients[key] = uncachedClient
caches[key] = theCache
schemes[key] = cfg.Scheme
cachedClients[key] = cachedClient
}

uncachedClient, cachedClient, theCache, err := getClients(defaultConfig)
uncachedClient, cachedClient, theCache, err := getClients(defaultConfig, scheme)
if err != nil {
return nil, err
}

aggUncachedClient := multi.NewClient(uncachedClient, clients)
aggCachedClient := multi.NewClient(cachedClient, cachedClients)
aggCache := multi.NewCache(theCache, defaultConfig.Scheme, caches, schemes)
aggCache := multi.NewCache(scheme, theCache, caches)

factory := NewSharedControllerFactory(aggUncachedClient, aggCache, &SharedControllerFactoryOptions{
// In baaah this is only invoked when a key fails to process
Expand All @@ -76,9 +70,9 @@ func NewRuntimeWithConfigs(defaultConfig Config, cfgs map[string]Config) (*Runti
}, nil
}

func getClients(cfg Config) (uncachedClient client.WithWatch, cachedClient client.Client, theCache cache.Cache, err error) {
func getClients(cfg Config, scheme *runtime.Scheme) (uncachedClient client.WithWatch, cachedClient client.Client, theCache cache.Cache, err error) {
uncachedClient, err = client.NewWithWatch(cfg.Rest, client.Options{
Scheme: cfg.Scheme,
Scheme: scheme,
})
if err != nil {
return nil, nil, nil, err
Expand All @@ -90,15 +84,15 @@ func getClients(cfg Config) (uncachedClient client.WithWatch, cachedClient clien
}

theCache, err = cache.New(cfg.Rest, cache.Options{
Scheme: cfg.Scheme,
Scheme: scheme,
Namespaces: namespaces,
})
if err != nil {
return nil, nil, nil, err
}

cachedClient, err = client.New(cfg.Rest, client.Options{
Scheme: cfg.Scheme,
Scheme: scheme,
Cache: &client.CacheOptions{
Reader: theCache,
},
Expand Down
29 changes: 2 additions & 27 deletions pkg/runtime/multi/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package multi
import (
"context"
"fmt"
"reflect"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -24,35 +23,11 @@ func (c *CacheNotFoundError) Error() string {
return fmt.Sprintf("cache for group %s not found", c.group)
}

func NewCache(defaultCache cache.Cache, defaultScheme *runtime.Scheme, caches map[string]cache.Cache, schemes map[string]*runtime.Scheme) cache.Cache {
newScheme := runtime.NewScheme()
gvksSeen := make(map[schema.GroupVersionKind]struct{})
groups := make(map[string]struct{})
for group := range schemes {
groups[group] = struct{}{}
}

for group, scheme := range schemes {
_, inGroups := groups[group]
for key, val := range scheme.AllKnownTypes() {
if _, ok := gvksSeen[key]; !ok && inGroups && key.Group == group {
newScheme.AddKnownTypeWithName(key, reflect.New(val).Interface().(runtime.Object))
gvksSeen[key] = struct{}{}
}
}
}

for key, val := range defaultScheme.AllKnownTypes() {
if _, ok := gvksSeen[key]; !ok {
newScheme.AddKnownTypeWithName(key, reflect.New(val).Interface().(runtime.Object))
gvksSeen[key] = struct{}{}
}
}

func NewCache(scheme *runtime.Scheme, defaultCache cache.Cache, caches map[string]cache.Cache) cache.Cache {
return multiCache{
defaultCache: defaultCache,
caches: caches,
scheme: newScheme,
scheme: scheme,
}
}

Expand Down
35 changes: 5 additions & 30 deletions pkg/runtime/multi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package multi
import (
"context"
"fmt"
"reflect"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -28,7 +27,6 @@ func (c *ClientNotFoundError) Error() string {
type multiClient struct {
defaultClient kclient.WithWatch
clients map[string]kclient.WithWatch
scheme *runtime.Scheme
}

type clientWithFakeWatch struct {
Expand All @@ -42,6 +40,7 @@ func (c *clientWithFakeWatch) Watch(context.Context, kclient.ObjectList, ...kcli
// NewClient returns a client that will use the client for the API groups it knows about.
// The default client is used for any unspecified API groups. If a client cannot be found, and a default doesn't exist,
// then every method will return a ClientNotFound error.
// Note that defaultClient's scheme should be valid for all clients.
func NewClient(defaultClient kclient.Client, clients map[string]kclient.Client) kclient.Client {
fakeWatchClients := make(map[string]kclient.WithWatch, len(clients))

Expand All @@ -54,35 +53,11 @@ func NewClient(defaultClient kclient.Client, clients map[string]kclient.Client)
// NewWithWatch returns a client WithWatch that will use the client for the API groups it knows about.
// The default client is used for any unspecified API groups. If a client cannot be found, and a default doesn't exist,
// then every method will return a ClientNotFound error.
// Note that defaultClient's scheme should be valid for all clients.
func NewWithWatch(defaultClient kclient.WithWatch, clients map[string]kclient.WithWatch) kclient.WithWatch {
newScheme := runtime.NewScheme()
gvksSeen := make(map[schema.GroupVersionKind]struct{})
groups := make(map[string]struct{})
for group := range clients {
groups[group] = struct{}{}
}

for group, c := range clients {
_, inGroups := groups[group]
for key, val := range c.Scheme().AllKnownTypes() {
if _, ok := gvksSeen[key]; !ok && inGroups && key.Group == group {
newScheme.AddKnownTypeWithName(key, reflect.New(val).Interface().(runtime.Object))
gvksSeen[key] = struct{}{}
}
}
}

for key, val := range defaultClient.Scheme().AllKnownTypes() {
if _, ok := gvksSeen[key]; !ok {
newScheme.AddKnownTypeWithName(key, reflect.New(val).Interface().(runtime.Object))
gvksSeen[key] = struct{}{}
}
}

return &multiClient{
defaultClient: defaultClient,
clients: clients,
scheme: newScheme,
}
}

Expand Down Expand Up @@ -151,15 +126,15 @@ func (m multiClient) SubResource(subResource string) kclient.SubResourceClient {
}

func (m multiClient) Scheme() *runtime.Scheme {
return m.scheme
return m.defaultClient.Scheme()
}

func (m multiClient) RESTMapper() meta.RESTMapper {
return multiRestMapper{m}
}

func (m multiClient) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) {
return apiutil.GVKForObject(obj, m.scheme)
return apiutil.GVKForObject(obj, m.defaultClient.Scheme())
}

func (m multiClient) IsObjectNamespaced(obj runtime.Object) (bool, error) {
Expand All @@ -179,7 +154,7 @@ func (m multiClient) Watch(ctx context.Context, obj kclient.ObjectList, opts ...
}

func (m multiClient) getClient(obj runtime.Object) (kclient.WithWatch, error) {
gvk, err := apiutil.GVKForObject(obj, m.scheme)
gvk, err := m.GroupVersionKindFor(obj)
if err != nil {
return nil, err
}
Expand Down
22 changes: 13 additions & 9 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
const defaultHealthzPort = 8888

type Options struct {
Backend backend.Backend
// If the backend is nil, then DefaultRESTConfig, DefaultNamespace, and Scheme are used to create a backend.
Backend backend.Backend
// If a Backend is provided, then this is ignored. If not provided and needed, then a default is created with Scheme.
DefaultRESTConfig *rest.Config
DefaultScheme *runtime.Scheme
DefaultNamespace string
// SpecialConfigs are keyed by an API group. This indicates to the router that all actions on this group should use the
// If a Backend is provided, then this is ignored.
DefaultNamespace string
// If a Backend is provided, then this is ignored.
Scheme *runtime.Scheme
// APIGroupConfigs are keyed by an API group. This indicates to the router that all actions on this group should use the
// given Config. This is useful for routers that watch different objects on different API servers.
SpecialConfigs map[string]bruntime.Config
APIGroupConfigs map[string]bruntime.Config
// ElectionConfig being nil represents no leader election for the router.
ElectionConfig *leader.ElectionConfig
// Defaults to 8888
Expand All @@ -42,14 +46,14 @@ func (o *Options) complete() (*Options, error) {

if result.DefaultRESTConfig == nil {
var err error
result.DefaultRESTConfig, err = restconfig.New(result.DefaultScheme)
result.DefaultRESTConfig, err = restconfig.New(result.Scheme)
if err != nil {
return nil, err
}
}

defaultConfig := bruntime.Config{Rest: result.DefaultRESTConfig, Scheme: result.DefaultScheme, Namespace: result.DefaultNamespace}
backend, err := bruntime.NewRuntimeWithConfigs(defaultConfig, result.SpecialConfigs)
defaultConfig := bruntime.Config{Rest: result.DefaultRESTConfig, Namespace: result.DefaultNamespace}
backend, err := bruntime.NewRuntimeWithConfigs(defaultConfig, result.APIGroupConfigs, result.Scheme)
if err != nil {
return nil, err
}
Expand All @@ -73,7 +77,7 @@ func DefaultOptions(routerName string, scheme *runtime.Scheme) (*Options, error)
return &Options{
Backend: rt.Backend,
DefaultRESTConfig: cfg,
DefaultScheme: scheme,
Scheme: scheme,
ElectionConfig: leader.NewDefaultElectionConfig("", routerName, cfg),
HealthzPort: defaultHealthzPort,
}, nil
Expand Down

0 comments on commit 6807d93

Please sign in to comment.