golang简单而强大的DAG调度器和仪表板插件库goflow的使用

Golang简单而强大的DAG调度器和仪表板插件库goflow的使用

什么是Goflow

Goflow是一个用Go编写的简单但功能强大的DAG调度器和仪表板。它可以帮助你:

  • 像Apache Airflow一样调度有向无环图(DAG)任务,但更简单
  • 协调各种集群或服务的计算任务
  • 提供监控仪表板
  • 通过单个二进制或容器轻松部署
  • 在小型虚拟机上运行,节省云成本
  • 支持多种存储技术
  • 使用代码而非配置文件定义DAG

快速开始

使用Docker

docker run -p 8181:8181 ghcr.io/fieldryand/goflow-example:latest

然后在浏览器中访问localhost:8181查看仪表板。

不使用Docker

  1. 创建新项目并安装依赖:
go mod init # 创建新模块
go get github.com/fieldryand/goflow/v2 # 安装依赖
  1. 创建main.go文件:
package main

import "github.com/fieldryand/goflow/v2"

func main() {
    options := goflow.Options{
        UIPath: "ui/",
        ShowExamples:  true,
        WithSeconds:  true,
    }
    gf := goflow.New(options)
    gf.Use(goflow.DefaultLogger())
    gf.Run(":8181")
}
  1. 下载并解压仪表板:
wget https://github.com/fieldryand/goflow/releases/latest/download/goflow-ui.tar.gz
tar -xvzf goflow-ui.tar.gz
rm goflow-ui.tar.gz
  1. 运行应用:
go run main.go

然后在浏览器中访问localhost:8181

开发概述

基本概念

  • Job: Goflow工作流称为Job,可以使用cron语法调度
  • Task: 每个Job由一个或多个Task组成,构成依赖图
  • Operator: 定义Task执行的工作,Goflow提供了一些基本Operator,也可以自定义
  • Retries: 可以为Task设置重试策略
  • Streaming: 使用服务器发送事件实时将状态推送到仪表板

创建Job示例

func myJob() *goflow.Job {
    j := &goflow.Job{Name: "my-job", Schedule: "* * * * *", Active: true}
    j.Add(&goflow.Task{
        Name:     "sleep-for-one-second",
        Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
    })
    return j
}

自定义Operator

type PositiveAddition struct{ a, b int }

func (o PositiveAddition) Run() (interface{}, error) {
    if o.a < 0 || o.b < 0 {
        return 0, errors.New("Can't add negative numbers")
    }
    result := o.a + o.b
    return result, nil
}

重试机制

func myJob() *goflow.Job {
    j := &goflow.Job{Name: "my-job", Schedule: "* * * * *"}
    j.Add(&goflow.Task{
        Name:       "sleep-for-one-second",
        Operator:   goflow.Command{Cmd: "sleep", Args: []string{"1"}},
        Retries:    5,
        RetryDelay: goflow.ConstantDelay{Period: 1},
    })
    return j
}

任务依赖

func myJob() *goflow.Job {
    j := &goflow.Job{Name: "my-job", Schedule: "* * * * *"}
    j.Add(&goflow.Task{
        Name:       "sleep-for-one-second",
        Operator:   goflow.Command{Cmd: "sleep", Args: []string{"1"}},
        Retries:    5,
        RetryDelay: goflow.ConstantDelay{Period: 1},
    })
    j.Add(&goflow.Task{
        Name:       "get-google",
        Operator:   goflow.Get{Client: &http.Client{}, URL: "https://www.google.com"},
    })
    j.Add(&goflow.Task{
        Name:       "add-two-plus-three",
        Operator:   PositiveAddition{a: 2, b: 3},
    })
    j.SetDownstream(j.Task("sleep-for-one-second"), j.Task("get-google"))
    j.SetDownstream(j.Task("sleep-for-one-second"), j.Task("add-two-plus-three"))
    return j
}

触发器规则

默认情况下,Task的触发器规则是allSuccessful,即所有上游任务成功后才执行。可以设置为allDone让任务在所有上游完成后执行:

j.Add(&goflow.Task{
    Name:        "sleep-for-one-second",
    Operator:    goflow.Command{Cmd: "sleep", Args: []string{"1"}},
    Retries:     5,
    RetryDelay:  goflow.ConstantDelay{Period: 1},
    TriggerRule: "allDone",
})

运行Goflow引擎

func main() {
    gf := goflow.New(goflow.Options{Streaming: true})
    gf.AddJob(myJob)
    gf.Use(goflow.DefaultLogger())
    gf.Run(":8181")
}

存储

Goflow支持多种键值存储,默认使用内存数据库,也可以使用Redis等:

package main

