golang高性能内存线程安全延迟队列插件库dqueue的使用
Golang高性能内存线程安全延迟队列插件库dqueue的使用
什么是dqueue?
dqueue是一个Golang实现的延迟任务队列库,它允许你创建在未来某个时间点执行的任务,比如"3分钟后执行这个任务"或"15秒后执行那个任务"。然后可以通过并发goroutine消费这些任务,任务会按照正确的顺序提供给消费者。
基本用法
package main
import (
"fmt"
"time"
"github.com/vodolaz095/dqueue"
)
func main() {
handler := dqueue.New() // 导入"github.com/vodolaz095/dqueue"
// payload可以是任何类型 - 数字、字符串、buffer、结构体...
something := "task"
// 创建未来执行的任务
handler.ExecuteAt(something, time.Now().Add(time.Minute)) // 在指定时间执行
handler.ExecuteAfter(something, time.Minute) // 1分钟后执行
// 获取准备就绪的任务
task, ready := handler.Get()
if ready { // 任务已就绪
fmt.Printf("任务 %s 准备在 %s 执行\n",
task.Payload.(string),
task.ExecuteAt.Format(time.Kitchen),
)
} else {
fmt.Println("没有准备就绪的任务")
}
// 获取队列中剩余任务数量
tasksInQueue := handler.Len()
// 导出所有任务,例如在关闭应用前保存所有待处理队列
tasks := handler.Dump()
// 清空队列
handler.Prune()
}
并发消费者示例
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/vodolaz095/dqueue"
)
func ProcessTask(task dqueue.Task) error {
// 处理任务的逻辑
fmt.Printf("处理任务: %v\n", task.Payload)
return nil
}
func main() {
handler := dqueue.New()
// 发布任务
something := "task" // payload可以是任何类型
handler.ExecuteAt(something, time.Now().Add(time.Minute))
handler.ExecuteAfter(something, time.Minute)
// 创建全局上下文,在应用停止时取消
wg := sync.WaitGroup{}
mainCtx, mainCancel := context.WithTimeout(context.Background(), 3*time.Second)
defer mainCancel()
// 启动并发消费者
for j := 0; j < 10; j++ {
wg.Add(1)
go func(workerNumber int, initialCtx context.Context) {
ctx, cancel := context.WithCancel(initialCtx)
defer cancel()
ticker := time.NewTicker(time.Millisecond)
for {
select {
case <-ticker.C:
task, ready := handler.Get()
if ready { // 任务已就绪
err := ProcessTask(task)
if err != nil { // 例如,将消息重新排队在1分钟后交付
handler.ExecuteAfter(something, time.Minute)
}
}
break
case <-ctx.Done():
fmt.Printf("关闭worker %v, 队列中还有%v个任务\n", workerNumber, handler.Len())
wg.Done()
ticker.Stop()
return
}
}
}(j, mainCtx)
}
wg.Wait()
// 查看剩余任务,以便应用重启时可以恢复
tasks := handler.Dump()
}
主要功能
- ExecuteAt(payload interface{}, when time.Time) - 在指定时间执行任务
- ExecuteAfter(payload interface{}, duration time.Duration) - 在指定时间间隔后执行任务
- Get() (Task, bool) - 获取准备就绪的任务
- Len() int - 获取队列中任务数量
- Dump() []Task - 导出所有任务
- Prune() - 清空队列
这个库是线程安全的,适合在高并发环境下使用。如果你需要更高性能但更不灵活的解决方案,可以考虑基于泛型的替代方案。
更多关于golang高性能内存线程安全延迟队列插件库dqueue的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang高性能内存线程安全延迟队列插件库dqueue的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang高性能内存线程安全延迟队列dqueue使用指南
dqueue是一个高性能的内存线程安全延迟队列实现,适用于需要延迟任务处理的场景。下面我将详细介绍其使用方法和示例代码。
安装
go get github.com/ouqiang/delayqueue
基本使用
1. 创建队列
package main
import (
"fmt"
"time"
"github.com/ouqiang/delayqueue"
)
func main() {
// 创建延迟队列
queue := delayqueue.New()
// 设置队列容量(可选)
queue.SetCapacity(10000)
// 启动队列消费
go func() {
for {
// 阻塞获取元素
item := queue.Take()
fmt.Printf("处理元素: %v, 当前时间: %v\n", item.Value(), time.Now())
}
}()
// 添加延迟元素
queue.Put(delayqueue.NewItem("task1", time.Now().Add(3*time.Second)))
queue.Put(delayqueue.NewItem("task2", time.Now().Add(5*time.Second)))
// 防止主线程退出
time.Sleep(10 * time.Second)
}
2. 带回调的使用方式
func main() {
queue := delayqueue.New()
// 设置回调函数处理元素
queue.OnTake(func(item delayqueue.Item) {
fmt.Printf("回调处理: %v\n", item.Value())
})
// 添加元素
queue.Put(delayqueue.NewItem("callback-task", time.Now().Add(2*time.Second)))
time.Sleep(3 * time.Second)
}
高级功能
1. 优先级队列
func main() {
queue := delayqueue.New()
// 添加不同优先级的元素
queue.Put(delayqueue.NewItemWithPriority("high-priority", time.Now().Add(5*time.Second), 1)
queue.Put(delayqueue.NewItemWithPriority("low-priority", time.Now().Add(5*time.Second), 10))
go func() {
for {
item := queue.Take()
fmt.Println(item.Value()) // 先输出high-priority
}
}()
time.Sleep(10 * time.Second)
}
2. 取消延迟任务
func main() {
queue := delayqueue.New()
item1 := delayqueue.NewItem("cancelable-task", time.Now().Add(5*time.Second))
queue.Put(item1)
// 取消任务
queue.Remove(item1)
// 确认是否已取消
fmt.Println(queue.Contains(item1)) // 输出false
}
3. 批量操作
func main() {
queue := delayqueue.New()
// 批量添加
items := []delayqueue.Item{
delayqueue.NewItem("batch1", time.Now().Add(1*time.Second)),
delayqueue.NewItem("batch2", time.Now().Add(2*time.Second)),
}
queue.PutBatch(items...)
// 批量获取
for i := 0; i < 2; i++ {
item := queue.Take()
fmt.Println(item.Value())
}
}
性能优化建议
- 合理设置队列容量:根据业务需求设置初始容量,减少扩容带来的性能开销
- 批量操作:尽量使用PutBatch进行批量插入
- 避免频繁取消:Remove操作需要遍历队列,频繁使用会影响性能
- 控制优先级数量:优先级数量过多会影响性能
实际应用场景
- 订单超时取消
- 延迟通知
- 定时任务调度
- 重试机制实现
注意事项
- dqueue是内存队列,程序重启会导致队列数据丢失
- 大量数据时注意内存占用
- 分布式环境需要结合其他技术实现
以上是dqueue的基本使用方法和示例代码,这个库提供了简单高效的延迟队列实现,适合单机高性能延迟任务处理场景。