golang简单而强大的DAG调度器和仪表板插件库goflow的使用
Golang简单而强大的DAG调度器和仪表板插件库goflow的使用
什么是Goflow
Goflow是一个用Go编写的简单但功能强大的DAG调度器和仪表板。它可以帮助你:
- 像Apache Airflow一样调度有向无环图(DAG)任务,但更简单
- 协调各种集群或服务的计算任务
- 提供监控仪表板
- 通过单个二进制或容器轻松部署
- 在小型虚拟机上运行,节省云成本
- 支持多种存储技术
- 使用代码而非配置文件定义DAG
快速开始
使用Docker
docker run -p 8181:8181 ghcr.io/fieldryand/goflow-example:latest
然后在浏览器中访问localhost:8181
查看仪表板。
不使用Docker
- 创建新项目并安装依赖:
go mod init # 创建新模块
go get github.com/fieldryand/goflow/v2 # 安装依赖
- 创建
main.go
文件:
package main
import "github.com/fieldryand/goflow/v2"
func main() {
options := goflow.Options{
UIPath: "ui/",
ShowExamples: true,
WithSeconds: true,
}
gf := goflow.New(options)
gf.Use(goflow.DefaultLogger())
gf.Run(":8181")
}
- 下载并解压仪表板:
wget https://github.com/fieldryand/goflow/releases/latest/download/goflow-ui.tar.gz
tar -xvzf goflow-ui.tar.gz
rm goflow-ui.tar.gz
- 运行应用:
go run main.go
然后在浏览器中访问localhost:8181
。
开发概述
基本概念
- Job: Goflow工作流称为Job,可以使用cron语法调度
- Task: 每个Job由一个或多个Task组成,构成依赖图
- Operator: 定义Task执行的工作,Goflow提供了一些基本Operator,也可以自定义
- Retries: 可以为Task设置重试策略
- Streaming: 使用服务器发送事件实时将状态推送到仪表板
创建Job示例
func myJob() *goflow.Job {
j := &goflow.Job{Name: "my-job", Schedule: "* * * * *", Active: true}
j.Add(&goflow.Task{
Name: "sleep-for-one-second",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
})
return j
}
自定义Operator
type PositiveAddition struct{ a, b int }
func (o PositiveAddition) Run() (interface{}, error) {
if o.a < 0 || o.b < 0 {
return 0, errors.New("Can't add negative numbers")
}
result := o.a + o.b
return result, nil
}
重试机制
func myJob() *goflow.Job {
j := &goflow.Job{Name: "my-job", Schedule: "* * * * *"}
j.Add(&goflow.Task{
Name: "sleep-for-one-second",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
Retries: 5,
RetryDelay: goflow.ConstantDelay{Period: 1},
})
return j
}
任务依赖
func myJob() *goflow.Job {
j := &goflow.Job{Name: "my-job", Schedule: "* * * * *"}
j.Add(&goflow.Task{
Name: "sleep-for-one-second",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
Retries: 5,
RetryDelay: goflow.ConstantDelay{Period: 1},
})
j.Add(&goflow.Task{
Name: "get-google",
Operator: goflow.Get{Client: &http.Client{}, URL: "https://www.google.com"},
})
j.Add(&goflow.Task{
Name: "add-two-plus-three",
Operator: PositiveAddition{a: 2, b: 3},
})
j.SetDownstream(j.Task("sleep-for-one-second"), j.Task("get-google"))
j.SetDownstream(j.Task("sleep-for-one-second"), j.Task("add-two-plus-three"))
return j
}
触发器规则
默认情况下,Task的触发器规则是allSuccessful
,即所有上游任务成功后才执行。可以设置为allDone
让任务在所有上游完成后执行:
j.Add(&goflow.Task{
Name: "sleep-for-one-second",
Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
Retries: 5,
RetryDelay: goflow.ConstantDelay{Period: 1},
TriggerRule: "allDone",
})
运行Goflow引擎
func main() {
gf := goflow.New(goflow.Options{Streaming: true})
gf.AddJob(myJob)
gf.Use(goflow.DefaultLogger())
gf.Run(":8181")
}
存储
Goflow支持多种键值存储,默认使用内存数据库,也可以使用Redis等:
package main
import "github.com/fieldryand/goflow/v2"
import "github.com/philippgille/gokv/redis"
func main() {
// 创建存储客户端
client, err := redis.NewClient(redis.DefaultOptions)
if err != nil {
panic(err)
}
defer client.Close()
// 作为Goflow选项传入
options := goflow.Options{
Store: client,
UIPath: "ui/",
Streaming: true,
ShowExamples: true,
}
gf := goflow.New(options)
gf.Use(goflow.DefaultLogger())
gf.Run(":8181")
}
API和集成
Goflow提供API端点用于集成:
GET /api/health
: 检查服务健康状态GET /api/jobs
: 列出注册的JobGET /api/jobs/{jobname}
: 获取特定Job详情GET /api/executions
: 查询和列出Job执行POST /api/jobs/{jobname}/submit
: 提交Job执行POST /api/jobs/{jobname}/toggle
: 切换Job调度开关/stream
: 返回服务器发送事件,用于实时更新
可以使用Swagger查看完整的API文档:
docker run -p 8080:8080 -e SWAGGER_JSON=/app/swagger.json -v $(pwd):/app swaggerapi/swagger-ui
Goflow是一个功能强大但简单的任务调度系统,特别适合需要轻量级解决方案的场景。通过代码定义DAG使得复杂工作流更易于管理,而内置的仪表板提供了良好的可视化和监控能力。
更多关于golang简单而强大的DAG调度器和仪表板插件库goflow的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang简单而强大的DAG调度器和仪表板插件库goflow的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
GoFlow: 一个简单而强大的DAG调度器和仪表板插件库
GoFlow是一个用Go语言编写的轻量级DAG(Directed Acyclic Graph)调度器库,它允许你定义和执行任务依赖图,并提供了仪表板插件功能。
主要特性
- 简单的DAG定义和执行
- 任务依赖管理
- 并发执行
- 可插拔的仪表板界面
- 轻量级且易于集成
基本使用示例
安装
go get github.com/faasflow/goflow
定义简单DAG
package main
import (
"fmt"
"github.com/faasflow/goflow"
)
// 定义任务函数
func task1(data []byte, option map[string][]string) ([]byte, error) {
fmt.Println("执行任务1")
return []byte("任务1完成"), nil
}
func task2(data []byte, option map[string][]string) ([]byte, error) {
fmt.Println("执行任务2")
return []byte("任务2完成"), nil
}
func task3(data []byte, option map[string][]string) ([]byte, error) {
fmt.Println("执行任务3")
return []byte("任务3完成"), nil
}
func main() {
// 创建流程
flow := goflow.New()
// 添加节点
flow.AddNode("task1", task1)
flow.AddNode("task2", task2)
flow.AddNode("task3", task3)
// 定义依赖关系
flow.AddEdge("task1", "task2") // task1 -> task2
flow.AddEdge("task1", "task3") // task1 -> task3
// 执行流程
result, err := flow.Execute(nil)
if err != nil {
fmt.Printf("流程执行失败: %v\n", err)
return
}
fmt.Printf("流程执行结果: %v\n", result)
}
仪表板集成
GoFlow提供了简单的仪表板插件功能,可以可视化DAG和执行状态。
package main
import (
"github.com/faasflow/goflow"
"github.com/faasflow/goflow/dashboard"
"net/http"
)
func main() {
// 创建流程
flow := goflow.New()
// ... 添加节点和边 (同上例)
// 创建仪表板
dash := dashboard.New(flow)
// 启动HTTP服务器
http.Handle("/", dash.GetHandler())
fmt.Println("仪表板运行在 http://localhost:8080")
go http.ListenAndServe(":8080", nil)
// 执行流程
flow.Execute(nil)
}
高级功能
条件分支
flow.AddConditionalEdge("task1", "task2", func(data []byte) bool {
// 根据data内容决定是否执行task2
return string(data) == "需要执行task2"
})
并行执行
// task2和task3将并行执行
flow.AddEdge("task1", "task2")
flow.AddEdge("task1", "task3")
错误处理
flow.SetErrorHandler(func(node string, err error) {
fmt.Printf("节点 %s 执行失败: %v\n", node, err)
})
实际应用场景
- 数据处理流水线
- 微服务编排
- CI/CD流程
- 批处理作业
- 复杂业务逻辑编排
总结
GoFlow提供了一个简单而强大的方式来定义和执行DAG工作流。它的轻量级设计使得它易于集成到现有系统中,而仪表板功能则为监控和管理流程提供了可视化界面。
对于更复杂的需求,你还可以考虑结合其他Go库如Workflow或Cadence,但对于大多数简单的DAG需求,GoFlow提供了完美的平衡点。