golang基于Redis的简单可靠高效分布式任务队列插件库Asynq的使用
Golang基于Redis的简单可靠高效分布式任务队列插件库Asynq的使用
简介
Asynq是一个Go语言库,用于排队任务并通过工作器异步处理它们。它基于Redis构建,设计为既易于入门又具有可扩展性。
工作原理概述:
- 客户端将任务放入队列
- 服务器从队列中拉取任务并为每个任务启动一个工作goroutine
- 任务由多个工作器并发处理
任务队列用作跨多台机器分发工作的机制。系统可以由多个工作服务器和代理组成,从而实现高可用性和水平扩展。
功能特性
- 保证任务至少执行一次
- 任务调度
- 失败任务的重试
- 工作器崩溃时自动恢复任务
- 加权优先级队列
- 严格优先级队列
- 低延迟添加任务
- 使用唯一选项实现任务去重
- 每个任务可设置超时和截止时间
- 支持任务聚合以批量处理多个连续操作
- 灵活的处理程序接口,支持中间件
- 暂停队列功能
- 周期性任务
- 支持Redis Sentinel实现高可用
- 与Prometheus集成收集和可视化队列指标
- Web UI检查和远程控制队列和任务
- CLI工具检查和远程控制队列和任务
快速入门
安装
确保已安装Go(支持最后两个Go版本)。初始化项目后运行:
go get -u github.com/hibiken/asynq
确保本地运行Redis服务器(需要4.0或更高版本)。
示例代码
- 首先创建一个封装任务创建和处理的包:
package tasks
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
// 任务类型常量
const (
TypeEmailDelivery = "email:deliver"
TypeImageResize = "image:resize"
)
// 电子邮件投递负载
type EmailDeliveryPayload struct {
UserID int
TemplateID string
}
// 图片调整大小负载
type ImageResizePayload struct {
SourceURL string
}
// 创建电子邮件投递任务
func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeEmailDelivery, payload), nil
}
// 创建图片调整大小任务
func NewImageResizeTask(src string) (*asynq.Task, error) {
payload, err := json.Marshal(ImageResizePayload{SourceURL: src})
if err != nil {
return nil, err
}
// 可以在NewTask中传递任务选项,这些选项可以在入队时被覆盖
return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20 * time.Minute)), nil
}
// 处理电子邮件投递任务
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
var p EmailDeliveryPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
// 电子邮件投递代码...
return nil
}
// 图片处理器实现asynq.Handler接口
type ImageProcessor struct {
// 结构体字段
}
func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
var p ImageResizePayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
log.Printf("Resizing image: src=%s", p.SourceURL)
// 图片调整大小代码...
return nil
}
func NewImageProcessor() *ImageProcessor {
return &ImageProcessor{}
}
- 在应用代码中,导入上述包并使用Client将任务放入队列:
package main
import (
"log"
"time"
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
// 示例1:立即处理任务
task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
// 示例2:调度未来处理的任务
info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatalf("could not schedule task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
// 示例3:设置其他选项来调整任务处理行为
task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
- 启动工作服务器处理后台任务:
package main
import (
"log"
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
// 指定使用多少并发工作器
Concurrency: 10,
// 可选地指定具有不同优先级的多个队列
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
// 查看godoc了解其他配置选项
},
)
// mux将类型映射到处理程序
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
// ...注册其他处理程序...
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
Web UI
Asynqmon是一个基于Web的工具,用于监控和管理Asynq队列和任务。
命令行工具
Asynq附带一个命令行工具来检查队列和任务的状态。
安装CLI工具:
go install github.com/hibiken/asynq/tools/asynq@latest
许可证
Asynq是免费开源软件,根据MIT许可证授权。
更多关于golang基于Redis的简单可靠高效分布式任务队列插件库Asynq的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang基于Redis的简单可靠高效分布式任务队列插件库Asynq的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用Asynq构建Golang分布式任务队列
Asynq是一个基于Redis的Go语言分布式任务队列库,它简单、可靠且高效。下面我将详细介绍Asynq的核心概念和使用方法。
Asynq核心概念
- Task:表示要执行的工作单元
- Client:用于创建和调度任务
- Server:处理任务的工作进程
- Worker:实际执行任务的处理器
- Queue:任务存储的队列(默认队列名为"default")
安装Asynq
go get -u github.com/hibiken/asynq
基本使用示例
1. 创建任务
package main
import (
"fmt"
"log"
"github.com/hibiken/asynq"
)
// 定义任务类型
const (
TypeEmailDelivery = "email:deliver"
TypeImageResize = "image:resize"
)
// EmailDeliveryPayload 定义邮件任务负载
type EmailDeliveryPayload struct {
UserID int
TemplateID string
}
// ImageResizePayload 定义图片处理任务负载
type ImageResizePayload struct {
SourceURL string
Width int
Height int
}
func main() {
// 创建Redis连接选项
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379", // Redis地址
}
// 创建Asynq客户端
client := asynq.NewClient(redisConnOpt)
defer client.Close()
// 创建邮件任务
emailTask, err := asynq.NewTask(
TypeEmailDelivery,
EmailDeliveryPayload{UserID: 42, TemplateID: "welcome"},
)
if err != nil {
log.Fatalf("could not create task: %v", err)
}
// 将任务加入队列
info, err := client.Enqueue(emailTask)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID, info.Queue)
// 创建图片处理任务(带选项)
imgTask, err := asynq.NewTask(
TypeImageResize,
ImageResizePayload{SourceURL: "http://example.com/image.jpg", Width: 300, Height: 200},
)
if err != nil {
log.Fatalf("could not create task: %v", err)
}
// 带选项的任务入队
info, err = client.Enqueue(
imgTask,
asynq.Queue("critical"), // 指定队列
asynq.MaxRetry(5), // 最大重试次数
asynq.Timeout(20*time.Minute), // 超时时间
)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID, info.Queue)
}
2. 创建任务处理器
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
// NewEmailTaskProcessor 创建邮件任务处理器
func NewEmailTaskProcessor() asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
var p EmailDeliveryPayload
if err := asynq.UnmarshalPayload(task.Payload(), &p); err != nil {
return fmt.Errorf("unmarshal payload failed: %v", err)
}
log.Printf("sending email to user %d using template %s", p.UserID, p.TemplateID)
// 这里实现实际的邮件发送逻辑
return nil
})
}
// NewImageTaskProcessor 创建图片处理任务处理器
func NewImageTaskProcessor() asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
var p ImageResizePayload
if err := asynq.UnmarshalPayload(task.Payload(), &p); err != nil {
return fmt.Errorf("unmarshal payload failed: %v", err)
}
log.Printf("resizing image from %s to %dx%d", p.SourceURL, p.Width, p.Height)
// 这里实现实际的图片处理逻辑
return nil
})
}
func main() {
// 创建Redis连接选项
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
}
// 创建Asynq服务器
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
Concurrency: 10, // 最大并发数
Queues: map[string]int{
"critical": 6, // 关键队列权重
"default": 3, // 默认队列权重
"low": 1, // 低优先级队列权重
},
},
)
// 创建任务多路复用器
mux := asynq.NewServeMux()
mux.HandleFunc(TypeEmailDelivery, NewEmailTaskProcessor())
mux.HandleFunc(TypeImageResize, NewImageTaskProcessor())
// 启动服务器
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
高级功能
1. 定时任务
// 在未来某个时间执行任务
info, err := client.Enqueue(
emailTask,
asynq.ProcessIn(24*time.Hour), // 24小时后执行
)
// 定期任务(需要Asynq Monitor服务)
// 首先创建调度器
scheduler := asynq.NewScheduler(redisConnOpt, &asynq.SchedulerOpts{})
// 注册定期任务
entryID, err := scheduler.Register(
"@every 1h", // 每小时执行一次
emailTask,
)
if err != nil {
log.Fatalf("could not schedule task: %v", err)
}
// 启动调度器
if err := scheduler.Run(); err != nil {
log.Fatalf("could not run scheduler: %v", err)
}
2. 任务重试机制
Asynq内置了自动重试机制,可以通过选项配置:
info, err := client.Enqueue(
task,
asynq.MaxRetry(3), // 最大重试次数
asynq.RetryDelay(2*time.Minute), // 重试间隔
asynq.Timeout(10*time.Minute), // 任务超时时间
)
3. 任务优先级
通过设置不同队列的权重来实现优先级:
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
Concurrency: 10,
Queues: map[string]int{
"critical": 6, // 60%的资源
"default": 3, // 30%的资源
"low": 1, // 10%的资源
},
},
)
监控和管理
Asynq提供了Web UI和CLI工具来监控和管理任务:
- Web UI:
http.Handle("/", asynqmon.New(asynqmon.Options{
RootPath: "/monitoring", // 根路径
RedisConnOpt: redisConnOpt,
}))
log.Fatal(http.ListenAndServe(":8080", nil))
- CLI工具:
# 安装CLI
go install github.com/hibiken/asynq/tools/asynq@latest
# 查看队列统计
asynq stats --redis-addr=localhost:6379
# 查看待处理任务
asynq queue list --redis-addr=localhost:6379
最佳实践
-
任务设计:
- 保持任务小而专注
- 任务应该是幂等的
- 合理设置任务超时时间
-
错误处理:
- 在处理器中妥善处理错误
- 记录失败任务以便排查
- 设置合理的重试策略
-
性能优化:
- 根据服务器资源调整并发数
- 对任务进行合理分类和优先级划分
- 监控队列积压情况
Asynq是一个功能强大但易于使用的分布式任务队列库,非常适合需要可靠后台任务处理的Go应用程序。通过合理配置,它可以处理从简单到复杂的各种任务处理需求。