Golang流处理库:支持OpenTelemetry、集成Yaegi动态处理及简洁接口

Golang流处理库:支持OpenTelemetry、集成Yaegi动态处理及简洁接口 GitHub

whitaker-io/machine

Machine 是一个用于处理数据的工作流/管道库 - whitaker-io/machine

我构建这个库是为了能够快速构建具有追踪、序列化支持的流处理器,并且提供了一个命令来生成项目。

现在 API 已经稳定,并且开箱即用地提供了针对 Kafka、SQS、Redis Streams 和 Google Pub/Sub 的接口实现。

请告诉我您的想法,如果您喜欢它,请留下一个星标!


更多关于Golang流处理库:支持OpenTelemetry、集成Yaegi动态处理及简洁接口的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang流处理库:支持OpenTelemetry、集成Yaegi动态处理及简洁接口的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Machine库的设计确实体现了现代流处理系统的关键需求,特别是对可观测性和灵活性的支持。以下从技术实现角度分析其核心特性:

1. OpenTelemetry集成示例

import (
    "context"
    "go.opentelemetry.io/otel"
    "github.com/whitaker-io/machine"
)

func main() {
    // 创建带追踪的工作流
    workflow := machine.NewWorkflow("data-pipeline", 
        machine.WithTracer(otel.Tracer("machine")),
    )
    
    // 添加处理节点时会自动创建span
    workflow.AddProcessor("transform", func(ctx context.Context, data interface{}) (interface{}, error) {
        // 从context获取当前span
        span := trace.SpanFromContext(ctx)
        span.SetAttributes(attribute.String("processor", "transform"))
        
        // 处理逻辑
        return processData(data), nil
    })
}

2. Yaegi动态处理实现

import (
    "github.com/traefik/yaegi/interp"
    "github.com/whitaker-io/machine/processor"
)

// 动态加载处理函数
func createDynamicProcessor(script string) (machine.Processor, error) {
    i := interp.New(interp.Options{})
    
    // 执行用户提供的脚本
    _, err := i.Eval(`
        package main
        
        func Process(data interface{}) (interface{}, error) {
            // 动态逻辑:` + script + `
            return data.(string) + "-processed", nil
        }
    `)
    
    if err != nil {
        return nil, err
    }
    
    // 获取编译后的函数
    v, _ := i.Eval("Process")
    fn := v.Interface().(func(interface{}) (interface{}, error))
    
    return processor.Func(fn), nil
}

// 在工作流中使用
workflow.AddDynamicProcessor("custom", scriptContent)

3. 简洁接口设计

// 声明式管道构建
pipeline := machine.New("event-processor").
    FromKafka(machine.KafkaConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "input-topic",
    }).
    Process("enrich", func(ctx context.Context, msg *machine.Message) error {
        msg.Value = enrichData(msg.Value)
        return nil
    }).
    Filter("important-only", func(msg *machine.Message) bool {
        return msg.Headers.Get("priority") == "high"
    }).
    ToRedisStream(machine.RedisConfig{
        Addr: "localhost:6379",
        Stream: "output-stream",
    }).
    WithRetry(machine.ExponentialBackoff(3, 1*time.Second)).
    WithMetrics(prometheus.DefaultRegisterer)

// 运行管道
if err := pipeline.Run(context.Background()); err != nil {
    log.Fatal(err)
}

4. 多消息队列统一抽象

// 相同的接口处理不同消息源
sources := []machine.Source{
    machine.NewKafkaSource(config.Kafka),
    machine.NewSQSSource(config.SQS),
    machine.NewPubSubSource(config.PubSub),
}

// 统一处理逻辑
for _, source := range sources {
    workflow.From(source).
        Process("unified-handler", handleMessage).
        To(machine.NewRedisSink(outputConfig))
}

5. 序列化支持

// 自动序列化/反序列化
workflow.AddCodec("json", machine.JSONCodec{}).
    AddCodec("protobuf", machine.ProtoCodec{}).
    AddCodec("avro", machine.AvroCodec{
        Schema: userSchema,
    })

// 处理器接收已解码的数据
workflow.Process("business-logic", func(ctx context.Context, user *User) error {
    // user已经是反序列化的结构体
    return processUser(user)
})

6. 错误处理与重试

workflow.Process("critical-step", func(ctx context.Context, data interface{}) (interface{}, error) {
    result, err := apiCall(data)
    if err != nil {
        // 自动重试机制
        return nil, machine.NewRetryableError(err, 30*time.Second)
    }
    return result, nil
}).
WithDeadLetterQueue(machine.DeadLetterConfig{
    Sink:    machine.NewKafkaSink(dlqConfig),
    OnError: func(err error, msg *machine.Message) {
        // 自定义DLQ处理逻辑
        metrics.DLQCounter.Inc()
    },
})

这个库的架构选择很实用:通过接口统一不同消息系统,用context传递追踪信息,Yaegi提供运行时灵活性。OpenTelemetry集成让分布式调试更直观,而简洁的链式API降低了使用门槛。对于需要快速构建可观测流处理系统的团队,这个设计平衡了功能性和易用性。

回到顶部