-
Notifications
You must be signed in to change notification settings - Fork 4
/
pubsub_test.go
62 lines (48 loc) · 1.38 KB
/
pubsub_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package kafka
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestE2EPubSub(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test as it requires a connection to Kafka.")
}
const e2eTestTopic = "e2eTestTopic"
producer := newTestProducer(t, e2eTestTopic)
require.NoError(t, producer.ConnectivityCheck())
producedMessages := []FTMessage{
{
Headers: map[string]string{},
Body: "message 1",
Topic: e2eTestTopic,
},
{
Headers: map[string]string{},
Body: "message 2",
Topic: e2eTestTopic,
},
}
consumedMessagesLock := &sync.RWMutex{}
consumedMessages := []FTMessage{}
messageHandler := func(message FTMessage) {
consumedMessagesLock.Lock()
consumedMessages = append(consumedMessages, message)
consumedMessagesLock.Unlock()
}
consumer := newTestConsumer(t, e2eTestTopic)
require.NoError(t, consumer.ConnectivityCheck())
consumer.Start(messageHandler)
time.Sleep(5 * time.Second) // Let partition claiming take place.
for _, message := range producedMessages {
assert.NoError(t, producer.SendMessage(message))
}
time.Sleep(5 * time.Second) // Let message handling take place.
consumedMessagesLock.RLock()
assert.Equal(t, producedMessages, consumedMessages)
consumedMessagesLock.RUnlock()
assert.NoError(t, producer.Close())
assert.NoError(t, consumer.Close())
}