Golang Machinery任务队列

在使用Golang Machinery任务队列时遇到以下问题:

  1. 任务执行失败后重试机制不生效,如何正确配置重试次数和间隔?
  2. 多个worker同时消费任务时出现竞争条件,有什么最佳实践可以避免?
  3. 任务结果保存到Redis后过期时间设置无效,应该如何正确配置?
  4. 高并发场景下任务堆积严重,如何优化 Machinery 的性能参数?
  5. 有没有推荐的监控方案可以实时查看任务队列状态?

环境信息:

  • Go 1.18
  • Machinery v1.0.0
  • Redis 6.2 作为broker
2 回复

Golang Machinery是一个基于Redis的分布式任务队列库,支持异步任务调度和定时任务。适用于高并发场景,易于集成到Go项目中。

更多关于Golang Machinery任务队列的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang Machinery 是一个基于 Redis 或 AMQP 的分布式任务队列库,适用于异步任务处理、定时任务和分布式工作流。以下是核心功能和使用方法:

核心特性

  1. 支持多种后端:Redis、AMQP(如 RabbitMQ)
  2. 任务重试机制:可配置失败重试策略
  3. 工作流支持:支持任务链、组和回调
  4. 周期性任务:类似 cron 的定时任务
  5. 结果存储:可获取任务执行状态和结果

安装

go get github.com/RichardKnop/machinery/v2

基础示例

1. 定义任务

package tasks

import (
	"context"
	"fmt"
)

func Add(ctx context.Context, args []int64) (int64, error) {
	sum := int64(0)
	for _, arg := range args {
		sum += arg
	}
	return sum, nil
}

2. 启动 Worker

package main

import (
	"machinery_demo/tasks"
	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/config"
)

func main() {
	cnf := &config.Config{
		Broker:        "redis://localhost:6379",
		ResultBackend: "redis://localhost:6379",
	}

	server, err := machinery.NewServer(cnf)
	if err != nil {
		panic(err)
	}

	// 注册任务
	err = server.RegisterTask("add", tasks.Add)
	if err != nil {
		panic(err)
	}

	worker := server.NewWorker("worker_name", 10)
	err = worker.Launch()
	if err != nil {
		panic(err)
	}
}

3. 发送任务

package main

import (
	"context"
	"fmt"
	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/config"
)

func main() {
	cnf := &config.Config{
		Broker:        "redis://localhost:6379",
		ResultBackend: "redis://localhost:6379",
	}

	server, _ := machinery.NewServer(cnf)

	signature := &machinery.TaskSignature{
		Name: "add",
		Args: []machinery.Arg{
			{Type: "int64", Value: 5},
			{Type: "int64", Value: 3},
		},
	}

	asyncResult, _ := server.SendTaskWithContext(context.Background(), signature)
	result, _ := asyncResult.Get(0) // 阻塞获取结果
	fmt.Printf("Result: %v\n", result.Interface()) // 输出: Result: 8
}

关键配置说明

  • Broker:消息代理(Redis/AMQP URL)
  • ResultBackend:结果存储后端
  • DefaultQueue:默认队列名称
  • ResultsExpireIn:结果过期时间

工作流示例(任务链)

signatures := []*machinery.TaskSignature{
	{Name: "add", Args: []machinery.Arg{{Type: "int64", Value: 2}}},
	{Name: "multiply", Args: []machinery.Arg{{Type: "int64", Value: 4}}},
}

chain, _ := machinery.NewChain(signatures...)
asyncResult, _ := server.SendChain(chain)

注意事项

  1. 任务函数必须返回 error 作为最后一个返回值
  2. 参数和返回值需支持 JSON 序列化
  3. 生产环境建议配置 Redis 密码和集群

适用于需要异步处理耗时操作(如邮件发送、图片处理、数据计算)的场景,能有效提升系统吞吐量和可靠性。

回到顶部