当前位置:   article > 正文

消息队列kafka基础,基于go代码举例_go kafka消费消息

go kafka消费消息

基础概念

broker

broker就是单个kafka实例。Kafka集群中,一个kafka服务器就是一个broker。

topic

一类消息的集合。在kafka中,消息以主题为单位进行归类,producer负责将消息发送到指定的主题,而consumer负责订阅主题并进行消费。在生产者向kafka发送数据以及消费者订阅数据都要指定具体的topic来消费。

partition

topic是一个逻辑上的概念,而partition是物理层的概念。一个topic可以分为多个分区,并且可以分配在多个kafka服务器上。一个分区只属于单个topic,很多时候也会把分区称为主题分区(Topic-Partition)。分区可以提高kafka的并发效率。如多个消费者可以同时消费一个topic下的不同分区,但不保证topic消费的顺序性。(同一个分区下可以保证消息消费的顺序性,而针对一个主topic消费是无序的)
 

副本 partition分区的备份

partition的副本,从leader replica同步数据,当kafka服务挂了之后,防止数据丢失。副本的数量不能超过broker的数量,否则创建主题时会失败。

所有副本的统称(Assigned Repllicas),AR = ISR + OSR。ISR:表示和Leader保持同步(默认30s)的follower集合。OSR:表示Follower与Leader副本同步时,延迟过多的副本。

producer生产者

生产者,生产并发送消息的一方。可以同步发送和异步发送。在创建生产者示例的时候需要传入配置项和kafka实例地址等消息。比如生产者的ACK机制,生产者写入分区策略(向分区发送数据的规则等)

consumer 消费者和消费者组

消费者,接收订阅消费消息的一方。

kafka消费者组(Consumer Group)是kafka提供的可扩展且具有容错性的消费者机制。
  它是一个组,所以内部有可以有多个消费者,这些消费者共用一个ID(Group ID),一个组内的所有消费者共同协作,完成对订阅的主题的所有分区进行消费。其中一个主题中的一个分区只能由一个消费者组中的一个消费者消费。

消费者组的特性

作用:可以同时消费多个分区,kafka自动提交offset基于消费者组id。
一个消费者组可以有多个消费者。
Group ID是一个字符串,在一个kafka集群中,它标识唯一的一个消费者组。
每个消费者组订阅的所有主题中,每个主题的每个分区只能由一个消费者消费。消费者组之间不影响。

消费者组中的消费者最好等于topic下面的分区个数,可以实现效率最高的消费。分区数可以大于消费者的数量,但是消费者数量不能多于分区数,会造成多余的消费者没有分区可以消费的情况(一个消费者组的消费者只能消费一个topic下的一个分区)。消费者和分区的分配策略可以通过设置消费者策略来实现。主要包括三种情况,详情可以下面的消费者策略

Range范围分配策略:

Range范围分配策略是kafka默认的分配策略, 它可以确保每个消费者消费的分区数量是均衡的;

RoundRobin轮询策略:

RoundRobinAssignor轮询策略是将消费者组所有消费者以及消费者所订阅的所有的topic的partition按照字典顺序排序(topic和partition的hashcode进行排序), 然后通过轮询方式逐个将分区一次分配给每个消费者;

stricky粘性分配策略:

从kafak 0.11.x开始, 引入此类分配策略; 主要目的:

  • 分区分配尽可能均匀;
  • 在发生rebalance的时候, 分区的分配尽可能与上一次分配保持相同;
    • 没有发生rebalance时, stricky粘性分配策略和RoundRobin分配策略类似;

offset

  消息在日志中的位置,可以理解是消息在partition上的偏移量,也是代表该消息的唯一序号;重点:分区上的偏移量。在生产者发送消息的时候,可以通过kafka返回来查看offset的值。

offset的作用是记录数据的位置,并且kafka提供主动提交offset机制(会保存消费者组上次消费的offset和分区以及topic),可以避免消费者宕机以后再重新消费数据。出了kafka自动提交offset,kafka也提供了API可以让消费者主动提交offset。

  消费过程中可以通过创建单个消费者(不指定消费者组id),这种情况下如果配置消费的offset设置为为最早的offset(最新的就是从offset最大的地方开始消费,这里讨论没有意义),消费者挂了以后,每次消费会重新消费最早的数据。而不是上次的数据

  如果用消费者组消费的话并且设置为最早的offset,kafka会自动帮我们保存上次消费的位置,当消费者宕机了以后,重启消费是消费上次的位置。(当然也可能会造成数据的丢失或者重复,即kafka还没来得及保存offset挂了,重启后从没保存的offset开始消费,或者消费者还没来及的保存,kafka已经保存了offset:丢失)

