Skip to content

Commit

Permalink
Fix race conditions in the memory alerts store (#3648)
Browse files Browse the repository at this point in the history
* Fix race conditions in the memory alerts store

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* Expose the GC method from store.Alerts

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* Use RLock/Unlock on read path

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* Resolve conflicts

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* release locks by using the defer

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* Revert the RWMutex back to Mutex

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

---------

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
  • Loading branch information
damnever committed May 16, 2024
1 parent df70411 commit 91a94f0
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 29 deletions.
79 changes: 52 additions & 27 deletions provider/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ const alertChannelLength = 200
type Alerts struct {
cancel context.CancelFunc

mtx sync.Mutex

alerts *store.Alerts
marker types.AlertMarker

mtx sync.Mutex
listeners map[int]listeningAlerts
next int

Expand Down Expand Up @@ -100,37 +101,53 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio
logger: log.With(l, "component", "provider"),
callback: alertCallback,
}
a.alerts.SetGCCallback(func(alerts []types.Alert) {
for _, alert := range alerts {
// As we don't persist alerts, we no longer consider them after
// they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly.
m.Delete(alert.Fingerprint())
a.callback.PostDelete(&alert)
}

a.mtx.Lock()
for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
}
a.mtx.Unlock()
})

if r != nil {
a.registerMetrics(r)
}

go a.alerts.Run(ctx, intervalGC)
go a.gcLoop(ctx, intervalGC)

return a, nil
}

func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
a.gc()
}
}
}

func (a *Alerts) gc() {
a.mtx.Lock()
defer a.mtx.Unlock()

deleted := a.alerts.GC()
for _, alert := range deleted {
// As we don't persist alerts, we no longer consider them after
// they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly.
a.marker.Delete(alert.Fingerprint())
a.callback.PostDelete(&alert)
}

for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
}
}

// Close the alert provider.
func (a *Alerts) Close() {
if a.cancel != nil {
Expand Down Expand Up @@ -174,11 +191,13 @@ func (a *Alerts) GetPending() provider.AlertIterator {
ch = make(chan *types.Alert, alertChannelLength)
done = make(chan struct{})
)
a.mtx.Lock()
defer a.mtx.Unlock()
alerts := a.alerts.List()

go func() {
defer close(ch)

for _, a := range a.alerts.List() {
for _, a := range alerts {
select {
case ch <- a:
case <-done:
Expand All @@ -192,11 +211,16 @@ func (a *Alerts) GetPending() provider.AlertIterator {

// Get returns the alert for a given fingerprint.
func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.alerts.Get(fp)
}

// Put adds the given alert to the set.
func (a *Alerts) Put(alerts ...*types.Alert) error {
a.mtx.Lock()
defer a.mtx.Unlock()

for _, alert := range alerts {
fp := alert.Fingerprint()

Expand Down Expand Up @@ -226,21 +250,22 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {

a.callback.PostStore(alert, existing)

a.mtx.Lock()
for _, l := range a.listeners {
select {
case l.alerts <- alert:
case <-l.done:
}
}
a.mtx.Unlock()
}

return nil
}

// count returns the number of non-resolved alerts we currently have stored filtered by the provided state.
func (a *Alerts) count(state types.AlertState) int {
a.mtx.Lock()
defer a.mtx.Unlock()

var count int
for _, alert := range a.alerts.List() {
if alert.Resolved() {
Expand Down
60 changes: 60 additions & 0 deletions provider/mem/mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package mem

import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -561,3 +562,62 @@ func (l *limitCountCallback) PostStore(_ *types.Alert, existing bool) {
func (l *limitCountCallback) PostDelete(_ *types.Alert) {
l.alerts.Dec()
}

func TestAlertsConcurrently(t *testing.T) {
callback := &limitCountCallback{limit: 100}
a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), time.Millisecond, callback, log.NewNopLogger(), nil)
require.NoError(t, err)

stopc := make(chan struct{})
failc := make(chan struct{})
go func() {
time.Sleep(2 * time.Second)
close(stopc)
}()
expire := 10 * time.Millisecond
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()

j := 0
for {
select {
case <-failc:
return
case <-stopc:
return
default:
}
now := time.Now()
err := a.Put(&types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"bar": model.LabelValue(strconv.Itoa(j))},
StartsAt: now,
EndsAt: now.Add(expire),
},
UpdatedAt: now,
})
if err != nil && !errors.Is(err, errTooManyAlerts) {
close(failc)
return
}
j++
}
}()
}
wg.Wait()
select {
case <-failc:
t.Fatalf("unexpected error happened")
default:
}

time.Sleep(expire)
require.Eventually(t, func() bool {
// When the alert will eventually expire and is considered resolved - it won't count.
return a.count(types.AlertStateActive) == 0
}, 2*expire, expire)
require.Equal(t, int32(0), callback.alerts.Load())
}
6 changes: 4 additions & 2 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ func (a *Alerts) Run(ctx context.Context, interval time.Duration) {
case <-ctx.Done():
return
case <-t.C:
a.gc()
a.GC()
}
}
}

func (a *Alerts) gc() {
// GC deletes resolved alerts and returns them.
func (a *Alerts) GC() []types.Alert {
a.Lock()
var resolved []types.Alert
for fp, alert := range a.c {
Expand All @@ -90,6 +91,7 @@ func (a *Alerts) gc() {
}
a.Unlock()
a.cb(resolved)
return resolved
}

// Get returns the Alert with the matching fingerprint, or an error if it is
Expand Down

0 comments on commit 91a94f0

Please sign in to comment.