golang跨技术栈事件驱动工作流框架插件workflow的使用

Golang跨技术栈事件驱动工作流框架插件Workflow的使用

概述

Workflow是一个分布式事件驱动的工作流框架,可以运行健壮、持久和可扩展的顺序业务逻辑。它使用RoleScheduler将工作分布在多个实例上。

Workflow Logo

主要特性

  • 技术栈无关:支持Kafka、Cassandra、Redis、MongoDB、PostgreSQL、MySQL、RabbitMQ或Reflex
  • 基于图(DAG):通过定义称为"Steps"的小工作单元来设计工作流
  • TDD支持:采用测试驱动开发
  • 回调:支持从webhook或手动触发回调
  • 事件融合:添加事件连接器以消费外部事件流
  • 钩子:在工作流运行的核心变化时执行钩子
  • 调度:支持标准cron规范
  • 超时:设置动态或静态等待时间
  • 并行消费者:指定运行多少个步骤消费者
  • 消费者管理:优雅关闭所有进程

安装

go get github.com/luno/workflow

适配器

Workflow通过适配器实现技术栈无关性。核心适配器包括:

事件流适配器(EventStreamer)

# Kafka适配器
go get github.com/luno/workflow/adapters/kafkastreamer

# Reflex适配器
go get github.com/luno/workflow/adapters/reflexstreamer

记录存储(RecordStore)

# SQL存储适配器
go get github.com/luno/workflow/adapters/sqlstore

角色调度器(RoleScheduler)

# Rink角色调度器
go get github.com/luno/workflow/adapters/rinkrolescheduler

完整示例

第一步:定义工作流

package usage

import (
	"context"

	"github.com/luno/workflow"
)

type Step int

func (s Step) String() string {
	switch s {
	case StepOne:
		return "One"
	case StepTwo:
		return "Two"
	case StepThree:
		return "Three"
	default:
		return "Unknown"
	}
}

const (
	StepUnknown Step = 0
	StepOne     Step = 1
	StepTwo     Step = 2
	StepThree   Step = 3
)

type MyType struct {
	Field string
}

func Workflow() *workflow.Workflow[MyType, Step] {
	b := workflow.NewBuilder[MyType, Step]("my workflow name")

	b.AddStep(StepOne, func(ctx context.Context, r *workflow.Run[MyType, Step]) (Step, error) {
		r.Object.Field = "Hello,"
		return StepTwo, nil
	}, StepTwo)

	b.AddStep(StepTwo, func(ctx context.Context, r *workflow.Run[MyType, Step]) (Step, error) {
		r.Object.Field += " world!"
		return StepThree, nil
	}, StepThree)

	return b.Build(...)
}

第二步:运行工作流

wf := usage.Workflow()

ctx := context.Background()
wf.Run(ctx)

停止工作流:

wf.Stop()

第三步:触发工作流

foreignID := "82347982374982374"
runID, err := wf.Trigger(ctx, foreignID)
if err != nil {
	...
}

等待工作流完成:

foreignID := "82347982374982374"
runID, err := wf.Trigger(ctx, foreignID)
if err != nil {
	...
}

ctx, cancel := context.WithTimeout(ctx, 10 * time.Second)
defer cancel()

record, err := wf.Await(ctx, foreignID, runID, StepThree)
if err != nil {
	...
}

工作流运行状态

一个工作流运行(Run)可以处于以下状态之一:

运行状态 值(int) 描述
Unknown 0 无意义,防止默认零值
Initiated 1 创建时分配,尚未处理
Running 2 正在被处理
Paused 3 可恢复或取消的临时停止
Completed 4 完成所有配置步骤
Cancelled 5 未完成所有步骤
Data Deleted 6 运行对象数据已被删除
Requested Data Deleted 7 请求删除运行对象数据

配置选项

Workflow提供了多种配置选项:

// 设置并行实例数
b.AddStep(StepOne, ..., StepTwo).WithOptions(
    workflow.ParallelCount(5)
)

// 设置轮询频率
b.AddStep(StepOne, ..., StepTwo).WithOptions(
    workflow.PollingFrequency(10 * time.Second)
)

// 设置错误后退时间
b.AddStep(StepOne, ..., StepTwo).WithOptions(
    workflow.ErrBackOff(5 * time.Minute)
)

// 设置延迟警报
b.AddStep(StepOne, ..., StepTwo).WithOptions(
    workflow.LagAlert(15 * time.Minute),
)

// 设置错误计数暂停阈值
b.AddStep(StepOne, ..., StepTwo).WithOptions(
    workflow.PauseAfterErrCount(3),
)

