golang高性能Apache Kafka和Confluent平台客户端插件库confluent-kafka-go的使用
Golang高性能Apache Kafka和Confluent平台客户端插件库confluent-kafka-go的使用
简介
confluent-kafka-go是Confluent为Apache Kafka和Confluent Platform提供的Golang客户端。
主要特点:
- 高性能 - 基于精心调优的C客户端librdkafka的轻量级封装
- 可靠性 - 正确处理了Apache Kafka客户端的所有细节
- 支持 - 由Confluent提供商业支持
- 面向未来 - 与Apache Kafka核心和Confluent平台组件保持同步
示例代码
高级消费者示例
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func main() {
// 创建消费者配置
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
// 订阅主题
err = c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
if err != nil {
panic(err)
}
// 持续消费消息
run := true
for run {
msg, err := c.ReadMessage(time.Second)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else if !err.(kafka.Error).IsTimeout() {
// 客户端会自动尝试从所有错误中恢复
// 超时不视为错误,因为它是ReadMessage在没有消息时引发的
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
生产者示例
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func main() {
// 创建生产者
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
panic(err)
}
defer p.Close()
// 消息投递报告处理
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// 异步生产消息到主题
topic := "myTopic"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// 关闭前等待消息投递完成
p.Flush(15 * 1000)
}
安装与使用
使用Go Modules
- 在代码中导入包:
import "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- 构建项目:
go build ./...
对于Alpine Linux (musl)构建,需要指定-tags musl
:
go build -tags musl ./...
手动安装
go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka
librdkafka依赖
预构建的librdkafka二进制文件已包含在Go客户端中,通常不需要单独安装。但如果需要GSSAPI/Kerberos认证支持,则需要单独安装librdkafka。
支持的平台:
- Mac OSX x64和arm64
- 基于glibc的Linux x64和arm64(如RedHat、Debian、CentOS、Ubuntu等)- 不支持GSSAPI/Kerberos
- 基于musl的Linux amd64和arm64(Alpine)- 不支持GSSAPI/Kerberos
- Windows amd64 - 不支持GSSAPI/Kerberos
静态构建
对于基于glibc的系统,如果需要静态构建,可以使用musl工具链:
CC=/path/to/musl-gcc go build --ldflags '-linkmode external -extldflags "-static"' -tags musl
许可证
Apache License v2.0
KAFKA是The Apache Software Foundation的注册商标,已授权confluent-kafka-go使用。confluent-kafka-go与The Apache Software Foundation无关,也不受其认可。
更多关于golang高性能Apache Kafka和Confluent平台客户端插件库confluent-kafka-go的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang高性能Apache Kafka和Confluent平台客户端插件库confluent-kafka-go的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
confluent-kafka-go: Golang高性能Kafka客户端库使用指南
confluent-kafka-go是Confluent官方提供的Golang Kafka客户端库,基于librdkafka C库构建,提供了高性能、可靠的Kafka生产者和消费者实现。
安装
首先需要安装librdkafka库,然后安装Go客户端:
# Ubuntu/Debian
sudo apt-get install librdkafka-dev
# CentOS/RHEL
sudo yum install librdkafka-devel
# macOS
brew install librdkafka
# 然后安装Go客户端
go get -u github.com/confluentinc/confluent-kafka-go/kafka
生产者示例
package main
import (
"fmt"
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// 生产者配置
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092", // Kafka集群地址
"client.id": "go-producer", // 客户端ID
"acks": "all", // 确保消息被所有副本确认
}
// 创建生产者
producer, err := kafka.NewProducer(config)
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer producer.Close()
// 消息投递报告通道
deliveryChan := make(chan kafka.Event)
// 异步发送消息
topic := "test-topic"
for i := 0; i < 10; i++ {
value := fmt.Sprintf("Message %d", i)
err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Value: []byte(value),
}, deliveryChan)
if err != nil {
log.Printf("Failed to produce message: %s", err)
continue
}
// 等待投递报告
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v", m.TopicPartition.Error)
} else {
log.Printf("Delivered message to %v", m.TopicPartition)
}
}
// 确保所有消息都发送完成
producer.Flush(15 * 1000)
}
消费者示例
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// 消费者配置
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "go-consumer-group",
"auto.offset.reset": "earliest", // 从最早的消息开始消费
}
// 创建消费者
consumer, err := kafka.NewConsumer(config)
if err != nil {
log.Fatalf("Failed to create consumer: %s", err)
}
defer consumer.Close()
// 订阅主题
topic := "test-topic"
err = consumer.SubscribeTopics([]string{topic}, nil)
if err != nil {
log.Fatalf("Failed to subscribe to topic: %s", err)
}
// 处理中断信号
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
// 消费消息
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
// 轮询消息,超时时间100ms
ev := consumer.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
// 处理消息
fmt.Printf("Received message on %s: %s\n",
e.TopicPartition, string(e.Value))
// 手动提交偏移量
_, err := consumer.CommitMessage(e)
if err != nil {
log.Printf("Failed to commit offset: %v", err)
}
case kafka.Error:
// 处理错误
fmt.Printf("Error: %v\n", e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored event: %v\n", e)
}
}
}
}
高级配置
生产者高级配置
config := &kafka.ConfigMap{
"bootstrap.servers": "kafka1:9092,kafka2:9092",
"compression.type": "snappy", // 压缩类型
"queue.buffering.max.messages": 100000, // 队列中最大消息数
"queue.buffering.max.ms": 500, // 批量发送等待时间(ms)
"batch.num.messages": 10000, // 每批最大消息数
"linger.ms": 10, // 发送前等待时间(ms)
"retries": 5, // 重试次数
"retry.backoff.ms": 1000, // 重试间隔(ms)
}
消费者高级配置
config := &kafka.ConfigMap{
"bootstrap.servers": "kafka1:9092,kafka2:9092",
"group.id": "my-consumer-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": false, // 禁用自动提交
"max.poll.interval.ms": 300000, // 最大轮询间隔(ms)
"session.timeout.ms": 10000, // 会话超时(ms)
"heartbeat.interval.ms": 3000, // 心跳间隔(ms)
"fetch.max.bytes": 1048576, // 每次fetch最大字节数
"fetch.min.bytes": 1, // 每次fetch最小字节数
"fetch.wait.max.ms": 500, // fetch等待时间(ms)
}
性能优化建议
- 批量发送:适当调整
linger.ms
和batch.num.messages
参数可以提高吞吐量 - 压缩:启用压缩(
compression.type
)可以减少网络传输量 - 异步发送:使用异步发送模式可以提高性能
- 消费者并行度:增加消费者实例数量可以提高消费速度
- 合理设置消费者参数:调整
fetch.min.bytes
和fetch.wait.max.ms
可以减少网络请求
错误处理
// 生产者错误处理
go func() {
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v", ev.TopicPartition.Error)
}
case kafka.Error:
log.Printf("Producer error: %v", ev)
}
}
}()
confluent-kafka-go提供了丰富的功能和配置选项,可以满足大多数Kafka使用场景的需求。通过合理配置,可以实现高性能的消息生产和消费。