golang并发任务执行工具插件库taskctl的使用

Golang 并发任务执行工具插件库 taskctl 的使用

概述

taskctl 是一个现代的并发任务运行器和开发者日常任务自动化工具包,它是 GNU Make 的简单现代替代品。taskctl 允许你以人类可读的格式(YAML、JSON 或 TOML)设计和组织日常任务和开发流水线。

taskctl - developer's routine tasks automation toolkit

主要特性

  • 人类可读的配置格式(YAML、JSON 或 TOML)
  • 并发任务执行
  • 高度可定制的执行计划
  • 跨平台支持
  • 支持导入本地或远程配置
  • 集成文件监视器(实时重载)
  • 可定制的执行上下文
  • 多种输出类型
  • 可嵌入的任务运行器
  • 交互式提示
  • 方便的自动补全

快速开始

安装

MacOS

brew tap taskctl/taskctl
brew install taskctl

Linux

sudo wget https://github.com/taskctl/taskctl/releases/latest/download/taskctl_linux_amd64 -O /usr/local/bin/taskctl
sudo chmod +x /usr/local/bin/taskctl

Windows

scoop bucket add taskctl https://github.com/taskctl/scoop-taskctl.git
scoop install taskctl

从源码安装

git clone https://github.com/taskctl/taskctl
cd taskctl
go build -o taskctl .

基本使用

  • taskctl - 运行交互式任务提示
  • taskctl pipeline1 - 运行单个流水线
  • taskctl task1 - 运行单个任务
  • taskctl pipeline1 task1 - 运行一个或多个流水线和/或任务
  • taskctl watch watcher1 watcher2 - 启动一个或多个监视器

配置示例

taskctl 使用配置文件(tasks.yamltaskctl.yaml)来存储你的任务和流水线。配置文件包含以下部分:

  • tasks
  • pipelines
  • watchers
  • contexts
  • variables

示例配置

tasks:
  lint:
    command:
      - golint $(go list ./... | grep -v /vendor/)
      - go vet $(go list ./... | grep -v /vendor/)
  
  test:
    allow_failure: true
    command: go test ./....
        
  build:
    command: go build -o bin/app ./...
    env: 
      GOOS: linux
      GOARCH: amd64
    before: rm -rf bin/*

pipelines:
  release:
    - task: lint
    - task: test
    - task: build
      depends_on: [lint, test]

根据这个计划,linttest 将并发运行,build 只有在 linttest 都完成后才会开始。

任务定义

任务是 taskctl 的基础。它描述了一个或多个要运行的命令、它们的环境、执行器和属性,如工作目录、执行超时、允许失败等。

tasks:
  lint:
    allow_failure: true
    command:
      - golint $(go list ./... | grep -v /vendor/)
      - go vet $(go list ./... | grep -v /vendor/)
      
  build:
    command: go build ./...
    env: 
      GOOS: linux
      GOARCH: amd64
    env_file: /data/.env
    after: rm -rf tmp/*
    variations:
      - GOARCH: amd64
      - GOARCH: arm
        GOARM: 7

任务变量

每个任务、阶段和上下文都有变量,可用于渲染任务的字段 - commanddir。除了全局预定义的变量外,变量可以在任务定义中设置。

预定义变量包括:

  • .Root - 根配置文件目录
  • .Dir - 配置文件目录
  • .TempDir - 系统的临时目录
  • .Args - 提供的参数作为字符串
  • .ArgsList - 提供的参数数组
  • .Task.Name - 当前任务的名称
  • .Context.Name - 当前任务的执行上下文名称
  • .Stage.Name - 当前阶段的名称
  • .Output - 上一个命令的输出
  • .Tasks.Task1.Output - task1 最后一个命令输出

传递 CLI 参数到任务

任何跟在 -- 后面的命令行参数都会通过 .Args.ArgsList 变量或 ARGS 环境变量传递给每个任务。

lint1:
  command: go lint {{.Args}}

lint2:
  command: go lint {{index .ArgsList 1}}

任务变体

任务可以以一个或多个变体运行。变体允许使用不同的环境变量重用任务:

tasks:
  build:
    command:
      - GOOS=${GOOS} GOARCH=amd64 go build -o bin/taskctl_${GOOS} ./cmd/taskctl
    env:
      GOFLAGS: -ldflags=-s -ldflags=-w
    variations:
      - GOOS: linux
      - GOOS: darwin
      - GOOS: windows

流水线

流水线是一组阶段(任务或其他流水线),按特定顺序执行。阶段可以并行或依次执行。阶段可以覆盖任务的环境、变量等。

pipelines:
  pipeline1:
    - task: start task
    - task: task A
      depends_on: "start task"
    - task: task B
      depends_on: "start task"
    - task: task C
      depends_on: "start task"
    - task: task D
      depends_on: "task C"
    - task: task E
      depends_on: ["task A", "task B", "task D"]
    - task: finish
      depends_on: ["task E"]

文件系统监视器

监视器监视由提供的模式选择的文件中的更改,并在发生任何事件时触发任务。

watchers:
  watcher1:
    watch: ["README.*", "pkg/**/*.go"] # 要监视的文件
    exclude: ["pkg/excluded.go", "pkg/excluded-dir/*"] # 排除模式
    events: [create, write, remove, rename, chmod] # 要监听的文件系统事件
    task: task1 # 事件发生时运行的任务

