golang无代码工作流执行器插件Dagu的使用

Golang无代码工作流执行器插件Dagu的使用

Dagu是一个轻量级且功能强大的工作流引擎,专为企业和小型团队设计。它支持在本地、云或物联网设备上部署,使用低代码声明式工作流定义,易于理解和使用。

Dagu Logo

Dagu Logo

概述

Dagu是一个强大的工作流引擎,可以在无法安装Airflow的环境中部署,如小型设备、本地服务器和遗留系统。它允许您以简单的YAML格式声明性地定义任何批处理作业作为单个DAG(有向无环图)。

steps:
  - name: step1
    command: sleep 1 && echo "Hello, Dagu!"
    
  - name: step2
    command: sleep 1 && echo "This is a second step"

通过声明性地定义作业中的流程,复杂的流程变得可视化,使故障排除和恢复更加容易。可以从Web UI查看日志和重试,无需手动通过SSH登录服务器。

主要特性

  • DAG定义 - 以可读的YAML表达复杂依赖关系
  • 调度 - 支持时区的Cron表达式
  • 队列 - 使用命名队列控制并发
  • 错误处理 - 重试、失败处理程序、清理钩子
  • 条件执行 - 基于条件运行步骤
  • 并行执行 - 控制并发步骤执行
  • 变量和参数 - 在步骤之间传递数据,参数化工作流
  • Docker支持 - 在容器中运行步骤
  • SSH执行器 - 在远程主机上执行命令
  • HTTP请求 - 与API集成
  • 电子邮件通知 - SMTP集成
  • 分层工作流 - 任意深度的嵌套DAG
  • 分布式执行 - 跨多台机器扩展
  • 认证 - 基本认证、API令牌、TLS
  • Web UI - 实时监控和控制
  • REST API - 完全编程访问

快速开始

安装

# 安装
curl -L https://raw.githubusercontent.com/dagu-org/dagu/main/scripts/installer.sh | bash

# 创建dagu配置目录
mkdir -p ~/.config/dagu/dags

# 创建第一个工作流
cat > ~/.config/dagu/dags/hello.yaml << 'EOF'
steps:
  - name: hello
    command: echo "Hello from Dagu!"
    
  - name: world  
    command: echo "Running step 2"
EOF

# 执行它
dagu start hello

# 检查状态
dagu status hello

# 启动Web UI
dagu start-all
# 访问 http://localhost:8080

示例Demo

CLI演示

Demo CLI

Web UI演示

Demo Web UI

完整示例

ETL管道示例

name: daily-etl
schedule: "0 2 * * *"
steps:
  - name: extract
    command: python extract.py
    output: DATA_FILE
    
  - name: validate
    command: python validate.py ${DATA_FILE}
    
  - name: transform
    command: python transform.py ${DATA_FILE}
    retryPolicy:
      limit: 3
      
  - name: load
    command: python load.py ${DATA_FILE}

分层工作流示例

steps:
  - name: data-pipeline
    run: etl
    params: "ENV=prod REGION=us-west-2"
    
  - name: parallel-jobs
    run: batch
    parallel:
      items: ["job1", "job2", "job3"]
      maxConcurrency: 2
    params: "JOB=${ITEM}"
---
name: etl
params:
  - ENV
  - REGION
steps:
  - name: process
    command: python etl.py --env ${ENV} --region ${REGION}
---
name: batch
params:
  - JOB
steps:
  - name: process
    command: python process.py --job ${JOB}

基于容器的管道示例

name: ml-pipeline
steps:
  - name: prepare-data
    executor:
      type: docker
      config:
        image: python:3.11
        autoRemove: true
        volumes:
          - /data:/data
    command: python prepare.py
    
  - name: train-model
    executor:
      type: docker
      config:
        image: tensorflow/tensorflow:latest-gpu
    command: python train.py
    
  - name: deploy
    command: kubectl apply -f model-deployment.yaml
    preconditions:
      - condition: "`date +%u`"
        expected: "re:[1-5]"  # 仅工作日

分布式执行示例

name: distributed-workflow
steps:
  - name: gpu-training
    run: gpu-training
    params: "REGION=${ITEM}"
    parallel: ["us-east-1", "eu-west-1"]
    