最佳实践

  1. 将复杂业务逻辑分解为小步骤
  2. 可用于产生新数据而不仅仅是执行逻辑
  3. 工作流变更必须向后兼容
  4. 不适合低延迟场景
  5. 使用Workflow提供的Prometheus指标进行监控和告警

更多关于golang跨技术栈事件驱动工作流框架插件workflow的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang跨技术栈事件驱动工作流框架插件workflow的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang跨技术栈事件驱动工作流框架插件Workflow使用指南

什么是Workflow框架

Workflow是一个基于事件驱动的跨技术栈工作流框架,它允许开发者构建复杂的业务流程,同时支持与不同技术栈的集成。该框架特别适合微服务架构中需要协调多个服务的场景。

核心特性

  1. 事件驱动架构:基于事件的生产-消费模型
  2. 跨技术栈支持:可与Java、Python等其他语言服务集成
  3. 可视化流程编排:支持通过配置定义工作流
  4. 状态持久化:自动保存工作流状态
  5. 错误处理与重试:内置完善的错误处理机制

基本使用示例

安装

go get github.com/workflow-engine/workflow

定义工作流

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/workflow-engine/workflow"
)

// 定义工作流步骤
func step1(ctx workflow.Context) (interface{}, error) {
	fmt.Println("执行步骤1")
	return "step1 result", nil
}

func step2(ctx workflow.Context) (interface{}, error) {
	data := ctx.GetInput().(string) // 获取上一步的输出
	fmt.Printf("执行步骤2,收到输入: %s\n", data)
	return "step2 result", nil
}

func step3(ctx workflow.Context) (interface{}, error) {
	data := ctx.GetInput().(string)
	fmt.Printf("执行步骤3,收到输入: %s\n", data)
	return "final result", nil
}

func main() {
	// 创建工作流引擎
	engine := workflow.NewEngine()

	// 定义工作流
	wf := workflow.NewWorkflow("demo-workflow").
		AddStep("step1", step1).
		AddStep("step2", step2).
		AddStep("step3", step3)

	// 注册工作流
	if err := engine.RegisterWorkflow(wf); err != nil {
		log.Fatal(err)
	}

	// 启动工作流
	execution, err := engine.Execute(context.Background(), "demo-workflow", nil)
	if err != nil {
		log.Fatal(err)
	}

	// 等待工作流完成
	result, err := execution.Await(30 * time.Second)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("工作流执行结果: %v\n", result)
}

高级功能

事件监听与跨服务通信

// 定义事件处理器
func orderCreatedHandler(ctx workflow.Context, event *workflow.Event) (interface{}, error) {
	orderData := event.Payload.(map[string]interface{})
	fmt.Printf("收到订单创建事件: %v\n", orderData)
	
	// 调用支付服务
	paymentResult, err := ctx.CallService("payment-service", "processPayment", orderData)
	if err != nil {
		return nil, err
	}
	
	return paymentResult, nil
}

// 注册事件处理器
engine.RegisterEventHandler("order.created", orderCreatedHandler)

错误处理与重试

func unreliableStep(ctx workflow.Context) (interface{}, error) {
	// 模拟可能失败的操作
	if time.Now().Unix()%2 == 0 {
		return nil, fmt.Errorf("随机失败")
	}
	return "success", nil
}

// 使用带重试的步骤
wf := workflow.NewWorkflow("retry-demo").
	AddStepWithRetry("unreliable-step", unreliableStep, 
		workflow.RetryPolicy{
			MaxAttempts: 3,
			Delay:       1 * time.Second,
		})

持久化存储

// 使用Redis作为持久化存储
store := workflow.NewRedisStore(&workflow.RedisConfig{
	Addr:     "localhost:6379",
	Password: "",
	DB:       0,
})

engine := workflow.NewEngine(
	workflow.WithStore(store),
)

最佳实践

  1. 保持步骤幂等性:确保每个步骤可以安全重试
  2. 合理设置超时:根据业务特点配置适当的超时时间
  3. 监控工作流执行:集成监控系统跟踪工作流状态
  4. 版本控制:对工作流定义进行版本管理
  5. 测试策略:单元测试每个步骤,集成测试完整工作流

常见问题解决

  1. 工作流卡住:检查是否有步骤未正确完成或超时设置过短
  2. 事件丢失:确保事件总线配置正确并启用持久化
  3. 性能问题:考虑对长时间运行的工作流使用异步步骤
  4. 调试困难:启用详细日志并利用工作流可视化工具

通过Workflow框架,开发者可以构建灵活、可靠且易于维护的跨服务业务流程,显著降低分布式系统开发的复杂度。

回到顶部