Golang中如何实现周期性任务调度
Golang中如何实现周期性任务调度
Go语言中是否有像Java原生库中的ScheduledExecutorService那样的原生库或第三方支持,可用于生产环境?
以下是Java 1.8的代码片段:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TaskScheduler {
/**
* @param args
*/
public static void main(String[] args) {
Runnable runnable = ()-> {
// 要运行的任务放在这里
System.out.println("Hello !!");
};
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
service.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);
}
}
希望找到符合Go语言习惯且无内存泄漏的实现方案 🙂
显然不想要这样的解决方案:
go func() {
for true {
fmt.Println("Hello !!")
time.Sleep(1 * time.Second)
}
}()
更多关于Golang中如何实现周期性任务调度的实战教程也可以访问 https://www.itying.com/category-94-b0.html
[!quote] johandalabacka 在主 Go 协程中的
done <- true会阻塞直到正在进行的任务完成
现在明白了 😄
- 在你的情况下,你不需要停止它,因为当你退出程序时它无论如何都会停止。通常,当你想重复使用同一个定时器时,才需要在无限循环中停止它。
- 简洁的解决方案 😄
- 好的,根据官方规范说明,“停止定时器以释放相关资源。”但在我的实现中,“stop”是在信号中断之后才被调用。所以,我认为这可能会导致一些高资源占用问题。感谢您的解释 🙂
case <-done: return
- 但是它通过调用 return 来实现这一点,还是我误解了你的意思?
- 你可以停止它,但正如之前的帖子中所述。只有当你打算稍后再次启动它,并且程序在不久之后就会退出时,才需要这样做。
case <-done: return
如果某个任务必须在退出前完成,您是否可以创建一个通道,让调度器等待并在任务完成时接收信号
-
Select 会一直阻塞执行,直到从任一通道获取值。 我的问题是:如果
done变为 true,for 循环应该通过 return 立即退出。为什么它要等待之前由定时器通道触发的未完成任务完成? -
明白了 🙂
-
你可以立即使用以下方式启动定时器:
ticker := time.NewTicker(time.Second * 5) for ; true; <-ticker.C { … } -
定时器在等待期间会阻塞,因此它并不耗费资源,调度器在此期间会运行其他的 goroutine。除非你将等待时间设置为 1 纳秒

