Golang Kafka实战教程

最近在学习Golang操作Kafka,想请教几个问题:

  1. 如何用golang创建Kafka生产者和消费者?
  2. 在Golang中如何处理Kafka消息的序列化和反序列化?
  3. 有没有推荐的Golang Kafka客户端库?Sarama和confluent-kafka-go哪个更适合生产环境?
  4. 在Golang中实现Kafka消息重试机制的最佳实践是什么?
  5. 如何用Golang监控Kafka的生产消费延迟等指标?
2 回复

推荐使用Sarama库实现Golang与Kafka交互。步骤:1. 安装Sarama;2. 配置生产者/消费者;3. 实现消息发送与接收。示例代码简洁,支持异步发送和分区消费。注意处理错误和连接关闭。

更多关于Golang Kafka实战教程的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang Kafka实战教程

1. 环境准备

首先安装必要的依赖:

go get github.com/segmentio/kafka-go

2. 生产者示例

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/segmentio/kafka-go"
)

func main() {
    // 配置Kafka连接
    writer := &kafka.Writer{
        Addr:     kafka.TCP("localhost:9092"),
        Topic:    "test-topic",
        Balancer: &kafka.LeastBytes{},
    }

    defer writer.Close()

    // 发送消息
    err := writer.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("Key1"),
            Value: []byte("Hello Kafka!"),
        },
        kafka.Message{
            Key:   []byte("Key2"),
            Value: []byte("Second message"),
        },
    )

    if err != nil {
        fmt.Printf("发送失败: %v\n", err)
    } else {
        fmt.Println("消息发送成功!")
    }
}

3. 消费者示例

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
)

func main() {
    // 创建消费者
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:   []string{"localhost:9092"},
        Topic:     "test-topic",
        Partition: 0,
        MinBytes:  10e3, // 10KB
        MaxBytes:  10e6, // 10MB
    })

    defer reader.Close()

    fmt.Println("开始消费消息...")

    for {
        // 读取消息
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            fmt.Printf("消费错误: %v\n", err)
            break
        }

        fmt.Printf("收到消息: key=%s, value=%s, offset=%d\n",
            string(msg.Key), string(msg.Value), msg.Offset)
    }
}

4. 高级配置

异步生产者

func asyncProducer() {
    writer := &kafka.Writer{
        Addr:         kafka.TCP("localhost:9092"),
        Topic:        "async-topic",
        BatchSize:    100,      // 批量大小
        BatchTimeout: time.Second, // 批量超时
        Async:        true,     // 异步模式
    }
}

消费者组

func consumerGroup() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{"localhost:9092"},
        GroupID:  "consumer-group-1", // 消费者组ID
        Topic:    "test-topic",
        MinBytes: 10e3,
        MaxBytes: 10e6,
    })
}

5. 错误处理

func handleErrors() {
    // 配置重试机制
    writer := &kafka.Writer{
        Addr:        kafka.TCP("localhost:9092"),
        Topic:       "test-topic",
        MaxAttempts: 3, // 最大重试次数
    }
}

6. 运行测试

  1. 启动Zookeeper和Kafka服务
  2. 创建topic:
    kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092
    
  3. 运行生产者和消费者程序

这个教程涵盖了Kafka在Golang中的基本使用,包括消息生产、消费、错误处理等核心功能。

回到顶部