golang全功能业务流程开发平台插件iWF的使用
Golang全功能业务流程开发平台插件iWF的使用
什么是iWF
Indeed Workflow Framework(iWF)是一个编码框架和服务,用于简化涉及等待外部事件、处理超时和持久化长期状态的业务流程。通过iWF,开发者可以构建可扩展、可维护的工作流,使其能适应实时事件并与外部系统无缝集成。
iWF的独特优势
- 工作流即代码:使用原生代码定义所有内容:分支、循环、并行线程、变量、模式等
- 结构化编程:提供良好组织的结构,使工作流自然且易于阅读
- 持久计时器:提供持久、对系统故障具有弹性的计时器
- 自动重试:后台执行单元(WorkflowState)天生具有故障恢复能力,使用持久计时器内置分布式退避重试
- 简化架构:iWF应用程序都是基于REST的微服务,易于部署、监控、扩展、维护(版本)和行业标准操作
- API简单明确:使用尽可能少的概念来建模复杂逻辑
- 动态交互:允许外部应用程序通过RPC、信号和内部通道与运行中的工作流交互
- 丰富工具:提供工具来查找运行状态定义、跳过计时器、增强重置等
如何使用iWF Golang SDK
iWF提供了Golang SDK供开发者使用:
package main
import (
"context"
"fmt"
"github.com/indeedeng/iwf-golang-sdk/iwf"
)
// 定义工作流状态
type MyWorkflowState struct {
iwf.DefaultWorkflowState
}
// 实现状态行为
func (b MyWorkflowState) WaitUntil(ctx context.Context, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
// 在此处实现等待逻辑
return iwf.EmptyCommandRequest(), nil
}
func (b MyWorkflowState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
// 在此处实现状态执行逻辑
fmt.Println("Executing workflow state")
// 决定下一步
return iwf.GracefulCompleteWorkflow(1), nil
}
// 注册工作流
func init() {
iwf.RegisterWorkflow(&MyWorkflow{})
iwf.RegisterWorkflowState(&MyWorkflowState{})
}
// 定义工作流
type MyWorkflow struct {
iwf.DefaultWorkflow
}
// 启动工作流
func (b MyWorkflow) GetWorkflowStates() []iwf.StateDef {
return []iwf.StateDef{
iwf.StateDef{
State: &MyWorkflowState{},
},
}
}
func main() {
// 创建iWF客户端
client := iwf.NewClient(iwf.NewWorkflowWorker())
// 启动工作流
runId, err := client.StartWorkflow(
context.Background(),
"MyWorkflow",
"1",
10,
nil,
nil,
)
if err != nil {
panic(err)
}
fmt.Printf("Workflow started with runId: %s\n", runId)
}
示例:转账业务流程(SAGA模式)
package moneytransfer
import (
"context"
"github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"
"github.com/indeedeng/iwf-golang-sdk/iwf"
)
// 定义转账状态
type TransferWorkflowState struct {
iwf.DefaultWorkflowState
}
func (b TransferWorkflowState) WaitUntil(ctx context.Context, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
// 在此处实现转账前的等待逻辑
return iwf.EmptyCommandRequest(), nil
}
func (b TransferWorkflowState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
// 实现转账逻辑
var transferRequest TransferRequest
err := input.Get(&transferRequest)
if err != nil {
return nil, err
}
// 执行转账
err = transferMoney(transferRequest.FromAccount, transferRequest.ToAccount, transferRequest.Amount)
if err != nil {
// 转账失败,进入补偿状态
return iwf.SingleNextState(&CompensationState{}, nil), nil
}
// 转账成功
return iwf.GracefulCompleteWorkflow(1), nil
}
// 补偿状态
type CompensationState struct {
iwf.DefaultWorkflowState
}
func (b CompensationState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
// 实现补偿逻辑
var transferRequest TransferRequest
err := input.Get(&transferRequest)
if err != nil {
return nil, err
}
// 执行补偿
compensateTransfer(transferRequest.FromAccount, transferRequest.ToAccount, transferRequest.Amount)
return iwf.ForceCompleteWorkflow(1), nil
}
// 转账请求结构
type TransferRequest struct {
FromAccount string
ToAccount string
Amount float64
}
// 注册工作流和状态
func init() {
iwf.RegisterWorkflow(&MoneyTransferWorkflow{})
iwf.RegisterWorkflowState(&TransferWorkflowState{})
iwf.RegisterWorkflowState(&CompensationState{})
}
// 定义转账工作流
type MoneyTransferWorkflow struct {
iwf.DefaultWorkflow
}
func (b MoneyTransferWorkflow) GetWorkflowStates() []iwf.StateDef {
return []iwf.StateDef{
iwf.StateDef{
State: &TransferWorkflowState{},
},
iwf.StateDef{
State: &CompensationState{},
},
}
}
运行iWF服务
最简单的方式是使用Docker运行iWF服务:
docker pull iworkflowio/iwf-server-lite:latest && docker run -p 8801:8801 -p 7233:7233 -p 8233:8233 -e AUTO_FIX_WORKER_URL=host.docker.internal --add-host host.docker.internal:host-gateway -it iworkflowio/iwf-server-lite:latest
这将启动以下服务:
- IWF服务: http://localhost:8801/
- Temporal WebUI: http://localhost:8233/
- Temporal服务: localhost:7233
支持与贡献
可以加入Slack频道获取支持,或者查看GitHub Discussions和Issues。贡献代码请参考CONTRIBUTING指南。
更多关于golang全功能业务流程开发平台插件iWF的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang全功能业务流程开发平台插件iWF的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
iWF: Golang全功能业务流程开发平台插件
iWF(Industrial Workflow)是一个基于Golang的全功能业务流程编排引擎,它提供了一种简单高效的方式来构建复杂的业务流程。下面我将详细介绍iWF的使用方法,并提供相关示例代码。
iWF核心概念
iWF基于工作流模式,主要包含以下核心组件:
- 工作流定义(Workflow Definition):描述业务流程的结构
- 活动(Activity):业务流程中的具体执行单元
- 决策器(Decider):控制工作流流转逻辑
- 持久化(Persistence):工作流状态存储
安装iWF
go get github.com/indeedeng/iwf-golang-sdk
基本使用示例
1. 定义工作流
package main
import (
"context"
"fmt"
"github.com/indeedeng/iwf-golang-sdk/iwf"
)
type MyWorkflow struct {
iwf.DefaultWorkflowType
}
// 定义工作流ID
func (b MyWorkflow) GetWorkflowId() string {
return "my-workflow"
}
// 定义工作流状态
func (b MyWorkflow) GetWorkflowStates() []iwf.StateDef {
return []iwf.StateDef{
{State: &StartState{}},
{State: &ProcessingState{}},
{State: &CompleteState{}},
}
}
2. 定义状态
type StartState struct {
iwf.DefaultState
}
func (s StartState) WaitUntil(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
// 初始化工作流数据
var workflowInput string
input.Get(&workflowInput)
persistence.SetDataAttribute("input", workflowInput)
// 进入下一个状态
return iwf.EmptyCommandRequest(), nil
}
func (s StartState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
// 决定下一个状态是ProcessingState
return iwf.ForceCompleteWorkflow("processing"), nil
}
3. 处理状态
type ProcessingState struct {
iwf.DefaultState
}
func (s ProcessingState) WaitUntil(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
// 执行一些异步操作
return iwf.AllCommandCompletedRequest(
iwf.NewTimerCommand("timer1", time.Now().Add(10*time.Second)),
), nil
}
func (s ProcessingState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
// 检查定时器是否完成
if commandResults.TimerCommands[0].Status != iwf.COMPLETED {
return iwf.DeadEnd, nil
}
// 处理业务逻辑
var inputData string
persistence.GetDataAttribute("input", &inputData)
processedData := "processed_" + inputData
persistence.SetDataAttribute("result", processedData)
// 进入完成状态
return iwf.ForceCompleteWorkflow("complete"), nil
}
4. 完成状态
type CompleteState struct {
iwf.DefaultState
}
func (s CompleteState) WaitUntil(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
return iwf.EmptyCommandRequest(), nil
}
func (s CompleteState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
// 工作流完成
return iwf.ForceCompleteWorkflow(""), nil
}
5. 启动工作流
func main() {
// 创建iWF客户端
client := iwf.NewClient(iwf.NewWorkflowClient(), iwf.NewWorkerService())
// 注册工作流
registry := iwf.NewRegistry()
registry.AddWorkflow(&MyWorkflow{})
// 启动工作流
workflowId := "example-workflow-1"
input := "test-input"
runId, err := client.StartWorkflow(
context.Background(),
&MyWorkflow{},
workflowId,
10*time.Second, // 工作流超时
input,
)
if err != nil {
panic(err)
}
fmt.Printf("Workflow started: %s, runId: %s\n", workflowId, runId)
}
高级功能
1. 信号处理
// 在状态中等待信号
func (s SomeState) WaitUntil(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
return iwf.AllCommandCompletedRequest(
iwf.NewSignalCommand("signal1"),
), nil
}
// 发送信号
err := client.SignalWorkflow(
context.Background(),
workflowId,
runId,
"signal1",
"signal-data",
)
2. 查询工作流状态
result, err := client.QueryWorkflow(
context.Background(),
workflowId,
runId,
"get-result", // 查询类型
nil, // 查询参数
)
3. 错误处理
func (s SomeState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
// 业务逻辑
if err := someBusinessLogic(); err != nil {
// 重试逻辑
return iwf.GracefulCompleteWorkflow(
iwf.NewStateDef(&ErrorHandlingState{}),
iwf.NewStateInput(err.Error()),
), nil
}
return iwf.ForceCompleteWorkflow("next-state"), nil
}
最佳实践
- 状态设计:将业务流程分解为离散的状态,每个状态职责单一
- 幂等性:确保状态处理逻辑是幂等的,可以安全重试
- 超时设置:为长时间运行的操作设置合理的超时
- 错误处理:设计专门的错误处理状态
- 测试:为每个状态和工作流编写单元测试
iWF提供了强大的业务流程编排能力,通过合理设计工作流状态和流转逻辑,可以构建复杂的业务系统。以上示例展示了iWF的基本用法,实际使用时可以根据业务需求进行扩展。