高负载下REST端点如何正确选择Golang Kafka生产者

高负载下REST端点如何正确选择Golang Kafka生产者 大家好!感谢各位愿意提供帮助,我会尽量简洁。

为了学习Kafka,我正在尝试以下方案:

  • 我有一个REST端点,用于将JSON请求数据推送到Kafka。
  • 我希望发布到Kafka的过程是异步的
  • 这样HTTP处理程序可以立即返回HTTP响应202

我已经阅读了关于Kafka的资料,并观看了Confluent在YouTube上的视频。 我仍然无法在使用同步还是异步Kafka生产者之间做出有把握的决定。 这正是我需要大家帮助的地方。在我继续之前,请允许我提供一些代码:

func SomeGinHandler(c *gin.Context) {
    // 假设我们从请求的JSON中提取了数据
    // 数据已填入someValue字节数组

	/* 接下来是一个非常敏感的部分,
       因为Publish()可能会将数据放入队列,
       但仍然可能以某种方式失败;现在我们队列中有了“脏”数据
       而且我目前看不到从队列中移除它的方法
    */

	err = kafkaProducer.Publish(c.Request.Context(), []byte(someKey), someValue))
	if err != nil {
		c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}

    // 关键在于,上面的Publish()调用不应该阻塞,但也要可靠
    // 我不能从这个HTTP处理程序中丢失消息
    // 通过目前对Kafka的阅读,我担心这是不可能的
    // 我想我必须做出权衡,但什么是正确的选择??
    // 坚持使用同步生产者,并设置ack = waitALL??
    // 或者异步生产者在这里可能以某种方式工作??怎么实现??

	c.JSON(http.StatusAccepted, gin.H{"message" : "request accepted"})
}

正如我在代码注释中描述的那样,我希望Kafka发布代码能立即返回,这样REST处理程序就不会被阻塞。

我将在另一个微服务中使用Kafka消费者组来处理数据。

我可能无法承受消息丢失,在阅读了关于异步生产者的资料后,我发现它只是“发射后不管”,所以消息丢失是可能的。

显然,我必须做出权衡,那么考虑到REST端点将承受高负载,在上述场景中您建议采用什么方法?

关于库的选择,我倾向于franz-go,但sarama似乎也不错。我想避免CGO依赖,但如果需要,confluent的库也可以。如果您想到其他库,我也会考虑。我面临的问题本质上是设计问题,所以我不期望库能神奇地解决它。


更多关于高负载下REST端点如何正确选择Golang Kafka生产者的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

我认为同步生产者是正确选择,因为在我的场景中可靠性比异步性更重要。这始终是一种权衡,因此我决定采纳您的建议,优先考虑可靠性,因为高负载问题可以通过水平扩展来解决,这样Kafka同步生产者就能达到足够快的速度,不会成为瓶颈。

更多关于高负载下REST端点如何正确选择Golang Kafka生产者的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


这听起来与两端是什么无关。基本逻辑是这样的:

new req -> handle some time -> to db -> make resp

一般来说,为了强数据一致性,会采用严格处理。或许你应该尝试一些方法,比如使用预写日志(WAL)来记录写入失败时的处理方案。

在高负载REST端点中使用Kafka生产者时,正确的选择是异步生产者配合可靠配置。以下是具体实现方案:

// 使用franz-go库的完整示例
package main

import (
	"context"
	"encoding/json"
	"net/http"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/twmb/franz-go/pkg/kgo"
)

type KafkaProducer struct {
	client *kgo.Client
}

func NewKafkaProducer(brokers []string, topic string) (*KafkaProducer, error) {
	opts := []kgo.Opt{
		kgo.SeedBrokers(brokers...),
		kgo.DefaultProduceTopic(topic),
		// 关键配置:确保消息不丢失
		kgo.RequiredAcks(kgo.AllISRAcks()), // 等待所有副本确认
		kgo.MaxBufferedRecords(10000),      // 缓冲区大小
		kgo.ProducerBatchMaxBytes(10485760), // 10MB批次
		kgo.ProducerLinger(100 * time.Millisecond), // 批次等待时间
		kgo.ProducerRetryMax(10),                   // 重试次数
		kgo.DisableIdempotentWrite(),               // 根据需求选择
	}

	client, err := kgo.NewClient(opts...)
	if err != nil {
		return nil, err
	}

	return &KafkaProducer{client: client}, nil
}

