Skip to content

Commit

Permalink
:sparkling: Add terminal error
Browse files Browse the repository at this point in the history
This change adds a terminal error into the reconcile package. The
purpose of that error is to still be logged and used in metrics but
avoid retrying in situations where it is known that that will not help.
It also adds a new metric for just terminal errors, as they often
indicate that some kind of human intervention is needed.
  • Loading branch information
alvaroaleman committed May 16, 2023
1 parent 92646a5 commit d10ae95
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 24 deletions.
16 changes: 11 additions & 5 deletions pkg/controller/controllertest/testing.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controllertest

import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -35,28 +36,33 @@ func (ErrorType) GetObjectKind() schema.ObjectKind { return nil }
// DeepCopyObject implements runtime.Object.
func (ErrorType) DeepCopyObject() runtime.Object { return nil }

var _ workqueue.RateLimitingInterface = Queue{}
var _ workqueue.RateLimitingInterface = &Queue{}

// Queue implements a RateLimiting queue as a non-ratelimited queue for testing.
// This helps testing by having functions that use a RateLimiting queue synchronously add items to the queue.
type Queue struct {
workqueue.Interface
AddedRateLimitedLock sync.Mutex
AddedRatelimited []any
}

// AddAfter implements RateLimitingInterface.
func (q Queue) AddAfter(item interface{}, duration time.Duration) {
func (q *Queue) AddAfter(item interface{}, duration time.Duration) {
q.Add(item)
}

// AddRateLimited implements RateLimitingInterface. TODO(community): Implement this.
func (q Queue) AddRateLimited(item interface{}) {
func (q *Queue) AddRateLimited(item interface{}) {
q.AddedRateLimitedLock.Lock()
q.AddedRatelimited = append(q.AddedRatelimited, item)
q.AddedRateLimitedLock.Unlock()
q.Add(item)
}

// Forget implements RateLimitingInterface. TODO(community): Implement this.
func (q Queue) Forget(item interface{}) {}
func (q *Queue) Forget(item interface{}) {}

// NumRequeues implements RateLimitingInterface. TODO(community): Implement this.
func (q Queue) NumRequeues(item interface{}) int {
func (q *Queue) NumRequeues(item interface{}) int {
return 0
}
2 changes: 1 addition & 1 deletion pkg/handler/eventhandler_test.go
Expand Up @@ -46,7 +46,7 @@ var _ = Describe("Eventhandler", func() {
var pod *corev1.Pod
var mapper meta.RESTMapper
BeforeEach(func() {
q = controllertest.Queue{Interface: workqueue.New()}
q = &controllertest.Queue{Interface: workqueue.New()}
pod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "biz", Name: "baz"},
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/controller/controller.go
Expand Up @@ -314,7 +314,11 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
result, err := c.Reconcile(ctx, req)
switch {
case err != nil:
c.Queue.AddRateLimited(req)
if errors.Is(err, reconcile.TerminalError(nil)) {
ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
} else {
c.Queue.AddRateLimited(req)
}
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
log.Error(err, "Reconciler error")
Expand Down
25 changes: 24 additions & 1 deletion pkg/internal/controller/controller_test.go
Expand Up @@ -359,7 +359,6 @@ var _ = Describe("controller", func() {
})

It("should requeue a Request if there is an error and continue processing items", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
Expand All @@ -372,6 +371,9 @@ var _ = Describe("controller", func() {
By("Invoking Reconciler which will give an error")
fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
Expect(<-reconciled).To(Equal(request))
queue.AddedRateLimitedLock.Lock()
Expect(queue.AddedRatelimited).To(Equal([]any{request}))
queue.AddedRateLimitedLock.Unlock()

By("Invoking Reconciler a second time without error")
fakeReconcile.AddResult(reconcile.Result{}, nil)
Expand All @@ -382,6 +384,27 @@ var _ = Describe("controller", func() {
Eventually(func() int { return queue.NumRequeues(request) }, 1.0).Should(Equal(0))
})

It("should not requeue a Request if there is a terminal error", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
}()

queue.Add(request)

By("Invoking Reconciler which will give an error")
fakeReconcile.AddResult(reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("expected error: reconcile")))
Expect(<-reconciled).To(Equal(request))

queue.AddedRateLimitedLock.Lock()
Expect(queue.AddedRatelimited).To(BeEmpty())
queue.AddedRateLimitedLock.Unlock()

Expect(queue.Len()).Should(Equal(0))
})

// TODO(directxman12): we should ensure that backoff occurrs with error requeue

It("should not reset backoff until there's a non-error result", func() {
Expand Down
8 changes: 8 additions & 0 deletions pkg/internal/controller/metrics/metrics.go
Expand Up @@ -39,6 +39,13 @@ var (
Help: "Total number of reconciliation errors per controller",
}, []string{"controller"})

// TerminalReconcileErrors is a prometheus counter metrics which holds the total
// number of terminal errors from the Reconciler.
TerminalReconcileErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_runtime_terminal_reconcile_errors_total",
Help: "Total number of terminal reconciliation errors per controller",
}, []string{"controller"})

// ReconcileTime is a prometheus metric which keeps track of the duration
// of reconciliations.
ReconcileTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -67,6 +74,7 @@ func init() {
metrics.Registry.MustRegister(
ReconcileTotal,
ReconcileErrors,
TerminalReconcileErrors,
ReconcileTime,
WorkerCount,
ActiveWorkers,
Expand Down
32 changes: 16 additions & 16 deletions pkg/internal/source/internal_test.go
Expand Up @@ -74,7 +74,7 @@ var _ = Describe("Internal", func() {
set = true
},
}
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, funcs, nil)
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, funcs, nil)
})

Describe("EventHandler", func() {
Expand All @@ -99,38 +99,38 @@ var _ = Describe("Internal", func() {
})

It("should used Predicates to filter CreateEvents", func() {
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
set = false
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
Expand All @@ -157,37 +157,37 @@ var _ = Describe("Internal", func() {

It("should used Predicates to filter UpdateEvents", func() {
set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }},
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
Expand Down Expand Up @@ -215,37 +215,37 @@ var _ = Describe("Internal", func() {

It("should used Predicates to filter DeleteEvents", func() {
set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
instance.OnDelete(pod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
Expand Down
24 changes: 24 additions & 0 deletions pkg/reconcile/reconcile.go
Expand Up @@ -18,6 +18,7 @@ package reconcile

import (
"context"
"errors"
"time"

"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -100,3 +101,26 @@ var _ Reconciler = Func(nil)

// Reconcile implements Reconciler.
func (r Func) Reconcile(ctx context.Context, o Request) (Result, error) { return r(ctx, o) }

// TerminalError is an error that will not be retried but still be logged
// and recorded in metrics.
func TerminalError(wrapped error) error {
return &terminalError{err: wrapped}
}

type terminalError struct {
err error
}

func (te *terminalError) Unwrap() error {
return te.err
}

func (te *terminalError) Error() string {
return "terminal error: " + te.err.Error()
}

func (te *terminalError) Is(target error) bool {
tp := &terminalError{}
return errors.As(target, &tp)
}
8 changes: 8 additions & 0 deletions pkg/reconcile/reconcile_test.go
Expand Up @@ -23,6 +23,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -88,5 +89,12 @@ var _ = Describe("reconcile", func() {
Expect(actualResult).To(Equal(result))
Expect(actualErr).To(Equal(err))
})

It("should allow unwrapping inner error from terminal error", func() {
inner := apierrors.NewGone("")
terminalError := reconcile.TerminalError(inner)

Expect(apierrors.IsGone(terminalError)).To(BeTrue())
})
})
})

0 comments on commit d10ae95

Please sign in to comment.