Golang Kafka实战教程
最近在学习Golang操作Kafka,想请教几个问题:
- 如何用golang创建Kafka生产者和消费者?
- 在Golang中如何处理Kafka消息的序列化和反序列化?
- 有没有推荐的Golang Kafka客户端库?Sarama和confluent-kafka-go哪个更适合生产环境?
- 在Golang中实现Kafka消息重试机制的最佳实践是什么?
- 如何用Golang监控Kafka的生产消费延迟等指标?
2 回复
推荐使用Sarama库实现Golang与Kafka交互。步骤:1. 安装Sarama;2. 配置生产者/消费者;3. 实现消息发送与接收。示例代码简洁,支持异步发送和分区消费。注意处理错误和连接关闭。
更多关于Golang Kafka实战教程的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang Kafka实战教程
1. 环境准备
首先安装必要的依赖:
go get github.com/segmentio/kafka-go
2. 生产者示例
package main
import (
"context"
"fmt"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
// 配置Kafka连接
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "test-topic",
Balancer: &kafka.LeastBytes{},
}
defer writer.Close()
// 发送消息
err := writer.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key1"),
Value: []byte("Hello Kafka!"),
},
kafka.Message{
Key: []byte("Key2"),
Value: []byte("Second message"),
},
)
if err != nil {
fmt.Printf("发送失败: %v\n", err)
} else {
fmt.Println("消息发送成功!")
}
}
3. 消费者示例
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
)
func main() {
// 创建消费者
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
defer reader.Close()
fmt.Println("开始消费消息...")
for {
// 读取消息
msg, err := reader.ReadMessage(context.Background())
if err != nil {
fmt.Printf("消费错误: %v\n", err)
break
}
fmt.Printf("收到消息: key=%s, value=%s, offset=%d\n",
string(msg.Key), string(msg.Value), msg.Offset)
}
}
4. 高级配置
异步生产者
func asyncProducer() {
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "async-topic",
BatchSize: 100, // 批量大小
BatchTimeout: time.Second, // 批量超时
Async: true, // 异步模式
}
}
消费者组
func consumerGroup() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "consumer-group-1", // 消费者组ID
Topic: "test-topic",
MinBytes: 10e3,
MaxBytes: 10e6,
})
}
5. 错误处理
func handleErrors() {
// 配置重试机制
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "test-topic",
MaxAttempts: 3, // 最大重试次数
}
}
6. 运行测试
- 启动Zookeeper和Kafka服务
- 创建topic:
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 - 运行生产者和消费者程序
这个教程涵盖了Kafka在Golang中的基本使用,包括消息生产、消费、错误处理等核心功能。