func (kp *KafkaProducer) PublishAsync(ctx context.Context, key, value []byte) error {
	record := &kgo.Record{
		Key:   key,
		Value: value,
		Timestamp: time.Now(),
	}

	// 异步发送,立即返回
	err := kp.client.Produce(ctx, record, func(r *kgo.Record, err error) {
		// 回调处理发送结果
		if err != nil {
			// 记录日志或处理错误
			logError(err, r)
			return
		}
		// 发送成功,可记录指标
		recordSuccess(r)
	})

	return err // 这里只返回入队错误,不等待发送完成
}

func (kp *KafkaProducer) Close() {
	kp.client.Close()
}

// REST处理器实现
func SomeGinHandler(c *gin.Context, producer *KafkaProducer) {
	var requestData map[string]interface{}
	if err := c.BindJSON(&requestData); err != nil {
		c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}

	value, err := json.Marshal(requestData)
	if err != nil {
		c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}

	key := []byte(c.ClientIP()) // 使用客户端IP作为key

	// 异步发布,立即返回
	err = producer.PublishAsync(c.Request.Context(), key, value)
	if err != nil {
		// 这里捕获的是入队错误(如缓冲区满),不是发送错误
		c.AbortWithStatusJSON(http.StatusServiceUnavailable, gin.H{"error": "system busy"})
		return
	}

	c.JSON(http.StatusAccepted, gin.H{"message": "request accepted"})
}

// 监控和错误处理
func logError(err error, r *kgo.Record) {
	// 实现错误日志记录
	// 可集成到监控系统
}

func recordSuccess(r *kgo.Record) {
	// 实现成功指标记录
}

// 主函数示例
func main() {
	producer, err := NewKafkaProducer(
		[]string{"localhost:9092"},
		"your-topic",
	)
	if err != nil {
		panic(err)
	}
	defer producer.Close()

	r := gin.Default()
	r.POST("/publish", func(c *gin.Context) {
		SomeGinHandler(c, producer)
	})

	r.Run(":8080")
}

关键配置说明:

  1. RequiredAcks(kgo.AllISRAcks()):确保所有副本都确认写入,这是防止消息丢失的核心配置
  2. MaxBufferedRecordsProducerBatchMaxBytes:控制内存缓冲区大小,防止内存溢出
  3. ProducerLinger:平衡延迟和吞吐量
  4. ProducerRetryMax:网络故障时自动重试

错误处理策略:

// 增强的错误处理回调
func (kp *KafkaProducer) PublishAsyncWithCallback(ctx context.Context, key, value []byte, 
	onSuccess func(*kgo.Record), onError func(*kgo.Record, error)) error {
	
	record := &kgo.Record{
		Key:   key,
		Value: value,
	}

	err := kp.client.Produce(ctx, record, func(r *kgo.Record, err error) {
		if err != nil {
			// 可重试错误处理
			if isRetriableError(err) {
				// 重新入队或记录到死信队列
				handleRetriableError(r, err)
			} else {
				// 不可恢复错误
				onError(r, err)
			}
			return
		}
		onSuccess(r)
	})

	return err
}

func isRetriableError(err error) bool {
	// 判断错误是否可重试
	return false
}

性能优化建议:

// 批量发送配置优化
func NewHighThroughputProducer(brokers []string) (*KafkaProducer, error) {
	opts := []kgo.Opt{
		kgo.SeedBrokers(brokers...),
		kgo.RequiredAcks(kgo.AllISRAcks()),
		kgo.MaxBufferedRecords(50000),           // 增大缓冲区
		kgo.ProducerBatchMaxBytes(50 * 1024 * 1024), // 50MB批次
		kgo.ProducerLinger(50 * time.Millisecond),   // 更短的等待
		kgo.ProducerRetryMax(5),
		kgo.ProducerCompression(kgo.SnappyCompression()), // 压缩减少网络传输
		kgo.BlockMaxBytes(100 * 1024 * 1024),            // 增大网络缓冲区
	}

	client, err := kgo.NewClient(opts...)
	if err != nil {
		return nil, err
	}

	return &KafkaProducer{client: client}, nil
}

这种异步方案在高负载下能提供:

  1. 非阻塞的HTTP响应(立即返回202)
  2. 可靠的消息传递(通过ack确认机制)
  3. 高吞吐量(批量发送和缓冲区)
  4. 自动错误恢复(重试机制)

消息丢失只会在极端情况下发生(如Kafka集群完全不可用),此时应返回503错误给客户端,建议其重试。

回到顶部