Go 使用 sarama 包操作 kafka
在网上看了许多关于 Go kafka 的基础使用教程,都不是很满意,有的写的太深如解决重复消费,有的写的太浅就复制了一段代码就结束了。所以我打算自己整理出一篇详细的基础使用教程,附上详细的注释与代码,以供自己复习,也方便读者查阅观看。
前言
本教程会使用 sarama 包来操作 kafka,sarama 是一个完全由 Go 实现的 kafka 客户端,在 github 上 star 很多,建议使用。安装 sarama:
go get github.com/Shopify/sarama
在开始之前,我们先提出几个要解决的问题,然后在下文来一一处理:
- 创建一个 kafka 客户端;
- 生产一条信息;
- 消费一条信息;
- 使用消费者组;
- 消息体 Key 的使用;
- 异步生产消息。
全局常量
const (
topic = "kafka_one"
group = "kafka_one_1"
host = "192.168.10.43:9092,192.168.10.43:9093,192.168.10.43:9094"
)
我们先约定好三个全局常量,topic 是我们要使用的主题名,group 是我们要使用的消费者组名,host 是 kafka breker 节点地址。
创建 kafka 客户端
// 获取 kafka 节点信息
func getAddr() []string {
return strings.Split(host, ",")
}
// 创建 kafka 客户端连接的配置
func getConf() *sarama.Config {
conf := sarama.NewConfig()
// 生产消息后是否需要通知生产者
// 同步模式会直接返回
// 异步模式会返回到Successes和Errors通道中
conf.Producer.Return.Successes = true
return conf
}
// 创建 kafka 客户端,并返回客户端链接供生产和消费使用
func newClient() sarama.Client {
client, err := sarama.NewClient(getAddr(), getConf())
if err != nil {
panic(err)
}
return client
}
通过 newClient 函数即可创建 kafka 的客户端链接。
生产一条信息
func TestProducer(t *testing.T) {
client := newClient()
// 创建一个同步模式的生产者链接
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
t.Fatal(err)
}
defer producer.Close()
// 构建一个消息体,准备发送至 kafka
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("hello kafka"),
}
// 发送消息
partition, offset, err := producer.SendMessage(msg)
if err != nil {
t.Fatal(err)
}
log.Printf("发送消息成功: topic=%s, partition=%d, offset=%d\n", topic, partition, offset)
}
// 结果
2023/07/09 00:43:59 发送消息成功: topic=kafka_one, partition=1, offset=0
可以看到,我们发送的消息落在分区1上,偏移量是0,接下来我们启用一个消费者,来消费 kafka_one topic 分区1中的数据。
消费信息
func TestConsumer(t *testing.T) {
client := newClient()
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
t.Fatal(err)
}
// 消费分区=1 offset=0 的数据
consumePartition, err := consumer.ConsumePartition(topic, int32(1), 0)
if err != nil {
return
}
// 启用一个协程,持续监听队列中的数据,队列中的数据会通过 Messages 发过来
go func() {
for msg := range consumePartition.Messages() {
fmt.Printf("接收topic=%s, partition=%d, offset=%d, value=%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
}
}()
for {
}
}
// 结果
接收topic=kafka_one, partition=1, offset=0, value=hello kafka
执行此函数后,这个消费者就会持续监听分区1中的数据,我们使用生产者生产到分区1上的数据会被其监听到,如果生产到其他分区,此消费者就无能为力了。这个时候我们需要消费者组来消费一个 topic 上所有分区的数据。
消费者组
使用消费者组前,我们必须要构建一个结构体实现 sarama 提供的 ConsumerGroupHandler 接口,用来处理消费者组的创建、销毁与监听。其接口定义如下:
type ConsumerGroupHandler interface {
// 回调函数,在消费者启动前执行一些操作
Setup(ConsumerGroupSession) error
// 回调函数,在消费者关闭时执行的操作
Cleanup(ConsumerGroupSession) error
// 回调函数,当队列中有消息时会触发,是我们处理消息的地方
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
定义一个结构体,实现 ConsumerGroupHandler:
type ConsumerGrp struct {
}
func (c *ConsumerGrp) Setup(session sarama.ConsumerGroupSession) error {
fmt.Println("消费者启动")
return nil
}
func (c *ConsumerGrp) Cleanup(session sarama.ConsumerGroupSession) error {
fmt.Println("消费者关闭")
return nil
}
func (c *ConsumerGrp) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("接收topic=%s, partition=%d, offset=%d, value=%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
}
return nil
}
启用消费者组,监听数据:
func TestConsumerGrp(t *testing.T) {
client := newClient()
ctx := context.Background()
consumerGroup, err := sarama.NewConsumerGroupFromClient(group, client)
if err != nil {
return
}
defer consumerGroup.Close()
consumer := &Consumer{}
go func() {
for {
err := consumerGroup.Consume(ctx, []string{topic}, consumer)
if err != nil {
log.Fatal(err)
}
}
}()
for {
}
}
此时,我们就可以使用消费组来监听所有分区的数据了。
消息体的 Key
我们在使用生产者发送消息时,消息会落在不同的分区上,如果我们需要将一类消息发送到一个分区时,就需要使用到消息体的 key,修改生产者消息体,加上 key:
func TestProducer(t *testing.T) {
client := newClient()
// 创建一个同步模式的生产者链接
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
t.Fatal(err)
}
defer producer.Close()
// 构建一个消息体,准备发送至 kafka
msg := &sarama.ProducerMessage{
Topic: topic,
// 加上key
Key: sarama.StringEncoder("my_key"),
Value: sarama.StringEncoder("hello kafka"),
}
// 发送消息
partition, offset, err := producer.SendMessage(msg)
if err != nil {
t.Fatal(err)
}
log.Printf("发送消息成功: topic=%s, partition=%d, offset=%d\n", topic, partition, offset)
}
同一个 key 发送的消息体会落在同一个分区上。我们可以多发送几次测试:
2023/07/09 16:47:21 发送消息成功: topic=kafka_one, partition=3, offset=8
2023/07/09 16:47:24 发送消息成功: topic=kafka_one, partition=3, offset=8
2023/07/09 16:47:27 发送消息成功: topic=kafka_one, partition=3, offset=8
异步生产信息
异步生产相比同步生产可以提高吞吐量降低延迟,但存在丢失消息的风险,会增加错误处理的复杂性,对于要求消息可靠的场景下,建议使用同步模式,对吞吐和延迟比较在意的话,可以考虑使用异步模式。异步模式通过通道来确认消息是否发送成功/失败,实例代码:
// 异步发送消息
func TestAsyncProducer(t *testing.T) {
client := newClient()
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
t.Fatal(err)
}
defer producer.AsyncClose()
go func() {
for {
select {
// 从通道中监听成功信息
case msg := <-producer.Successes():
fmt.Printf("发送成功, topic=%s, partition=%d, offset=%d\n", msg.Topic, msg.Partition, msg.Offset)
// 从通道中监听失败信息信息
case err := <-producer.Errors():
fmt.Printf("发送消息失败: %s\n", err.Error())
}
}
}()
// 构建消息体发送
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("async value"),
}
producer.Input() <- msg
// 等待1秒,确保消息发送成功
time.Sleep(1 * time.Second)
}
到这里,sarama 的使用就告一段落了,希望能对你有些帮助!
本文目录