oldme 博客

回首向来萧瑟处,归去,也无风雨也无晴

Go 使用 sarama 包操作 kafka

oldme create: 2023-07-08

在网上看了许多关于 Go kafka 的基础使用教程,都不是很满意,有的写的太深如解决重复消费,有的写的太浅就复制了一段代码就结束了。所以我打算自己整理出一篇详细的基础使用教程,附上详细的注释与代码,以供自己复习,也方便读者查阅观看。

前言

本教程会使用 sarama 包来操作 kafka,sarama 是一个完全由 Go 实现的 kafka 客户端,在 github 上 star 很多,建议使用。安装 sarama:

go get github.com/Shopify/sarama

在开始之前,我们先提出几个要解决的问题,然后在下文来一一处理:

  1. 创建一个 kafka 客户端;
  2. 生产一条信息;
  3. 消费一条信息;
  4. 使用消费者组;
  5. 消息体 Key 的使用;
  6. 异步生产消息。

全局常量

const (
	topic = "kafka_one"
	group = "kafka_one_1"
	host  = "192.168.10.43:9092,192.168.10.43:9093,192.168.10.43:9094"
)

我们先约定好三个全局常量,topic 是我们要使用的主题名,group 是我们要使用的消费者组名,host 是 kafka breker 节点地址。

创建 kafka 客户端

// 获取 kafka 节点信息
func getAddr() []string {
	return strings.Split(host, ",")
}

// 创建 kafka 客户端连接的配置
func getConf() *sarama.Config {
	conf := sarama.NewConfig()
	// 生产消息后是否需要通知生产者
	// 同步模式会直接返回
	// 异步模式会返回到Successes和Errors通道中
	conf.Producer.Return.Successes = true
	return conf
}

// 创建 kafka 客户端,并返回客户端链接供生产和消费使用
func newClient() sarama.Client {
	client, err := sarama.NewClient(getAddr(), getConf())
	if err != nil {
		panic(err)
	}
	return client
}

通过 newClient 函数即可创建 kafka 的客户端链接。

生产一条信息

func TestProducer(t *testing.T) {
	client := newClient()

	// 创建一个同步模式的生产者链接
	producer, err := sarama.NewSyncProducerFromClient(client)
	if err != nil {
		t.Fatal(err)
	}
	defer producer.Close()

	// 构建一个消息体,准备发送至 kafka
	msg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder("hello kafka"),
	}

	// 发送消息
	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		t.Fatal(err)
	}
	log.Printf("发送消息成功: topic=%s, partition=%d, offset=%d\n", topic, partition, offset)
}

// 结果
2023/07/09 00:43:59 发送消息成功: topic=kafka_one, partition=1, offset=0

可以看到,我们发送的消息落在分区1上,偏移量是0,接下来我们启用一个消费者,来消费 kafka_one topic 分区1中的数据。

消费信息

