Golang延迟队列实现方案

在Golang中实现延迟队列有哪些成熟的方案?目前项目中需要处理定时任务,比如订单超时自动取消,想了解下常见的实现方式。比较关心性能、可靠性和易用性方面,是否有推荐的开源库或最佳实践?另外,如果自己实现的话,需要注意哪些关键点?

2 回复

Golang实现延迟队列常用方案:

  1. 基于time.AfterFunc + 堆结构
  2. 使用Redis有序集合(ZSET)
  3. 使用第三方库如asynq或machinery
  4. 基于channel + goroutine定时扫描

推荐使用Redis ZSET方案,支持持久化和分布式部署,通过score存储执行时间戳,定期轮询获取到期任务。

更多关于Golang延迟队列实现方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Golang中实现延迟队列有多种方案,以下是几种常用且高效的实现方式:

1. 基于时间堆(Timer Heap)

使用container/heap实现最小堆,按执行时间排序:

package main

import (
	"container/heap"
	"sync"
	"time"
)

type DelayItem struct {
	Value    interface{}
	Priority int64 // 执行时间戳
	Index    int
}

type DelayQueue []*DelayItem

func (dq DelayQueue) Len() int { return len(dq) }

func (dq DelayQueue) Less(i, j int) bool {
	return dq[i].Priority < dq[j].Priority
}

func (dq DelayQueue) Swap(i, j int) {
	dq[i], dq[j] = dq[j], dq[i]
	dq[i].Index = i
	dq[j].Index = j
}

func (dq *DelayQueue) Push(x interface{}) {
	n := len(*dq)
	item := x.(*DelayItem)
	item.Index = n
	*dq = append(*dq, item)
}

func (dq *DelayQueue) Pop() interface{} {
	old := *dq
	n := len(old)
	item := old[n-1]
	item.Index = -1
	*dq = old[0 : n-1]
	return item
}

type Queue struct {
	items *DelayQueue
	lock  sync.Mutex
	cond  *sync.Cond
}

func NewDelayQueue() *Queue {
	dq := make(DelayQueue, 0)
	q := &Queue{
		items: &dq,
	}
	q.cond = sync.NewCond(&q.lock)
	heap.Init(q.items)
	return q
}

func (q *Queue) Push(item interface{}, delay time.Duration) {
	q.lock.Lock()
	defer q.lock.Unlock()
	
	heap.Push(q.items, &DelayItem{
		Value:    item,
		Priority: time.Now().Add(delay).UnixNano(),
	})
	q.cond.Signal()
}

func (q *Queue) Pop() interface{} {
	q.lock.Lock()
	defer q.lock.Unlock()
	
	for {
		if q.items.Len() == 0 {
			q.cond.Wait()
			continue
		}
		
		item := (*q.items)[0]
		if item.Priority <= time.Now().UnixNano() {
			heap.Pop(q.items)
			return item.Value
		}
		
		// 等待最接近的任务到期
		duration := time.Duration(item.Priority - time.Now().UnixNano())
		time.AfterFunc(duration, func() {
			q.cond.Signal()
		})
		q.cond.Wait()
	}
}

2. 基于时间轮(Time Wheel)

适用于大量定时任务的场景:

package main

import (
	"container/list"
	"sync"
	"time"
)

type Task struct {
	delay time.Duration
	f     func()
}

type TimeWheel struct {
	interval    time.Duration
	ticker      *time.Ticker
	slots       []*list.List
	currentPos  int
	slotNum     int
	taskMapping sync.Map
	stopCh      chan bool
	lock        sync.Mutex
}

func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel {
	tw := &TimeWheel{
		interval:   interval,
		slots:      make([]*list.List, slotNum),
		currentPos: 0,
		slotNum:    slotNum,
		stopCh:     make(chan bool),
	}
	
	for i := 0; i < slotNum; i++ {
		tw.slots[i] = list.New()
	}
	return tw
}

func (tw *TimeWheel) Start() {
	tw.ticker = time.NewTicker(tw.interval)
	go tw.run()
}

func (tw *TimeWheel) Stop() {
	tw.ticker.Stop()
	tw.stopCh <- true
}

func (tw *TimeWheel) AddTask(delay time.Duration, f func()) {
	tw.lock.Lock()
	defer tw.lock.Unlock()
	
	ticks := int(delay / tw.interval)
	round := ticks / tw.slotNum
	pos := (tw.currentPos + ticks) % tw.slotNum
	
	task := &Task{delay: delay, f: f}
	tw.slots[pos].PushBack(task)
	tw.taskMapping.Store(task, struct{}{})
}

func (tw *TimeWheel) run() {
	for {
		select {
		case <-tw.ticker.C:
			tw.tick()
		case <-tw.stopCh:
			return
		}
	}
}

func (tw *TimeWheel) tick() {
	tw.lock.Lock()
	defer tw.lock.Unlock()
	
	l := tw.slots[tw.currentPos]
	for e := l.Front(); e != nil; e = e.Next() {
		if task, ok := e.Value.(*Task); ok {
			go task.f()
			tw.taskMapping.Delete(task)
		}
	}
	tw.slots[tw.currentPos] = list.New()
	tw.currentPos = (tw.currentPos + 1) % tw.slotNum
}

3. 基于Redis的延迟队列

对于分布式场景,可以使用Redis的有序集合:

package main

import (
	"context"
	"github.com/go-redis/redis/v8"
	"time"
)

type RedisDelayQueue struct {
	client *redis.Client
	key    string
}

func NewRedisDelayQueue(client *redis.Client, key string) *RedisDelayQueue {
	return &RedisDelayQueue{
		client: client,
		key:    key,
	}
}

func (q *RedisDelayQueue) AddTask(taskID string, delay time.Duration) error {
	score := time.Now().Add(delay).Unix()
	return q.client.ZAdd(context.Background(), q.key, &redis.Z{
		Score:  float64(score),
		Member: taskID,
	}).Err()
}

func (q *RedisDelayQueue) Poll() ([]string, error) {
	now := time.Now().Unix()
	
	// 获取到期的任务
	result, err := q.client.ZRangeByScore(context.Background(), q.key, &redis.ZRangeBy{
		Min: "0",
		Max: string(now),
	}).Result()
	
	if err != nil {
		return nil, err
	}
	
	// 删除已获取的任务
	if len(result) > 0 {
		_, err = q.client.ZRem(context.Background(), q.key, result).Result()
		if err != nil {
			return nil, err
		}
	}
	
	return result, nil
}

方案选择建议

  • 单机应用:推荐使用时间堆,实现简单且性能良好
  • 高并发定时任务:时间轮更适合,避免大量timer的开销
  • 分布式系统:使用Redis方案,支持水平扩展和高可用
  • 生产环境:考虑使用成熟的第三方库如github.com/ouqiang/delay-queue

每种方案都有其适用场景,根据具体需求选择合适的实现方式。

回到顶部