Golang延时队列实现

在Golang中如何实现一个高效的延时队列?目前项目中需要处理大量带有延迟时间的任务,希望能支持精确的定时触发。请问有哪些成熟的实现方案?比如基于堆、时间轮或者Redis的方案,各自有什么优缺点?另外,如何保证高并发下的性能和可靠性?有没有开源库推荐或者最佳实践可以参考?

2 回复

Golang实现延时队列常用方法:

  1. 使用time.After + channel
  2. 使用最小堆(container/heap)管理到期时间
  3. 使用time.Ticker轮询检查
  4. 结合Redis的zset实现分布式延时队列

推荐使用最小堆方案,时间复杂度O(logN),适合单机高并发场景。

更多关于Golang延时队列实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go语言中,实现延时队列通常使用time.Timertime.Ticker或第三方库(如github.com/ouqiang/delayqueue)。以下是几种常见实现方式:


1. 使用time.AfterFunc实现单次延时任务

package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Println("开始时间:", time.Now())
    
    // 3秒后执行任务
    timer := time.AfterFunc(3*time.Second, func() {
        fmt.Println("执行延时任务:", time.Now())
    })
    
    defer timer.Stop() // 必要时停止任务
    time.Sleep(5 * time.Second) // 等待任务执行
}

2. 使用channel实现简易延时队列

package main

import (
    "fmt"
    "time"
)

type DelayTask struct {
    ExecuteAt time.Time
    Task      func()
}

func main() {
    queue := make(chan DelayTask, 100)
    
    // 消费者协程
    go func() {
        for task := range queue {
            now := time.Now()
            if now.Before(task.ExecuteAt) {
                time.Sleep(task.ExecuteAt.Sub(now))
            }
            task.Task()
        }
    }()
    
    // 添加延时任务(2秒后执行)
    queue <- DelayTask{
        ExecuteAt: time.Now().Add(2 * time.Second),
        Task: func() {
            fmt.Println("执行任务:", time.Now())
        },
    }
    
    time.Sleep(3 * time.Second)
    close(queue)
}

3. 使用最小堆实现多任务延时队列

package main

import (
    "container/heap"
    "fmt"
    "time"
)

type Task struct {
    ExecuteAt time.Time
    Task      func()
}

type TaskQueue []*Task

func (h TaskQueue) Len() int           { return len(h) }
func (h TaskQueue) Less(i, j int) bool { return h[i].ExecuteAt.Before(h[j].ExecuteAt) }
func (h TaskQueue) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }

func (h *TaskQueue) Push(x interface{}) {
    *h = append(*h, x.(*Task))
}

func (h *TaskQueue) Pop() interface{} {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[0 : n-1]
    return x
}

func main() {
    pq := make(TaskQueue, 0)
    heap.Init(&pq)
    
    // 添加任务
    heap.Push(&pq, &Task{
        ExecuteAt: time.Now().Add(3 * time.Second),
        Task: func() { fmt.Println("执行任务3秒") },
    })
    
    heap.Push(&pq, &Task{
        ExecuteAt: time.Now().Add(1 * time.Second),
        Task: func() { fmt.Println("执行任务1秒") },
    })
    
    // 执行任务
    for pq.Len() > 0 {
        task := heap.Pop(&pq).(*Task)
        if time.Now().Before(task.ExecuteAt) {
            time.Sleep(task.ExecuteAt.Sub(time.Now()))
        }
        task.Task()
    }
}

4. 推荐使用现成库

对于生产环境,建议使用成熟的开源库:

go get github.com/ouqiang/delayqueue
import "github.com/ouqiang/delayqueue"

dq := delayqueue.New()
dq.Push("task1", 5*time.Second) // 5秒后执行

选择建议:

  • 简单单次任务:使用time.AfterFunc
  • 轻量级需求:channel方案
  • 复杂调度场景:最小堆或现成库
  • 生产环境:推荐使用delayqueue等经过验证的库

所有方案都需注意协程安全和内存管理,根据具体场景选择合适的实现方式。

回到顶部