golang通用任务并行编程框架插件go-taskflow的使用
go-taskflow 使用指南
go-taskflow 是一个通用的任务并行编程框架,灵感来源于 taskflow-cpp。它充分利用了 Go 原生的并发能力,非常适合管理并发任务中的复杂依赖关系。
主要特性
- 高扩展性:可以轻松扩展以适应各种特定用例
- 原生 Go 并发模型:利用 goroutines 实现高效并发任务执行
- 友好的编程接口:简化了 Go 中复杂任务依赖关系的管理
- 多种任务类型:支持静态任务、子流程、条件节点和循环流程
- 优先级任务调度:可以分配任务优先级确保高优先级任务优先执行
- 内置可视化分析工具:生成任务可视化表示和分析任务执行性能
安装方法
使用以下命令安装最新版本的 go-taskflow:
go get -u github.com/noneback/go-taskflow
使用示例
下面是一个使用 go-taskflow 实现并行归并排序的完整示例:
package main
import (
"fmt"
"log"
"math/rand"
"os"
"slices"
"strconv"
"sync"
gtf "github.com/noneback/go-taskflow"
)
// mergeInto merges a sorted source array into a sorted destination array.
func mergeInto(dest, src []int) []int {
size := len(dest) + len(src)
tmp := make([]int, 0, size)
i, j := 0, 0
for i < len(dest) && j < len(src) {
if dest[i] < src[j] {
tmp = append(tmp, dest[i])
i++
} else {
tmp = append(tmp, src[j])
j++
}
}
if i < len(dest) {
tmp = append(tmp, dest[i:]...)
} else {
tmp = append(tmp, src[j:]...)
}
return tmp
}
func main() {
size := 100
randomArr := make([][]int, 10)
sortedArr := make([]int, 0, 10*size)
mutex := &sync.Mutex{}
// 初始化随机数组
for i := 0; i < 10; i++ {
for j := 0; j < size; j++ {
randomArr[i] = append(randomArr[i], rand.Int())
}
}
// 创建任务流
sortTasks := make([]*gtf.Task, 10)
tf := gtf.NewTaskFlow("merge sort")
// 创建最终完成的任务
done := tf.NewTask("Done", func() {
if !slices.IsSorted(sortedArr) {
log.Fatal("Sorting failed")
}
fmt.Println("Sorted successfully")
fmt.Println(sortedArr[:1000])
})
// 创建10个并行排序任务
for i := 0; i < 10; i++ {
sortTasks[i] = tf.NewTask("sort_"+strconv.Itoa(i), func() {
arr := randomArr[i]
slices.Sort(arr)
mutex.Lock()
defer mutex.Unlock()
sortedArr = mergeInto(sortedArr, arr)
})
}
// 设置任务依赖关系
done.Succeed(sortTasks...)
// 创建执行器并运行任务流
executor := gtf.NewExecutor(1000)
executor.Run(tf).Wait()
// 输出任务流结构
if err := tf.Dump(os.Stdout); err != nil {
log.Fatal("Error dumping taskflow:", err)
}
// 输出性能分析数据
if err := executor.Profile(os.Stdout); err != nil {
log.Fatal("Error profiling taskflow:", err)
}
}
错误处理
在 Go 中,错误是值,用户需要自行处理。框架只处理未恢复的 panic 事件。如果发生 panic,整个父图将被取消,剩余任务将不完整。
可以手动处理 panic 防止中断:
tf.NewTask("not interrupt", func() {
defer func() {
if r := recover(); r != nil {
// 处理 panic
}
}()
// 用户定义逻辑
})
任务流可视化
要生成任务流的可视化表示,可以使用 Dump
方法:
if err := tf.Dump(os.Stdout); err != nil {
log.Fatal(err)
}
Dump
方法生成 DOT 格式的原始字符串。可以使用 dot
工具创建 SVG 图。
性能分析
要分析任务流性能,使用 Profile
方法:
if err := executor.Profile(os.Stdout); err != nil {
log.Fatal(err)
}
Profile
方法生成 flamegraph 格式的原始字符串。可以使用 flamegraph
工具创建火焰图。
适用场景
- 数据管道:编排具有复杂依赖关系的数据处理阶段
- AI 代理工作流自动化:定义和执行具有明确序列和依赖结构的 AI 代理工作流
- 并行图任务:并发执行基于图的任务以最大化 CPU 利用率
更多关于golang通用任务并行编程框架插件go-taskflow的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang通用任务并行编程框架插件go-taskflow的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Go-TaskFlow 并行任务框架使用指南
Go-TaskFlow 是一个轻量级的 Golang 通用任务并行编程框架,它提供了简单易用的 API 来管理和执行并行任务。下面我将介绍其核心功能和使用方法。
1. 安装
go get github.com/go-taskflow/taskflow
2. 基本概念
- Task: 表示一个可执行的任务单元
- Flow: 表示一组有依赖关系的任务集合
- Executor: 负责执行任务和流程
3. 基本使用示例
3.1 创建简单任务
package main
import (
"context"
"fmt"
"time"
"github.com/go-taskflow/taskflow"
)
func main() {
// 创建执行器
executor := taskflow.NewExecutor()
// 定义任务
task1 := taskflow.NewTask("task1", func(ctx context.Context) (interface{}, error) {
time.Sleep(1 * time.Second)
return "task1 result", nil
})
task2 := taskflow.NewTask("task2", func(ctx context.Context) (interface{}, error) {
time.Sleep(2 * time.Second)
return 42, nil
})
// 执行任务
result1, err := executor.Execute(context.Background(), task1)
if err != nil {
fmt.Println("Task1 failed:", err)
} else {
fmt.Println("Task1 result:", result1)
}
result2, err := executor.Execute(context.Background(), task2)
if err != nil {
fmt.Println("Task2 failed:", err)
} else {
fmt.Println("Task2 result:", result2)
}
}
3.2 创建并行任务流
func main() {
executor := taskflow.NewExecutor()
// 创建任务
taskA := taskflow.NewTask("A", func(ctx context.Context) (interface{}, error) {
time.Sleep(1 * time.Second)
return "A done", nil
})
taskB := taskflow.NewTask("B", func(ctx context.Context) (interface{}, error) {
time.Sleep(2 * time.Second)
return "B done", nil
})
taskC := taskflow.NewTask("C", func(ctx context.Context) (interface{}, error) {
time.Sleep(500 * time.Millisecond)
return "C done", nil
})
// 创建流程并设置依赖关系
flow := taskflow.NewFlow("example_flow")
flow.Add(taskA)
flow.Add(taskB)
flow.Add(taskC)
// 设置C依赖A和B
flow.DependsOn(taskC, taskA, taskB)
// 执行流程
results, err := executor.ExecuteFlow(context.Background(), flow)
if err != nil {
fmt.Println("Flow execution failed:", err)
return
}
for taskName, result := range results {
fmt.Printf("%s: %v\n", taskName, result)
}
}
4. 高级功能
4.1 带参数的任务
taskWithParams := taskflow.NewTask("with_params", func(ctx context.Context) (interface{}, error) {
// 从上下文中获取参数
params := taskflow.ParamsFromContext(ctx)
name := params.GetString("name")
age := params.GetInt("age")
return fmt.Sprintf("Hello %s, age %d", name, age), nil
})
// 执行时传递参数
result, err := executor.Execute(
taskflow.NewContextWithParams(context.Background(),
taskflow.Params{
"name": "Alice",
"age": 30,
}),
taskWithParams,
)
4.2 错误处理和重试
taskWithRetry := taskflow.NewTask("retry_task", func(ctx context.Context) (interface{}, error) {
// 模拟有时会失败的任务
if time.Now().Unix()%2 == 0 {
return nil, fmt.Errorf("random error")
}
return "success", nil
}).WithRetry(3, 1*time.Second) // 重试3次,每次间隔1秒
4.3 超时控制
taskWithTimeout := taskflow.NewTask("timeout_task", func(ctx context.Context) (interface{}, error) {
time.Sleep(2 * time.Second)
return "done", nil
}).WithTimeout(1 * time.Second) // 设置1秒超时
// 或者对整个流程设置超时
flow.WithTimeout(5 * time.Second)
5. 实际应用示例
// 模拟电商订单处理流程
func main() {
executor := taskflow.NewExecutor()
// 定义任务
validateOrder := taskflow.NewTask("validate", func(ctx context.Context) (interface{}, error) {
// 验证订单逻辑
return "Order validated", nil
})
checkInventory := taskflow.NewTask("check_inventory", func(ctx context.Context) (interface{}, error) {
// 检查库存逻辑
return "Inventory checked", nil
})
processPayment := taskflow.NewTask("process_payment", func(ctx context.Context) (interface{}, error) {
// 处理支付逻辑
return "Payment processed", nil
})
updateInventory := taskflow.NewTask("update_inventory", func(ctx context.Context) (interface{}, error) {
// 更新库存逻辑
return "Inventory updated", nil
})
sendNotification := taskflow.NewTask("send_notification", func(ctx context.Context) (interface{}, error) {
// 发送通知逻辑
return "Notification sent", nil
})
// 创建流程
orderFlow := taskflow.NewFlow("order_processing")
orderFlow.Add(validateOrder, checkInventory, processPayment, updateInventory, sendNotification)
// 设置依赖关系
orderFlow.DependsOn(
checkInventory, // 检查库存依赖订单验证
processPayment, // 处理支付依赖订单验证
updateInventory, // 更新库存依赖库存检查和支付处理
sendNotification, // 发送通知依赖所有前面的步骤
)
// 执行流程
results, err := executor.ExecuteFlow(context.Background(), orderFlow)
if err != nil {
fmt.Println("Order processing failed:", err)
return
}
for taskName, result := range results {
fmt.Printf("%s: %v\n", taskName, result)
}
}
6. 性能调优
// 创建带自定义配置的执行器
executor := taskflow.NewExecutor(
taskflow.WithMaxParallel(10), // 最大并行任务数
taskflow.WithQueueSize(100), // 任务队列大小
taskflow.WithTimeout(30*time.Second), // 默认超时时间
)
总结
Go-TaskFlow 提供了以下优势:
- 简单易用的 API 设计
- 灵活的任务依赖管理
- 内置的错误处理和重试机制
- 超时控制和上下文传递
- 可配置的执行策略
通过合理设计任务和流程,可以轻松构建复杂的并行处理系统,同时保持代码的清晰和可维护性。