golang通用任务并行编程框架插件go-taskflow的使用

go-taskflow 使用指南

go-taskflow 是一个通用的任务并行编程框架,灵感来源于 taskflow-cpp。它充分利用了 Go 原生的并发能力,非常适合管理并发任务中的复杂依赖关系。

主要特性

  • 高扩展性:可以轻松扩展以适应各种特定用例
  • 原生 Go 并发模型:利用 goroutines 实现高效并发任务执行
  • 友好的编程接口:简化了 Go 中复杂任务依赖关系的管理
  • 多种任务类型:支持静态任务、子流程、条件节点和循环流程
  • 优先级任务调度:可以分配任务优先级确保高优先级任务优先执行
  • 内置可视化分析工具:生成任务可视化表示和分析任务执行性能

任务类型示例

安装方法

使用以下命令安装最新版本的 go-taskflow:

go get -u github.com/noneback/go-taskflow

使用示例

下面是一个使用 go-taskflow 实现并行归并排序的完整示例:

package main

import (
    "fmt"
    "log"
    "math/rand"
    "os"
    "slices"
    "strconv"
    "sync"

    gtf "github.com/noneback/go-taskflow"
)

// mergeInto merges a sorted source array into a sorted destination array.
func mergeInto(dest, src []int) []int {
    size := len(dest) + len(src)
    tmp := make([]int, 0, size)
    i, j := 0, 0
    for i < len(dest) && j < len(src) {
        if dest[i] < src[j] {
            tmp = append(tmp, dest[i])
            i++
        } else {
            tmp = append(tmp, src[j])
            j++
        }
    }

    if i < len(dest) {
        tmp = append(tmp, dest[i:]...)
    } else {
        tmp = append(tmp, src[j:]...)
    }

    return tmp
}

func main() {
    size := 100
    randomArr := make([][]int, 10)
    sortedArr := make([]int, 0, 10*size)
    mutex := &sync.Mutex{}

    // 初始化随机数组
    for i := 0; i < 10; i++ {
        for j := 0; j < size; j++ {
            randomArr[i] = append(randomArr[i], rand.Int())
        }
    }

    // 创建任务流
    sortTasks := make([]*gtf.Task, 10)
    tf := gtf.NewTaskFlow("merge sort")
    
    // 创建最终完成的任务
    done := tf.NewTask("Done", func() {
        if !slices.IsSorted(sortedArr) {
            log.Fatal("Sorting failed")
        }
        fmt.Println("Sorted successfully")
        fmt.Println(sortedArr[:1000])
    })

    // 创建10个并行排序任务
    for i := 0; i < 10; i++ {
        sortTasks[i] = tf.NewTask("sort_"+strconv.Itoa(i), func() {
            arr := randomArr[i]
            slices.Sort(arr)
            mutex.Lock()
            defer mutex.Unlock()
            sortedArr = mergeInto(sortedArr, arr)
        })
    }
    
    // 设置任务依赖关系
    done.Succeed(sortTasks...)

    // 创建执行器并运行任务流
    executor := gtf.NewExecutor(1000)
    executor.Run(tf).Wait()

    // 输出任务流结构
    if err := tf.Dump(os.Stdout); err != nil {
        log.Fatal("Error dumping taskflow:", err)
    }

    // 输出性能分析数据
    if err := executor.Profile(os.Stdout); err != nil {
        log.Fatal("Error profiling taskflow:", err)
    }
}

错误处理

在 Go 中,错误是值,用户需要自行处理。框架只处理未恢复的 panic 事件。如果发生 panic,整个父图将被取消,剩余任务将不完整。

可以手动处理 panic 防止中断:

tf.NewTask("not interrupt", func() {
    defer func() {
        if r := recover(); r != nil {
            // 处理 panic
        }
    }()
    // 用户定义逻辑
})

任务流可视化

要生成任务流的可视化表示,可以使用 Dump 方法:

if err := tf.Dump(os.Stdout); err != nil {
    log.Fatal(err)
}

Dump 方法生成 DOT 格式的原始字符串。可以使用 dot 工具创建 SVG 图。

性能分析

要分析任务流性能,使用 Profile 方法:

if err := executor.Profile(os.Stdout); err != nil {
    log.Fatal(err)
}

Profile 方法生成 flamegraph 格式的原始字符串。可以使用 flamegraph 工具创建火焰图。

适用场景

  • 数据管道:编排具有复杂依赖关系的数据处理阶段
  • AI 代理工作流自动化:定义和执行具有明确序列和依赖结构的 AI 代理工作流
  • 并行图任务:并发执行基于图的任务以最大化 CPU 利用率

更多关于golang通用任务并行编程框架插件go-taskflow的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang通用任务并行编程框架插件go-taskflow的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Go-TaskFlow 并行任务框架使用指南

Go-TaskFlow 是一个轻量级的 Golang 通用任务并行编程框架,它提供了简单易用的 API 来管理和执行并行任务。下面我将介绍其核心功能和使用方法。

1. 安装

go get github.com/go-taskflow/taskflow

2. 基本概念

  • Task: 表示一个可执行的任务单元
  • Flow: 表示一组有依赖关系的任务集合
  • Executor: 负责执行任务和流程

3. 基本使用示例

3.1 创建简单任务

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-taskflow/taskflow"
)

