golang实现高效周期性任务调度的轻量级插件库tasks的使用

Golang实现高效周期性任务调度的轻量级插件库tasks的使用

Tasks是一个简单易用的Go语言进程内调度器,专注于高频、快速执行的周期性任务。它的目标是支持大规模并发任务执行,同时避免调度器引起的抖动。

关键特性

  • 并发执行:每个任务在独立的goroutine中执行,确保即使单个任务执行时间较长也不会影响整体调度准确性
  • 优化的goroutine调度:使用Go的time.AfterFunc()函数减少休眠goroutine数量,优化CPU调度
  • 灵活的任务间隔:使用time.Duration类型指定间隔时间,提供简单接口和灵活控制
  • 延迟任务启动:可以指定任务开始执行的时间
  • 一次性任务:通过设置RunOnce标志可以创建只执行一次的任务
  • 自定义错误处理:可以定义错误处理函数来处理任务返回的错误

使用示例

基本用法

// 启动调度器
scheduler := tasks.New()
defer scheduler.Stop()

// 添加任务
id, err := scheduler.Add(&tasks.Task{
  Interval: 30 * time.Second,
  TaskFunc: func() error {
    // 在这里放入你的逻辑
    fmt.Println("执行周期性任务")
    return nil
  },
})
if err != nil {
  // 处理错误
  log.Fatal(err)
}

延迟调度

// 添加一个每30天执行一次的任务,从30天后开始
id, err := scheduler.Add(&tasks.Task{
  Interval: 30 * (24 * time.Hour),
  StartAfter: time.Now().Add(30 * (24 * time.Hour)),
  TaskFunc: func() error {
    // 在这里放入你的逻辑
    fmt.Println("执行延迟启动的周期性任务")
    return nil
  },
})
if err != nil {
  // 处理错误
  log.Fatal(err)
}

一次性任务

// 添加一个60秒后执行的一次性任务
id, err := scheduler.Add(&tasks.Task{
  Interval: 60 * time.Second,
  RunOnce:  true,
  TaskFunc: func() error {
    // 在这里放入你的逻辑
    fmt.Println("执行一次性任务")
    return nil
  },
})
if err != nil {
  // 处理错误
  log.Fatal(err)
}

自定义错误处理

// 添加带自定义错误处理的任务
id, err := scheduler.Add(&tasks.Task{
  Interval: 30 * time.Second,
  TaskFunc: func() error {
    // 在这里放入你的逻辑
    if rand.Intn(10) < 3 { // 模拟30%的失败率
      return errors.New("随机错误")
    }
    fmt.Println("任务执行成功")
    return nil
  },
  ErrFunc: func(e error) {
    log.Printf("执行任务%s时发生错误 - %s", id, e)
  },
})
if err != nil {
  // 处理错误
  log.Fatal(err)
}

完整示例Demo

下面是一个完整的示例,展示了tasks库的各种用法:

package main

import (
	"errors"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/madflojo/tasks"
)

