赞
踩
环境和工具:
golang
介绍:
要在 Go 中使用 Kafka,您可以使用 sarama 这样的第三方库。sarama 是 Kafka 的 Go 客户端,提供了与 Kafka 集群进行通信所需的功能。
1、安装 sarama 库
首先,您需要安装 sarama 库。您可以使用 go get
命令来安装:
go get github.com/Shopify/sarama
2、生产消息:
以下是一个简单的示例代码,用于在 Go 中生产消息:
- package main
-
- import (
- "log"
- "os"
- "os/signal"
-
- "github.com/Shopify/sarama"
- )
-
- func main() {
- config := sarama.NewConfig()
- config.Producer.RequiredAcks = sarama.WaitForAll
- config.Producer.Retry.Max = 5
- config.Producer.Return.Successes = true
-
- producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
- if err != nil {
- log.Fatalf("Error creating producer: %v", err)
- }
- defer producer.Close()
-
- topic := "test-topic"
- message := &sarama.ProducerMessage{
- Topic: topic,
- Value: sarama.StringEncoder("Hello, Kafka!"),
- }
-
- _, _, err = producer.SendMessage(message)
- if err != nil {
- log.Fatalf("Failed to send message: %v", err)
- }
-
- log.Println("Message sent successfully")
-
- // Handle graceful shutdown
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, os.Interrupt)
- <-signals
- }

这个程序将连接到本地 Kafka 服务器(localhost:9092),向名为 test-topic
的主题发送一条消息。
3、消费消息:
以下是一个简单的示例代码,用于在 Go 中消费消息:
- package main
-
- import (
- "log"
- "os"
- "os/signal"
-
- "github.com/Shopify/sarama"
- )
-
- func main() {
- config := sarama.NewConfig()
- config.Consumer.Return.Errors = true
-
- consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
- if err != nil {
- log.Fatalf("Error creating consumer: %v", err)
- }
- defer consumer.Close()
-
- topic := "test-topic"
- partition := int32(0)
-
- partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
- if err != nil {
- log.Fatalf("Error creating partition consumer: %v", err)
- }
- defer partitionConsumer.Close()
-
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, os.Interrupt)
-
- doneCh := make(chan struct{})
- go func() {
- for {
- select {
- case err := <-partitionConsumer.Errors():
- log.Printf("Consumer error: %v", err)
- case msg := <-partitionConsumer.Messages():
- log.Printf("Received message: %s", string(msg.Value))
- case <-signals:
- log.Println("Interrupt signal received, shutting down consumer...")
- close(doneCh)
- return
- }
- }
- }()
-
- <-doneCh
- }

这个程序将连接到本地 Kafka 服务器(localhost:9092),消费名为 test-topic
的主题中的消息。
以上示例提供了 Go 中使用 Kafka 的基本功能。您可以根据实际需求进行调整和扩展。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。