---
name: gpu-training
params: "REGION=region"
workerSelector:
  gpu: "true"
steps:
  - name: gpu-training
    command: python train_model.py ${REGION}

Dagu是为1-3人的小型团队设计的,使他们能够轻松管理复杂的工作流。对于认为像Airflow这样的大规模、高成本基础设施过于庞大并寻求更简单解决方案的团队来说,它是一个理想的选择。


更多关于golang无代码工作流执行器插件Dagu的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang无代码工作流执行器插件Dagu的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang 无代码工作流执行器插件 Dagu 使用指南

Dagu 是一个基于 Golang 开发的无代码工作流执行器,允许用户通过简单的 YAML 配置文件定义和执行工作流,无需编写代码。

Dagu 核心概念

  1. 工作流(Workflow): 由多个步骤(Step)组成的有向无环图(DAG)
  2. 步骤(Step): 工作流中的单个任务单元
  3. 依赖关系: 定义步骤之间的执行顺序

安装 Dagu

# 使用 Go 安装
go install github.com/yohamta/dagu@latest

# 或者下载预编译二进制
# 从 GitHub 发布页面下载对应平台的二进制文件

基本使用

1. 创建简单工作流

创建一个 hello.yaml 文件:

name: Hello Workflow
steps:
  - name: Step 1
    command: echo
    args: ["Hello, World!"]
  - name: Step 2
    command: echo
    args: ["This is Dagu"]
    depends: ["Step 1"]

2. 执行工作流

dagu run hello.yaml

进阶功能

变量和环境

name: Variable Example
env:
  - GREETING: "Hello"
  - TARGET: "Dagu User"
steps:
  - name: Print Message
    command: echo
    args: ["${GREETING}, ${TARGET}!"]

条件执行

name: Conditional Execution
steps:
  - name: Check Time
    command: date
    stdout: /tmp/current_time.txt
  - name: Morning Greeting
    command: echo
    args: ["Good morning!"]
    preconditions:
      - condition: "`date +%H` < 12"
        expected: "true"
  - name: Afternoon Greeting
    command: echo
    args: ["Good afternoon!"]
    preconditions:
      - condition: "`date +%H` >= 12"
        expected: "true"

HTTP 请求

name: HTTP Request Example
steps:
  - name: Fetch Data
    command: curl
    args: ["-s", "https://api.example.com/data"]
    stdout: /tmp/api_response.json

错误处理和重试

name: Error Handling Example
steps:
  - name: Unstable Service
    command: curl
    args: ["-s", "https://unstable-api.example.com"]
    retryPolicy:
      limit: 3
      intervalSec: 5

与 Golang 集成

虽然 Dagu 本身是无代码的,但你可以通过 Golang 代码来控制和监控工作流:

package main

import (
	"fmt"
	"os/exec"
)

func main() {
	// 执行 Dagu 工作流
	cmd := exec.Command("dagu", "run", "hello.yaml")
	output, err := cmd.CombinedOutput()
	if err != nil {
		fmt.Printf("Error running Dagu: %v\n", err)
		return
	}
	fmt.Printf("Dagu output:\n%s\n", output)
	
	// 检查状态
	statusCmd := exec.Command("dagu", "status", "hello.yaml")
	statusOutput, err := statusCmd.CombinedOutput()
	if err != nil {
		fmt.Printf("Error checking status: %v\n", err)
		return
	}
	fmt.Printf("Workflow status:\n%s\n", statusOutput)
}

监控和管理

Dagu 提供了 Web 界面用于监控工作流:

dagu server

然后在浏览器中访问 http://localhost:8080

最佳实践

  1. 模块化设计: 将复杂工作流拆分为多个小的工作流
  2. 日志记录: 使用 stdoutstderr 参数记录输出
  3. 参数化: 使用环境变量使工作流更灵活
  4. 错误处理: 为关键步骤配置重试策略
  5. 文档化: 在工作流 YAML 中添加 description 字段

Dagu 的无代码方法使得非开发人员也能轻松创建和管理自动化工作流,同时保持了足够的灵活性来满足复杂场景的需求。

回到顶部