golang基于有向无环图的工作流管理框架插件库go-dag的使用

golang基于有向无环图的工作流管理框架插件库go-dag的使用

go-dag是一个用Go语言开发的框架,用于管理由有向无环图(DAG)描述的工作流执行。与其他DAG框架不同,go-dag不强调边和顶点的概念,而是让用户专注于定义transit(worker及其输入和输出)。go-dag会根据用户定义的transit自动构建工作流并执行。

特点

  • 支持Go 1.20、1.21、1.22及最新版本
  • 零外部依赖,仅使用内部代码包(测试除外)
  • 支持简单工作流描述,可处理任意复杂度的DAG
  • 提供日志功能,支持自定义transit和日志事件

安装

go get github.com/rhosocial/go-dag

使用示例

下面是一个使用go-dag的完整示例demo:

package main

import (
	"fmt"
	"log"

	"github.com/rhosocial/go-dag/workflow/simple"
)

// 定义一个简单的worker
type SimpleWorker struct {
	name string
}

func (w *SimpleWorker) Work(input []interface{}, output chan<- interface{}) error {
	log.Printf("[%s] 开始处理输入: %v\n", w.name, input)
	
	// 模拟处理逻辑
	result := fmt.Sprintf("%s处理后的结果", w.name)
	
	// 将结果发送到输出通道
	output <- result
	
	log.Printf("[%s] 处理完成\n", w.name)
	return nil
}

func main() {
	// 创建工作流管理器
	manager := simple.NewWorkflowManager()

	// 创建三个worker
	workerA := &SimpleWorker{name: "WorkerA"}
	workerB := &SimpleWorker{name: "WorkerB"}
	workerC := &SimpleWorker{name: "WorkerC"}

	// 定义工作流:
	// WorkerA -> WorkerB
	//       \-> WorkerC
	
	// 添加transit(任务)
	transitA := manager.AddTransit("A", workerA)
	transitB := manager.AddTransit("B", workerB)
	transitC := manager.AddTransit("C", workerC)

	// 设置依赖关系
	transitB.DependsOn(transitA)
	transitC.DependsOn(transitA)

	// 启动工作流
	results, err := manager.Run()
	if err != nil {
		log.Fatalf("工作流执行失败: %v", err)
	}

	// 打印结果
	for name, result := range results {
		fmt.Printf("%s的结果: %v\n", name, result)
	}
}

代码说明

  1. 首先定义了一个SimpleWorker结构体,实现了Work方法作为工作单元
  2. 创建WorkflowManager来管理工作流
  3. 使用AddTransit方法添加三个任务(transit)
  4. 使用DependsOn方法设置任务间的依赖关系
    • WorkerB和WorkerC都依赖于WorkerA
  5. 调用Run方法执行工作流
  6. 最后打印每个任务的结果

输出示例

执行上述代码后,输出可能类似于:

[WorkerA] 开始处理输入: []
[WorkerA] 处理完成
[WorkerB] 开始处理输入: [WorkerA处理后的结果]
[WorkerB] 处理完成
[WorkerC] 开始处理输入: [WorkerA处理后的结果] 
[WorkerC] 处理完成
A的结果: WorkerA处理后的结果
B的结果: WorkerB处理后的结果
C的结果: WorkerC处理后的结果

更多功能

go-dag的workflow/simple包还提供以下功能:

  1. 自定义日志记录
  2. 错误处理
  3. 并发控制
  4. 工作流可视化(通过日志)

如果需要更复杂的工作流功能,可以关注项目未来的更新。


更多关于golang基于有向无环图的工作流管理框架插件库go-dag的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于有向无环图的工作流管理框架插件库go-dag的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Go-DAG: 基于有向无环图的工作流管理框架

Go-DAG 是一个用于管理有向无环图(DAG)工作流的 Go 语言库,它可以帮助你构建和执行复杂的任务依赖关系。下面我将详细介绍如何使用这个库。

安装

go get github.com/heimdalr/dag

基本概念

  • 顶点(Vertex): 表示工作流中的一个任务或步骤
  • 边(Edge): 表示任务之间的依赖关系
  • DAG: 有向无环图,表示整个工作流

基本用法

1. 创建DAG

package main

import (
	"fmt"
	"github.com/heimdalr/dag"
)

func main() {
	// 创建一个新的DAG实例
	d := dag.NewDAG()
	
	// 添加顶点(任务)
	id1, _ := d.AddVertex("1", "Task 1")
	id2, _ := d.AddVertex("2", "Task 2")
	id3, _ := d.AddVertex("3", "Task 3")
	
	// 添加边(依赖关系)
	_ = d.AddEdge(id1, id2) // Task 1 -> Task 2
	_ = d.AddEdge(id2, id3) // Task 2 -> Task 3
}

2. 执行工作流

// 定义一个任务执行函数
func executeTask(vertexID string, value interface{}) error {
	taskName := value.(string)
	fmt.Printf("Executing task %s (ID: %s)\n", taskName, vertexID)
	// 这里执行实际的任务逻辑
	return nil
}