进阶

生产者写入策略

轮询分区

设置方法 :

原理

轮询方法就是按照分区0,1,2,0,1,2的方式轮询写入数据

随机策略

设置方法

字面意思,随机写入,一般不用,容易造成数据倾斜(某个分区写入了大量数据)

按照key值来分

设置方法

配置文件配置

对想发送到同一个分区的消息设置相同的key值

自定义策略

消费者消费策略

消费者策略主要是定义对于多个消费者,分区如何分配的问题。以及解决发生reblance再平衡时,如何进行分区和消费者之间的分配。主要包括range范围分配,Round轮询策略(类似于生产者策略中的Round,Sticky粘性分配策略)

什么时候会发生reblance

消费者个数发生变化

订阅主题发生变化

消费分区的个数发生变化

以上都会造成分区数和消费者数之间的重新分配,所以要设置分配策略

range范围分配

配置方法

具体原理:

Round轮询分配

配置方法

Sticky粘性分配

粘性分配和Round轮询分配在没有发生reblance的时候都是一样的。

不同点是在发生reblance的时候,sticky策略会尽可能保存发生变化之前的架构,例如将属于挂到的consumer的分区均匀分配给剩下的consumer。而前面的2个策略是根据现有的分区数和消费者数量再次执行一遍它们的策略。显而易见,sticky策略会减少资源的消耗。

配置方法

kafka消费提交机制

提交就是Offset提交,分为kafka自动提交和消费者主动提交

自动提交是默认的,在代码中默认是true

而消费者主动提交需要我们调用kafka提供的api进行提交

kafka生产端ACK机制

ack机制是生产者向kafka发送数据避免消息丢失的一种解决办法,可以通过设置0,-1,1代表不同的确认程度。

ack为0

生产端不等kafka发送确认消息,直接发送下一条(leader和副本数据都可能丢失)

ack为1

生产端只等leader副本写入,不管其他副本是否写入,发送下一条。(副本数据可能丢失)

ack为-1

生产端等所有副本写入再发送下一条。

kafka生产端的幂等性和事务

幂等性

幂等性开启方法

大体概念:

PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。但是如果重新连接会产生一个新的pid(宕机后重启变化)

Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number

分区号:写入对应的分区

Broker端在缓存中保存了这seq number,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义。不能保证同一个Producer一个topic不同的partion幂等。

缺点:幂等性只能保证一个Producer会话的消息不重复,宕机重启还会造成重复,所以kafka提供了事务。

事务

生产者创建时,设置全局唯一的事务ID --- TransactionID;事务ID与PID绑定,当producer重启后,会根据事务ID查找PID,因此能够保证全局at-exactly-once语义

配置方法:transactional.id(注:使用事务的前提是必须开启幂等性)

解决全局精确唯一语义是kafka事务引入的初衷,但是后期又引入的新的功能:producer生产消息的batch在一个原子单元内完成;

代码示例

生产者同步发送

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/IBM/sarama"
  6. "time"
  7. )
  8. type Tem struct {
  9. Temperature int `json:"temperature"`
  10. Humidity int `json:"humidity"`
  11. Name string `json:"name"`
  12. }
  13. func main() {
  14. config := sarama.NewConfig()
  15. // 生产者配置
  16. //确认ACK all代表所有副本全部写入
  17. config.Producer.RequiredAcks = sarama.WaitForAll
  18. //random 写入策略 随机写入一般不用 容易引起数据倾斜
  19. // config.Producer.Partitioner = sarama.NewRandomPartitioner
  20. //返回结果 同步发送必须设置为true 不然SendMessage无法返回结果
  21. config.Producer.Return.Successes = true
  22. //轮询写入
  23. config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
  24. //按照key写入 需要在kafka发送结构体中接入key参数 如下
  25. // config.Producer.Partitioner = sarama.NewHashPartitioner
  26. // 封装消息
  27. msg := &sarama.ProducerMessage{}
  28. msg.Topic = "devices123"
  29. // 设置key key写入的时候用到的参数 想写入同一个分区就用同一个key
  30. msg.Key = sarama.StringEncoder("my-key")
  31. //time_str := time.Now().Format("2006-01-02 15:04:05")
  32. tem1 := Tem{
  33. Temperature: 180,
  34. Humidity: 20,
  35. Name: "kafka",
  36. }
  37. marshal, _ := json.Marshal(tem1)
  38. msg.Value = sarama.StringEncoder(string(marshal))
  39. // 连接kafka
  40. client, err := sarama.NewSyncProducer([]string{"192.168.174.128:9092"}, config)
  41. if err != nil {
  42. fmt.Println("producer closed", err)
  43. return
  44. }
  45. defer client.Close()
  46. for {
  47. select {
  48. case <-time.Tick(1 * time.Second):
  49. partition, offset, err := client.SendMessage(msg)
  50. if err != nil {
  51. fmt.Println("send failed", err)
  52. return
  53. }
  54. fmt.Println("partition:", partition, "offset:%v", offset)
  55. //}
  56. // 发送消息
  57. }
  58. }
  59. }

