Kafka(Go)教程(五)---Producer-Consumer API 基本使用
本文主要讲解其中的 Producer API 和 Consumer API 在 Go Client sarama 中的使基本使用以及注意事项。
1. 概述
Kakfa 相关代码见 Github
Kafka 有 5 个核心 API:
- Producer API
- Consumer API
- Stream API
- Connect API
- Admin API
在 Go sarama 客户端中暂时只实现了 Producer、Consumer、Admin 3 个API。
其中 Stream API 已经明确表示不会支持,Connect 未知。
2. Producer API
Kafka 中生产者分为同步生产者和异步生产者。
顾名思义,同步生产者每条消息都会实时发送到 Kafka,而异步生产者则为了提升性能,会等待存了一批消息或者到了指定间隔时间才会一次性发送到 Kafka。
Async Producer
sarama 中异步生产者使用 Demo 如下
func Producer(topic string, limit int) {
config := sarama.NewConfig()
// 异步生产者不建议把 Errors 和 Successes 都开启,一般开启 Errors 就行
// 同步生产者就必须都开启,因为会同步返回发送成功或者失败
config.Producer.Return.Errors = true // 设定是否需要返回错误信息
config.Producer.Return.Successes = true // 设定是否需要返回成功信息
producer, err := sarama.NewAsyncProducer([]string{conf.HOST}, config)
if err != nil {
log.Fatal("NewSyncProducer err:", err)
}
var (
wg sync.WaitGroup
enqueued, timeout, successes, errors int
)
// [!important] 异步生产者发送后必须把返回值从 Errors 或者 Successes 中读出来 不然会阻塞 sarama 内部处理逻辑 导致只能发出去一条消息
wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
// log.Printf("[Producer] Success: key:%v msg:%+v \n", s.Key, s.Value)
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for e := range producer.Errors() {
log.Printf("[Producer] Errors:err:%v msg:%+v \n", e.Msg, e.Err)
errors++
}
}()
// 异步发送
for i := 0; i < limit; i++ {
str := strconv.Itoa(int(time.Now().UnixNano()))
msg := &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(str)}
// 异步发送只是写入内存了就返回了,并没有真正发送出去
// sarama 库中用的是一个 channel 来接收,后台 goroutine 异步从该 channel 中取出消息并真正发送
// select + ctx 做超时控制,防止阻塞 producer.Input() <- msg 也可能会阻塞
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
select {
case producer.Input() <- msg:
enqueued++
case <-ctx.Done():
timeout++
}
cancel()
if i%10000 == 0 && i != 0 {
log.Printf("已发送消息数:%d 超时数:%d\n", i, timeout)
}
}
// We are done
producer.AsyncClose()
wg.Wait()
log.Printf("发送完毕 总发送条数:%d enqueued:%d timeout:%d successes: %d errors: %d\n", limit, enqueued, timeout, successes, errors)
}
注意点:
异步生产者只需要将消息发送到 chan 就会返回,同样的具体的响应包括 Success 或者 Errors 也是通过 chan 异步返回的。
必须把返回值从 Errors 或者 Successes 中读出来 不然会阻塞 producer.Input()
Sync Producer
同步生产者就更简单了:
func Producer(topic string, limit int) {
config := sarama.NewConfig()
// 同步生产者必须同时开启 Return.Successes 和 Return.Errors
// 因为同步生产者在发送之后就必须返回状态,所以需要两个都返回
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true // 这个默认值就是 true 可以不用手动 赋值
// 同步生产者和异步生产者逻辑是一致的,Success或者Errors都是通过channel返回的,
// 只是同步生产者封装了一层,等channel返回之后才返回给调用者
// 具体见 sync_producer.go 文件72行 newSyncProducerFromAsyncProducer 方法
// 内部启动了两个 goroutine 分别处理Success Channel 和 Errors Channel
// 同步生产者内部就是封装的异步生产者
// type syncProducer struct {
// producer *asyncProducer
// wg sync.WaitGroup
// }
producer, err := sarama.NewSyncProducer([]string{conf.HOST}, config)
if err != nil {
log.Fatal("NewSyncProducer err:", err)
}
defer producer.Close()
var successes, errors int
for i := 0; i < limit; i++ {
str := strconv.Itoa(int(time.Now().UnixNano()))
msg := &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(str)}
partition, offset, err := producer.SendMessage(msg) // 发送逻辑也是封装的异步发送逻辑,可以理解为将异步封装成了同步
if err != nil {
log.Printf("SendMessage:%d err:%v\n ", i, err)
errors++
continue
}
successes++
log.Printf("[Producer] partitionid: %d; offset:%d, value: %s\n", partition, offset, str)
}
log.Printf("发送完毕 总发送条数:%d successes: %d errors: %d\n", limit, successes, errors)
}
注意点:
必须同时开启 Return.Successes 和 Return.Errors
3. Consumer API
Kafka 中消费者分为独立消费者和消费者组。
StandaloneConsumer
// SinglePartition 单分区消费
func SinglePartition(topic string) {
config := sarama.NewConfig()
consumer, err := sarama.NewConsumer([]string{conf.HOST}, config)
if err != nil {
log.Fatal("NewConsumer err: ", err)
}
defer consumer.Close()
// 参数1 指定消费哪个 topic
// 参数2 分区 这里默认消费 0 号分区 kafka 中有分区的概念,类似于ES和MongoDB中的sharding,MySQL中的分表这种
// 参数3 offset 从哪儿开始消费起走,正常情况下每次消费完都会将这次的offset提交到kafka,然后下次可以接着消费,
// 这里demo就从最新的开始消费,即该 consumer 启动之前产生的消息都无法被消费
// 如果改为 sarama.OffsetOldest 则会从最旧的消息开始消费,即每次重启 consumer 都会把该 topic 下的所有消息消费一次
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
log.Fatal("ConsumePartition err: ", err)
}
defer partitionConsumer.Close()
// 会一直阻塞在这里
for message := range partitionConsumer.Messages() {
log.Printf("[Consumer] partitionid: %d; offset:%d, value: %s\n", message.Partition, message.Offset, string(message.Value))
}
}
// Partitions 多分区消费
func Partitions(topic string) {
config := sarama.NewConfig()
consumer, err := sarama.NewConsumer([]string{conf.HOST}, config)
if err != nil {
log.Fatal("NewConsumer err: ", err)
}
defer consumer.Close()
// 先查询该 topic 有多少分区
partitions, err := consumer.Partitions(topic)
if err != nil {
log.Fatal("Partitions err: ", err)
}
var wg sync.WaitGroup
wg.Add(len(partitions))
// 然后每个分区开一个 goroutine 来消费
for _, partitionId := range partitions {
go consumeByPartition(consumer, topic, partitionId, &wg)
}
wg.Wait()
}
func consumeByPartition(consumer sarama.Consumer, topic string, partitionId int32, wg *sync.WaitGroup) {
defer wg.Done()
partitionConsumer, err := consumer.ConsumePartition(topic, partitionId, sarama.OffsetOldest)
if err != nil {
log.Fatal("ConsumePartition err: ", err)
}
defer partitionConsumer.Close()
for message := range partitionConsumer.Messages() {
log.Printf("[Consumer] partitionid: %d; offset:%d, value: %s\n", message.Partition, message.Offset, string(message.Value))
}
}
反复运行上面的 Demo 会发现,每次都会从第 1 条消息开始消费,一直到消费完全部消息。
这不是妥妥的重复消费吗?
Kafka 和其他 MQ 最大的区别在于 Kafka 中的消息在消费后不会被删除,而是会一直保留,直到过期。
为了防止每次重启消费者都从第 1 条消息开始消费,我们需要在消费消息后将 offset 提交给 Kafka。这样重启后就可以接着上次的 Offset 继续消费了。
OffsetManager
在独立消费者中没有实现提交 Offset 的功能,所以我们需要借助 OffsetManager 来完成。
func OffsetManager(topic string) {
config := sarama.NewConfig()
// 配置开启自动提交 offset,这样 samara 库会定时帮我们把最新的 offset 信息提交给 kafka
config.Consumer.Offsets.AutoCommit.Enable = true // 开启自动 commit offset
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 自动 commit时间间隔
client, err := sarama.NewClient([]string{conf.HOST}, config)
if err != nil {
log.Fatal("NewClient err: ", err)
}
defer client.Close()
// offsetManager 用于管理每个 consumerGroup的 offset
// 根据 groupID 来区分不同的 consumer,注意: 每次提交的 offset 信息也是和 groupID 关联的
offsetManager, err := sarama.NewOffsetManagerFromClient("myGroupID", client) // 偏移量管理器
if err != nil {
log.Println("NewOffsetManagerFromClient err:", err)
}
defer offsetManager.Close()
// 每个分区的 offset 也是分别管理的,demo 这里使用 0 分区,因为该 topic 只有 1 个分区
partitionOffsetManager, err := offsetManager.ManagePartition(topic, conf.DefaultPartition) // 对应分区的偏移量管理器
if err != nil {
log.Println("ManagePartition err:", err)
}
defer partitionOffsetManager.Close()
// defer 在程序结束后在 commit 一次,防止自动提交间隔之间的信息被丢掉
defer offsetManager.Commit()
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Println("NewConsumerFromClient err:", err)
}
// 根据 kafka 中记录的上次消费的 offset 开始+1的位置接着消费
nextOffset, _ := partitionOffsetManager.NextOffset() // 取得下一消息的偏移量作为本次消费的起点
fmt.Println("nextOffset:", nextOffset)
pc, err := consumer.ConsumePartition(topic, conf.DefaultPartition, nextOffset)
if err != nil {
log.Println("ConsumePartition err:", err)
}
defer pc.Close()
for message := range pc.Messages() {
value := string(message.Value)
log.Printf("[Consumer] partitionid: %d; offset:%d, value: %s\n", message.Partition, message.Offset, value)
// 每次消费后都更新一次 offset,这里更新的只是程序内存中的值,需要 commit 之后才能提交到 kafka
partitionOffsetManager.MarkOffset(message.Offset+1, "modified metadata") // MarkOffset 更新最后消费的 offset
}
}
1)创建偏移量管理器
offsetManager, _ := sarama.NewOffsetManagerFromClient("myGroupID", client)
2)创建对应分区的偏移量管理器
Kafka 中每个分区的偏移量是单独管理的
partitionOffsetManager, _ := offsetManager.ManagePartition(topic, kafka.DefaultPartition)
3)记录偏移量
这里记录的是下一条要取的消息,而不是取的最后一条消息,所以需要 +1
partitionOffsetManager.MarkOffset(message.Offset+1, "modified metadata")
4)提交偏移量
sarama 中默认会自动提交偏移量,但还是建议用 defer 在程序退出的时候手动提交一次。
defer offsetManager.Commit()
ConsumerGroup
Kafka 消费者组中可以存在多个消费者,Kafka 会以 partition 为单位将消息分给各个消费者。每条消息只会被消费者组的一个消费者消费。
注意:是以分区为单位,如果消费者组中有两个消费者,但是订阅的 Topic 只有 1 个分区,那么注定有一个消费者永远消费不到任何消息。
消费者组的好处在于并发消费,Kafka 把分发逻辑已经实现了,我们只需要启动多个消费者即可。
如果只有一个消费者,我们需要手动获取消息后分发给多个 Goroutine,需要多写一段代码,而且 Offset 维护还比较麻烦。
// MyConsumerGroupHandler 实现 sarama.ConsumerGroup 接口,作为自定义ConsumerGroup
type MyConsumerGroupHandler struct {
name string
count int64
}
// Setup 执行在 获得新 session 后 的第一步, 在 ConsumeClaim() 之前
func (MyConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
// Cleanup 执行在 session 结束前, 当所有 ConsumeClaim goroutines 都退出时
func (MyConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
// ConsumeClaim 具体的消费逻辑
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("[consumer] name:%s topic:%q partition:%d offset:%d\n", h.name, msg.Topic, msg.Partition, msg.Offset)
// 标记消息已被消费 内部会更新 consumer offset
sess.MarkMessage(msg, "")
h.count++
if h.count%10000 == 0 {
fmt.Printf("name:%s 消费数:%v\n", h.name, h.count)
}
}
return nil
}
func ConsumerGroup(topic, group, name string) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true // 指定返回错误
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cg, err := sarama.NewConsumerGroup([]string{conf.HOST}, group, config)
if err != nil {
log.Fatal("NewConsumerGroup err: ", err)
}
defer cg.Close()
var wg sync.WaitGroup
wg.Add(2)
// 错误处理
go func() {
defer wg.Done()
for err = range cg.Errors() {
fmt.Println("ERROR", err)
}
}()
go func() {
defer wg.Done()
handler := MyConsumerGroupHandler{name: name}
for {
fmt.Println("running: ", name)
/*
![important]
应该在一个无限循环中不停地调用 Consume()
因为每次 Rebalance 后需要再次执行 Consume() 来恢复连接
Consume 开始才发起 Join Group 请求 如果当前消费者加入后成为了 消费者组 leader,则还会进行 Rebalance 过程,从新分配
组内每个消费组需要消费的 topic 和 partition,最后 Sync Group 后才开始消费
具体信息见 https://github.com/lixd/kafka-go-example/issues/4
*/
err = cg.Consume(ctx, []string{topic}, handler)
if err != nil {
log.Println("Consume err: ", err)
}
// 如果 context 被 cancel 了,那么退出
if ctx.Err() != nil {
return
}
}
}()
wg.Wait()
}
注意点:
主要是实现sarama.ConsumerGroup
接口。Setup
和Cleanup
都是一些辅助性的工作,真正的逻辑在 ConsumeClaim
方法中。
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
// 标记消息已被消费 内部会更新 consumer offset
sess.MarkMessage(msg, "")
}
return nil
}
需要调用sess.MarkMessage()
方法更新 Offset。
Kakfa 相关代码见 Github
4. 小结
1)生产者
- 同步生产者
- 同步发送,效率低实时性高
- 异步生产者
- 异步发送,效率高
- 消息大小、数量达到阈值或间隔时间达到设定值时触发发送
异步生产者不会阻塞,而且会批量发送消息给 Kafka,性能上优于 同步生产者。
2)消费者
- 独立消费者
- 需要配合 OffsetManager 使用
- 消费者组
- 以分区为单位将消息分发给组里的各个消费者
- 若消费者数大于分区数,必定有消费者消费不到消息