golang基于有向无环图的工作流管理框架插件库go-dag的使用
golang基于有向无环图的工作流管理框架插件库go-dag的使用
go-dag是一个用Go语言开发的框架,用于管理由有向无环图(DAG)描述的工作流执行。与其他DAG框架不同,go-dag不强调边和顶点的概念,而是让用户专注于定义transit
(worker及其输入和输出)。go-dag会根据用户定义的transit自动构建工作流并执行。
特点
- 支持Go 1.20、1.21、1.22及最新版本
- 零外部依赖,仅使用内部代码包(测试除外)
- 支持简单工作流描述,可处理任意复杂度的DAG
- 提供日志功能,支持自定义transit和日志事件
安装
go get github.com/rhosocial/go-dag
使用示例
下面是一个使用go-dag的完整示例demo:
package main
import (
"fmt"
"log"
"github.com/rhosocial/go-dag/workflow/simple"
)
// 定义一个简单的worker
type SimpleWorker struct {
name string
}
func (w *SimpleWorker) Work(input []interface{}, output chan<- interface{}) error {
log.Printf("[%s] 开始处理输入: %v\n", w.name, input)
// 模拟处理逻辑
result := fmt.Sprintf("%s处理后的结果", w.name)
// 将结果发送到输出通道
output <- result
log.Printf("[%s] 处理完成\n", w.name)
return nil
}
func main() {
// 创建工作流管理器
manager := simple.NewWorkflowManager()
// 创建三个worker
workerA := &SimpleWorker{name: "WorkerA"}
workerB := &SimpleWorker{name: "WorkerB"}
workerC := &SimpleWorker{name: "WorkerC"}
// 定义工作流:
// WorkerA -> WorkerB
// \-> WorkerC
// 添加transit(任务)
transitA := manager.AddTransit("A", workerA)
transitB := manager.AddTransit("B", workerB)
transitC := manager.AddTransit("C", workerC)
// 设置依赖关系
transitB.DependsOn(transitA)
transitC.DependsOn(transitA)
// 启动工作流
results, err := manager.Run()
if err != nil {
log.Fatalf("工作流执行失败: %v", err)
}
// 打印结果
for name, result := range results {
fmt.Printf("%s的结果: %v\n", name, result)
}
}
代码说明
- 首先定义了一个
SimpleWorker
结构体,实现了Work
方法作为工作单元 - 创建
WorkflowManager
来管理工作流 - 使用
AddTransit
方法添加三个任务(transit) - 使用
DependsOn
方法设置任务间的依赖关系- WorkerB和WorkerC都依赖于WorkerA
- 调用
Run
方法执行工作流 - 最后打印每个任务的结果
输出示例
执行上述代码后,输出可能类似于:
[WorkerA] 开始处理输入: []
[WorkerA] 处理完成
[WorkerB] 开始处理输入: [WorkerA处理后的结果]
[WorkerB] 处理完成
[WorkerC] 开始处理输入: [WorkerA处理后的结果]
[WorkerC] 处理完成
A的结果: WorkerA处理后的结果
B的结果: WorkerB处理后的结果
C的结果: WorkerC处理后的结果
更多功能
go-dag的workflow/simple
包还提供以下功能:
- 自定义日志记录
- 错误处理
- 并发控制
- 工作流可视化(通过日志)
如果需要更复杂的工作流功能,可以关注项目未来的更新。
更多关于golang基于有向无环图的工作流管理框架插件库go-dag的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang基于有向无环图的工作流管理框架插件库go-dag的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Go-DAG: 基于有向无环图的工作流管理框架
Go-DAG 是一个用于管理有向无环图(DAG)工作流的 Go 语言库,它可以帮助你构建和执行复杂的任务依赖关系。下面我将详细介绍如何使用这个库。
安装
go get github.com/heimdalr/dag
基本概念
- 顶点(Vertex): 表示工作流中的一个任务或步骤
- 边(Edge): 表示任务之间的依赖关系
- DAG: 有向无环图,表示整个工作流
基本用法
1. 创建DAG
package main
import (
"fmt"
"github.com/heimdalr/dag"
)
func main() {
// 创建一个新的DAG实例
d := dag.NewDAG()
// 添加顶点(任务)
id1, _ := d.AddVertex("1", "Task 1")
id2, _ := d.AddVertex("2", "Task 2")
id3, _ := d.AddVertex("3", "Task 3")
// 添加边(依赖关系)
_ = d.AddEdge(id1, id2) // Task 1 -> Task 2
_ = d.AddEdge(id2, id3) // Task 2 -> Task 3
}
2. 执行工作流
// 定义一个任务执行函数
func executeTask(vertexID string, value interface{}) error {
taskName := value.(string)
fmt.Printf("Executing task %s (ID: %s)\n", taskName, vertexID)
// 这里执行实际的任务逻辑
return nil
}
// 执行整个DAG
func executeDAG(d *dag.DAG) error {
// 获取拓扑排序后的顶点列表
order, _ := d.GetOrderedDescendants("1") // 从根节点开始
// 按顺序执行任务
for _, id := range order {
value, _ := d.GetVertex(id)
if err := executeTask(id, value); err != nil {
return fmt.Errorf("task %s failed: %v", id, err)
}
}
return nil
}
func main() {
d := dag.NewDAG()
// ... 添加顶点和边 ...
if err := executeDAG(d); err != nil {
fmt.Println("Workflow execution failed:", err)
}
}
高级功能
1. 并行执行
func executeParallel(d *dag.DAG) error {
// 获取可以并行执行的层级
layers, _ := d.GetLayers()
for _, layer := range layers {
// 为每个层创建一个等待组
var wg sync.WaitGroup
errChan := make(chan error, len(layer))
for _, id := range layer {
wg.Add(1)
go func(vertexID string) {
defer wg.Done()
value, _ := d.GetVertex(vertexID)
if err := executeTask(vertexID, value); err != nil {
errChan <- err
}
}(id)
}
wg.Wait()
close(errChan)
// 检查是否有错误
for err := range errChan {
return err
}
}
return nil
}
2. 动态DAG构建
func buildDynamicDAG() *dag.DAG {
d := dag.NewDAG()
// 添加基础任务
baseTasks := []string{"init", "process", "finalize"}
for i, task := range baseTasks {
id := fmt.Sprintf("%d", i+1)
_, _ = d.AddVertex(id, task)
}
// 添加依赖
_ = d.AddEdge("1", "2")
_ = d.AddEdge("2", "3")
// 动态添加分支任务
branchTasks := []string{"branch1", "branch2"}
for i, task := range branchTasks {
id := fmt.Sprintf("b%d", i+1)
_, _ = d.AddVertex(id, task)
// 从process任务分支出来
_ = d.AddEdge("2", id)
// 分支任务完成后回到主流程
_ = d.AddEdge(id, "3")
}
return d
}
3. 可视化DAG
func visualizeDAG(d *dag.DAG) string {
var builder strings.Builder
builder.WriteString("digraph {\n")
// 添加所有顶点
vertices := d.GetVertices()
for id, value := range vertices {
builder.WriteString(fmt.Sprintf(" %s [label=\"%s\"];\n", id, value))
}
// 添加所有边
edges := d.GetEdges()
for from, toList := range edges {
for _, to := range toList {
builder.WriteString(fmt.Sprintf(" %s -> %s;\n", from, to))
}
}
builder.WriteString("}\n")
return builder.String()
}
// 可以将输出保存为.dot文件,用Graphviz工具渲染
错误处理
func safeAddEdge(d *dag.DAG, from, to string) error {
// 检查是否会形成环
if d.WillCreateCycle(from, to) {
return fmt.Errorf("adding edge from %s to %s would create a cycle", from, to)
}
// 检查顶点是否存在
if !d.VertexExists(from) {
return fmt.Errorf("source vertex %s does not exist", from)
}
if !d.VertexExists(to) {
return fmt.Errorf("destination vertex %s does not exist", to)
}
return d.AddEdge(from, to)
}
实际应用示例
// 定义一个更复杂的任务结构
type Task struct {
Name string
Duration time.Duration
Handler func() error
}
func main() {
d := dag.NewDAG()
// 添加复杂任务
tasks := map[string]*Task{
"download": {"Download data", 2 * time.Second, func() error { /* ... */ return nil }},
"parse": {"Parse data", 1 * time.Second, func() error { /* ... */ return nil }},
"process": {"Process data", 3 * time.Second, func() error { /* ... */ return nil }},
"store": {"Store results", 1 * time.Second, func() error { /* ... */ return nil }},
"notify": {"Send notification", 500 * time.Millisecond, func() error { /* ... */ return nil }},
}
// 添加顶点
for id, task := range tasks {
_, _ = d.AddVertex(id, task)
}
// 设置依赖关系
_ = d.AddEdge("download", "parse")
_ = d.AddEdge("parse", "process")
_ = d.AddEdge("process", "store")
_ = d.AddEdge("store", "notify")
// 执行工作流
start := time.Now()
if err := executeParallel(d); err != nil {
fmt.Println("Workflow failed:", err)
} else {
fmt.Printf("Workflow completed in %v\n", time.Since(start))
}
}
总结
Go-DAG 提供了一个简单而强大的方式来管理和执行基于 DAG 的工作流。通过它,你可以:
- 清晰地定义任务及其依赖关系
- 自动检测循环依赖
- 支持顺序或并行执行
- 动态构建和修改工作流
- 可视化工作流结构
对于更复杂的场景,你还可以考虑结合其他库如 github.com/hashicorp/go-multierror
来处理并行执行中的多个错误,或者使用 context
包来实现超时和取消功能。
希望这个指南能帮助你开始使用 Go-DAG 来管理你的工作流!