生产者异步发送

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/IBM/sarama"
  6. "log"
  7. "sync"
  8. "sync/atomic"
  9. )
  10. type Tem struct {
  11. Temperature int `json:"temperature"`
  12. Humidity int `json:"humidity"`
  13. Name string `json:"name"`
  14. }
  15. func main() {
  16. config := sarama.NewConfig()
  17. // 异步生产者不建议把 Errors 和 Successes 都开启,一般开启 Errors 就行
  18. // 同步生产者就必须都开启,因为会同步返回发送成功或者失败
  19. config.Producer.Return.Errors = false // 设定需要返回错误信息
  20. config.Producer.Return.Successes = true // 设定需要返回成功信息
  21. //创建producer NewAsyncProducer异步工厂
  22. producer, err := sarama.NewAsyncProducer([]string{"192.168.174.128:9092"}, config)
  23. if err != nil {
  24. log.Fatal("NewSyncProducer err:", err)
  25. }
  26. tem1 := Tem{
  27. Temperature: 180,
  28. Humidity: 20,
  29. Name: "kafka",
  30. }
  31. marshal, err := json.Marshal(tem1)
  32. //创建信息
  33. str := sarama.StringEncoder(string(marshal))
  34. // 异步关闭
  35. defer producer.AsyncClose()
  36. // 创建消息发送完关闭管道
  37. finColese := make(chan bool)
  38. wg := &sync.WaitGroup{}
  39. wg.Add(1)
  40. //创建协程监听 写入返回的成功信息 或者错误信息
  41. go func() {
  42. // [!important] 异步生产者发送后必须把返回值从 Errors 或者 Successes 中读出来 不然会阻塞 sarama 内部处理逻辑 导致只能发出去一条消息
  43. for {
  44. select {
  45. case s := <-producer.Successes():
  46. log.Printf("[Producer] key:%v msg:%+v \n", s.Key, s.Value)
  47. case e := <-producer.Errors():
  48. if e != nil {
  49. log.Printf("[Producer] err:%v msg:%+v \n", e.Msg, e.Err)
  50. }
  51. case <-finColese:
  52. fmt.Println("收到关闭信号准备关闭协程")
  53. wg.Done()
  54. return
  55. }
  56. }
  57. }()
  58. var count int64
  59. // 异步发送
  60. for i := 0; i < 20; i++ {
  61. msg := &sarama.ProducerMessage{Topic: "devices123", Key: nil, Value: sarama.StringEncoder(str)}
  62. // 异步发送只是写入内存了就返回了,并没有真正发送出去
  63. // sarama 库中用的是一个 channel 来接收,后台 goroutine 异步从该 channel 中取出消息并真正发送
  64. producer.Input() <- msg
  65. atomic.AddInt64(&count, 1)
  66. }
  67. finColese <- true
  68. wg.Wait()
  69. log.Printf("发送完毕 总发送消息数:%v\n", count)
  70. }

