Golang任务流框架Go-taskflow@0.1.6发布:基于原生特性打造的通用型并发任务编排方案

Golang任务流框架Go-taskflow@0.1.6发布:基于原生特性打造的通用型并发任务编排方案

Go-Taskflow

GitHub仓库图片

GitHub - noneback/go-taskflow: 一个纯Go的通用任务并行编程框架,集成了可视化工具和性能分析器

一个受 taskflow-cpp 启发的 Go 通用任务并行编程框架,结合了 Go 的原生能力和简洁性,适用于并发任务中的复杂依赖管理。

特性

  • 高可扩展性:轻松扩展框架以适应各种特定用例。
  • 原生 Go 并发模型:利用 Go 的 goroutine 有效管理并发任务执行。
  • 用户友好的编程接口:使用 Go 简化复杂的任务依赖管理。
  • 静态/子流/条件/循环任务:定义静态任务、条件节点、嵌套子流和循环流,以增强模块化和可编程性。
  • 优先级任务调度:定义任务的优先级,优先级更高的任务将优先被调度。
  • 内置可视化与分析工具:使用集成工具生成任务的可视化表示并分析任务执行性能,使调试和优化更加容易。

使用场景

  • 数据流水线:编排具有复杂依赖关系的数据处理阶段。
  • 工作流自动化:定义和运行具有明确顺序和依赖结构的自动化工作流。
  • 并行任务处理:并发执行独立任务以充分利用 CPU 资源。

示例

导入最新版本:go get -u github.com/noneback/go-taskflow

代码示例

正确理解条件任务

条件节点在 taskflow-cpp 中很特殊。它不仅参与条件控制,还参与循环。

我们的仓库保持了几乎相同的行为。您应该阅读 ConditionTasking 以避免常见的陷阱。

go-taskflow 中的错误处理

Go 语言中的 errors 是值。正确处理它们是用户的责任。

只有未恢复的 panic 需要由框架处理。目前,如果发生这种情况,整个父图将被取消,剩余的任务将不会执行。这种行为将来可能会演变。如果您有任何好的想法,请随时告诉我。

如果您希望在发生 panic 时不中断整个任务流,也可以在注册任务时手动处理 panic。 例如:

tf.NewTask("not interrupt", func() {
	defer func() {
		if r := recover(); r != nil {
			// deal with it.
		}
	}()
	// user functions.
)

如何使用可视化任务流

if err := tf.Dump(os.Stdout); err != nil {
		log.Fatal(err)
}

tf.Dump 生成点格式的原始字符串,使用 dot 工具可以绘制出 Graph 的 svg 图像。

如何使用性能分析任务流

if err :=exector.Profile(os.Stdout);err != nil {
		log.Fatal(err)
}

Profile 生成火焰图格式的原始字符串,使用 flamegraph 工具可以绘制出火焰图的 svg 图像。

更多

欢迎提出任何功能请求或进行讨论。


更多关于Golang任务流框架Go-taskflow@0.1.6发布:基于原生特性打造的通用型并发任务编排方案的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang任务流框架Go-taskflow@0.1.6发布:基于原生特性打造的通用型并发任务编排方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Go-taskflow 0.1.6的发布确实为Go生态带来了一个基于原生并发特性的强大任务编排方案。这个框架充分利用了goroutine和channel的优势,为复杂依赖任务管理提供了优雅的解决方案。

从架构设计来看,go-taskflow的核心价值在于它提供了声明式的任务依赖管理,同时保持了Go语言的简洁性。以下是一个典型的使用示例:

package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/noneback/go-taskflow"
)

func main() {
    // 创建任务流执行器
    executor := taskflow.NewExecutor()
    
    // 创建任务流
    tf := taskflow.NewTaskFlow("example-flow")
    
    // 定义任务
    taskA := tf.NewTask("task-a", func() {
        fmt.Println("执行任务A")
        time.Sleep(100 * time.Millisecond)
    })
    
    taskB := tf.NewTask("task-b", func() {
        fmt.Println("执行任务B")
        time.Sleep(200 * time.Millisecond)
    })
    
    taskC := tf.NewTask("task-c", func() {
        fmt.Println("执行任务C")
        time.Sleep(150 * time.Millisecond)
    })
    
    // 设置任务依赖关系:taskC 依赖 taskA 和 taskB
    taskC.DependsOn(taskA, taskB)
    
    // 执行任务流
    ctx := context.Background()
    if err := executor.Run(ctx, tf); err != nil {
        fmt.Printf("任务执行失败: %v\n", err)
    }
    
    // 生成可视化图表
    if err := tf.Dump(os.Stdout); err != nil {
        fmt.Printf("生成可视化失败: %v\n", err)
    }
}

条件任务和循环任务的实现方式也很有特色:

// 条件任务示例
tf := taskflow.NewTaskFlow("conditional-flow")

conditionTask := tf.NewConditionTask("condition", func() taskflow.ConditionResult {
    // 基于某些条件返回不同的分支
    if someCondition {
        return taskflow.Then
    }
    return taskflow.Else
})

thenTask := tf.NewTask("then-task", func() {
    fmt.Println("执行then分支")
})

