Library for producing and consuming messages directly from Kafka.
The library is NOT using Zookeeper to connect to Kafka under the hood.
Importing:
import "github.com/Financial-Times/kafka-client-go/v4"
Creating a producer:
config := kafka.ProducerConfig{
BrokersConnectionString: "", // Comma-separated list of Kafka brokers
Topic: "", // Topic to publish to
}
producer, err := kafka.NewProducer(config)
// Error handling
Failing to establish a connection to Kafka will result in an error.
Sending a message:
headers := map[string]string{}
body := ""
message := kafka.NewFTMessage(headers, body)
err := producer.SendMessage(message)
// Error handling
The health of the Kafka cluster can be checked by attempting to establish separate connection with the provided configuration:
err := producer.ConnectivityCheck()
// Error handling
Connections should be closed by the client:
err := producer.Close()
// Error handling
Creating a consumer:
config := kafka.ConsumerConfig{
BrokersConnectionString: "", // Comma-separated list of Kafka brokers
ConsumerGroup: "", // Unique name of a consumer group
}
topics := []*kafka.Topic{
kafka.NewTopic(""), // Topic to consume from
kafka.NewTopic("", kafka.WithLagTolerance(50)), // Topic to consume from with custom lag tolerance used for monitoring
}
logger := logger.NewUPPLogger(...)
consumer, err := kafka.NewConsumer(config, topics, logger)
// Error handling
Failing to establish a connection to Kafka will result in an error.
Consuming messages:
handler := func(message kafka.FTMessage) {
// Message handling
}
consumer.Start(handler)
The health of the Kafka cluster can be checked by attempting to establish separate connection with the provided configuration:
err := consumer.ConnectivityCheck()
// Error handling
The health of the consumer process is also being monitored and its status can be accessed:
err := consumer.MonitorCheck()
// Error handling
Connections should be closed by the client:
err := consumer.Close()
// Error handling
go test --race -v ./...
Nota bene: Some tests in this project require a local Kafka (listening on port 29092).
Use the -short
flag in order to omit those tests.
Or before executing the tests locally, to start kafka cluster in docker containers and satisfy the test requirement, execute command:
docker compose -f docker-compose.yml up -d --build
And map host unknown
to a valid IP address in your local /etc/hosts file (e.g. to IP: 127.0.0.1), as the tests require this host name to be valid.
127.0.0.1 unknown