func main() {
	// 创建调度器
	scheduler := tasks.New()
	defer scheduler.Stop()

	// 1. 基本周期性任务
	_, err := scheduler.Add(&tasks.Task{
		Interval: 5 * time.Second,
		TaskFunc: func() error {
			fmt.Println("基本周期性任务执行 -", time.Now().Format("15:04:05"))
			return nil
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	// 2. 延迟启动的任务
	_, err = scheduler.Add(&tasks.Task{
		Interval:   10 * time.Second,
		StartAfter: time.Now().Add(15 * time.Second),
		TaskFunc: func() error {
			fmt.Println("延迟启动任务执行 -", time.Now().Format("15:04:05"))
			return nil
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	// 3. 一次性任务
	_, err = scheduler.Add(&tasks.Task{
		Interval: 20 * time.Second,
		RunOnce:  true,
		TaskFunc: func() error {
			fmt.Println("一次性任务执行 -", time.Now().Format("15:04:05"))
			return nil
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	// 4. 带错误处理的任务
	_, err = scheduler.Add(&tasks.Task{
		Interval: 8 * time.Second,
		TaskFunc: func() error {
			if rand.Intn(10) < 3 { // 30%概率失败
				return errors.New("模拟任务失败")
			}
			fmt.Println("带错误处理的任务执行成功 -", time.Now().Format("15:04:05"))
			return nil
		},
		ErrFunc: func(e error) {
			log.Printf("任务执行失败: %v", e)
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	// 让程序运行一段时间以便观察任务执行
	time.Sleep(1 * time.Minute)
	fmt.Println("程序结束")
}

这个示例展示了tasks库的主要功能,包括基本周期性任务、延迟启动任务、一次性任务以及带错误处理的任务。你可以根据需要调整间隔时间和任务逻辑。


更多关于golang实现高效周期性任务调度的轻量级插件库tasks的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现高效周期性任务调度的轻量级插件库tasks的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang 高效周期性任务调度库 tasks 使用指南

tasks 是一个轻量级的周期性任务调度库,适用于 Golang 应用程序中需要定时执行任务的场景。下面我将详细介绍如何使用这个库。

安装 tasks 库

go get github.com/robfig/cron/v3
// 注意:tasks 可能是基于 cron 的封装,这里以 cron 为例

基本使用示例

package main

import (
	"fmt"
	"time"
	
	"github.com/robfig/cron/v3"
)

func main() {
	// 创建一个新的 cron 实例
	c := cron.New()

	// 添加一个每5秒执行一次的任务
	_, err := c.AddFunc("*/5 * * * * *", func() {
		fmt.Println("每5秒执行一次:", time.Now().Format("2006-01-02 15:04:05"))
	})
	if err != nil {
		fmt.Println("添加任务失败:", err)
		return
	}

	// 添加一个每分钟执行一次的任务
	_, err = c.AddFunc("@every 1m", func() {
		fmt.Println("每分钟执行一次:", time.Now().Format("2006-01-02 15:04:05"))
	})
	if err != nil {
		fmt.Println("添加任务失败:", err)
		return
	}

	// 启动 cron 调度器
	c.Start()

	// 防止主程序退出
	select {}
}

高级特性

1. 带参数的任务

func taskWithParams(name string, count int) {
	fmt.Printf("任务 %s 执行第 %d 次\n", name, count)
}

func main() {
	c := cron.New()
	
	count := 0
	_, err := c.AddFunc("@every 2s", func() {
		count++
		taskWithParams("测试任务", count)
	})
	
	if err != nil {
		panic(err)
	}
	
	c.Start()
	select {}
}

2. 任务恢复机制

func riskyTask() {
	defer func() {
		if r := recover(); r != nil {
			fmt.Println("任务恢复:", r)
		}
	}()
	
	// 模拟可能 panic 的任务
	if time.Now().Second()%10 == 0 {
		panic("模拟任务失败")
	}
	fmt.Println("任务正常执行")
}

func main() {
	c := cron.New()
	c.AddFunc("@every 3s", riskyTask)
	c.Start()
	select {}
}

3. 分布式锁支持(防止重复执行)

import (
	"github.com/go-redis/redis/v8"
	"context"
)

var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
	Addr: "localhost:6379",
})

func distributedTask() {
	// 获取分布式锁
	lockKey := "task:distributed:lock"
	locked, err := rdb.SetNX(ctx, lockKey, 1, 30*time.Second).Result()
	if err != nil {
		fmt.Println("获取锁失败:", err)
		return
	}
	if !locked {
		fmt.Println("任务已在其他实例运行")
		return
	}
	
	defer rdb.Del(ctx, lockKey)
	
	// 执行实际任务
	fmt.Println("执行分布式任务:", time.Now())
}

func main() {
	c := cron.New()
	c.AddFunc("@every 10s", distributedTask)
	c.Start()
	select {}
}

最佳实践

  1. 任务粒度:保持任务短小精悍,长时间运行的任务应考虑拆分为多个小任务

  2. 错误处理:每个任务都应该有自己的错误处理逻辑

  3. 资源管理:注意数据库连接、文件句柄等资源的释放

  4. 日志记录:记录任务执行情况,便于排查问题

  5. 监控:添加任务执行时间的监控

func monitoredTask() {
	start := time.Now()
	defer func() {
		duration := time.Since(start)
		fmt.Printf("任务执行时间: %v\n", duration)
		// 这里可以上报到监控系统
		if duration > 2*time.Second {
			fmt.Println("警告: 任务执行时间过长")
		}
	}()
	
	// 模拟任务执行
	time.Sleep(time.Duration(1+rand.Intn(3)) * time.Second)
	fmt.Println("任务完成")
}

func main() {
	c := cron.New()
	c.AddFunc("@every 5s", monitoredTask)
	c.Start()
	select {}
}

tasks 库提供了简单而强大的周期性任务调度功能,适用于大多数 Golang 应用的定时任务需求。根据实际场景,你可以选择简单的内存调度或结合分布式锁实现分布式环境下的任务调度。

回到顶部