Golang中如何实现周期性任务调度

Golang中如何实现周期性任务调度 Go语言中是否有像Java原生库中的ScheduledExecutorService那样的原生库或第三方支持,可用于生产环境?

以下是Java 1.8的代码片段:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class TaskScheduler {

    /**
     * @param args
     */
    public static void main(String[] args) {
        Runnable runnable = ()-> {
                // 要运行的任务放在这里
                System.out.println("Hello !!");
        };
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        service.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);

    }

}

希望找到符合Go语言习惯且无内存泄漏的实现方案 🙂

显然不想要这样的解决方案:

go func() {
    for true {
        fmt.Println("Hello !!")
        time.Sleep(1 * time.Second)
    }
}()

更多关于Golang中如何实现周期性任务调度的实战教程也可以访问 https://www.itying.com/category-94-b0.html

15 回复

更多关于Golang中如何实现周期性任务调度的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


[!quote] johandalabacka 在主 Go 协程中的 done <- true 会阻塞直到正在进行的任务完成

现在明白了 😄

  1. 在你的情况下,你不需要停止它,因为当你退出程序时它无论如何都会停止。通常,当你想重复使用同一个定时器时,才需要在无限循环中停止它。

还有一个类似 cron 的库

Package cron

包 cron

包 cron 实现了 cron 规范解析器和作业运行器。

  1. 简洁的解决方案 😄
  2. 好的,根据官方规范说明,“停止定时器以释放相关资源。”但在我的实现中,“stop”是在信号中断之后才被调用。所以,我认为这可能会导致一些高资源占用问题。感谢您的解释 🙂

case <-done: return

  1. 但是它通过调用 return 来实现这一点,还是我误解了你的意思?
  2. 你可以停止它,但正如之前的帖子中所述。只有当你打算稍后再次启动它,并且程序在不久之后就会退出时,才需要这样做。
case <-done: return

如果某个任务必须在退出前完成,您是否可以创建一个通道,让调度器等待并在任务完成时接收信号

  1. Select 会一直阻塞执行,直到从任一通道获取值。 我的问题是:如果 done 变为 true,for 循环应该通过 return 立即退出。为什么它要等待之前由定时器通道触发的未完成任务完成?

  2. 明白了 🙂

  1. 你可以立即使用以下方式启动定时器:

    ticker := time.NewTicker(time.Second * 5)
    for ; true; <-ticker.C {
    …
    }
    
  2. 定时器在等待期间会阻塞,因此它并不耗费资源,调度器在此期间会运行其他的 goroutine。除非你将等待时间设置为 1 纳秒 微笑

  1. 在主 Go 协程中,done <- true 会一直阻塞,直到正在进行的任务运行完毕并且 select 语句从 done 通道中读取到值。如果任务不能中途被中断(例如,它必须完成某些数据库操作或写入文件),你应该使用这种模式。如果任务可以随着程序一同中止,那么你就不需要这种使用 done 通道的构造。
func main() {
    fmt.Println("hello world")
}

太棒了,我采用了第二种解决方案。它总是会等待任务完成。

不过还有一些疑问:

1:在调度器函数中从 done chan 收到 true 后,难道不应该立即从函数返回并退出 for 循环吗?为什么它要等待未完成的任务? 2:ticker 还没有被停止,这是有意为之的吗?

time package - time - Go Packages

在上面的示例中,我认为他们并没有重用定时器。尽管如此,他们还是通过 defer 停止了定时器。我需要知道的是,始终停止定时器是否是最佳实践?因为正如你所说,即使不停止定时器也不会造成任何危害,毕竟主程序退出后它自然会停止。

如果这是行业惯例,那么我必须遵守,否则代码将无法通过评审 🙁

嗯,我同意这一点,你应该正确地编写它 😊

同时也要注意这一点(针对计时器的一般情况):

github.com

// The Timer type represents a single event.
// When the Timer expires, the current time will be sent on C,
// unless the Timer was created by AfterFunc.
// A Timer must be created with NewTimer or AfterFunc.
type Timer struct {
	C <-chan Time
	r runtimeTimer
}

// Stop prevents the Timer from firing.
// It returns true if the call stops the timer, false if the timer has already
// expired or been stopped.
// Stop does not close the channel, to prevent a read from the channel succeeding
// incorrectly.
//
// To prevent a timer created with NewTimer from firing after a call to Stop,
// check the return value and drain the channel.
// For example, assuming the program has not received from t.C already:
//
// 	if !t.Stop() {

可以将第一个任务移到调度器函数中

func scheduler(tick *time.Ticker) {
	task(time.Now())
	for t := range tick.C {
		task(t)
	}
}

这样它就能与其他任务调用保持在一起

如果任务必须在退出前完成,您还可以创建一个通道,调度器会等待该通道,并在任务完成时接收信号

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	tick := time.NewTicker(time.Second * 5)
	done := make(chan bool)
	go scheduler(tick, done)
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	<-sigs
	done <- true
	//os.Exit(1)
}

func scheduler(tick *time.Ticker, done chan bool) {
	task(time.Now())
	for {
		select {
		case t := <-tick.C:
			task(t)
		case <-done:
			return
		}
	}
}

func task(t time.Time) {
	fmt.Println("work started at", t)
	time.Sleep(2 * time.Second)
	fmt.Println("work finished")
}

