golang全功能业务流程开发平台插件iWF的使用

Golang全功能业务流程开发平台插件iWF的使用

什么是iWF

Indeed Workflow Framework(iWF)是一个编码框架和服务,用于简化涉及等待外部事件、处理超时和持久化长期状态的业务流程。通过iWF,开发者可以构建可扩展、可维护的工作流,使其能适应实时事件并与外部系统无缝集成。

iWF的独特优势

  • 工作流即代码:使用原生代码定义所有内容:分支、循环、并行线程、变量、模式等
  • 结构化编程:提供良好组织的结构,使工作流自然且易于阅读
  • 持久计时器:提供持久、对系统故障具有弹性的计时器
  • 自动重试:后台执行单元(WorkflowState)天生具有故障恢复能力,使用持久计时器内置分布式退避重试
  • 简化架构:iWF应用程序都是基于REST的微服务,易于部署、监控、扩展、维护(版本)和行业标准操作
  • API简单明确:使用尽可能少的概念来建模复杂逻辑
  • 动态交互:允许外部应用程序通过RPC、信号和内部通道与运行中的工作流交互
  • 丰富工具:提供工具来查找运行状态定义、跳过计时器、增强重置等

如何使用iWF Golang SDK

iWF提供了Golang SDK供开发者使用:

package main

import (
	"context"
	"fmt"
	"github.com/indeedeng/iwf-golang-sdk/iwf"
)

// 定义工作流状态
type MyWorkflowState struct {
	iwf.DefaultWorkflowState
}

// 实现状态行为
func (b MyWorkflowState) WaitUntil(ctx context.Context, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
	// 在此处实现等待逻辑
	return iwf.EmptyCommandRequest(), nil
}

func (b MyWorkflowState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
	// 在此处实现状态执行逻辑
	fmt.Println("Executing workflow state")
	
	// 决定下一步
	return iwf.GracefulCompleteWorkflow(1), nil
}

// 注册工作流
func init() {
	iwf.RegisterWorkflow(&MyWorkflow{})
	iwf.RegisterWorkflowState(&MyWorkflowState{})
}

// 定义工作流
type MyWorkflow struct {
	iwf.DefaultWorkflow
}

// 启动工作流
func (b MyWorkflow) GetWorkflowStates() []iwf.StateDef {
	return []iwf.StateDef{
		iwf.StateDef{
			State: &MyWorkflowState{},
		},
	}
}

func main() {
	// 创建iWF客户端
	client := iwf.NewClient(iwf.NewWorkflowWorker())

	// 启动工作流
	runId, err := client.StartWorkflow(
		context.Background(),
		"MyWorkflow",
		"1",
		10,
		nil,
		nil,
	)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Workflow started with runId: %s\n", runId)
}

示例:转账业务流程(SAGA模式)

package moneytransfer

import (
	"context"
	"github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"
	"github.com/indeedeng/iwf-golang-sdk/iwf"
)

// 定义转账状态
type TransferWorkflowState struct {
	iwf.DefaultWorkflowState
}

func (b TransferWorkflowState) WaitUntil(ctx context.Context, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
	// 在此处实现转账前的等待逻辑
	return iwf.EmptyCommandRequest(), nil
}

func (b TransferWorkflowState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
	// 实现转账逻辑
	var transferRequest TransferRequest
	err := input.Get(&transferRequest)
	if err != nil {
		return nil, err
	}
	
	// 执行转账
	err = transferMoney(transferRequest.FromAccount, transferRequest.ToAccount, transferRequest.Amount)
	if err != nil {
		// 转账失败,进入补偿状态
		return iwf.SingleNextState(&CompensationState{}, nil), nil
	}
	
	// 转账成功
	return iwf.GracefulCompleteWorkflow(1), nil
}

// 补偿状态
type CompensationState struct {
	iwf.DefaultWorkflowState
}

func (b CompensationState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
	// 实现补偿逻辑
	var transferRequest TransferRequest
	err := input.Get(&transferRequest)
	if err != nil {
		return nil, err
	}
	
	// 执行补偿
	compensateTransfer(transferRequest.FromAccount, transferRequest.ToAccount, transferRequest.Amount)
	
	return iwf.ForceCompleteWorkflow(1), nil
}

// 转账请求结构
type TransferRequest struct {
	FromAccount string
	ToAccount   string
	Amount      float64
}

// 注册工作流和状态
func init() {
	iwf.RegisterWorkflow(&MoneyTransferWorkflow{})
	iwf.RegisterWorkflowState(&TransferWorkflowState{})
	iwf.RegisterWorkflowState(&CompensationState{})
}

// 定义转账工作流
type MoneyTransferWorkflow struct {
	iwf.DefaultWorkflow
}

func (b MoneyTransferWorkflow) GetWorkflowStates() []iwf.StateDef {
	return []iwf.StateDef{
		iwf.StateDef{
			State: &TransferWorkflowState{},
		},
		iwf.StateDef{
			State: &CompensationState{},
		},
	}
}

运行iWF服务

最简单的方式是使用Docker运行iWF服务:

docker pull iworkflowio/iwf-server-lite:latest && docker run -p 8801:8801 -p 7233:7233 -p 8233:8233 -e AUTO_FIX_WORKER_URL=host.docker.internal --add-host host.docker.internal:host-gateway -it iworkflowio/iwf-server-lite:latest

这将启动以下服务:

