golang无代码工作流执行器插件Dagu的使用
Golang无代码工作流执行器插件Dagu的使用
Dagu是一个轻量级且功能强大的工作流引擎,专为企业和小型团队设计。它支持在本地、云或物联网设备上部署,使用低代码声明式工作流定义,易于理解和使用。
Dagu Logo
概述
Dagu是一个强大的工作流引擎,可以在无法安装Airflow的环境中部署,如小型设备、本地服务器和遗留系统。它允许您以简单的YAML格式声明性地定义任何批处理作业作为单个DAG(有向无环图)。
steps:
- name: step1
command: sleep 1 && echo "Hello, Dagu!"
- name: step2
command: sleep 1 && echo "This is a second step"
通过声明性地定义作业中的流程,复杂的流程变得可视化,使故障排除和恢复更加容易。可以从Web UI查看日志和重试,无需手动通过SSH登录服务器。
主要特性
- DAG定义 - 以可读的YAML表达复杂依赖关系
- 调度 - 支持时区的Cron表达式
- 队列 - 使用命名队列控制并发
- 错误处理 - 重试、失败处理程序、清理钩子
- 条件执行 - 基于条件运行步骤
- 并行执行 - 控制并发步骤执行
- 变量和参数 - 在步骤之间传递数据,参数化工作流
- Docker支持 - 在容器中运行步骤
- SSH执行器 - 在远程主机上执行命令
- HTTP请求 - 与API集成
- 电子邮件通知 - SMTP集成
- 分层工作流 - 任意深度的嵌套DAG
- 分布式执行 - 跨多台机器扩展
- 认证 - 基本认证、API令牌、TLS
- Web UI - 实时监控和控制
- REST API - 完全编程访问
快速开始
安装
# 安装
curl -L https://raw.githubusercontent.com/dagu-org/dagu/main/scripts/installer.sh | bash
# 创建dagu配置目录
mkdir -p ~/.config/dagu/dags
# 创建第一个工作流
cat > ~/.config/dagu/dags/hello.yaml << 'EOF'
steps:
- name: hello
command: echo "Hello from Dagu!"
- name: world
command: echo "Running step 2"
EOF
# 执行它
dagu start hello
# 检查状态
dagu status hello
# 启动Web UI
dagu start-all
# 访问 http://localhost:8080
示例Demo
CLI演示
Web UI演示
完整示例
ETL管道示例
name: daily-etl
schedule: "0 2 * * *"
steps:
- name: extract
command: python extract.py
output: DATA_FILE
- name: validate
command: python validate.py ${DATA_FILE}
- name: transform
command: python transform.py ${DATA_FILE}
retryPolicy:
limit: 3
- name: load
command: python load.py ${DATA_FILE}
分层工作流示例
steps:
- name: data-pipeline
run: etl
params: "ENV=prod REGION=us-west-2"
- name: parallel-jobs
run: batch
parallel:
items: ["job1", "job2", "job3"]
maxConcurrency: 2
params: "JOB=${ITEM}"
---
name: etl
params:
- ENV
- REGION
steps:
- name: process
command: python etl.py --env ${ENV} --region ${REGION}
---
name: batch
params:
- JOB
steps:
- name: process
command: python process.py --job ${JOB}
基于容器的管道示例
name: ml-pipeline
steps:
- name: prepare-data
executor:
type: docker
config:
image: python:3.11
autoRemove: true
volumes:
- /data:/data
command: python prepare.py
- name: train-model
executor:
type: docker
config:
image: tensorflow/tensorflow:latest-gpu
command: python train.py
- name: deploy
command: kubectl apply -f model-deployment.yaml
preconditions:
- condition: "`date +%u`"
expected: "re:[1-5]" # 仅工作日
分布式执行示例
name: distributed-workflow
steps:
- name: gpu-training
run: gpu-training
params: "REGION=${ITEM}"
parallel: ["us-east-1", "eu-west-1"]
---
name: gpu-training
params: "REGION=region"
workerSelector:
gpu: "true"
steps:
- name: gpu-training
command: python train_model.py ${REGION}
Dagu是为1-3人的小型团队设计的,使他们能够轻松管理复杂的工作流。对于认为像Airflow这样的大规模、高成本基础设施过于庞大并寻求更简单解决方案的团队来说,它是一个理想的选择。
更多关于golang无代码工作流执行器插件Dagu的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang无代码工作流执行器插件Dagu的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang 无代码工作流执行器插件 Dagu 使用指南
Dagu 是一个基于 Golang 开发的无代码工作流执行器,允许用户通过简单的 YAML 配置文件定义和执行工作流,无需编写代码。
Dagu 核心概念
- 工作流(Workflow): 由多个步骤(Step)组成的有向无环图(DAG)
- 步骤(Step): 工作流中的单个任务单元
- 依赖关系: 定义步骤之间的执行顺序
安装 Dagu
# 使用 Go 安装
go install github.com/yohamta/dagu@latest
# 或者下载预编译二进制
# 从 GitHub 发布页面下载对应平台的二进制文件
基本使用
1. 创建简单工作流
创建一个 hello.yaml
文件:
name: Hello Workflow
steps:
- name: Step 1
command: echo
args: ["Hello, World!"]
- name: Step 2
command: echo
args: ["This is Dagu"]
depends: ["Step 1"]
2. 执行工作流
dagu run hello.yaml
进阶功能
变量和环境
name: Variable Example
env:
- GREETING: "Hello"
- TARGET: "Dagu User"
steps:
- name: Print Message
command: echo
args: ["${GREETING}, ${TARGET}!"]
条件执行
name: Conditional Execution
steps:
- name: Check Time
command: date
stdout: /tmp/current_time.txt
- name: Morning Greeting
command: echo
args: ["Good morning!"]
preconditions:
- condition: "`date +%H` < 12"
expected: "true"
- name: Afternoon Greeting
command: echo
args: ["Good afternoon!"]
preconditions:
- condition: "`date +%H` >= 12"
expected: "true"
HTTP 请求
name: HTTP Request Example
steps:
- name: Fetch Data
command: curl
args: ["-s", "https://api.example.com/data"]
stdout: /tmp/api_response.json
错误处理和重试
name: Error Handling Example
steps:
- name: Unstable Service
command: curl
args: ["-s", "https://unstable-api.example.com"]
retryPolicy:
limit: 3
intervalSec: 5
与 Golang 集成
虽然 Dagu 本身是无代码的,但你可以通过 Golang 代码来控制和监控工作流:
package main
import (
"fmt"
"os/exec"
)
func main() {
// 执行 Dagu 工作流
cmd := exec.Command("dagu", "run", "hello.yaml")
output, err := cmd.CombinedOutput()
if err != nil {
fmt.Printf("Error running Dagu: %v\n", err)
return
}
fmt.Printf("Dagu output:\n%s\n", output)
// 检查状态
statusCmd := exec.Command("dagu", "status", "hello.yaml")
statusOutput, err := statusCmd.CombinedOutput()
if err != nil {
fmt.Printf("Error checking status: %v\n", err)
return
}
fmt.Printf("Workflow status:\n%s\n", statusOutput)
}
监控和管理
Dagu 提供了 Web 界面用于监控工作流:
dagu server
然后在浏览器中访问 http://localhost:8080
最佳实践
- 模块化设计: 将复杂工作流拆分为多个小的工作流
- 日志记录: 使用
stdout
和stderr
参数记录输出 - 参数化: 使用环境变量使工作流更灵活
- 错误处理: 为关键步骤配置重试策略
- 文档化: 在工作流 YAML 中添加
description
字段
Dagu 的无代码方法使得非开发人员也能轻松创建和管理自动化工作流,同时保持了足够的灵活性来满足复杂场景的需求。