Golang Machinery任务队列
在使用Golang Machinery任务队列时遇到以下问题:
- 任务执行失败后重试机制不生效,如何正确配置重试次数和间隔?
 - 多个worker同时消费任务时出现竞争条件,有什么最佳实践可以避免?
 - 任务结果保存到Redis后过期时间设置无效,应该如何正确配置?
 - 高并发场景下任务堆积严重,如何优化 Machinery 的性能参数?
 - 有没有推荐的监控方案可以实时查看任务队列状态?
 
环境信息:
- Go 1.18
 - Machinery v1.0.0
 - Redis 6.2 作为broker
 
        
          2 回复
        
      
      
        Golang Machinery是一个基于Redis的分布式任务队列库,支持异步任务调度和定时任务。适用于高并发场景,易于集成到Go项目中。
更多关于Golang Machinery任务队列的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang Machinery 是一个基于 Redis 或 AMQP 的分布式任务队列库,适用于异步任务处理、定时任务和分布式工作流。以下是核心功能和使用方法:
核心特性
- 支持多种后端:Redis、AMQP(如 RabbitMQ)
 - 任务重试机制:可配置失败重试策略
 - 工作流支持:支持任务链、组和回调
 - 周期性任务:类似 cron 的定时任务
 - 结果存储:可获取任务执行状态和结果
 
安装
go get github.com/RichardKnop/machinery/v2
基础示例
1. 定义任务
package tasks
import (
	"context"
	"fmt"
)
func Add(ctx context.Context, args []int64) (int64, error) {
	sum := int64(0)
	for _, arg := range args {
		sum += arg
	}
	return sum, nil
}
2. 启动 Worker
package main
import (
	"machinery_demo/tasks"
	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/config"
)
func main() {
	cnf := &config.Config{
		Broker:        "redis://localhost:6379",
		ResultBackend: "redis://localhost:6379",
	}
	server, err := machinery.NewServer(cnf)
	if err != nil {
		panic(err)
	}
	// 注册任务
	err = server.RegisterTask("add", tasks.Add)
	if err != nil {
		panic(err)
	}
	worker := server.NewWorker("worker_name", 10)
	err = worker.Launch()
	if err != nil {
		panic(err)
	}
}
3. 发送任务
package main
import (
	"context"
	"fmt"
	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/config"
)
func main() {
	cnf := &config.Config{
		Broker:        "redis://localhost:6379",
		ResultBackend: "redis://localhost:6379",
	}
	server, _ := machinery.NewServer(cnf)
	signature := &machinery.TaskSignature{
		Name: "add",
		Args: []machinery.Arg{
			{Type: "int64", Value: 5},
			{Type: "int64", Value: 3},
		},
	}
	asyncResult, _ := server.SendTaskWithContext(context.Background(), signature)
	result, _ := asyncResult.Get(0) // 阻塞获取结果
	fmt.Printf("Result: %v\n", result.Interface()) // 输出: Result: 8
}
关键配置说明
- Broker:消息代理(Redis/AMQP URL)
 - ResultBackend:结果存储后端
 - DefaultQueue:默认队列名称
 - ResultsExpireIn:结果过期时间
 
工作流示例(任务链)
signatures := []*machinery.TaskSignature{
	{Name: "add", Args: []machinery.Arg{{Type: "int64", Value: 2}}},
	{Name: "multiply", Args: []machinery.Arg{{Type: "int64", Value: 4}}},
}
chain, _ := machinery.NewChain(signatures...)
asyncResult, _ := server.SendChain(chain)
注意事项
- 任务函数必须返回 
error作为最后一个返回值 - 参数和返回值需支持 JSON 序列化
 - 生产环境建议配置 Redis 密码和集群
 
适用于需要异步处理耗时操作(如邮件发送、图片处理、数据计算)的场景,能有效提升系统吞吐量和可靠性。
        
      
                    
                    
                    