支持与贡献

可以加入Slack频道获取支持,或者查看GitHub Discussions和Issues。贡献代码请参考CONTRIBUTING指南。


更多关于golang全功能业务流程开发平台插件iWF的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang全功能业务流程开发平台插件iWF的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


iWF: Golang全功能业务流程开发平台插件

iWF(Industrial Workflow)是一个基于Golang的全功能业务流程编排引擎,它提供了一种简单高效的方式来构建复杂的业务流程。下面我将详细介绍iWF的使用方法,并提供相关示例代码。

iWF核心概念

iWF基于工作流模式,主要包含以下核心组件:

  1. 工作流定义(Workflow Definition):描述业务流程的结构
  2. 活动(Activity):业务流程中的具体执行单元
  3. 决策器(Decider):控制工作流流转逻辑
  4. 持久化(Persistence):工作流状态存储

安装iWF

go get github.com/indeedeng/iwf-golang-sdk

基本使用示例

1. 定义工作流

package main

import (
	"context"
	"fmt"
	"github.com/indeedeng/iwf-golang-sdk/iwf"
)

type MyWorkflow struct {
	iwf.DefaultWorkflowType
}

// 定义工作流ID
func (b MyWorkflow) GetWorkflowId() string {
	return "my-workflow"
}

// 定义工作流状态
func (b MyWorkflow) GetWorkflowStates() []iwf.StateDef {
	return []iwf.StateDef{
		{State: &StartState{}},
		{State: &ProcessingState{}},
		{State: &CompleteState{}},
	}
}

2. 定义状态

type StartState struct {
	iwf.DefaultState
}

func (s StartState) WaitUntil(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
	// 初始化工作流数据
	var workflowInput string
	input.Get(&workflowInput)
	
	persistence.SetDataAttribute("input", workflowInput)
	
	// 进入下一个状态
	return iwf.EmptyCommandRequest(), nil
}

func (s StartState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
	// 决定下一个状态是ProcessingState
	return iwf.ForceCompleteWorkflow("processing"), nil
}

3. 处理状态

type ProcessingState struct {
	iwf.DefaultState
}

func (s ProcessingState) WaitUntil(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
	// 执行一些异步操作
	return iwf.AllCommandCompletedRequest(
		iwf.NewTimerCommand("timer1", time.Now().Add(10*time.Second)),
	), nil
}

func (s ProcessingState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
	// 检查定时器是否完成
	if commandResults.TimerCommands[0].Status != iwf.COMPLETED {
		return iwf.DeadEnd, nil
	}
	
	// 处理业务逻辑
	var inputData string
	persistence.GetDataAttribute("input", &inputData)
	
	processedData := "processed_" + inputData
	persistence.SetDataAttribute("result", processedData)
	
	// 进入完成状态
	return iwf.ForceCompleteWorkflow("complete"), nil
}

4. 完成状态

type CompleteState struct {
	iwf.DefaultState
}

func (s CompleteState) WaitUntil(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
	return iwf.EmptyCommandRequest(), nil
}

func (s CompleteState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
	// 工作流完成
	return iwf.ForceCompleteWorkflow(""), nil
}

5. 启动工作流

func main() {
	// 创建iWF客户端
	client := iwf.NewClient(iwf.NewWorkflowClient(), iwf.NewWorkerService())
	
	// 注册工作流
	registry := iwf.NewRegistry()
	registry.AddWorkflow(&MyWorkflow{})
	
	// 启动工作流
	workflowId := "example-workflow-1"
	input := "test-input"
	runId, err := client.StartWorkflow(
		context.Background(),
		&MyWorkflow{},
		workflowId,
		10*time.Second, // 工作流超时
		input,
	)
	if err != nil {
		panic(err)
	}
	
	fmt.Printf("Workflow started: %s, runId: %s\n", workflowId, runId)
}

高级功能

1. 信号处理

// 在状态中等待信号
func (s SomeState) WaitUntil(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
	return iwf.AllCommandCompletedRequest(
		iwf.NewSignalCommand("signal1"),
	), nil
}

// 发送信号
err := client.SignalWorkflow(
	context.Background(),
	workflowId,
	runId,
	"signal1",
	"signal-data",
)

2. 查询工作流状态

result, err := client.QueryWorkflow(
	context.Background(),
	workflowId,
	runId,
	"get-result", // 查询类型
	nil,          // 查询参数
)

3. 错误处理

func (s SomeState) Execute(ctx context.Context, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
	// 业务逻辑
	if err := someBusinessLogic(); err != nil {
		// 重试逻辑
		return iwf.GracefulCompleteWorkflow(
			iwf.NewStateDef(&ErrorHandlingState{}),
			iwf.NewStateInput(err.Error()),
		), nil
	}
	
	return iwf.ForceCompleteWorkflow("next-state"), nil
}

最佳实践

  1. 状态设计:将业务流程分解为离散的状态,每个状态职责单一
  2. 幂等性:确保状态处理逻辑是幂等的,可以安全重试
  3. 超时设置:为长时间运行的操作设置合理的超时
  4. 错误处理:设计专门的错误处理状态
  5. 测试:为每个状态和工作流编写单元测试

iWF提供了强大的业务流程编排能力,通过合理设计工作流状态和流转逻辑,可以构建复杂的业务系统。以上示例展示了iWF的基本用法,实际使用时可以根据业务需求进行扩展。

回到顶部