func main() {
	// 创建执行器
	executor := taskflow.NewExecutor()

	// 定义任务
	task1 := taskflow.NewTask("task1", func(ctx context.Context) (interface{}, error) {
		time.Sleep(1 * time.Second)
		return "task1 result", nil
	})

	task2 := taskflow.NewTask("task2", func(ctx context.Context) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 42, nil
	})

	// 执行任务
	result1, err := executor.Execute(context.Background(), task1)
	if err != nil {
		fmt.Println("Task1 failed:", err)
	} else {
		fmt.Println("Task1 result:", result1)
	}

	result2, err := executor.Execute(context.Background(), task2)
	if err != nil {
		fmt.Println("Task2 failed:", err)
	} else {
		fmt.Println("Task2 result:", result2)
	}
}

3.2 创建并行任务流

func main() {
	executor := taskflow.NewExecutor()

	// 创建任务
	taskA := taskflow.NewTask("A", func(ctx context.Context) (interface{}, error) {
		time.Sleep(1 * time.Second)
		return "A done", nil
	})

	taskB := taskflow.NewTask("B", func(ctx context.Context) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return "B done", nil
	})

	taskC := taskflow.NewTask("C", func(ctx context.Context) (interface{}, error) {
		time.Sleep(500 * time.Millisecond)
		return "C done", nil
	})

	// 创建流程并设置依赖关系
	flow := taskflow.NewFlow("example_flow")
	flow.Add(taskA)
	flow.Add(taskB)
	flow.Add(taskC)
	
	// 设置C依赖A和B
	flow.DependsOn(taskC, taskA, taskB)

	// 执行流程
	results, err := executor.ExecuteFlow(context.Background(), flow)
	if err != nil {
		fmt.Println("Flow execution failed:", err)
		return
	}

	for taskName, result := range results {
		fmt.Printf("%s: %v\n", taskName, result)
	}
}

4. 高级功能

4.1 带参数的任务

taskWithParams := taskflow.NewTask("with_params", func(ctx context.Context) (interface{}, error) {
	// 从上下文中获取参数
	params := taskflow.ParamsFromContext(ctx)
	name := params.GetString("name")
	age := params.GetInt("age")
	
	return fmt.Sprintf("Hello %s, age %d", name, age), nil
})

// 执行时传递参数
result, err := executor.Execute(
	taskflow.NewContextWithParams(context.Background(), 
		taskflow.Params{
			"name": "Alice",
			"age":  30,
		}),
	taskWithParams,
)

4.2 错误处理和重试

taskWithRetry := taskflow.NewTask("retry_task", func(ctx context.Context) (interface{}, error) {
	// 模拟有时会失败的任务
	if time.Now().Unix()%2 == 0 {
		return nil, fmt.Errorf("random error")
	}
	return "success", nil
}).WithRetry(3, 1*time.Second) // 重试3次,每次间隔1秒

4.3 超时控制

taskWithTimeout := taskflow.NewTask("timeout_task", func(ctx context.Context) (interface{}, error) {
	time.Sleep(2 * time.Second)
	return "done", nil
}).WithTimeout(1 * time.Second) // 设置1秒超时

// 或者对整个流程设置超时
flow.WithTimeout(5 * time.Second)

5. 实际应用示例

// 模拟电商订单处理流程
func main() {
	executor := taskflow.NewExecutor()

	// 定义任务
	validateOrder := taskflow.NewTask("validate", func(ctx context.Context) (interface{}, error) {
		// 验证订单逻辑
		return "Order validated", nil
	})

	checkInventory := taskflow.NewTask("check_inventory", func(ctx context.Context) (interface{}, error) {
		// 检查库存逻辑
		return "Inventory checked", nil
	})

	processPayment := taskflow.NewTask("process_payment", func(ctx context.Context) (interface{}, error) {
		// 处理支付逻辑
		return "Payment processed", nil
	})

	updateInventory := taskflow.NewTask("update_inventory", func(ctx context.Context) (interface{}, error) {
		// 更新库存逻辑
		return "Inventory updated", nil
	})

	sendNotification := taskflow.NewTask("send_notification", func(ctx context.Context) (interface{}, error) {
		// 发送通知逻辑
		return "Notification sent", nil
	})

	// 创建流程
	orderFlow := taskflow.NewFlow("order_processing")
	orderFlow.Add(validateOrder, checkInventory, processPayment, updateInventory, sendNotification)

	// 设置依赖关系
	orderFlow.DependsOn(
		checkInventory,    // 检查库存依赖订单验证
		processPayment,    // 处理支付依赖订单验证
		updateInventory,   // 更新库存依赖库存检查和支付处理
		sendNotification,  // 发送通知依赖所有前面的步骤
	)

	// 执行流程
	results, err := executor.ExecuteFlow(context.Background(), orderFlow)
	if err != nil {
		fmt.Println("Order processing failed:", err)
		return
	}

	for taskName, result := range results {
		fmt.Printf("%s: %v\n", taskName, result)
	}
}

6. 性能调优

// 创建带自定义配置的执行器
executor := taskflow.NewExecutor(
	taskflow.WithMaxParallel(10),  // 最大并行任务数
	taskflow.WithQueueSize(100),   // 任务队列大小
	taskflow.WithTimeout(30*time.Second), // 默认超时时间
)

总结

Go-TaskFlow 提供了以下优势:

  1. 简单易用的 API 设计
  2. 灵活的任务依赖管理
  3. 内置的错误处理和重试机制
  4. 超时控制和上下文传递
  5. 可配置的执行策略

通过合理设计任务和流程,可以轻松构建复杂的并行处理系统,同时保持代码的清晰和可维护性。

回到顶部