Skip to content

Commit

Permalink
Fix lock races in the controller engine
Browse files Browse the repository at this point in the history
In a bunch of places I was:

1. Taking a read lock, reading state, releasing the read lock.
2. Computing something based on the read state.
3. Taking a write lock and using the computation to mutate the state.

The issue with this approach is that Goroutine B can take the write
lock and mutate state between when Goroutine A reads state and when it
writes new state based on what it read. Goroutine A is operating on
stale state.

I think there's some benefit in using an RWMutex to avoid blocking
everyone when there's no work to do, but a better approach is to:

1. Take the read lock, figure out what work needs to be done, release
   the read lock and return early if there's no work to be done.
2. If there's work to be done, take the write lock, figure out again
   what work there is to be done, and do it.

Signed-off-by: Nic Cope <nicc@rk0n.org>
  • Loading branch information
negz committed May 9, 2024
1 parent c600e60 commit f2f3384
Showing 1 changed file with 62 additions and 42 deletions.
104 changes: 62 additions & 42 deletions internal/controller/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,19 @@ func (e *ControllerEngine) GetFieldIndexer() client.FieldIndexer {

// Start a new controller.
func (e *ControllerEngine) Start(name string, o ...ControllerOption) error {
co := &ControllerOptions{}
for _, fn := range o {
fn(co)
}
e.mx.Lock()
defer e.mx.Unlock()

// Start is a no-op if the controller is already running.
e.mx.RLock()
_, ok := e.controllers[name]
e.mx.RUnlock()
if ok {
if _, running := e.controllers[name]; running {
return nil
}

co := &ControllerOptions{}
for _, fn := range o {
fn(co)
}

c, err := kcontroller.NewUnmanaged(name, e.mgr, co.runtime)
if err != nil {
return errors.Wrap(err, "cannot create new controller")
Expand Down Expand Up @@ -208,9 +208,7 @@ func (e *ControllerEngine) Start(name string, o ...ControllerOption) error {
sources: make(map[schema.GroupVersionKind]*StoppableSource),
}

e.mx.Lock()
e.controllers[name] = r
e.mx.Unlock()

return nil
}
Expand Down Expand Up @@ -241,9 +239,9 @@ func (e *ControllerEngine) Stop(ctx context.Context, name string) error {

// Finally, stop and delete the controller.
e.mx.Lock()
defer e.mx.Unlock()
c.cancel()
delete(e.controllers, name)
e.mx.Unlock()

e.log.Debug("Stopped controller", "controller", name)
return nil
Expand All @@ -252,8 +250,8 @@ func (e *ControllerEngine) Stop(ctx context.Context, name string) error {
// IsRunning returns true if the named controller is running.
func (e *ControllerEngine) IsRunning(name string) bool {
e.mx.RLock()
defer e.mx.RUnlock()
_, running := e.controllers[name]
e.mx.RUnlock()
return running
}

Expand Down Expand Up @@ -284,58 +282,74 @@ func (e *ControllerEngine) StartWatches(name string, ws ...Watch) error {
return errors.Errorf("controller %q is not running", name)
}

watchExists := make(map[schema.GroupVersionKind]bool)
c.mx.RLock()
for gvk := range c.sources {
watchExists[gvk] = true
// Make sure we can get GVKs for all the watches before we take locks.
gvks := make([]schema.GroupVersionKind, len(ws))
for i := range ws {
gvk, err := apiutil.GVKForObject(ws[i].kind, e.mgr.GetScheme())
if err != nil {
return errors.Wrapf(err, "cannot determine group, version, and kind for %T", ws[i].kind)
}
gvks[i] = gvk
}
c.mx.RUnlock()

// It's possible that we didn't explicitly stop a watch, but its backing
// informer was removed. This implicitly stops the watch by deleting its
// backing listener. If a watch exists but doesn't have an active informer,
// we want to restart the watch (and, implicitly, the informer).
//
// There's a potential race here. Another Goroutine could remove an informer
// between where we build the map and where we read it to check whether an
// informer is active. We wouldn't start a watch when we should. If the
// controller calls StartWatches repeatedly (e.g. an XR controller) this
// will eventually self-correct.
a := e.infs.ActiveInformers()
activeInformer := make(map[schema.GroupVersionKind]bool, len(a))
for _, gvk := range a {
activeInformer[gvk] = true
}

// Using a map here deduplicates watches by GVK. If we're asked to start
// several watches for the same GVK at in the same call, we'll only start
// the last one.
start := make(map[schema.GroupVersionKind]Watch, len(ws))
for _, w := range ws {
gvk, err := apiutil.GVKForObject(w.kind, e.mgr.GetScheme())
if err != nil {
return errors.Wrapf(err, "cannot determine group, version, and kind for %T", w.kind)
}

// Some controllers will call StartWatches on every reconcile. Most calls
// won't actually need to start a new watch. For example an XR controller
// would only need to start a new watch if an XR composed a new kind of
// resource that no other XR it controls already composes. So, we try to
// avoid taking a write lock and blocking all reconciles unless we need to.
c.mx.RLock()
start := false
for _, gvk := range gvks {
// We've already created this watch and the informer backing it is still
// running. We don't need to create a new watch.
if watchExists[gvk] && activeInformer[gvk] {
if _, watchExists := c.sources[gvk]; watchExists && activeInformer[gvk] {
e.log.Debug("Watch exists for GVK, not starting a new one", "controller", name, "watched-gvk", gvk)
continue
}

start[gvk] = w
start = true
break
}
c.mx.RUnlock()

// Don't take any write locks if there's no watches to start.
if len(start) == 0 {
// Nothing to start.
if !start {
return nil
}

// TODO(negz): If this blocks too much, we could alleviate it a bit by
// reading watches to start from a buffered channel.

// Take the write lock. This will block any other callers that want to
// update the watches for the same controller.
// We have at least one watch to start - take the write lock. It's possible
// another Goroutine updated this controller's watches since we released the
// read lock, so we compute everything again.
c.mx.Lock()
defer c.mx.Unlock()

// Start new sources.
for gvk, w := range start {
for i, w := range ws {
gvk := gvks[i]

// We've already created this watch and the informer backing it is still
// running. We don't need to create a new watch. We don't debug log this
// one - we'll have logged it above unless the watch was added between
// releasing the read lock and taking the write lock.
if _, watchExists := c.sources[gvk]; watchExists && activeInformer[gvk] {
continue
}

// The controller's Watch method just calls the StoppableSource's Start
// method, passing in its private work queue as an argument. This
// will start an informer for the watched kind if there isn't one
Expand Down Expand Up @@ -371,20 +385,23 @@ func (e *ControllerEngine) StopWatches(ctx context.Context, name string, keep ..
return 0, errors.Errorf("controller %q is not running", name)
}

// Don't take the write lock if we actually want to keep all watches.
c.mx.RLock()
stop := sets.KeySet(c.sources).Difference(sets.New(keep...))
stop := sets.KeySet(c.sources).IsSuperset(sets.New(keep...))
c.mx.RUnlock()

// Don't take the write lock if we actually want to keep all watches.
if len(stop) == 0 {
if !stop {
return 0, nil
}

// We have at least one watch to stop - take the write lock. It's possible
// another Goroutine updated this controller's watches since we released the
// read lock, so we compute everything again.
c.mx.Lock()
defer c.mx.Unlock()

stopped := 0
for gvk := range stop {
for gvk := range sets.KeySet(c.sources).Difference(sets.New(keep...)) {
if err := c.sources[gvk].Stop(ctx); err != nil {
return stopped, errors.Wrapf(err, "cannot stop watch for %q", gvk)
}
Expand Down Expand Up @@ -424,6 +441,9 @@ func (e *ControllerEngine) RemoveUnwatchedInformers(ctx context.Context) error {
u := &unstructured.Unstructured{}
u.SetAPIVersion(gvk.GroupVersion().String())
u.SetKind(gvk.Kind)

// It's fine to remove an informer that doesn't exist (e.g. because
// someone removed it since we called ActiveInformers).
if err := e.infs.RemoveInformer(ctx, u); err != nil {
return errors.Wrapf(err, "cannot remove informer for %q", gvk)
}
Expand Down

0 comments on commit f2f3384

Please sign in to comment.