golang高性能Apache Kafka客户端库插件sarama的使用

Golang高性能Apache Kafka客户端库插件Sarama的使用

Sarama简介

Sarama是一个MIT许可的Go语言客户端库,用于Apache Kafka。

快速开始

以下是使用Sarama库的基本示例代码:

1. 安装Sarama

go get github.com/IBM/sarama

2. 生产者示例

package main

import (
	"log"
	"os"
	"os/signal"

	"github.com/IBM/sarama"
)

func main() {
	// 配置生产者
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner

	// 创建生产者
	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalln("Failed to start producer:", err)
	}
	defer producer.Close()

	// 构造消息
	msg := &sarama.ProducerMessage{
		Topic: "test_topic",
		Value: sarama.StringEncoder("Hello, Kafka!"),
	}

	// 发送消息
	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		log.Printf("Failed to send message: %v\n", err)
	} else {
		log.Printf("Message sent to partition %d at offset %d\n", partition, offset)
	}

	// 处理中断信号
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	<-signals
}

3. 消费者示例

package main

import (
	"log"
	"os"
	"os/signal"

	"github.com/IBM/sarama"
)

func main() {
	// 配置消费者
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true

	// 创建消费者
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalln("Failed to start consumer:", err)
	}
	defer consumer.Close()

	// 订阅主题
	partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatalln("Failed to start partition consumer:", err)
	}
	defer partitionConsumer.Close()

	// 处理中断信号
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// 消费消息
	for {
		select {
		case msg := <-partitionConsumer.Messages():
			log.Printf("Consumed message: %s at offset %d\n", msg.Value, msg.Offset)
		case err := <-partitionConsumer.Errors():
			log.Printf("Error: %v\n", err)
		case <-signals:
			return
		}
	}
}

兼容性和API稳定性

Sarama提供"2个发布版本+2个月"的兼容性保证:我们支持Kafka和Go的两个最新稳定版本,并为旧版本提供两个月的宽限期。

Sarama遵循语义版本控制,并通过标准的Go模块版本编号方案提供API稳定性。

高级用法

消费者组示例

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"sync"

	"github.com/IBM/sarama"
)

type ConsumerGroupHandler struct{}

func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		log.Printf("Message topic:%q partition:%d offset:%d value:%s\n", 
			msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
		sess.MarkMessage(msg, "")
	}
	return nil
}

func main() {
	config := sarama.NewConfig()
	config.Version = sarama.V2_5_0_0 // 指定Kafka版本
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "test_group", config)
	if err != nil {
		log.Fatalln("Failed to create consumer group:", err)
	}
	defer group.Close()

	ctx, cancel := context.WithCancel(context.Background())
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			if err := group.Consume(ctx, []string{"test_topic"}, ConsumerGroupHandler{}); err != nil {
				log.Printf("Error from consumer: %v", err)
				return
			}
			if ctx.Err() != nil {
				return
			}
		}
	}()

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, os.Interrupt)
	<-sigterm
	cancel()
	wg.Wait()
}

贡献

如果你想为Sarama项目做贡献,可以查看项目的贡献指南。Sarama wiki包含了更多技术和设计细节。


更多关于golang高性能Apache Kafka客户端库插件sarama的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能Apache Kafka客户端库插件sarama的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Sarama - Go语言高性能Kafka客户端库

Sarama是Go语言实现的一个Apache Kafka客户端库,具有高性能、稳定性和易用性等特点。下面我将详细介绍Sarama的使用方法。

安装Sarama

go get github.com/Shopify/sarama

生产者(Producer)使用示例

同步生产者

package main

import (
	"log"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
	config.Producer.Retry.Max = 5                   // 重试次数
	config.Producer.Return.Successes = true         // 成功交付的消息将在success channel返回

	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalln("Failed to start producer:", err)
	}
	defer producer.Close()

	msg := &sarama.ProducerMessage{
		Topic: "test_topic",
		Value: sarama.StringEncoder("Hello, Kafka!"),
	}

	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		log.Printf("Failed to send message: %v\n", err)
	} else {
		log.Printf("Message sent to partition %d at offset %d\n", partition, offset)
	}
}

异步生产者

package main

import (
	"log"
	"os"
	"os/signal"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true

	producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		panic(err)
	}
	defer producer.Close()

	// 捕获中断信号优雅关闭
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	var enqueued, successes, errors int
	go func() {
		for {
			select {
			case producer.Input() <- &sarama.ProducerMessage{
				Topic: "test_topic",
				Value: sarama.StringEncoder("Async Message"),
			}:
				enqueued++
			case <-signals:
				producer.AsyncClose()
				return
			}
		}
	}()

	// 处理成功和错误
	go func() {
		for range producer.Successes() {
			successes++
		}
	}()

	go func() {
		for err := range producer.Errors() {
			log.Println(err)
			errors++
		}
	}()

	<-signals
	log.Printf("Enqueued: %d; Successes: %d; Errors: %d\n", enqueued, successes, errors)
}

消费者(Consumer)使用示例

独立消费者

package main

import (
	"log"
	"os"
	"os/signal"
	"github.com/Shopify/sarama"
)

func main() {
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}
	defer partitionConsumer.Close()

	// 捕获中断信号
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	consumed := 0
ConsumerLoop:
	for {
		select {
		case msg := <-partitionConsumer.Messages():
			log.Printf("Consumed message offset %d: %s\n", msg.Offset, string(msg.Value))
			consumed++
		case <-signals:
			break ConsumerLoop
		}
	}

	log.Printf("Consumed: %d\n", consumed)
}

消费者组(Consumer Group)

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"sync"
	"github.com/Shopify/sarama"
)

type ConsumerGroupHandler struct{}

func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		log.Printf("Message topic:%q partition:%d offset:%d value:%s\n", 
			msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
		sess.MarkMessage(msg, "")
	}
	return nil
}

func main() {
	config := sarama.NewConfig()
	config.Version = sarama.V2_5_0_0 // 指定Kafka版本
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "test_group", config)
	if err != nil {
		panic(err)
	}
	defer group.Close()

	ctx, cancel := context.WithCancel(context.Background())
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			if err := group.Consume(ctx, []string{"test_topic"}, ConsumerGroupHandler{}); err != nil {
				log.Printf("Error from consumer: %v", err)
			}
			if ctx.Err() != nil {
				return
			}
		}
	}()

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, os.Interrupt)
	<-sigterm
	cancel()
	wg.Wait()
}

高级配置

config := sarama.NewConfig()

// 生产者配置
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 5

// 消费者配置
config.Consumer.Return.Errors = true
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetOldest

// 网络配置
config.Net.DialTimeout = 30 * time.Second
config.Net.ReadTimeout = 30 * time.Second
config.Net.WriteTimeout = 30 * time.Second

性能优化建议

  1. 批量发送:调整Producer.Flush相关参数优化批量发送

    config.Producer.Flush.Messages = 1000
    config.Producer.Flush.Frequency = 500 * time.Millisecond
    
  2. 压缩:启用消息压缩减少网络传输

    config.Producer.Compression = sarama.CompressionSnappy
    
  3. 适当增加并发

    config.Producer.MaxMessageBytes = 1000000
    config.ChannelBufferSize = 256
    
  4. 调整消费者参数

    config.Consumer.Fetch.Min = 1
    config.Consumer.Fetch.Default = 1024 * 1024
    

Sarama提供了丰富的功能和灵活的配置选项,可以满足大多数Kafka使用场景。根据实际需求选择合适的配置参数,可以在性能和可靠性之间取得平衡。

回到顶部