Skip to content

Commit

Permalink
Expires notify log sooner when possible
Browse files Browse the repository at this point in the history
It seems useless to keep the notifications in the nflog for longer than
twice the repeat interval. This should help reduce memory usage of
clustered alertmanagers.

Signed-off-by: Julien Pivotto <roidelapluie@o11y.eu>
  • Loading branch information
roidelapluie committed Jul 18, 2022
1 parent da6de1f commit 7d9c41b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
9 changes: 7 additions & 2 deletions nflog/nflog.go
Expand Up @@ -399,7 +399,7 @@ func stateKey(k string, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, receiverKey(r))
}

func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
// Write all st with the same timestamp.
now := l.now()
key := stateKey(gkey, r)
Expand All @@ -415,6 +415,11 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
}
}

expiresAt := now.Add(l.retention)
if expiry > 0 && l.retention > expiry {
expiresAt = now.Add(expiry)
}

e := &pb.MeshEntry{
Entry: &pb.Entry{
Receiver: r,
Expand All @@ -423,7 +428,7 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
FiringAlerts: firingAlerts,
ResolvedAlerts: resolvedAlerts,
},
ExpiresAt: now.Add(l.retention),
ExpiresAt: expiresAt,
}

b, err := marshalMeshEntry(e)
Expand Down
2 changes: 1 addition & 1 deletion nflog/nflog_test.go
Expand Up @@ -322,7 +322,7 @@ func TestQuery(t *testing.T) {
firingAlerts := []uint64{1, 2, 3}
resolvedAlerts := []uint64{4, 5}

err = nl.Log(recv, "key", firingAlerts, resolvedAlerts)
err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0)
require.NoError(t, err, "logging notification failed")

entries, err := nl.Query(QGroupKey("key"), QReceiver(recv))
Expand Down
9 changes: 7 additions & 2 deletions notify/notify.go
Expand Up @@ -239,7 +239,7 @@ func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Aler
}

type NotificationLog interface {
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error)
}

Expand Down Expand Up @@ -785,7 +785,12 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ
return ctx, nil, errors.New("resolved alerts missing")
}

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
var expiry time.Duration
if n, ok := RepeatInterval(ctx); ok {
expiry = 2 * n
}

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry)
}

type timeStage struct {
Expand Down
13 changes: 8 additions & 5 deletions notify/notify_test.go
Expand Up @@ -58,15 +58,15 @@ type testNflog struct {
qres []*nflogpb.Entry
qerr error

logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
}

func (l *testNflog) Query(p ...nflog.QueryParam) ([]*nflogpb.Entry, error) {
return l.qres, l.qerr
}

func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts)
func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts, expiry)
}

func (l *testNflog) GC() (int, error) {
Expand Down Expand Up @@ -553,12 +553,14 @@ func TestSetNotifiesStage(t *testing.T) {
require.NotNil(t, resctx)

ctx = WithResolvedAlerts(ctx, []uint64{})
ctx = WithRepeatInterval(ctx, time.Hour)

tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
require.Equal(t, s.recv, r)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{0, 1, 2}, firingAlerts)
require.Equal(t, []uint64{}, resolvedAlerts)
require.Equal(t, 2*time.Hour, expiry)
return nil
}
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
Expand All @@ -569,11 +571,12 @@ func TestSetNotifiesStage(t *testing.T) {
ctx = WithFiringAlerts(ctx, []uint64{})
ctx = WithResolvedAlerts(ctx, []uint64{0, 1, 2})

tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
require.Equal(t, s.recv, r)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{}, firingAlerts)
require.Equal(t, []uint64{0, 1, 2}, resolvedAlerts)
require.Equal(t, 2*time.Hour, expiry)
return nil
}
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
Expand Down

0 comments on commit 7d9c41b

Please sign in to comment.