func TestConsumer(t *testing.T) {
	client := newClient()

	consumer, err := sarama.NewConsumerFromClient(client)
	if err != nil {
		t.Fatal(err)
	}

	// 消费分区=1 offset=0 的数据
	consumePartition, err := consumer.ConsumePartition(topic, int32(1), 0)
	if err != nil {
		return
	}

	// 启用一个协程,持续监听队列中的数据,队列中的数据会通过 Messages 发过来
	go func() {
		for msg := range consumePartition.Messages() {
			fmt.Printf("接收topic=%s, partition=%d, offset=%d, value=%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
		}
	}()

	for {
		
	}
}

// 结果
接收topic=kafka_one, partition=1, offset=0, value=hello kafka

执行此函数后,这个消费者就会持续监听分区1中的数据,我们使用生产者生产到分区1上的数据会被其监听到,如果生产到其他分区,此消费者就无能为力了。这个时候我们需要消费者组来消费一个 topic 上所有分区的数据。

消费者组

使用消费者组前,我们必须要构建一个结构体实现 sarama 提供的 ConsumerGroupHandler 接口,用来处理消费者组的创建、销毁与监听。其接口定义如下:

type ConsumerGroupHandler interface {
	// 回调函数,在消费者启动前执行一些操作
	Setup(ConsumerGroupSession) error

	// 回调函数,在消费者关闭时执行的操作
	Cleanup(ConsumerGroupSession) error

	// 回调函数,当队列中有消息时会触发,是我们处理消息的地方
	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

定义一个结构体,实现 ConsumerGroupHandler:

type ConsumerGrp struct {
}

func (c *ConsumerGrp) Setup(session sarama.ConsumerGroupSession) error {
	fmt.Println("消费者启动")
	return nil
}

func (c *ConsumerGrp) Cleanup(session sarama.ConsumerGroupSession) error {
	fmt.Println("消费者关闭")
	return nil
}

func (c *ConsumerGrp) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("接收topic=%s, partition=%d, offset=%d, value=%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
	}
	return nil
}

启用消费者组,监听数据:

func TestConsumerGrp(t *testing.T) {
	client := newClient()
	ctx := context.Background()
	consumerGroup, err := sarama.NewConsumerGroupFromClient(group, client)
	if err != nil {
		return
	}
	defer consumerGroup.Close()

	consumer := &Consumer{}
	go func() {
		for {
			err := consumerGroup.Consume(ctx, []string{topic}, consumer)
			if err != nil {
				log.Fatal(err)
			}
		}
	}()

	for {

	}
}

此时,我们就可以使用消费组来监听所有分区的数据了。

消息体的 Key

我们在使用生产者发送消息时,消息会落在不同的分区上,如果我们需要将一类消息发送到一个分区时,就需要使用到消息体的 key,修改生产者消息体,加上 key:

func TestProducer(t *testing.T) {
	client := newClient()

	// 创建一个同步模式的生产者链接
	producer, err := sarama.NewSyncProducerFromClient(client)
	if err != nil {
		t.Fatal(err)
	}
	defer producer.Close()

	// 构建一个消息体,准备发送至 kafka
	msg := &sarama.ProducerMessage{
		Topic: topic,
		// 加上key
		Key:   sarama.StringEncoder("my_key"),
		Value: sarama.StringEncoder("hello kafka"),
	}

	// 发送消息
	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		t.Fatal(err)
	}
	log.Printf("发送消息成功: topic=%s, partition=%d, offset=%d\n", topic, partition, offset)
}

同一个 key 发送的消息体会落在同一个分区上。我们可以多发送几次测试:

2023/07/09 16:47:21 发送消息成功: topic=kafka_one, partition=3, offset=8
2023/07/09 16:47:24 发送消息成功: topic=kafka_one, partition=3, offset=8
2023/07/09 16:47:27 发送消息成功: topic=kafka_one, partition=3, offset=8

异步生产信息

异步生产相比同步生产可以提高吞吐量降低延迟,但存在丢失消息的风险,会增加错误处理的复杂性,对于要求消息可靠的场景下,建议使用同步模式,对吞吐和延迟比较在意的话,可以考虑使用异步模式。异步模式通过通道来确认消息是否发送成功/失败,实例代码:

// 异步发送消息
func TestAsyncProducer(t *testing.T) {
	client := newClient()
	producer, err := sarama.NewAsyncProducerFromClient(client)
	if err != nil {
		t.Fatal(err)
	}
	defer producer.AsyncClose()

	go func() {
		for {
			select {
			// 从通道中监听成功信息
			case msg := <-producer.Successes():
				fmt.Printf("发送成功, topic=%s, partition=%d, offset=%d\n", msg.Topic, msg.Partition, msg.Offset)
			// 从通道中监听失败信息信息
			case err := <-producer.Errors():
				fmt.Printf("发送消息失败: %s\n", err.Error())
			}
		}
	}()

	// 构建消息体发送
	msg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder("async value"),
	}

	producer.Input() <- msg

	// 等待1秒,确保消息发送成功
	time.Sleep(1 * time.Second)
}

到这里,sarama 的使用就告一段落了,希望能对你有些帮助!

评论

欢迎您的回复 取消回复

您的邮箱不会显示出来,*必填

本文目录