Skip to content

Commit

Permalink
Expires notify log sooner when possible
Browse files Browse the repository at this point in the history
It seems useless to keep the notifications in the nflog for longer than
twice the repeat interval. This should help reduce memory usage of
clustered alertmanagers.

Signed-off-by: Julien Pivotto <roidelapluie@o11y.eu>
  • Loading branch information
roidelapluie committed Oct 14, 2022
1 parent d034f11 commit b044302
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
9 changes: 7 additions & 2 deletions nflog/nflog.go
Expand Up @@ -399,7 +399,7 @@ func stateKey(k string, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, receiverKey(r))
}

func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
// Write all st with the same timestamp.
now := l.now()
key := stateKey(gkey, r)
Expand All @@ -415,6 +415,11 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
}
}

expiresAt := now.Add(l.retention)
if expiry > 0 && l.retention > expiry {
expiresAt = now.Add(expiry)
}

e := &pb.MeshEntry{
Entry: &pb.Entry{
Receiver: r,
Expand All @@ -423,7 +428,7 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
FiringAlerts: firingAlerts,
ResolvedAlerts: resolvedAlerts,
},
ExpiresAt: now.Add(l.retention),
ExpiresAt: expiresAt,
}

b, err := marshalMeshEntry(e)
Expand Down
2 changes: 1 addition & 1 deletion nflog/nflog_test.go
Expand Up @@ -322,7 +322,7 @@ func TestQuery(t *testing.T) {
firingAlerts := []uint64{1, 2, 3}
resolvedAlerts := []uint64{4, 5}

err = nl.Log(recv, "key", firingAlerts, resolvedAlerts)
err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0)
require.NoError(t, err, "logging notification failed")

entries, err := nl.Query(QGroupKey("key"), QReceiver(recv))
Expand Down
10 changes: 8 additions & 2 deletions notify/notify.go
Expand Up @@ -239,7 +239,7 @@ func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Aler
}

type NotificationLog interface {
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error)
}

Expand Down Expand Up @@ -785,7 +785,13 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ
return ctx, nil, errors.New("resolved alerts missing")
}

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
repeat, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}
expiry := 2 * repeat

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry)
}

type timeStage struct {
Expand Down
13 changes: 8 additions & 5 deletions notify/notify_test.go
Expand Up @@ -58,15 +58,15 @@ type testNflog struct {
qres []*nflogpb.Entry
qerr error

logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
}

func (l *testNflog) Query(p ...nflog.QueryParam) ([]*nflogpb.Entry, error) {
return l.qres, l.qerr
}

func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts)
func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts, expiry)
}

func (l *testNflog) GC() (int, error) {
Expand Down Expand Up @@ -553,12 +553,14 @@ func TestSetNotifiesStage(t *testing.T) {
require.NotNil(t, resctx)

ctx = WithResolvedAlerts(ctx, []uint64{})
ctx = WithRepeatInterval(ctx, time.Hour)

tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
require.Equal(t, s.recv, r)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{0, 1, 2}, firingAlerts)
require.Equal(t, []uint64{}, resolvedAlerts)
require.Equal(t, 2*time.Hour, expiry)
return nil
}
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
Expand All @@ -569,11 +571,12 @@ func TestSetNotifiesStage(t *testing.T) {
ctx = WithFiringAlerts(ctx, []uint64{})
ctx = WithResolvedAlerts(ctx, []uint64{0, 1, 2})

tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
require.Equal(t, s.recv, r)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{}, firingAlerts)
require.Equal(t, []uint64{0, 1, 2}, resolvedAlerts)
require.Equal(t, 2*time.Hour, expiry)
return nil
}
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
Expand Down

0 comments on commit b044302

Please sign in to comment.