golang跨技术栈事件驱动工作流框架插件workflow的使用
Golang跨技术栈事件驱动工作流框架插件Workflow的使用
概述
Workflow是一个分布式事件驱动的工作流框架,可以运行健壮、持久和可扩展的顺序业务逻辑。它使用RoleScheduler将工作分布在多个实例上。
主要特性
- 技术栈无关:支持Kafka、Cassandra、Redis、MongoDB、PostgreSQL、MySQL、RabbitMQ或Reflex
- 基于图(DAG):通过定义称为"Steps"的小工作单元来设计工作流
- TDD支持:采用测试驱动开发
- 回调:支持从webhook或手动触发回调
- 事件融合:添加事件连接器以消费外部事件流
- 钩子:在工作流运行的核心变化时执行钩子
- 调度:支持标准cron规范
- 超时:设置动态或静态等待时间
- 并行消费者:指定运行多少个步骤消费者
- 消费者管理:优雅关闭所有进程
安装
go get github.com/luno/workflow
适配器
Workflow通过适配器实现技术栈无关性。核心适配器包括:
事件流适配器(EventStreamer)
# Kafka适配器
go get github.com/luno/workflow/adapters/kafkastreamer
# Reflex适配器
go get github.com/luno/workflow/adapters/reflexstreamer
记录存储(RecordStore)
# SQL存储适配器
go get github.com/luno/workflow/adapters/sqlstore
角色调度器(RoleScheduler)
# Rink角色调度器
go get github.com/luno/workflow/adapters/rinkrolescheduler
完整示例
第一步:定义工作流
package usage
import (
"context"
"github.com/luno/workflow"
)
type Step int
func (s Step) String() string {
switch s {
case StepOne:
return "One"
case StepTwo:
return "Two"
case StepThree:
return "Three"
default:
return "Unknown"
}
}
const (
StepUnknown Step = 0
StepOne Step = 1
StepTwo Step = 2
StepThree Step = 3
)
type MyType struct {
Field string
}
func Workflow() *workflow.Workflow[MyType, Step] {
b := workflow.NewBuilder[MyType, Step]("my workflow name")
b.AddStep(StepOne, func(ctx context.Context, r *workflow.Run[MyType, Step]) (Step, error) {
r.Object.Field = "Hello,"
return StepTwo, nil
}, StepTwo)
b.AddStep(StepTwo, func(ctx context.Context, r *workflow.Run[MyType, Step]) (Step, error) {
r.Object.Field += " world!"
return StepThree, nil
}, StepThree)
return b.Build(...)
}
第二步:运行工作流
wf := usage.Workflow()
ctx := context.Background()
wf.Run(ctx)
停止工作流:
wf.Stop()
第三步:触发工作流
foreignID := "82347982374982374"
runID, err := wf.Trigger(ctx, foreignID)
if err != nil {
...
}
等待工作流完成:
foreignID := "82347982374982374"
runID, err := wf.Trigger(ctx, foreignID)
if err != nil {
...
}
ctx, cancel := context.WithTimeout(ctx, 10 * time.Second)
defer cancel()
record, err := wf.Await(ctx, foreignID, runID, StepThree)
if err != nil {
...
}
工作流运行状态
一个工作流运行(Run)可以处于以下状态之一:
运行状态 | 值(int) | 描述 |
---|---|---|
Unknown | 0 | 无意义,防止默认零值 |
Initiated | 1 | 创建时分配,尚未处理 |
Running | 2 | 正在被处理 |
Paused | 3 | 可恢复或取消的临时停止 |
Completed | 4 | 完成所有配置步骤 |
Cancelled | 5 | 未完成所有步骤 |
Data Deleted | 6 | 运行对象数据已被删除 |
Requested Data Deleted | 7 | 请求删除运行对象数据 |
配置选项
Workflow提供了多种配置选项:
// 设置并行实例数
b.AddStep(StepOne, ..., StepTwo).WithOptions(
workflow.ParallelCount(5)
)
// 设置轮询频率
b.AddStep(StepOne, ..., StepTwo).WithOptions(
workflow.PollingFrequency(10 * time.Second)
)
// 设置错误后退时间
b.AddStep(StepOne, ..., StepTwo).WithOptions(
workflow.ErrBackOff(5 * time.Minute)
)
// 设置延迟警报
b.AddStep(StepOne, ..., StepTwo).WithOptions(
workflow.LagAlert(15 * time.Minute),
)
// 设置错误计数暂停阈值
b.AddStep(StepOne, ..., StepTwo).WithOptions(
workflow.PauseAfterErrCount(3),
)
最佳实践
- 将复杂业务逻辑分解为小步骤
- 可用于产生新数据而不仅仅是执行逻辑
- 工作流变更必须向后兼容
- 不适合低延迟场景
- 使用Workflow提供的Prometheus指标进行监控和告警
更多关于golang跨技术栈事件驱动工作流框架插件workflow的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang跨技术栈事件驱动工作流框架插件workflow的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang跨技术栈事件驱动工作流框架插件Workflow使用指南
什么是Workflow框架
Workflow是一个基于事件驱动的跨技术栈工作流框架,它允许开发者构建复杂的业务流程,同时支持与不同技术栈的集成。该框架特别适合微服务架构中需要协调多个服务的场景。
核心特性
- 事件驱动架构:基于事件的生产-消费模型
- 跨技术栈支持:可与Java、Python等其他语言服务集成
- 可视化流程编排:支持通过配置定义工作流
- 状态持久化:自动保存工作流状态
- 错误处理与重试:内置完善的错误处理机制
基本使用示例
安装
go get github.com/workflow-engine/workflow
定义工作流
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/workflow-engine/workflow"
)
// 定义工作流步骤
func step1(ctx workflow.Context) (interface{}, error) {
fmt.Println("执行步骤1")
return "step1 result", nil
}
func step2(ctx workflow.Context) (interface{}, error) {
data := ctx.GetInput().(string) // 获取上一步的输出
fmt.Printf("执行步骤2,收到输入: %s\n", data)
return "step2 result", nil
}
func step3(ctx workflow.Context) (interface{}, error) {
data := ctx.GetInput().(string)
fmt.Printf("执行步骤3,收到输入: %s\n", data)
return "final result", nil
}
func main() {
// 创建工作流引擎
engine := workflow.NewEngine()
// 定义工作流
wf := workflow.NewWorkflow("demo-workflow").
AddStep("step1", step1).
AddStep("step2", step2).
AddStep("step3", step3)
// 注册工作流
if err := engine.RegisterWorkflow(wf); err != nil {
log.Fatal(err)
}
// 启动工作流
execution, err := engine.Execute(context.Background(), "demo-workflow", nil)
if err != nil {
log.Fatal(err)
}
// 等待工作流完成
result, err := execution.Await(30 * time.Second)
if err != nil {
log.Fatal(err)
}
fmt.Printf("工作流执行结果: %v\n", result)
}
高级功能
事件监听与跨服务通信
// 定义事件处理器
func orderCreatedHandler(ctx workflow.Context, event *workflow.Event) (interface{}, error) {
orderData := event.Payload.(map[string]interface{})
fmt.Printf("收到订单创建事件: %v\n", orderData)
// 调用支付服务
paymentResult, err := ctx.CallService("payment-service", "processPayment", orderData)
if err != nil {
return nil, err
}
return paymentResult, nil
}
// 注册事件处理器
engine.RegisterEventHandler("order.created", orderCreatedHandler)
错误处理与重试
func unreliableStep(ctx workflow.Context) (interface{}, error) {
// 模拟可能失败的操作
if time.Now().Unix()%2 == 0 {
return nil, fmt.Errorf("随机失败")
}
return "success", nil
}
// 使用带重试的步骤
wf := workflow.NewWorkflow("retry-demo").
AddStepWithRetry("unreliable-step", unreliableStep,
workflow.RetryPolicy{
MaxAttempts: 3,
Delay: 1 * time.Second,
})
持久化存储
// 使用Redis作为持久化存储
store := workflow.NewRedisStore(&workflow.RedisConfig{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
engine := workflow.NewEngine(
workflow.WithStore(store),
)
最佳实践
- 保持步骤幂等性:确保每个步骤可以安全重试
- 合理设置超时:根据业务特点配置适当的超时时间
- 监控工作流执行:集成监控系统跟踪工作流状态
- 版本控制:对工作流定义进行版本管理
- 测试策略:单元测试每个步骤,集成测试完整工作流
常见问题解决
- 工作流卡住:检查是否有步骤未正确完成或超时设置过短
- 事件丢失:确保事件总线配置正确并启用持久化
- 性能问题:考虑对长时间运行的工作流使用异步步骤
- 调试困难:启用详细日志并利用工作流可视化工具
通过Workflow框架,开发者可以构建灵活、可靠且易于维护的跨服务业务流程,显著降低分布式系统开发的复杂度。