Golang Machinery任务队列
在使用Golang Machinery任务队列时遇到以下问题:
- 任务执行失败后重试机制不生效,如何正确配置重试次数和间隔?
- 多个worker同时消费任务时出现竞争条件,有什么最佳实践可以避免?
- 任务结果保存到Redis后过期时间设置无效,应该如何正确配置?
- 高并发场景下任务堆积严重,如何优化 Machinery 的性能参数?
- 有没有推荐的监控方案可以实时查看任务队列状态?
环境信息:
- Go 1.18
- Machinery v1.0.0
- Redis 6.2 作为broker
2 回复
Golang Machinery是一个基于Redis的分布式任务队列库,支持异步任务调度和定时任务。适用于高并发场景,易于集成到Go项目中。
更多关于Golang Machinery任务队列的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang Machinery 是一个基于 Redis 或 AMQP 的分布式任务队列库,适用于异步任务处理、定时任务和分布式工作流。以下是核心功能和使用方法:
核心特性
- 支持多种后端:Redis、AMQP(如 RabbitMQ)
- 任务重试机制:可配置失败重试策略
- 工作流支持:支持任务链、组和回调
- 周期性任务:类似 cron 的定时任务
- 结果存储:可获取任务执行状态和结果
安装
go get github.com/RichardKnop/machinery/v2
基础示例
1. 定义任务
package tasks
import (
"context"
"fmt"
)
func Add(ctx context.Context, args []int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, nil
}
2. 启动 Worker
package main
import (
"machinery_demo/tasks"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/config"
)
func main() {
cnf := &config.Config{
Broker: "redis://localhost:6379",
ResultBackend: "redis://localhost:6379",
}
server, err := machinery.NewServer(cnf)
if err != nil {
panic(err)
}
// 注册任务
err = server.RegisterTask("add", tasks.Add)
if err != nil {
panic(err)
}
worker := server.NewWorker("worker_name", 10)
err = worker.Launch()
if err != nil {
panic(err)
}
}
3. 发送任务
package main
import (
"context"
"fmt"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/config"
)
func main() {
cnf := &config.Config{
Broker: "redis://localhost:6379",
ResultBackend: "redis://localhost:6379",
}
server, _ := machinery.NewServer(cnf)
signature := &machinery.TaskSignature{
Name: "add",
Args: []machinery.Arg{
{Type: "int64", Value: 5},
{Type: "int64", Value: 3},
},
}
asyncResult, _ := server.SendTaskWithContext(context.Background(), signature)
result, _ := asyncResult.Get(0) // 阻塞获取结果
fmt.Printf("Result: %v\n", result.Interface()) // 输出: Result: 8
}
关键配置说明
- Broker:消息代理(Redis/AMQP URL)
- ResultBackend:结果存储后端
- DefaultQueue:默认队列名称
- ResultsExpireIn:结果过期时间
工作流示例(任务链)
signatures := []*machinery.TaskSignature{
{Name: "add", Args: []machinery.Arg{{Type: "int64", Value: 2}}},
{Name: "multiply", Args: []machinery.Arg{{Type: "int64", Value: 4}}},
}
chain, _ := machinery.NewChain(signatures...)
asyncResult, _ := server.SendChain(chain)
注意事项
- 任务函数必须返回
error作为最后一个返回值 - 参数和返回值需支持 JSON 序列化
- 生产环境建议配置 Redis 密码和集群
适用于需要异步处理耗时操作(如邮件发送、图片处理、数据计算)的场景,能有效提升系统吞吐量和可靠性。

