golang高性能Apache Kafka和Confluent平台客户端插件库confluent-kafka-go的使用

Golang高性能Apache Kafka和Confluent平台客户端插件库confluent-kafka-go的使用

简介

confluent-kafka-go是Confluent为Apache Kafka和Confluent Platform提供的Golang客户端。

主要特点:

  • 高性能 - 基于精心调优的C客户端librdkafka的轻量级封装
  • 可靠性 - 正确处理了Apache Kafka客户端的所有细节
  • 支持 - 由Confluent提供商业支持
  • 面向未来 - 与Apache Kafka核心和Confluent平台组件保持同步

示例代码

高级消费者示例

import (
	"fmt"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
	// 创建消费者配置
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	// 订阅主题
	err = c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

	if err != nil {
		panic(err)
	}

	// 持续消费消息
	run := true
	for run {
		msg, err := c.ReadMessage(time.Second)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else if !err.(kafka.Error).IsTimeout() {
			// 客户端会自动尝试从所有错误中恢复
			// 超时不视为错误,因为它是ReadMessage在没有消息时引发的
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}

	c.Close()
}

生产者示例

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
	// 创建生产者
	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
	if err != nil {
		panic(err)
	}

	defer p.Close()

	// 消息投递报告处理
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
				} else {
					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
				}
			}
		}
	}()

	// 异步生产消息到主题
	topic := "myTopic"
	for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(word),
		}, nil)
	}

	// 关闭前等待消息投递完成
	p.Flush(15 * 1000)
}

安装与使用

使用Go Modules

  1. 在代码中导入包:
import "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  1. 构建项目:
go build ./...

对于Alpine Linux (musl)构建,需要指定-tags musl

go build -tags musl ./...

手动安装

go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka

librdkafka依赖

预构建的librdkafka二进制文件已包含在Go客户端中,通常不需要单独安装。但如果需要GSSAPI/Kerberos认证支持,则需要单独安装librdkafka。

支持的平台:

  • Mac OSX x64和arm64
  • 基于glibc的Linux x64和arm64(如RedHat、Debian、CentOS、Ubuntu等)- 不支持GSSAPI/Kerberos
  • 基于musl的Linux amd64和arm64(Alpine)- 不支持GSSAPI/Kerberos
  • Windows amd64 - 不支持GSSAPI/Kerberos

静态构建

对于基于glibc的系统,如果需要静态构建,可以使用musl工具链:

CC=/path/to/musl-gcc go build --ldflags '-linkmode external -extldflags "-static"' -tags musl

许可证

Apache License v2.0

KAFKA是The Apache Software Foundation的注册商标,已授权confluent-kafka-go使用。confluent-kafka-go与The Apache Software Foundation无关,也不受其认可。


更多关于golang高性能Apache Kafka和Confluent平台客户端插件库confluent-kafka-go的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能Apache Kafka和Confluent平台客户端插件库confluent-kafka-go的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


confluent-kafka-go: Golang高性能Kafka客户端库使用指南

confluent-kafka-go是Confluent官方提供的Golang Kafka客户端库,基于librdkafka C库构建,提供了高性能、可靠的Kafka生产者和消费者实现。

安装

首先需要安装librdkafka库,然后安装Go客户端:

# Ubuntu/Debian
sudo apt-get install librdkafka-dev

# CentOS/RHEL
sudo yum install librdkafka-devel

# macOS
brew install librdkafka

# 然后安装Go客户端
go get -u github.com/confluentinc/confluent-kafka-go/kafka

生产者示例

package main

import (
	"fmt"
	"log"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	// 生产者配置
	config := &kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092", // Kafka集群地址
		"client.id":        "go-producer",    // 客户端ID
		"acks":             "all",            // 确保消息被所有副本确认
	}

	// 创建生产者
	producer, err := kafka.NewProducer(config)
	if err != nil {
		log.Fatalf("Failed to create producer: %s", err)
	}
	defer producer.Close()

	// 消息投递报告通道
	deliveryChan := make(chan kafka.Event)

	// 异步发送消息
	topic := "test-topic"
	for i := 0; i < 10; i++ {
		value := fmt.Sprintf("Message %d", i)
		err = producer.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{
				Topic:     &topic,
				Partition: kafka.PartitionAny,
			},
			Value: []byte(value),
		}, deliveryChan)
		if err != nil {
			log.Printf("Failed to produce message: %s", err)
			continue
		}

		// 等待投递报告
		e := <-deliveryChan
		m := e.(*kafka.Message)

		if m.TopicPartition.Error != nil {
			log.Printf("Delivery failed: %v", m.TopicPartition.Error)
		} else {
			log.Printf("Delivered message to %v", m.TopicPartition)
		}
	}

	// 确保所有消息都发送完成
	producer.Flush(15 * 1000)
}