import "github.com/fieldryand/goflow/v2"
import "github.com/philippgille/gokv/redis"

func main() {
    // 创建存储客户端
    client, err := redis.NewClient(redis.DefaultOptions)
    if err != nil {
        panic(err)
    }
    defer client.Close()

    // 作为Goflow选项传入
    options := goflow.Options{
        Store: client,
        UIPath: "ui/",
        Streaming: true,
        ShowExamples:  true,
    }
    gf := goflow.New(options)
    gf.Use(goflow.DefaultLogger())
    gf.Run(":8181")
}

API和集成

Goflow提供API端点用于集成:

  • GET /api/health: 检查服务健康状态
  • GET /api/jobs: 列出注册的Job
  • GET /api/jobs/{jobname}: 获取特定Job详情
  • GET /api/executions: 查询和列出Job执行
  • POST /api/jobs/{jobname}/submit: 提交Job执行
  • POST /api/jobs/{jobname}/toggle: 切换Job调度开关
  • /stream: 返回服务器发送事件,用于实时更新

可以使用Swagger查看完整的API文档:

docker run -p 8080:8080 -e SWAGGER_JSON=/app/swagger.json -v $(pwd):/app swaggerapi/swagger-ui

Goflow是一个功能强大但简单的任务调度系统,特别适合需要轻量级解决方案的场景。通过代码定义DAG使得复杂工作流更易于管理,而内置的仪表板提供了良好的可视化和监控能力。


更多关于golang简单而强大的DAG调度器和仪表板插件库goflow的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang简单而强大的DAG调度器和仪表板插件库goflow的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


GoFlow: 一个简单而强大的DAG调度器和仪表板插件库

GoFlow是一个用Go语言编写的轻量级DAG(Directed Acyclic Graph)调度器库,它允许你定义和执行任务依赖图,并提供了仪表板插件功能。

主要特性

  • 简单的DAG定义和执行
  • 任务依赖管理
  • 并发执行
  • 可插拔的仪表板界面
  • 轻量级且易于集成

基本使用示例

安装

go get github.com/faasflow/goflow

定义简单DAG

package main

import (
	"fmt"
	"github.com/faasflow/goflow"
)

// 定义任务函数
func task1(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Println("执行任务1")
	return []byte("任务1完成"), nil
}

func task2(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Println("执行任务2")
	return []byte("任务2完成"), nil
}

func task3(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Println("执行任务3")
	return []byte("任务3完成"), nil
}

func main() {
	// 创建流程
	flow := goflow.New()
	
	// 添加节点
	flow.AddNode("task1", task1)
	flow.AddNode("task2", task2)
	flow.AddNode("task3", task3)
	
	// 定义依赖关系
	flow.AddEdge("task1", "task2") // task1 -> task2
	flow.AddEdge("task1", "task3") // task1 -> task3
	
	// 执行流程
	result, err := flow.Execute(nil)
	if err != nil {
		fmt.Printf("流程执行失败: %v\n", err)
		return
	}
	
	fmt.Printf("流程执行结果: %v\n", result)
}

仪表板集成

GoFlow提供了简单的仪表板插件功能,可以可视化DAG和执行状态。

package main

import (
	"github.com/faasflow/goflow"
	"github.com/faasflow/goflow/dashboard"
	"net/http"
)

func main() {
	// 创建流程
	flow := goflow.New()
	
	// ... 添加节点和边 (同上例)
	
	// 创建仪表板
	dash := dashboard.New(flow)
	
	// 启动HTTP服务器
	http.Handle("/", dash.GetHandler())
	fmt.Println("仪表板运行在 http://localhost:8080")
	go http.ListenAndServe(":8080", nil)
	
	// 执行流程
	flow.Execute(nil)
}

高级功能

条件分支

flow.AddConditionalEdge("task1", "task2", func(data []byte) bool {
	// 根据data内容决定是否执行task2
	return string(data) == "需要执行task2"
})

并行执行

// task2和task3将并行执行
flow.AddEdge("task1", "task2")
flow.AddEdge("task1", "task3")

错误处理

flow.SetErrorHandler(func(node string, err error) {
	fmt.Printf("节点 %s 执行失败: %v\n", node, err)
})

实际应用场景

  1. 数据处理流水线
  2. 微服务编排
  3. CI/CD流程
  4. 批处理作业
  5. 复杂业务逻辑编排

总结

GoFlow提供了一个简单而强大的方式来定义和执行DAG工作流。它的轻量级设计使得它易于集成到现有系统中,而仪表板功能则为监控和管理流程提供了可视化界面。

对于更复杂的需求,你还可以考虑结合其他Go库如Workflow或Cadence,但对于大多数简单的DAG需求,GoFlow提供了完美的平衡点。

回到顶部