上下文

上下文允许你设置执行环境、变量、运行任务的二进制文件、上下命令等。

contexts:
  local:
    executable:
      bin: /bin/zsh
      args:
        - -c
    env:
      VAR_NAME: VAR_VALUE
    variables:
      sleep: 10
    quote: "'" # 将用提供的符号引用命令:"/bin/zsh -c 'echo 1'"
    before: echo "I'm local context!"
    after: echo "Have a nice day!"

嵌入 taskctl

taskctl 可以嵌入到任何 Go 程序中。

Runner 示例

t := task.FromCommands("go fmt ./...", "go build ./..")
r, err := NewTaskRunner()
if err != nil {
    return
}
err  = r.Run(t)
if err != nil {
    fmt.Println(err, t.ExitCode, t.ErrorMessage())
}
fmt.Println(t.Output())

Scheduler 示例

format := task.FromCommands("go fmt ./...")
build := task.FromCommands("go build ./..")
r, _ := runner.NewTaskRunner()
s := NewScheduler(r)

graph, err := NewExecutionGraph(
    &Stage{Name: "format", Task: format},
    &Stage{Name: "build", Task: build, DependsOn: []string{"format"}},
)
if err != nil {
    return
}

err = s.Schedule(graph)
if err != nil {
    fmt.Println(err)
}

自动补全

Bash

添加到 ~/.bashrc 或 ~/.profile

. <(taskctl completion bash)

ZSH

添加到 ~/.zshrc

. <(taskctl completion zsh)

许可证

本项目采用 GNU GPLv3 许可证 - 详见 LICENSE.md 文件。


更多关于golang并发任务执行工具插件库taskctl的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang并发任务执行工具插件库taskctl的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


taskctl - Go并发任务执行工具库

taskctl是一个轻量级的Go语言并发任务执行工具库,它提供了一种简单的方式来并发执行任务并收集结果。下面我将详细介绍taskctl的使用方法和示例代码。

安装

go get github.com/taskctl/taskctl

基本用法

1. 创建任务池

package main

import (
	"fmt"
	"time"

	"github.com/taskctl/taskctl/pkg/executor"
	"github.com/taskctl/taskctl/pkg/scheduler"
)

func main() {
	// 创建执行器
	ex := executor.NewExecutor(executor.DefaultConfig())

	// 创建调度器
	s := scheduler.NewScheduler(ex)

	// 添加任务
	s.AddTask("task1", func() (interface{}, error) {
		time.Sleep(1 * time.Second)
		return "Task 1 completed", nil
	})

	s.AddTask("task2", func() (interface{}, error) {
		time.Sleep(2 * time.Second)
		return "Task 2 completed", nil
	})

	// 执行任务并获取结果
	results, err := s.Run()
	if err != nil {
		fmt.Println("Error:", err)
		return
	}

	for task, result := range results {
		fmt.Printf("%s: %v\n", task, result.Value)
	}
}

2. 控制并发度