- 在主 Go 协程中,
done <- true会一直阻塞,直到正在进行的任务运行完毕并且 select 语句从 done 通道中读取到值。如果任务不能中途被中断(例如,它必须完成某些数据库操作或写入文件),你应该使用这种模式。如果任务可以随着程序一同中止,那么你就不需要这种使用 done 通道的构造。
func main() {
fmt.Println("hello world")
}
太棒了,我采用了第二种解决方案。它总是会等待任务完成。
不过还有一些疑问:
1:在调度器函数中从 done chan 收到 true 后,难道不应该立即从函数返回并退出 for 循环吗?为什么它要等待未完成的任务?
2:ticker 还没有被停止,这是有意为之的吗?
在上面的示例中,我认为他们并没有重用定时器。尽管如此,他们还是通过 defer 停止了定时器。我需要知道的是,始终停止定时器是否是最佳实践?因为正如你所说,即使不停止定时器也不会造成任何危害,毕竟主程序退出后它自然会停止。
如果这是行业惯例,那么我必须遵守,否则代码将无法通过评审 🙁
嗯,我同意这一点,你应该正确地编写它 😊
同时也要注意这一点(针对计时器的一般情况):
// The Timer type represents a single event.
// When the Timer expires, the current time will be sent on C,
// unless the Timer was created by AfterFunc.
// A Timer must be created with NewTimer or AfterFunc.
type Timer struct {
C <-chan Time
r runtimeTimer
}
// Stop prevents the Timer from firing.
// It returns true if the call stops the timer, false if the timer has already
// expired or been stopped.
// Stop does not close the channel, to prevent a read from the channel succeeding
// incorrectly.
//
// To prevent a timer created with NewTimer from firing after a call to Stop,
// check the return value and drain the channel.
// For example, assuming the program has not received from t.C already:
//
// if !t.Stop() {
可以将第一个任务移到调度器函数中
func scheduler(tick *time.Ticker) {
task(time.Now())
for t := range tick.C {
task(t)
}
}
这样它就能与其他任务调用保持在一起
如果任务必须在退出前完成,您还可以创建一个通道,调度器会等待该通道,并在任务完成时接收信号
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
tick := time.NewTicker(time.Second * 5)
done := make(chan bool)
go scheduler(tick, done)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
done <- true
//os.Exit(1)
}
func scheduler(tick *time.Ticker, done chan bool) {
task(time.Now())
for {
select {
case t := <-tick.C:
task(t)
case <-done:
return
}
}
}
func task(t time.Time) {
fmt.Println("work started at", t)
time.Sleep(2 * time.Second)
fmt.Println("work finished")
}
如果您尝试停止此程序,您会注意到直到最后一条"work finished"被写入后程序才会结束。
感谢 😊
Ticker 完美解决了我的问题。
有两个小问题:
1 - Ticker 不会在应用程序启动时立即触发,它总是等待创建时设定的间隔时间。
假设我想每隔 15 分钟执行某个任务,首次执行时 Ticker 会在应用启动后等待 15 分钟才触发。
我找到了一种变通方法:先在 main 函数中手动调用一次任务,然后依靠 Ticker 每 15 分钟触发。在 Java 的 ScheduledExecutorService 中,只需将延迟设为 ‘0’ 即可实现此行为。
Ticker 有办法实现吗?
2 - 我仅在收到某些系统中断信号后才停止 Ticker 并退出主程序,这会导致高 CPU 和资源占用吗?
代码片段:
func main() {
task(time.Now())
tick := time.NewTicker(time.Second * 5)
go scheduler(tick)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
tick.Stop()
//os.Exit(1)
}
func scheduler(tick *time.Ticker) {
for t := range tick.C {
task(t)
}
}
func task(t time.Time) {
fmt.Println("hello! printed at ", t)
}
在Go语言中,虽然没有完全等同于Java ScheduledExecutorService的原生库,但可以通过标准库的time包和第三方库来实现生产环境可用的周期性任务调度。
使用标准库 time.Ticker
这是最符合Go语言习惯的解决方案:
package main
import (
"fmt"
"time"
"context"
)
func scheduleTask(ctx context.Context, interval time.Duration, task func()) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
task()
case <-ctx.Done():
return
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
task := func() {
fmt.Println("Hello !!", time.Now().Format("15:04:05"))
}
// 启动周期性任务,每秒执行一次
go scheduleTask(ctx, 1*time.Second, task)
// 运行10秒后停止
time.Sleep(10 * time.Second)
cancel()
fmt.Println("任务调度已停止")
}
使用第三方库 cron
对于更复杂的调度需求,推荐使用robfig/cron库:
package main
import (
"fmt"
"log"
"time"
"github.com/robfig/cron/v3"
)
func main() {
c := cron.New()
// 每秒执行一次
_, err := c.AddFunc("@every 1s", func() {
fmt.Println("Hello !!", time.Now().Format("15:04:05"))
})
if err != nil {
log.Fatal(err)
}
// 启动调度器
c.Start()
// 运行10秒
time.Sleep(10 * time.Second)
// 优雅停止
ctx := c.Stop()
<-ctx.Done()
fmt.Println("cron调度器已停止")
}
带错误处理的增强版本
package main
import (
"fmt"
"log"
"time"
"context"
)
type ScheduledTask struct {
interval time.Duration
task func() error
}
func NewScheduledTask(interval time.Duration, task func() error) *ScheduledTask {
return &ScheduledTask{
interval: interval,
task: task,
}
}
func (st *ScheduledTask) Run(ctx context.Context) {
ticker := time.NewTicker(st.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := st.task(); err != nil {
log.Printf("任务执行失败: %v", err)
}
case <-ctx.Done():
log.Println("任务调度已取消")
return
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
task := func() error {
fmt.Println("Hello !!", time.Now().Format("15:04:05"))
return nil
}
scheduledTask := NewScheduledTask(1*time.Second, task)
go scheduledTask.Run(ctx)
// 运行一段时间后停止
time.Sleep(10 * time.Second)
cancel()
time.Sleep(1 * time.Second) // 等待清理完成
}
多个任务调度管理器
package main
import (
"fmt"
"sync"
"time"
"context"
)
type TaskScheduler struct {
wg sync.WaitGroup
cancel context.CancelFunc
}
func NewTaskScheduler() *TaskScheduler {
return &TaskScheduler{}
}
func (ts *TaskScheduler) ScheduleTask(ctx context.Context, interval time.Duration, task func()) {
ts.wg.Add(1)
go func() {
defer ts.wg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
task()
case <-ctx.Done():
return
}
}
}()
}
func (ts *TaskScheduler) Stop() {
if ts.cancel != nil {
ts.cancel()
}
ts.wg.Wait()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
scheduler := NewTaskScheduler()
scheduler.cancel = cancel
// 调度多个任务
scheduler.ScheduleTask(ctx, 1*time.Second, func() {
fmt.Println("任务1:", time.Now().Format("15:04:05"))
})
scheduler.ScheduleTask(ctx, 2*time.Second, func() {
fmt.Println("任务2:", time.Now().Format("15:04:05"))
})
// 运行一段时间后停止所有任务
time.Sleep(10 * time.Second)
scheduler.Stop()
fmt.Println("所有任务已停止")
}
这些方案都避免了简单的time.Sleep循环,提供了更好的控制性和错误处理能力。对于生产环境,推荐使用time.Ticker方案或robfig/cron库,它们都经过了充分测试且无内存泄漏问题。


