Skip to content

Commit

Permalink
Merge pull request #2982 from roidelapluie/expiresoon
Browse files Browse the repository at this point in the history
Expires notify log sooner when possible
  • Loading branch information
roidelapluie committed Oct 19, 2022
2 parents 1045dc0 + b044302 commit 21ca295
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
9 changes: 7 additions & 2 deletions nflog/nflog.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func stateKey(k string, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, receiverKey(r))
}

func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
// Write all st with the same timestamp.
now := l.now()
key := stateKey(gkey, r)
Expand All @@ -415,6 +415,11 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
}
}

expiresAt := now.Add(l.retention)
if expiry > 0 && l.retention > expiry {
expiresAt = now.Add(expiry)
}

e := &pb.MeshEntry{
Entry: &pb.Entry{
Receiver: r,
Expand All @@ -423,7 +428,7 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
FiringAlerts: firingAlerts,
ResolvedAlerts: resolvedAlerts,
},
ExpiresAt: now.Add(l.retention),
ExpiresAt: expiresAt,
}

b, err := marshalMeshEntry(e)
Expand Down
2 changes: 1 addition & 1 deletion nflog/nflog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestQuery(t *testing.T) {
firingAlerts := []uint64{1, 2, 3}
resolvedAlerts := []uint64{4, 5}

err = nl.Log(recv, "key", firingAlerts, resolvedAlerts)
err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0)
require.NoError(t, err, "logging notification failed")

entries, err := nl.Query(QGroupKey("key"), QReceiver(recv))
Expand Down
10 changes: 8 additions & 2 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Aler
}

type NotificationLog interface {
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error)
}

Expand Down Expand Up @@ -785,7 +785,13 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ
return ctx, nil, errors.New("resolved alerts missing")
}

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
repeat, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}
expiry := 2 * repeat

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry)
}

type timeStage struct {
Expand Down
13 changes: 8 additions & 5 deletions notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ type testNflog struct {
qres []*nflogpb.Entry
qerr error

logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
}

func (l *testNflog) Query(p ...nflog.QueryParam) ([]*nflogpb.Entry, error) {
return l.qres, l.qerr
}

func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts)
func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts, expiry)
}

func (l *testNflog) GC() (int, error) {
Expand Down Expand Up @@ -553,12 +553,14 @@ func TestSetNotifiesStage(t *testing.T) {
require.NotNil(t, resctx)

ctx = WithResolvedAlerts(ctx, []uint64{})
ctx = WithRepeatInterval(ctx, time.Hour)

tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
require.Equal(t, s.recv, r)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{0, 1, 2}, firingAlerts)
require.Equal(t, []uint64{}, resolvedAlerts)
require.Equal(t, 2*time.Hour, expiry)
return nil
}
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
Expand All @@ -569,11 +571,12 @@ func TestSetNotifiesStage(t *testing.T) {
ctx = WithFiringAlerts(ctx, []uint64{})
ctx = WithResolvedAlerts(ctx, []uint64{0, 1, 2})

tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
require.Equal(t, s.recv, r)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{}, firingAlerts)
require.Equal(t, []uint64{0, 1, 2}, resolvedAlerts)
require.Equal(t, 2*time.Hour, expiry)
return nil
}
resctx, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
Expand Down

0 comments on commit 21ca295

Please sign in to comment.