golang基于Cadence引擎的工作流与活动编排框架插件库cadence-client的使用
Golang基于Cadence引擎的工作流与活动编排框架插件库cadence-client的使用
Cadence介绍
Cadence是Uber Engineering开发的分布式、可扩展、持久且高可用的编排引擎,用于以可扩展和弹性的方式执行异步长时间运行的业务逻辑。cadence-client
是用于编写工作流和活动的框架。
安装
确保将代码库克隆到正确位置:
git clone git@github.com:uber-go/cadence-client.git $GOPATH/src/go.uber.org/cadence
或者使用go get:
go get go.uber.org/cadence
完整示例Demo
以下是一个完整的Golang使用Cadence的示例,包含工作流和活动的定义与执行:
package main
import (
"context"
"log"
"time"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/client"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
)
// 定义活动
func sampleActivity(ctx context.Context, name string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Activity started", zap.String("name", name))
time.Sleep(5 * time.Second) // 模拟长时间运行的任务
return "Hello " + name + "!", nil
}
// 定义工作流
func sampleWorkflow(ctx workflow.Context, name string) (string, error) {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
err := workflow.ExecuteActivity(ctx, sampleActivity, name).Get(ctx, &result)
if err != nil {
return "", err
}
return result, nil
}
func main() {
// 设置worker选项
var h worker.Host
serviceClient, err := h.NewServiceClient("127.0.0.1:7933")
if err != nil {
log.Fatalf("Failed to create service client: %v", err)
}
// 创建worker
workerOptions := worker.Options{
Logger: zap.NewNop(),
}
worker := worker.New(serviceClient, "test-domain", "test-task-list", workerOptions)
// 注册工作流和活动
worker.RegisterWorkflow(sampleWorkflow)
worker.RegisterActivity(sampleActivity)
// 启动worker
err = worker.Start()
if err != nil {
log.Fatalf("Failed to start worker: %v", err)
}
defer worker.Stop()
// 创建工作流客户端
workflowClient, err := client.NewClient(serviceClient, "test-domain", &client.Options{})
if err != nil {
log.Fatalf("Failed to create workflow client: %v", err)
}
// 启动工作流
workflowOptions := client.StartWorkflowOptions{
TaskList: "test-task-list",
ExecutionStartToCloseTimeout: time.Minute,
DecisionTaskStartToCloseTimeout: time.Minute,
}
execution, err := workflowClient.StartWorkflow(context.Background(), workflowOptions, sampleWorkflow, "Cadence")
if err != nil {
log.Fatalf("Failed to start workflow: %v", err)
}
log.Printf("Started workflow: WorkflowID=%v, RunID=%v", execution.ID, execution.RunID)
// 等待工作流完成
var result string
err = workflowClient.GetWorkflow(context.Background(), execution.ID, execution.RunID).Get(context.Background(), &result)
if err != nil {
log.Fatalf("Failed to get workflow result: %v", err)
}
log.Printf("Workflow result: %v", result)
}
代码说明
-
活动(Activity)定义:
func sampleActivity(ctx context.Context, name string) (string, error) { // 活动实现 }
- 活动是工作流中执行具体任务的单位
- 可以包含长时间运行的操作
-
工作流(Workflow)定义:
func sampleWorkflow(ctx workflow.Context, name string) (string, error) { // 工作流实现 // 调用活动 err := workflow.ExecuteActivity(ctx, sampleActivity, name).Get(ctx, &result) }
- 工作流协调活动的执行
- 可以包含业务逻辑和控制流
-
Worker配置:
worker := worker.New(serviceClient, "test-domain", "test-task-list", workerOptions) worker.RegisterWorkflow(sampleWorkflow) worker.RegisterActivity(sampleActivity)
- Worker负责执行工作流和活动
- 需要注册工作流和活动定义
-
工作流启动:
execution, err := workflowClient.StartWorkflow(context.Background(), workflowOptions, sampleWorkflow, "Cadence")
- 创建工作流执行实例
- 可以传递参数给工作流
-
结果获取:
err = workflowClient.GetWorkflow(context.Background(), execution.ID, execution.RunID).Get(context.Background(), &result)
- 可以同步或异步获取工作流执行结果
更多资源
- 查看示例代码库了解更多使用场景
- 参考官方文档获取详细API说明
更多关于golang基于Cadence引擎的工作流与活动编排框架插件库cadence-client的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang基于Cadence引擎的工作流与活动编排框架插件库cadence-client的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang基于Cadence引擎的工作流与活动编排框架
Cadence是一个由Uber开源的分布式工作流编排引擎,而cadence-client
是Golang语言与Cadence服务交互的客户端库。下面我将详细介绍如何使用这个库进行工作流和活动的编排。
1. 基本概念
- Workflow(工作流): 定义业务流程的执行逻辑
- Activity(活动): 工作流中执行的具体任务
- Worker(工作者): 执行工作流和活动代码的进程
2. 环境准备
首先需要安装Cadence服务并启动,然后安装Go客户端库:
go get go.uber.org/cadence
go get go.uber.org/yarpc
go get go.uber.org/thriftrw
3. 基本使用示例
3.1 定义活动(Activity)
package activities
import (
"context"
"time"
)
// GreetingActivity 定义一个简单的活动
func GreetingActivity(ctx context.Context, name string) (string, error) {
// 模拟耗时操作
time.Sleep(2 * time.Second)
return "Hello " + name + "!", nil
}
3.2 定义工作流(Workflow)
package workflows
import (
"time"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
"yourproject/activities"
)
// GreetingWorkflow 定义一个简单的工作流
func GreetingWorkflow(ctx workflow.Context, name string) (string, error) {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
err := workflow.ExecuteActivity(ctx, activities.GreetingActivity, name).Get(ctx, &result)
if err != nil {
workflow.GetLogger(ctx).Error("Activity failed", zap.Error(err))
return "", err
}
return result, nil
}
3.3 注册工作流和活动
package main
import (
"log"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"yourproject/activities"
"yourproject/workflows"
)
func init() {
// 注册工作流
workflow.Register(workflows.GreetingWorkflow)
// 注册活动
activity.Register(activities.GreetingActivity)
}
func main() {
// 配置worker
workerOptions := worker.Options{
MetricsScope: tally.NoopScope,
Logger: zap.NewNop(),
DisableWorkflowWorker: false,
}
// 创建worker
worker := worker.New(
service.NewClient(serviceClient),
"your-domain",
"your-task-list",
workerOptions,
)
// 启动worker
err := worker.Start()
if err != nil {
log.Fatal("Failed to start worker", err)
}
// 保持运行
select {}
}
3.4 启动工作流
package main
import (
"context"
"time"
"go.uber.org/cadence/client"
"go.uber.org/zap"
"yourproject/workflows"
)
func startWorkflow() {
// 创建客户端
cadenceClient, err := client.NewClient(serviceClient, "your-domain", nil)
if err != nil {
panic(err)
}
// 工作流选项
workflowOptions := client.StartWorkflowOptions{
ID: "greeting-workflow",
TaskList: "your-task-list",
ExecutionStartToCloseTimeout: time.Minute * 10,
}
// 启动工作流
we, err := cadenceClient.StartWorkflow(
context.Background(),
workflowOptions,
workflows.GreetingWorkflow,
"World",
)
if err != nil {
panic(err)
}
// 获取结果
var result string
err = cadenceClient.GetWorkflow(context.Background(), we.ID, we.RunID).Get(context.Background(), &result)
if err != nil {
panic(err)
}
zap.L().Info("Workflow completed", zap.String("Result", result))
}
4. 高级特性
4.1 信号(Signal)
// 工作流中接收信号
func SignalWorkflow(ctx workflow.Context) error {
ch := workflow.GetSignalChannel(ctx, "signal-name")
var signalData string
ch.Receive(ctx, &signalData)
// 处理信号数据
workflow.GetLogger(ctx).Info("Received signal", zap.String("data", signalData))
return nil
}
// 客户端发送信号
func sendSignal() {
err := cadenceClient.SignalWorkflow(
context.Background(),
"workflow-id",
"run-id",
"signal-name",
"signal-data",
)
if err != nil {
panic(err)
}
}
4.2 查询(Query)
// 工作流中定义查询处理器
func QueryWorkflow(ctx workflow.Context) error {
queryType := "state"
err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) {
return "current-state", nil
})
if err != nil {
return err
}
// 工作流逻辑...
return nil
}
// 客户端查询
func queryWorkflow() {
var result string
err := cadenceClient.QueryWorkflow(
context.Background(),
"workflow-id",
"run-id",
"state",
&result,
)
if err != nil {
panic(err)
}
}
5. 最佳实践
- 幂等性: 活动和信号处理应该是幂等的
- 超时设置: 合理设置活动和工作流的超时时间
- 错误处理: 工作流中应妥善处理活动错误
- 日志记录: 使用workflow.GetLogger记录工作流日志
- 版本控制: 工作流变更时考虑版本兼容性
6. 总结
Cadence提供了强大的工作流编排能力,通过cadence-client
库可以方便地在Golang中实现复杂的业务流程。本文介绍了基本的工作流和活动定义、注册、执行以及信号和查询等高级功能。实际使用时,还需要考虑部署架构、监控和错误处理等生产环境因素。