当前位置:   article > 正文

go环境使用kafka简单示例_go项目如何引入 kafka

go项目如何引入 kafka

环境和工具:

kafka

golang

介绍:

要在 Go 中使用 Kafka,您可以使用 sarama 这样的第三方库。sarama 是 Kafka 的 Go 客户端,提供了与 Kafka 集群进行通信所需的功能。

1、安装 sarama 库
首先,您需要安装 sarama 库。您可以使用 go get 命令来安装:

go get github.com/Shopify/sarama

2、生产消息:
以下是一个简单的示例代码,用于在 Go 中生产消息:

  1. package main
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "github.com/Shopify/sarama"
  7. )
  8. func main() {
  9. config := sarama.NewConfig()
  10. config.Producer.RequiredAcks = sarama.WaitForAll
  11. config.Producer.Retry.Max = 5
  12. config.Producer.Return.Successes = true
  13. producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
  14. if err != nil {
  15. log.Fatalf("Error creating producer: %v", err)
  16. }
  17. defer producer.Close()
  18. topic := "test-topic"
  19. message := &sarama.ProducerMessage{
  20. Topic: topic,
  21. Value: sarama.StringEncoder("Hello, Kafka!"),
  22. }
  23. _, _, err = producer.SendMessage(message)
  24. if err != nil {
  25. log.Fatalf("Failed to send message: %v", err)
  26. }
  27. log.Println("Message sent successfully")
  28. // Handle graceful shutdown
  29. signals := make(chan os.Signal, 1)
  30. signal.Notify(signals, os.Interrupt)
  31. <-signals
  32. }

这个程序将连接到本地 Kafka 服务器(localhost:9092),向名为 test-topic 的主题发送一条消息。 

3、消费消息:
以下是一个简单的示例代码,用于在 Go 中消费消息:

  1. package main
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "github.com/Shopify/sarama"
  7. )
  8. func main() {
  9. config := sarama.NewConfig()
  10. config.Consumer.Return.Errors = true
  11. consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
  12. if err != nil {
  13. log.Fatalf("Error creating consumer: %v", err)
  14. }
  15. defer consumer.Close()
  16. topic := "test-topic"
  17. partition := int32(0)
  18. partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
  19. if err != nil {
  20. log.Fatalf("Error creating partition consumer: %v", err)
  21. }
  22. defer partitionConsumer.Close()
  23. signals := make(chan os.Signal, 1)
  24. signal.Notify(signals, os.Interrupt)
  25. doneCh := make(chan struct{})
  26. go func() {
  27. for {
  28. select {
  29. case err := <-partitionConsumer.Errors():
  30. log.Printf("Consumer error: %v", err)
  31. case msg := <-partitionConsumer.Messages():
  32. log.Printf("Received message: %s", string(msg.Value))
  33. case <-signals:
  34. log.Println("Interrupt signal received, shutting down consumer...")
  35. close(doneCh)
  36. return
  37. }
  38. }
  39. }()
  40. <-doneCh
  41. }

这个程序将连接到本地 Kafka 服务器(localhost:9092),消费名为 test-topic 的主题中的消息。

以上示例提供了 Go 中使用 Kafka 的基本功能。您可以根据实际需求进行调整和扩展。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/913030
推荐阅读
相关标签
  

闽ICP备14008679号