消费者组消费数据

  1. package main
  2. // SIGUSR1 toggle the pause/resume consumption
  3. import (
  4. "context"
  5. "errors"
  6. "flag"
  7. "log"
  8. "os"
  9. "os/signal"
  10. "strings"
  11. "sync"
  12. "syscall"
  13. "github.com/IBM/sarama"
  14. )
  15. // Sarama configuration options
  16. var (
  17. brokers = ""
  18. version = ""
  19. group = ""
  20. topics = ""
  21. assignor = ""
  22. oldest = true
  23. verbose = false
  24. )
  25. func init() {
  26. flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
  27. flag.StringVar(&group, "group", "", "Kafka consumer group definition")
  28. flag.StringVar(&version, "version", sarama.DefaultVersion.String(), "Kafka cluster version")
  29. flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma separated list")
  30. flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
  31. flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
  32. flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
  33. flag.Parse()
  34. if len(brokers) == 0 {
  35. panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
  36. }
  37. if len(topics) == 0 {
  38. panic("no topics given to be consumed, please set the -topics flag")
  39. }
  40. if len(group) == 0 {
  41. panic("no Kafka consumer group defined, please set the -group flag")
  42. }
  43. }
  44. func main() {
  45. keepRunning := true
  46. log.Println("Starting a new Sarama consumer")
  47. if verbose {
  48. sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  49. }
  50. version, err := sarama.ParseKafkaVersion(version)
  51. if err != nil {
  52. log.Panicf("Error parsing Kafka version: %v", err)
  53. }
  54. /**
  55. * Construct a new Sarama configuration.
  56. * The Kafka cluster version has to be defined before the consumer/producer is initialized.
  57. */
  58. config := sarama.NewConfig()
  59. config.Version = version
  60. switch assignor {
  61. case "sticky":
  62. config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
  63. case "roundrobin":
  64. config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
  65. case "range":
  66. config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
  67. default:
  68. log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
  69. }
  70. if oldest {
  71. config.Consumer.Offsets.Initial = sarama.OffsetOldest
  72. }
  73. /**
  74. * Setup a new Sarama consumer group
  75. */
  76. consumer := Consumer{
  77. ready: make(chan bool),
  78. }
  79. ctx, cancel := context.WithCancel(context.Background())
  80. client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
  81. if err != nil {
  82. log.Panicf("Error creating consumer group client: %v", err)
  83. }
  84. consumptionIsPaused := false
  85. wg := &sync.WaitGroup{}
  86. wg.Add(1)
  87. go func() {
  88. defer wg.Done()
  89. for {
  90. // `Consume` should be called inside an infinite loop, when a
  91. // server-side rebalance happens, the consumer session will need to be
  92. // recreated to get the new claims
  93. if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
  94. if errors.Is(err, sarama.ErrClosedConsumerGroup) {
  95. return
  96. }
  97. log.Panicf("Error from consumer: %v", err)
  98. }
  99. // check if context was cancelled, signaling that the consumer should stop
  100. if ctx.Err() != nil {
  101. return
  102. }
  103. consumer.ready = make(chan bool)
  104. }
  105. }()
  106. <-consumer.ready // Await till the consumer has been set up
  107. log.Println("Sarama consumer up and running!...")
  108. sigusr1 := make(chan os.Signal, 1)
  109. signal.Notify(sigusr1, syscall.SIGUSR1)
  110. sigterm := make(chan os.Signal, 1)
  111. signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
  112. for keepRunning {
  113. select {
  114. case <-ctx.Done():
  115. log.Println("terminating: context cancelled")
  116. keepRunning = false
  117. case <-sigterm:
  118. log.Println("terminating: via signal")
  119. keepRunning = false
  120. case <-sigusr1:
  121. toggleConsumptionFlow(client, &consumptionIsPaused)
  122. }
  123. }
  124. cancel()
  125. wg.Wait()
  126. if err = client.Close(); err != nil {
  127. log.Panicf("Error closing client: %v", err)
  128. }
  129. }
  130. func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
  131. if *isPaused {
  132. client.ResumeAll()
  133. log.Println("Resuming consumption")
  134. } else {
  135. client.PauseAll()
  136. log.Println("Pausing consumption")
  137. }
  138. *isPaused = !*isPaused
  139. }
  140. // Consumer represents a Sarama consumer group consumer
  141. type Consumer struct {
  142. ready chan bool
  143. }
  144. // Setup is run at the beginning of a new session, before ConsumeClaim
  145. func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
  146. // Mark the consumer as ready
  147. close(consumer.ready)
  148. return nil
  149. }
  150. // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
  151. func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
  152. return nil
  153. }
  154. // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
  155. // Once the Messages() channel is closed, the Handler must finish its processing
  156. // loop and exit.
  157. func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  158. // NOTE:
  159. // Do not move the code below to a goroutine.
  160. // The `ConsumeClaim` itself is called within a goroutine, see:
  161. // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
  162. for {
  163. select {
  164. case message, ok := <-claim.Messages():
  165. if !ok {
  166. log.Printf("message channel was closed")
  167. return nil
  168. }
  169. log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
  170. session.MarkMessage(message, "")
  171. // Should return when `session.Context()` is done.
  172. // If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
  173. // https://github.com/IBM/sarama/issues/1192
  174. case <-session.Context().Done():
  175. return nil
  176. }
  177. }
  178. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/913025
推荐阅读
相关标签
  

闽ICP备14008679号