golang高性能Apache Kafka客户端库插件sarama的使用
Golang高性能Apache Kafka客户端库插件Sarama的使用
Sarama简介
Sarama是一个MIT许可的Go语言客户端库,用于Apache Kafka。
快速开始
以下是使用Sarama库的基本示例代码:
1. 安装Sarama
go get github.com/IBM/sarama
2. 生产者示例
package main
import (
"log"
"os"
"os/signal"
"github.com/IBM/sarama"
)
func main() {
// 配置生产者
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 创建生产者
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln("Failed to start producer:", err)
}
defer producer.Close()
// 构造消息
msg := &sarama.ProducerMessage{
Topic: "test_topic",
Value: sarama.StringEncoder("Hello, Kafka!"),
}
// 发送消息
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("Failed to send message: %v\n", err)
} else {
log.Printf("Message sent to partition %d at offset %d\n", partition, offset)
}
// 处理中断信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
<-signals
}
3. 消费者示例
package main
import (
"log"
"os"
"os/signal"
"github.com/IBM/sarama"
)
func main() {
// 配置消费者
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
// 创建消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln("Failed to start consumer:", err)
}
defer consumer.Close()
// 订阅主题
partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)
if err != nil {
log.Fatalln("Failed to start partition consumer:", err)
}
defer partitionConsumer.Close()
// 处理中断信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// 消费消息
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message: %s at offset %d\n", msg.Value, msg.Offset)
case err := <-partitionConsumer.Errors():
log.Printf("Error: %v\n", err)
case <-signals:
return
}
}
}
兼容性和API稳定性
Sarama提供"2个发布版本+2个月"的兼容性保证:我们支持Kafka和Go的两个最新稳定版本,并为旧版本提供两个月的宽限期。
Sarama遵循语义版本控制,并通过标准的Go模块版本编号方案提供API稳定性。
高级用法
消费者组示例
package main
import (
"context"
"log"
"os"
"os/signal"
"sync"
"github.com/IBM/sarama"
)
type ConsumerGroupHandler struct{}
func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.Printf("Message topic:%q partition:%d offset:%d value:%s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
sess.MarkMessage(msg, "")
}
return nil
}
func main() {
config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0 // 指定Kafka版本
config.Consumer.Offsets.Initial = sarama.OffsetOldest
group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "test_group", config)
if err != nil {
log.Fatalln("Failed to create consumer group:", err)
}
defer group.Close()
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := group.Consume(ctx, []string{"test_topic"}, ConsumerGroupHandler{}); err != nil {
log.Printf("Error from consumer: %v", err)
return
}
if ctx.Err() != nil {
return
}
}
}()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, os.Interrupt)
<-sigterm
cancel()
wg.Wait()
}
贡献
如果你想为Sarama项目做贡献,可以查看项目的贡献指南。Sarama wiki包含了更多技术和设计细节。
更多关于golang高性能Apache Kafka客户端库插件sarama的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang高性能Apache Kafka客户端库插件sarama的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Sarama - Go语言高性能Kafka客户端库
Sarama是Go语言实现的一个Apache Kafka客户端库,具有高性能、稳定性和易用性等特点。下面我将详细介绍Sarama的使用方法。
安装Sarama
go get github.com/Shopify/sarama
生产者(Producer)使用示例
同步生产者
package main
import (
"log"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
config.Producer.Retry.Max = 5 // 重试次数
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln("Failed to start producer:", err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test_topic",
Value: sarama.StringEncoder("Hello, Kafka!"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("Failed to send message: %v\n", err)
} else {
log.Printf("Message sent to partition %d at offset %d\n", partition, offset)
}
}
异步生产者
package main
import (
"log"
"os"
"os/signal"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
// 捕获中断信号优雅关闭
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var enqueued, successes, errors int
go func() {
for {
select {
case producer.Input() <- &sarama.ProducerMessage{
Topic: "test_topic",
Value: sarama.StringEncoder("Async Message"),
}:
enqueued++
case <-signals:
producer.AsyncClose()
return
}
}
}()
// 处理成功和错误
go func() {
for range producer.Successes() {
successes++
}
}()
go func() {
for err := range producer.Errors() {
log.Println(err)
errors++
}
}()
<-signals
log.Printf("Enqueued: %d; Successes: %d; Errors: %d\n", enqueued, successes, errors)
}
消费者(Consumer)使用示例
独立消费者
package main
import (
"log"
"os"
"os/signal"
"github.com/Shopify/sarama"
)
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer partitionConsumer.Close()
// 捕获中断信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
consumed := 0
ConsumerLoop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message offset %d: %s\n", msg.Offset, string(msg.Value))
consumed++
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d\n", consumed)
}
消费者组(Consumer Group)
package main
import (
"context"
"log"
"os"
"os/signal"
"sync"
"github.com/Shopify/sarama"
)
type ConsumerGroupHandler struct{}
func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.Printf("Message topic:%q partition:%d offset:%d value:%s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
sess.MarkMessage(msg, "")
}
return nil
}
func main() {
config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0 // 指定Kafka版本
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "test_group", config)
if err != nil {
panic(err)
}
defer group.Close()
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := group.Consume(ctx, []string{"test_topic"}, ConsumerGroupHandler{}); err != nil {
log.Printf("Error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
}
}()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, os.Interrupt)
<-sigterm
cancel()
wg.Wait()
}
高级配置
config := sarama.NewConfig()
// 生产者配置
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 5
// 消费者配置
config.Consumer.Return.Errors = true
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// 网络配置
config.Net.DialTimeout = 30 * time.Second
config.Net.ReadTimeout = 30 * time.Second
config.Net.WriteTimeout = 30 * time.Second
性能优化建议
-
批量发送:调整
Producer.Flush
相关参数优化批量发送config.Producer.Flush.Messages = 1000 config.Producer.Flush.Frequency = 500 * time.Millisecond
-
压缩:启用消息压缩减少网络传输
config.Producer.Compression = sarama.CompressionSnappy
-
适当增加并发:
config.Producer.MaxMessageBytes = 1000000 config.ChannelBufferSize = 256
-
调整消费者参数:
config.Consumer.Fetch.Min = 1 config.Consumer.Fetch.Default = 1024 * 1024
Sarama提供了丰富的功能和灵活的配置选项,可以满足大多数Kafka使用场景。根据实际需求选择合适的配置参数,可以在性能和可靠性之间取得平衡。