消费者示例

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	// 消费者配置
	config := &kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092",
		"group.id":         "go-consumer-group",
		"auto.offset.reset": "earliest", // 从最早的消息开始消费
	}

	// 创建消费者
	consumer, err := kafka.NewConsumer(config)
	if err != nil {
		log.Fatalf("Failed to create consumer: %s", err)
	}
	defer consumer.Close()

	// 订阅主题
	topic := "test-topic"
	err = consumer.SubscribeTopics([]string{topic}, nil)
	if err != nil {
		log.Fatalf("Failed to subscribe to topic: %s", err)
	}

	// 处理中断信号
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	// 消费消息
	run := true
	for run {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false
		default:
			// 轮询消息,超时时间100ms
			ev := consumer.Poll(100)
			if ev == nil {
				continue
			}

			switch e := ev.(type) {
			case *kafka.Message:
				// 处理消息
				fmt.Printf("Received message on %s: %s\n", 
					e.TopicPartition, string(e.Value))
				// 手动提交偏移量
				_, err := consumer.CommitMessage(e)
				if err != nil {
					log.Printf("Failed to commit offset: %v", err)
				}
			case kafka.Error:
				// 处理错误
				fmt.Printf("Error: %v\n", e)
				if e.Code() == kafka.ErrAllBrokersDown {
					run = false
				}
			default:
				fmt.Printf("Ignored event: %v\n", e)
			}
		}
	}
}

高级配置

生产者高级配置

config := &kafka.ConfigMap{
    "bootstrap.servers": "kafka1:9092,kafka2:9092",
    "compression.type":  "snappy",        // 压缩类型
    "queue.buffering.max.messages": 100000, // 队列中最大消息数
    "queue.buffering.max.ms": 500,         // 批量发送等待时间(ms)
    "batch.num.messages": 10000,           // 每批最大消息数
    "linger.ms": 10,                       // 发送前等待时间(ms)
    "retries": 5,                          // 重试次数
    "retry.backoff.ms": 1000,              // 重试间隔(ms)
}

消费者高级配置

config := &kafka.ConfigMap{
    "bootstrap.servers": "kafka1:9092,kafka2:9092",
    "group.id": "my-consumer-group",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": false,           // 禁用自动提交
    "max.poll.interval.ms": 300000,        // 最大轮询间隔(ms)
    "session.timeout.ms": 10000,           // 会话超时(ms)
    "heartbeat.interval.ms": 3000,         // 心跳间隔(ms)
    "fetch.max.bytes": 1048576,            // 每次fetch最大字节数
    "fetch.min.bytes": 1,                  // 每次fetch最小字节数
    "fetch.wait.max.ms": 500,              // fetch等待时间(ms)
}

性能优化建议

  1. 批量发送:适当调整linger.msbatch.num.messages参数可以提高吞吐量
  2. 压缩:启用压缩(compression.type)可以减少网络传输量
  3. 异步发送:使用异步发送模式可以提高性能
  4. 消费者并行度:增加消费者实例数量可以提高消费速度
  5. 合理设置消费者参数:调整fetch.min.bytesfetch.wait.max.ms可以减少网络请求

错误处理

// 生产者错误处理
go func() {
    for e := range producer.Events() {
        switch ev := e.(type) {
        case *kafka.Message:
            if ev.TopicPartition.Error != nil {
                log.Printf("Delivery failed: %v", ev.TopicPartition.Error)
            }
        case kafka.Error:
            log.Printf("Producer error: %v", ev)
        }
    }
}()

confluent-kafka-go提供了丰富的功能和配置选项,可以满足大多数Kafka使用场景的需求。通过合理配置,可以实现高性能的消息生产和消费。

回到顶部