Golang流处理库:支持OpenTelemetry、集成Yaegi动态处理及简洁接口
Golang流处理库:支持OpenTelemetry、集成Yaegi动态处理及简洁接口
whitaker-io/machine
Machine 是一个用于处理数据的工作流/管道库 - whitaker-io/machine
我构建这个库是为了能够快速构建具有追踪、序列化支持的流处理器,并且提供了一个命令来生成项目。
现在 API 已经稳定,并且开箱即用地提供了针对 Kafka、SQS、Redis Streams 和 Google Pub/Sub 的接口实现。
请告诉我您的想法,如果您喜欢它,请留下一个星标!
更多关于Golang流处理库:支持OpenTelemetry、集成Yaegi动态处理及简洁接口的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于Golang流处理库:支持OpenTelemetry、集成Yaegi动态处理及简洁接口的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Machine库的设计确实体现了现代流处理系统的关键需求,特别是对可观测性和灵活性的支持。以下从技术实现角度分析其核心特性:
1. OpenTelemetry集成示例
import (
"context"
"go.opentelemetry.io/otel"
"github.com/whitaker-io/machine"
)
func main() {
// 创建带追踪的工作流
workflow := machine.NewWorkflow("data-pipeline",
machine.WithTracer(otel.Tracer("machine")),
)
// 添加处理节点时会自动创建span
workflow.AddProcessor("transform", func(ctx context.Context, data interface{}) (interface{}, error) {
// 从context获取当前span
span := trace.SpanFromContext(ctx)
span.SetAttributes(attribute.String("processor", "transform"))
// 处理逻辑
return processData(data), nil
})
}
2. Yaegi动态处理实现
import (
"github.com/traefik/yaegi/interp"
"github.com/whitaker-io/machine/processor"
)
// 动态加载处理函数
func createDynamicProcessor(script string) (machine.Processor, error) {
i := interp.New(interp.Options{})
// 执行用户提供的脚本
_, err := i.Eval(`
package main
func Process(data interface{}) (interface{}, error) {
// 动态逻辑:` + script + `
return data.(string) + "-processed", nil
}
`)
if err != nil {
return nil, err
}
// 获取编译后的函数
v, _ := i.Eval("Process")
fn := v.Interface().(func(interface{}) (interface{}, error))
return processor.Func(fn), nil
}
// 在工作流中使用
workflow.AddDynamicProcessor("custom", scriptContent)
3. 简洁接口设计
// 声明式管道构建
pipeline := machine.New("event-processor").
FromKafka(machine.KafkaConfig{
Brokers: []string{"localhost:9092"},
Topic: "input-topic",
}).
Process("enrich", func(ctx context.Context, msg *machine.Message) error {
msg.Value = enrichData(msg.Value)
return nil
}).
Filter("important-only", func(msg *machine.Message) bool {
return msg.Headers.Get("priority") == "high"
}).
ToRedisStream(machine.RedisConfig{
Addr: "localhost:6379",
Stream: "output-stream",
}).
WithRetry(machine.ExponentialBackoff(3, 1*time.Second)).
WithMetrics(prometheus.DefaultRegisterer)
// 运行管道
if err := pipeline.Run(context.Background()); err != nil {
log.Fatal(err)
}
4. 多消息队列统一抽象
// 相同的接口处理不同消息源
sources := []machine.Source{
machine.NewKafkaSource(config.Kafka),
machine.NewSQSSource(config.SQS),
machine.NewPubSubSource(config.PubSub),
}
// 统一处理逻辑
for _, source := range sources {
workflow.From(source).
Process("unified-handler", handleMessage).
To(machine.NewRedisSink(outputConfig))
}
5. 序列化支持
// 自动序列化/反序列化
workflow.AddCodec("json", machine.JSONCodec{}).
AddCodec("protobuf", machine.ProtoCodec{}).
AddCodec("avro", machine.AvroCodec{
Schema: userSchema,
})
// 处理器接收已解码的数据
workflow.Process("business-logic", func(ctx context.Context, user *User) error {
// user已经是反序列化的结构体
return processUser(user)
})
6. 错误处理与重试
workflow.Process("critical-step", func(ctx context.Context, data interface{}) (interface{}, error) {
result, err := apiCall(data)
if err != nil {
// 自动重试机制
return nil, machine.NewRetryableError(err, 30*time.Second)
}
return result, nil
}).
WithDeadLetterQueue(machine.DeadLetterConfig{
Sink: machine.NewKafkaSink(dlqConfig),
OnError: func(err error, msg *machine.Message) {
// 自定义DLQ处理逻辑
metrics.DLQCounter.Inc()
},
})
这个库的架构选择很实用:通过接口统一不同消息系统,用context传递追踪信息,Yaegi提供运行时灵活性。OpenTelemetry集成让分布式调试更直观,而简洁的链式API降低了使用门槛。对于需要快速构建可观测流处理系统的团队,这个设计平衡了功能性和易用性。

