-
Notifications
You must be signed in to change notification settings - Fork 4
/
consumer_monitor.go
337 lines (277 loc) · 9.78 KB
/
consumer_monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package kafka
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/Financial-Times/go-logger/v2"
"github.com/IBM/sarama"
)
var ErrUnknownConsumerStatus = fmt.Errorf("consumer status is unknown")
const (
maxOffsetFetchInterval = 10 * time.Minute
defaultOffsetFetchInterval = 3 * time.Minute
maxOffsetFetchFailureCount = 10
defaultOffsetFetchFailureCount = 5
uncommittedOffsetValue = -1
)
type topicOffsetFetcher interface {
GetOffset(topic string, partitionID int32, position int64) (int64, error)
}
type consumerOffsetFetcher interface {
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error)
Close() error
}
type offset struct {
Partition int32
// Last committed offset of the consumer group.
Consumer int64
// Next offset to be produced in the topic.
Topic int64
}
type fetcherScheduler struct {
ticker *time.Ticker
standardInterval time.Duration
shortenedInterval time.Duration
failureCount int
maxFailureCount int
}
type consumerMonitor struct {
connectionString string
consumerGroup string
consumerOffsetFetcher consumerOffsetFetcher
topicOffsetFetcher topicOffsetFetcher
scheduler fetcherScheduler
connectionResetDisabled bool
// Key is Topic. Values are Partitions.
subscriptions map[string][]int32
topicsLock *sync.RWMutex
topics []*Topic
unknownStatus bool
logger *logger.UPPLogger
}
func newConsumerMonitor(config ConsumerConfig, consumerFetcher consumerOffsetFetcher, topicFetcher topicOffsetFetcher, topics []*Topic, logger *logger.UPPLogger) *consumerMonitor {
offsetFetchInterval := config.OffsetFetchInterval
if offsetFetchInterval <= 0 || offsetFetchInterval > maxOffsetFetchInterval {
offsetFetchInterval = defaultOffsetFetchInterval
}
maxFailureCount := config.OffsetFetchMaxFailureCount
if maxFailureCount <= 0 || maxFailureCount > maxOffsetFetchFailureCount {
maxFailureCount = defaultOffsetFetchFailureCount
}
return &consumerMonitor{
connectionString: config.BrokersConnectionString,
consumerGroup: config.ConsumerGroup,
scheduler: fetcherScheduler{
standardInterval: offsetFetchInterval,
shortenedInterval: offsetFetchInterval / 3,
maxFailureCount: maxFailureCount,
},
consumerOffsetFetcher: consumerFetcher,
topicOffsetFetcher: topicFetcher,
connectionResetDisabled: config.DisableMonitoringConnectionReset,
subscriptions: map[string][]int32{},
topicsLock: &sync.RWMutex{},
topics: topics,
logger: logger,
}
}
func (m *consumerMonitor) run(ctx context.Context, subscriptionEvents chan *subscriptionEvent) {
m.scheduler.ticker = time.NewTicker(m.scheduler.standardInterval)
defer m.scheduler.ticker.Stop()
log := m.logger.WithField("process", "ConsumerMonitor")
for {
select {
case <-ctx.Done():
// Terminate the fetchers and exit.
// Note that both fetchers share the same connection so calling Close() once is enough.
if err := m.consumerOffsetFetcher.Close(); err != nil {
log.WithError(err).Error("Failed to close offset fetcher connections")
}
log.Info("Terminating consumer monitor...")
return
case event := <-subscriptionEvents:
log.
WithField("subscribed", event.subscribed).
WithField("topic", event.topic).
WithField("partition", event.partition).
Info("Subscription event received")
m.updateSubscriptions(event)
case <-m.scheduler.ticker.C:
if len(m.subscriptions) == 0 {
log.Warn("Consumer is not currently subscribed for any topics")
continue
}
offsets, err := m.fetchOffsets()
if err != nil {
log.WithError(err).Warn("Failed to fetch consumer offsets")
m.scheduler.ticker.Reset(m.scheduler.shortenedInterval)
m.scheduler.failureCount++
if m.scheduler.failureCount < m.scheduler.maxFailureCount {
continue
}
log.Warnf("Fetching offsets failed %d times in a row. Consumer status data is stale.",
m.scheduler.failureCount)
if m.connectionResetDisabled {
m.clearConsumerStatus()
continue
}
// It's necessary to restart the connection due to:
// * bug producing broken pipe errors: https://github.com/Shopify/sarama/issues/1796;
// * consumer rebalancing causing coordinator errors;
// * other issues unknown at this point and time.
log.Info("Attempting to re-establish monitoring connection...")
consumerOffsetFetcher, topicOffsetFetcher, err := newConsumerGroupOffsetFetchers(m.connectionString)
if err != nil {
log.WithError(err).Error("Failed to establish new monitoring connection")
m.clearConsumerStatus()
continue
}
// Terminate the old connection and replace it.
_ = m.consumerOffsetFetcher.Close()
m.consumerOffsetFetcher = consumerOffsetFetcher
m.topicOffsetFetcher = topicOffsetFetcher
log.Info("Established new monitoring connection")
// Re-attempt to fetch offsets and clear the status on failure.
offsets, err = m.fetchOffsets()
if err != nil {
log.WithError(err).Error("Failed to fetch consumer offsets after resetting the connection")
m.clearConsumerStatus()
continue
}
}
log.WithField("offsets", offsets).Debug("Offsets fetched successfully")
m.updateConsumerStatus(offsets)
if m.scheduler.failureCount != 0 {
// Revert the scheduling change.
m.scheduler.failureCount = 0
m.scheduler.ticker.Reset(m.scheduler.standardInterval)
}
}
}
}
func (m *consumerMonitor) updateSubscriptions(event *subscriptionEvent) {
if event.subscribed {
m.subscriptions[event.topic] = append(m.subscriptions[event.topic], event.partition)
return
}
partitions, ok := m.subscriptions[event.topic]
if !ok {
return
}
for i, p := range partitions {
if p == event.partition {
partitions = append(partitions[:i], partitions[i+1:]...)
break
}
}
if len(partitions) == 0 {
delete(m.subscriptions, event.topic)
return
}
m.subscriptions[event.topic] = partitions
}
// Fetch and return the latest committed offsets by the consumer group
// as well as those at the end of the log for the respective topics.
func (m *consumerMonitor) fetchOffsets() (map[string][]offset, error) {
fetchedOffsets, err := m.consumerOffsetFetcher.ListConsumerGroupOffsets(m.consumerGroup, m.subscriptions)
if err != nil {
return nil, fmt.Errorf("error fetching consumer group offsets from client: %w", err)
}
if fetchedOffsets.Err != sarama.ErrNoError {
return nil, fmt.Errorf("error fetching consumer group offsets from server: %w", fetchedOffsets.Err)
}
topicOffsets := map[string][]offset{}
for topic := range m.subscriptions {
partitions, ok := fetchedOffsets.Blocks[topic]
if !ok {
return nil, fmt.Errorf("requested consumer offsets for topic %q were not fetched", topic)
}
var offsets []offset
for partition, block := range partitions {
if block.Err != sarama.ErrNoError {
return nil, fmt.Errorf("error fetching consumer group offsets for partition %d of topic %q from server: %w",
partition, topic, block.Err)
}
topicOffset, err := m.topicOffsetFetcher.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
return nil, fmt.Errorf("error fetching topic offset for partition %d of topic %q: %w",
partition, topic, err)
}
offsets = append(offsets, offset{
Partition: partition,
Consumer: block.Offset,
Topic: topicOffset,
})
}
topicOffsets[topic] = offsets
}
return topicOffsets, nil
}
func (m *consumerMonitor) updateConsumerStatus(fetchedOffsets map[string][]offset) {
m.topicsLock.Lock()
defer m.topicsLock.Unlock()
for _, topic := range m.topics {
offsets, ok := fetchedOffsets[topic.Name]
if !ok {
// No active subscriptions for the given topic at the time.
topic.partitionLag = make(map[int32]*int64)
continue
}
for _, offset := range offsets {
// Only store the lag if it exceeds the configured threshold or is invalid.
// The next offset in the topic will always be greater than or equal to the last committed offset of the consumer.
// uncommittedOffsets flag is set to true in case the consumer group hasn't committed any offsets yet (returned value is -1) to prevent reporting wrong values to the client
if offset.Consumer == uncommittedOffsetValue {
topic.partitionLag[offset.Partition] = nil
} else {
lag := offset.Topic - offset.Consumer
if lag > topic.lagTolerance || lag < 0 {
topic.partitionLag[offset.Partition] = &lag
} else {
// Clear the latest lag entry to flag the partition as healthy.
delete(topic.partitionLag, offset.Partition)
}
}
}
}
m.unknownStatus = false
}
func (m *consumerMonitor) clearConsumerStatus() {
m.topicsLock.Lock()
defer m.topicsLock.Unlock()
m.unknownStatus = true
for _, topic := range m.topics {
topic.partitionLag = make(map[int32]*int64)
}
}
func (m *consumerMonitor) consumerStatus() error {
m.topicsLock.RLock()
defer m.topicsLock.RUnlock()
if m.unknownStatus {
return ErrUnknownConsumerStatus
}
var statusMessages []string
for _, topic := range m.topics {
for partition, lag := range topic.partitionLag {
var message string
if lag == nil {
message = fmt.Sprintf("could not determine lag for partition %d of topic %q due to uncompleted initial offset commit", partition, topic.Name)
} else {
if *lag < 0 {
message = fmt.Sprintf("could not determine lag for partition %d of topic %q", partition, topic.Name)
} else {
message = fmt.Sprintf("consumer is lagging behind for partition %d of topic %q with %d messages", partition, topic.Name, *lag)
}
}
statusMessages = append(statusMessages, message)
}
}
if len(statusMessages) == 0 {
return nil
}
sort.Strings(statusMessages)
return fmt.Errorf("consumer is not healthy: %s", strings.Join(statusMessages, " ; "))
}