elseTask := tf.NewTask("else-task", func() {
    fmt.Println("执行else分支")
})

// 设置条件分支
conditionTask.SetThen(thenTask)
conditionTask.SetElse(elseTask)

// 循环任务示例
loopFlow := taskflow.NewTaskFlow("loop-subflow")
loopTask := tf.NewTask("loop-body", func() {
    fmt.Println("循环体执行")
})

// 创建循环任务
loop := tf.NewLoopTask("loop", loopFlow, 3) // 循环3次
loop.SetBody(loopTask)

对于需要优先级调度的场景:

// 优先级任务示例
highPriorityTask := tf.NewTask("high-priority", func() {
    fmt.Println("高优先级任务")
}).WithPriority(taskflow.PriorityHigh)

mediumPriorityTask := tf.NewTask("medium-priority", func() {
    fmt.Println("中优先级任务")
}).WithPriority(taskflow.PriorityMedium)

lowPriorityTask := tf.NewTask("low-priority", func() {
    fmt.Println("低优先级任务")
}).WithPriority(taskflow.PriorityLow)

错误处理机制的设计符合Go语言的惯用法:

// 安全的错误处理任务
safeTask := tf.NewTask("safe-task", func() {
    defer func() {
        if r := recover(); r != nil {
            // 局部恢复,不影响其他任务
            fmt.Printf("任务内部错误恢复: %v\n", r)
        }
    }()
    
    // 可能panic的业务逻辑
    riskyOperation()
})

// 任务间错误传递
taskWithError := tf.NewTask("error-task", func() error {
    if err := someOperation(); err != nil {
        return fmt.Errorf("操作失败: %w", err)
    }
    return nil
})

性能分析功能对于优化任务流执行很有帮助:

// 性能分析示例
executor := taskflow.NewExecutor()

// 执行任务流
ctx := context.Background()
if err := executor.Run(ctx, tf); err != nil {
    fmt.Printf("执行失败: %v\n", err)
}

// 生成性能分析数据
var profileBuf bytes.Buffer
if err := executor.Profile(&profileBuf); err != nil {
    fmt.Printf("性能分析失败: %v\n", err)
}

// 可以保存为火焰图数据文件
os.WriteFile("profile.svg", profileBuf.Bytes(), 0644)

子流嵌套支持复杂的模块化设计:

// 子流示例
subFlow := taskflow.NewTaskFlow("sub-flow")
subTask1 := subFlow.NewTask("sub-task-1", func() {
    fmt.Println("子任务1")
})
subTask2 := subFlow.NewTask("sub-task-2", func() {
    fmt.Println("子任务2")
})
subTask2.DependsOn(subTask1)

// 将子流作为主流的任务
mainFlow := taskflow.NewTaskFlow("main-flow")
mainTask := mainFlow.NewTask("main-task", func() {
    fmt.Println("主任务")
})

subFlowTask := mainFlow.NewSubFlowTask("sub-flow-task", subFlow)
subFlowTask.DependsOn(mainTask)

这个框架在数据流水线处理场景中表现尤为出色:

// 数据流水线示例
pipeline := taskflow.NewTaskFlow("data-pipeline")

// 数据提取阶段
extractTask := pipeline.NewTask("extract", func() interface{} {
    data := fetchDataFromSource()
    return data
})

// 数据转换阶段
transformTask := pipeline.NewTask("transform", func(input interface{}) interface{} {
    data := input.([]byte)
    transformed := processData(data)
    return transformed
})

// 数据加载阶段
loadTask := pipeline.NewTask("load", func(input interface{}) {
    data := input.(ProcessedData)
    saveToDestination(data)
})

// 设置流水线依赖
transformTask.DependsOn(extractTask)
loadTask.DependsOn(transformTask)

// 传递数据
extractTask.Pipe(transformTask)
transformTask.Pipe(loadTask)

框架对并发控制也提供了细粒度的管理:

// 并发控制示例
executor := taskflow.NewExecutor(
    taskflow.WithMaxConcurrency(10), // 最大并发数
    taskflow.WithTimeout(30*time.Second), // 超时设置
    taskflow.WithRetryPolicy(3, 1*time.Second), // 重试策略
)

// 带上下文的任务执行
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

if err := executor.Run(ctx, tf); err != nil {
    if errors.Is(err, context.DeadlineExceeded) {
        fmt.Println("任务执行超时")
    }
}

可视化输出可以直接集成到CI/CD流程中:

// 集成可视化到构建流程
func generateFlowDiagram(tf *taskflow.TaskFlow, outputPath string) error {
    var buf bytes.Buffer
    if err := tf.Dump(&buf); err != nil {
        return err
    }
    
    // 使用graphviz生成图像
    cmd := exec.Command("dot", "-Tsvg", "-o", outputPath)
    cmd.Stdin = &buf
    
    if err := cmd.Run(); err != nil {
        return fmt.Errorf("生成图像失败: %w", err)
    }
    
    return nil
}

这个版本在任务调度算法、内存管理和错误恢复机制方面都有显著改进,特别是在大规模并发场景下的稳定性表现值得关注。框架的API设计保持了良好的向后兼容性,同时引入了更多符合Go语言习惯的用法。

回到顶部