支持Kafka、Redis、SQS、Google Pub/Sub和HTTP调用的Golang流数据处理库

支持Kafka、Redis、SQS、Google Pub/Sub和HTTP调用的Golang流数据处理库 大家好,我已经完成了文档的整理工作,现在正在寻求反馈和建议。请审阅并告诉我您的想法。谢谢!

GitHub

GitHub

头像

whitaker-io/machine

Machine 是一个用于处理数据的工作流/管道库 - whitaker-io/machine


更多关于支持Kafka、Redis、SQS、Google Pub/Sub和HTTP调用的Golang流数据处理库的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于支持Kafka、Redis、SQS、Google Pub/Sub和HTTP调用的Golang流数据处理库的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


看了你的项目,这是一个很有价值的Golang流处理库。从架构上看,machine 通过统一的接口抽象了多种消息源,这种设计模式在实际工程中非常实用。

核心优势在于你使用了 Handler 接口来统一不同数据源的处理逻辑:

// 这是你的核心接口设计
type Handler interface {
    Handle(context.Context, Item) error
}

让我用一个具体的示例来展示如何在实际项目中使用。假设我们需要同时消费Kafka消息和HTTP请求,然后统一处理:

package main

import (
    "context"
    "fmt"
    "github.com/whitaker-io/machine"
    "github.com/whitaker-io/machine/driver/kafka"
    "github.com/whitaker-io/machine/driver/http"
)

// 自定义处理器
type DataProcessor struct{}

func (p *DataProcessor) Handle(ctx context.Context, item machine.Item) error {
    // 统一处理逻辑
    data := item.Payload()
    fmt.Printf("Processing: %s\n", string(data))
    
    // 这里可以添加业务逻辑:数据转换、验证、存储等
    processedData := transform(data)
    
    // 设置处理后的数据
    item.SetPayload(processedData)
    return nil
}

func transform(data []byte) []byte {
    // 示例转换逻辑
    return []byte("processed: " + string(data))
}

func main() {
    ctx := context.Background()
    processor := &DataProcessor{}
    
    // 创建Kafka工作者
    kafkaWorker := kafka.New(
        kafka.WithTopic("user-events"),
        kafka.WithBrokers("localhost:9092"),
        kafka.WithGroupID("machine-group"),
    )
    
    // 创建HTTP工作者
    httpWorker := http.New(
        http.WithPort(8080),
        http.WithPath("/webhook"),
    )
    
    // 创建工作流
    workflow := machine.New(
        machine.WithContext(ctx),
        machine.WithHandler(processor),
    )
    
    // 注册工作者
    workflow.Register(kafkaWorker)
    workflow.Register(httpWorker)
    
    // 启动工作流
    if err := workflow.Run(); err != nil {
        panic(err)
    }
}

这个示例展示了如何同时处理Kafka消息和HTTP请求。你的库在以下方面处理得很好:

  1. 并发控制:通过 machine.New() 的选项模式可以配置并发数
  2. 错误处理Handle 方法返回error,便于错误传播和重试
  3. 上下文传递:支持context,可以优雅处理超时和取消

对于需要从多个数据源聚合数据的场景,比如微服务架构下的数据采集,这种设计特别有用。每个数据源作为独立的工作者,通过统一的处理器接口,简化了业务逻辑的复杂度。

项目文档中提到的背压处理、优雅关闭这些特性,在实际生产环境中都是关键需求。特别是对Kafka和Redis这类外部系统的集成,正确处理连接管理和重试机制很重要。

代码结构清晰,driver分离的设计让扩展新的数据源变得简单。如果有人需要添加RabbitMQ或AWS Kinesis支持,只需要实现对应的driver接口即可。

回到顶部