func main() {
	// 创建带有限制的执行器配置
	config := executor.DefaultConfig()
	config.Concurrency = 2 // 限制同时运行的任务数为2

	ex := executor.NewExecutor(config)
	s := scheduler.NewScheduler(ex)

	// 添加5个任务
	for i := 0; i < 5; i++ {
		i := i
		s.AddTask(fmt.Sprintf("task%d", i), func() (interface{}, error) {
			time.Sleep(1 * time.Second)
			return fmt.Sprintf("Task %d completed", i), nil
		})
	}

	results, _ := s.Run()
	for task, result := range results {
		fmt.Println(task, ":", result.Value)
	}
}

3. 任务依赖

func main() {
	ex := executor.NewExecutor(executor.DefaultConfig())
	s := scheduler.NewScheduler(ex)

	// 添加任务
	s.AddTask("task1", func() (interface{}, error) {
		time.Sleep(1 * time.Second)
		return "Task 1 completed", nil
	})

	s.AddTask("task2", func() (interface{}, error) {
		time.Sleep(1 * time.Second)
		return "Task 2 completed", nil
	})

	// task3依赖task1和task2
	s.AddTask("task3", func() (interface{}, error) {
		return "Task 3 completed after 1 and 2", nil
	}, "task1", "task2")

	results, _ := s.Run()
	for task, result := range results {
		fmt.Println(task, ":", result.Value)
	}
}

4. 错误处理

func main() {
	ex := executor.NewExecutor(executor.DefaultConfig())
	s := scheduler.NewScheduler(ex)

	s.AddTask("success", func() (interface{}, error) {
		return "This will succeed", nil
	})

	s.AddTask("fail", func() (interface{}, error) {
		return nil, fmt.Errorf("this will fail")
	})

	results, err := s.Run()
	if err != nil {
		fmt.Println("Overall error:", err)
	}

	for task, result := range results {
		if result.Err != nil {
			fmt.Printf("%s failed: %v\n", task, result.Err)
		} else {
			fmt.Printf("%s succeeded: %v\n", task, result.Value)
		}
	}
}

5. 上下文取消

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	ex := executor.NewExecutor(executor.DefaultConfig())
	s := scheduler.NewScheduler(ex)

	for i := 0; i < 5; i++ {
		i := i
		s.AddTask(fmt.Sprintf("task%d", i), func() (interface{}, error) {
			select {
			case <-time.After(5 * time.Second):
				return fmt.Sprintf("Task %d completed", i), nil
			case <-ctx.Done():
				return nil, fmt.Errorf("Task %d canceled", i)
			}
		})
	}

	results, _ := s.RunWithContext(ctx)
	for task, result := range results {
		if result.Err != nil {
			fmt.Printf("%s: %v\n", task, result.Err)
		} else {
			fmt.Printf("%s: %v\n", task, result.Value)
		}
	}
}

高级特性

1. 自定义任务执行器

type CustomExecutor struct {
	executor.Executor
}

func (e *CustomExecutor) Execute(task *executor.Task) *executor.TaskResult {
	fmt.Printf("Custom execution for task: %s\n", task.Name)
	return e.Executor.Execute(task)
}

func main() {
	ex := &CustomExecutor{Executor: *executor.NewExecutor(executor.DefaultConfig())}
	s := scheduler.NewScheduler(ex)

	s.AddTask("custom", func() (interface{}, error) {
		return "Custom execution", nil
	})

	results, _ := s.Run()
	fmt.Println(results["custom"].Value)
}

2. 任务重试

func main() {
	config := executor.DefaultConfig()
	config.Retry = 3 // 设置重试次数
	config.RetryDelay = 1 * time.Second // 设置重试间隔

	ex := executor.NewExecutor(config)
	s := scheduler.NewScheduler(ex)

	attempt := 0
	s.AddTask("retry", func() (interface{}, error) {
		attempt++
		if attempt < 3 {
			return nil, fmt.Errorf("attempt %d failed", attempt)
		}
		return "Succeeded after retries", nil
	})

	results, _ := s.Run()
	fmt.Println(results["retry"].Value)
}

总结

taskctl提供了以下主要功能:

  1. 简单的并发任务执行
  2. 并发度控制
  3. 任务依赖管理
  4. 错误处理和重试机制
  5. 上下文取消支持
  6. 可扩展的执行器接口

这个库非常适合需要并行执行多个独立任务但又不想手动管理goroutine和channel的场景。它的API设计简洁,学习曲线平缓,是Go并发编程的一个实用工具。

回到顶部