Golang延迟队列实现方案
在Golang中实现延迟队列有哪些成熟的方案?目前项目中需要处理定时任务,比如订单超时自动取消,想了解下常见的实现方式。比较关心性能、可靠性和易用性方面,是否有推荐的开源库或最佳实践?另外,如果自己实现的话,需要注意哪些关键点?
2 回复
Golang实现延迟队列常用方案:
- 基于time.AfterFunc + 堆结构
- 使用Redis有序集合(ZSET)
- 使用第三方库如asynq或machinery
- 基于channel + goroutine定时扫描
推荐使用Redis ZSET方案,支持持久化和分布式部署,通过score存储执行时间戳,定期轮询获取到期任务。
更多关于Golang延迟队列实现方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Golang中实现延迟队列有多种方案,以下是几种常用且高效的实现方式:
1. 基于时间堆(Timer Heap)
使用container/heap实现最小堆,按执行时间排序:
package main
import (
"container/heap"
"sync"
"time"
)
type DelayItem struct {
Value interface{}
Priority int64 // 执行时间戳
Index int
}
type DelayQueue []*DelayItem
func (dq DelayQueue) Len() int { return len(dq) }
func (dq DelayQueue) Less(i, j int) bool {
return dq[i].Priority < dq[j].Priority
}
func (dq DelayQueue) Swap(i, j int) {
dq[i], dq[j] = dq[j], dq[i]
dq[i].Index = i
dq[j].Index = j
}
func (dq *DelayQueue) Push(x interface{}) {
n := len(*dq)
item := x.(*DelayItem)
item.Index = n
*dq = append(*dq, item)
}
func (dq *DelayQueue) Pop() interface{} {
old := *dq
n := len(old)
item := old[n-1]
item.Index = -1
*dq = old[0 : n-1]
return item
}
type Queue struct {
items *DelayQueue
lock sync.Mutex
cond *sync.Cond
}
func NewDelayQueue() *Queue {
dq := make(DelayQueue, 0)
q := &Queue{
items: &dq,
}
q.cond = sync.NewCond(&q.lock)
heap.Init(q.items)
return q
}
func (q *Queue) Push(item interface{}, delay time.Duration) {
q.lock.Lock()
defer q.lock.Unlock()
heap.Push(q.items, &DelayItem{
Value: item,
Priority: time.Now().Add(delay).UnixNano(),
})
q.cond.Signal()
}
func (q *Queue) Pop() interface{} {
q.lock.Lock()
defer q.lock.Unlock()
for {
if q.items.Len() == 0 {
q.cond.Wait()
continue
}
item := (*q.items)[0]
if item.Priority <= time.Now().UnixNano() {
heap.Pop(q.items)
return item.Value
}
// 等待最接近的任务到期
duration := time.Duration(item.Priority - time.Now().UnixNano())
time.AfterFunc(duration, func() {
q.cond.Signal()
})
q.cond.Wait()
}
}
2. 基于时间轮(Time Wheel)
适用于大量定时任务的场景:
package main
import (
"container/list"
"sync"
"time"
)
type Task struct {
delay time.Duration
f func()
}
type TimeWheel struct {
interval time.Duration
ticker *time.Ticker
slots []*list.List
currentPos int
slotNum int
taskMapping sync.Map
stopCh chan bool
lock sync.Mutex
}
func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel {
tw := &TimeWheel{
interval: interval,
slots: make([]*list.List, slotNum),
currentPos: 0,
slotNum: slotNum,
stopCh: make(chan bool),
}
for i := 0; i < slotNum; i++ {
tw.slots[i] = list.New()
}
return tw
}
func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.interval)
go tw.run()
}
func (tw *TimeWheel) Stop() {
tw.ticker.Stop()
tw.stopCh <- true
}
func (tw *TimeWheel) AddTask(delay time.Duration, f func()) {
tw.lock.Lock()
defer tw.lock.Unlock()
ticks := int(delay / tw.interval)
round := ticks / tw.slotNum
pos := (tw.currentPos + ticks) % tw.slotNum
task := &Task{delay: delay, f: f}
tw.slots[pos].PushBack(task)
tw.taskMapping.Store(task, struct{}{})
}
func (tw *TimeWheel) run() {
for {
select {
case <-tw.ticker.C:
tw.tick()
case <-tw.stopCh:
return
}
}
}
func (tw *TimeWheel) tick() {
tw.lock.Lock()
defer tw.lock.Unlock()
l := tw.slots[tw.currentPos]
for e := l.Front(); e != nil; e = e.Next() {
if task, ok := e.Value.(*Task); ok {
go task.f()
tw.taskMapping.Delete(task)
}
}
tw.slots[tw.currentPos] = list.New()
tw.currentPos = (tw.currentPos + 1) % tw.slotNum
}
3. 基于Redis的延迟队列
对于分布式场景,可以使用Redis的有序集合:
package main
import (
"context"
"github.com/go-redis/redis/v8"
"time"
)
type RedisDelayQueue struct {
client *redis.Client
key string
}
func NewRedisDelayQueue(client *redis.Client, key string) *RedisDelayQueue {
return &RedisDelayQueue{
client: client,
key: key,
}
}
func (q *RedisDelayQueue) AddTask(taskID string, delay time.Duration) error {
score := time.Now().Add(delay).Unix()
return q.client.ZAdd(context.Background(), q.key, &redis.Z{
Score: float64(score),
Member: taskID,
}).Err()
}
func (q *RedisDelayQueue) Poll() ([]string, error) {
now := time.Now().Unix()
// 获取到期的任务
result, err := q.client.ZRangeByScore(context.Background(), q.key, &redis.ZRangeBy{
Min: "0",
Max: string(now),
}).Result()
if err != nil {
return nil, err
}
// 删除已获取的任务
if len(result) > 0 {
_, err = q.client.ZRem(context.Background(), q.key, result).Result()
if err != nil {
return nil, err
}
}
return result, nil
}
方案选择建议
- 单机应用:推荐使用时间堆,实现简单且性能良好
- 高并发定时任务:时间轮更适合,避免大量timer的开销
- 分布式系统:使用Redis方案,支持水平扩展和高可用
- 生产环境:考虑使用成熟的第三方库如
github.com/ouqiang/delay-queue
每种方案都有其适用场景,根据具体需求选择合适的实现方式。