如果您尝试停止此程序,您会注意到直到最后一条"work finished"被写入后程序才会结束。

感谢 😊
Ticker 完美解决了我的问题。
有两个小问题:
1 - Ticker 不会在应用程序启动时立即触发,它总是等待创建时设定的间隔时间。
假设我想每隔 15 分钟执行某个任务,首次执行时 Ticker 会在应用启动后等待 15 分钟才触发。
我找到了一种变通方法:先在 main 函数中手动调用一次任务,然后依靠 Ticker 每 15 分钟触发。在 Java 的 ScheduledExecutorService 中,只需将延迟设为 ‘0’ 即可实现此行为。
Ticker 有办法实现吗?
2 - 我仅在收到某些系统中断信号后才停止 Ticker 并退出主程序,这会导致高 CPU 和资源占用吗?
代码片段:

func main() {
	task(time.Now())
	tick := time.NewTicker(time.Second * 5)
	go scheduler(tick)
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	 <-sigs
	tick.Stop()
	//os.Exit(1)
}
func scheduler(tick *time.Ticker) {
	for t := range tick.C {
		task(t)
	}
}

func task(t time.Time) {
	fmt.Println("hello! printed at ", t)
}

在Go语言中,虽然没有完全等同于Java ScheduledExecutorService的原生库,但可以通过标准库的time包和第三方库来实现生产环境可用的周期性任务调度。

使用标准库 time.Ticker

这是最符合Go语言习惯的解决方案:

package main

import (
    "fmt"
    "time"
    "context"
)

func scheduleTask(ctx context.Context, interval time.Duration, task func()) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            task()
        case <-ctx.Done():
            return
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    
    task := func() {
        fmt.Println("Hello !!", time.Now().Format("15:04:05"))
    }
    
    // 启动周期性任务,每秒执行一次
    go scheduleTask(ctx, 1*time.Second, task)
    
    // 运行10秒后停止
    time.Sleep(10 * time.Second)
    cancel()
    fmt.Println("任务调度已停止")
}

使用第三方库 cron

对于更复杂的调度需求,推荐使用robfig/cron库:

package main

import (
    "fmt"
    "log"
    "time"
    
    "github.com/robfig/cron/v3"
)

func main() {
    c := cron.New()
    
    // 每秒执行一次
    _, err := c.AddFunc("@every 1s", func() {
        fmt.Println("Hello !!", time.Now().Format("15:04:05"))
    })
    
    if err != nil {
        log.Fatal(err)
    }
    
    // 启动调度器
    c.Start()
    
    // 运行10秒
    time.Sleep(10 * time.Second)
    
    // 优雅停止
    ctx := c.Stop()
    <-ctx.Done()
    fmt.Println("cron调度器已停止")
}

带错误处理的增强版本

package main

import (
    "fmt"
    "log"
    "time"
    "context"
)

type ScheduledTask struct {
    interval time.Duration
    task     func() error
}

func NewScheduledTask(interval time.Duration, task func() error) *ScheduledTask {
    return &ScheduledTask{
        interval: interval,
        task:     task,
    }
}

func (st *ScheduledTask) Run(ctx context.Context) {
    ticker := time.NewTicker(st.interval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            if err := st.task(); err != nil {
                log.Printf("任务执行失败: %v", err)
            }
        case <-ctx.Done():
            log.Println("任务调度已取消")
            return
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    
    task := func() error {
        fmt.Println("Hello !!", time.Now().Format("15:04:05"))
        return nil
    }
    
    scheduledTask := NewScheduledTask(1*time.Second, task)
    go scheduledTask.Run(ctx)
    
    // 运行一段时间后停止
    time.Sleep(10 * time.Second)
    cancel()
    time.Sleep(1 * time.Second) // 等待清理完成
}

多个任务调度管理器

package main

import (
    "fmt"
    "sync"
    "time"
    "context"
)

type TaskScheduler struct {
    wg     sync.WaitGroup
    cancel context.CancelFunc
}

func NewTaskScheduler() *TaskScheduler {
    return &TaskScheduler{}
}

func (ts *TaskScheduler) ScheduleTask(ctx context.Context, interval time.Duration, task func()) {
    ts.wg.Add(1)
    
    go func() {
        defer ts.wg.Done()
        
        ticker := time.NewTicker(interval)
        defer ticker.Stop()
        
        for {
            select {
            case <-ticker.C:
                task()
            case <-ctx.Done():
                return
            }
        }
    }()
}

func (ts *TaskScheduler) Stop() {
    if ts.cancel != nil {
        ts.cancel()
    }
    ts.wg.Wait()
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    
    scheduler := NewTaskScheduler()
    scheduler.cancel = cancel
    
    // 调度多个任务
    scheduler.ScheduleTask(ctx, 1*time.Second, func() {
        fmt.Println("任务1:", time.Now().Format("15:04:05"))
    })
    
    scheduler.ScheduleTask(ctx, 2*time.Second, func() {
        fmt.Println("任务2:", time.Now().Format("15:04:05"))
    })
    
    // 运行一段时间后停止所有任务
    time.Sleep(10 * time.Second)
    scheduler.Stop()
    fmt.Println("所有任务已停止")
}

这些方案都避免了简单的time.Sleep循环,提供了更好的控制性和错误处理能力。对于生产环境,推荐使用time.Ticker方案或robfig/cron库,它们都经过了充分测试且无内存泄漏问题。

回到顶部