// 执行整个DAG
func executeDAG(d *dag.DAG) error {
	// 获取拓扑排序后的顶点列表
	order, _ := d.GetOrderedDescendants("1") // 从根节点开始
	
	// 按顺序执行任务
	for _, id := range order {
		value, _ := d.GetVertex(id)
		if err := executeTask(id, value); err != nil {
			return fmt.Errorf("task %s failed: %v", id, err)
		}
	}
	return nil
}

func main() {
	d := dag.NewDAG()
	// ... 添加顶点和边 ...
	
	if err := executeDAG(d); err != nil {
		fmt.Println("Workflow execution failed:", err)
	}
}

高级功能

1. 并行执行

func executeParallel(d *dag.DAG) error {
	// 获取可以并行执行的层级
	layers, _ := d.GetLayers()
	
	for _, layer := range layers {
		// 为每个层创建一个等待组
		var wg sync.WaitGroup
		errChan := make(chan error, len(layer))
		
		for _, id := range layer {
			wg.Add(1)
			go func(vertexID string) {
				defer wg.Done()
				value, _ := d.GetVertex(vertexID)
				if err := executeTask(vertexID, value); err != nil {
					errChan <- err
				}
			}(id)
		}
		
		wg.Wait()
		close(errChan)
		
		// 检查是否有错误
		for err := range errChan {
			return err
		}
	}
	return nil
}

2. 动态DAG构建

func buildDynamicDAG() *dag.DAG {
	d := dag.NewDAG()
	
	// 添加基础任务
	baseTasks := []string{"init", "process", "finalize"}
	for i, task := range baseTasks {
		id := fmt.Sprintf("%d", i+1)
		_, _ = d.AddVertex(id, task)
	}
	
	// 添加依赖
	_ = d.AddEdge("1", "2")
	_ = d.AddEdge("2", "3")
	
	// 动态添加分支任务
	branchTasks := []string{"branch1", "branch2"}
	for i, task := range branchTasks {
		id := fmt.Sprintf("b%d", i+1)
		_, _ = d.AddVertex(id, task)
		// 从process任务分支出来
		_ = d.AddEdge("2", id)
		// 分支任务完成后回到主流程
		_ = d.AddEdge(id, "3")
	}
	
	return d
}

3. 可视化DAG

func visualizeDAG(d *dag.DAG) string {
	var builder strings.Builder
	builder.WriteString("digraph {\n")
	
	// 添加所有顶点
	vertices := d.GetVertices()
	for id, value := range vertices {
		builder.WriteString(fmt.Sprintf("  %s [label=\"%s\"];\n", id, value))
	}
	
	// 添加所有边
	edges := d.GetEdges()
	for from, toList := range edges {
		for _, to := range toList {
			builder.WriteString(fmt.Sprintf("  %s -> %s;\n", from, to))
		}
	}
	
	builder.WriteString("}\n")
	return builder.String()
}

// 可以将输出保存为.dot文件,用Graphviz工具渲染

错误处理

func safeAddEdge(d *dag.DAG, from, to string) error {
	// 检查是否会形成环
	if d.WillCreateCycle(from, to) {
		return fmt.Errorf("adding edge from %s to %s would create a cycle", from, to)
	}
	
	// 检查顶点是否存在
	if !d.VertexExists(from) {
		return fmt.Errorf("source vertex %s does not exist", from)
	}
	
	if !d.VertexExists(to) {
		return fmt.Errorf("destination vertex %s does not exist", to)
	}
	
	return d.AddEdge(from, to)
}

实际应用示例

// 定义一个更复杂的任务结构
type Task struct {
	Name     string
	Duration time.Duration
	Handler  func() error
}

func main() {
	d := dag.NewDAG()
	
	// 添加复杂任务
	tasks := map[string]*Task{
		"download": {"Download data", 2 * time.Second, func() error { /* ... */ return nil }},
		"parse":    {"Parse data", 1 * time.Second, func() error { /* ... */ return nil }},
		"process":  {"Process data", 3 * time.Second, func() error { /* ... */ return nil }},
		"store":    {"Store results", 1 * time.Second, func() error { /* ... */ return nil }},
		"notify":   {"Send notification", 500 * time.Millisecond, func() error { /* ... */ return nil }},
	}
	
	// 添加顶点
	for id, task := range tasks {
		_, _ = d.AddVertex(id, task)
	}
	
	// 设置依赖关系
	_ = d.AddEdge("download", "parse")
	_ = d.AddEdge("parse", "process")
	_ = d.AddEdge("process", "store")
	_ = d.AddEdge("store", "notify")
	
	// 执行工作流
	start := time.Now()
	if err := executeParallel(d); err != nil {
		fmt.Println("Workflow failed:", err)
	} else {
		fmt.Printf("Workflow completed in %v\n", time.Since(start))
	}
}

总结

Go-DAG 提供了一个简单而强大的方式来管理和执行基于 DAG 的工作流。通过它,你可以:

  1. 清晰地定义任务及其依赖关系
  2. 自动检测循环依赖
  3. 支持顺序或并行执行
  4. 动态构建和修改工作流
  5. 可视化工作流结构

对于更复杂的场景,你还可以考虑结合其他库如 github.com/hashicorp/go-multierror 来处理并行执行中的多个错误,或者使用 context 包来实现超时和取消功能。

希望这个指南能帮助你开始使用 Go-DAG 来管理你的工作流!

回到顶部