Golang异步任务队列

在Golang中实现异步任务队列有哪些推荐的方式或库?目前项目需要处理大量后台任务,希望找到一个高性能、易维护的解决方案。听说有Asynq、Machinery等库,但不太清楚它们的优缺点和适用场景。有没有实际使用过的朋友能分享一下经验?比如如何保证任务可靠性、如何处理任务重试、以及如何监控任务执行状态等?

2 回复

推荐使用Asynq或Machinery。Asynq基于Redis,支持定时任务和重试机制,API简洁。Machinery支持多种后端(Redis、AMQP等),功能丰富。两者都适合构建可靠的异步任务系统。

更多关于Golang异步任务队列的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go语言中,实现异步任务队列通常使用goroutine和channel,或者借助第三方库如asynqmachinery等。以下是两种常见方法:

1. 使用Goroutine和Channel(轻量级方案)

适用于简单异步任务,无需持久化:

package main

import (
	"fmt"
	"time"
)

type Task struct {
	ID   int
	Data string
}

func worker(id int, tasks <-chan Task) {
	for task := range tasks {
		fmt.Printf("Worker %d processing task %d: %s\n", id, task.ID, task.Data)
		time.Sleep(2 * time.Second) // 模拟任务处理
		fmt.Printf("Worker %d completed task %d\n", id, task.ID)
	}
}

func main() {
	taskQueue := make(chan Task, 100)

	// 启动3个worker
	for i := 1; i <= 3; i++ {
		go worker(i, taskQueue)
	}

	// 发送任务
	for i := 1; i <= 5; i++ {
		taskQueue <- Task{ID: i, Data: fmt.Sprintf("data-%d", i)}
	}

	close(taskQueue)
	time.Sleep(10 * time.Second) // 等待任务完成
}

2. 使用Asynq库(生产级方案)

支持Redis持久化、定时任务、重试机制:

package main

import (
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

const redisAddr = "127.0.0.1:6379"

// 定义任务类型
const TypeEmailDelivery = "email:deliver"

type EmailTaskPayload struct {
	UserID int
	Email  string
}

func main() {
	// 1. 创建客户端
	client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
	defer client.Close()

	// 2. 创建任务
	task, err := asynq.NewTask(TypeEmailDelivery, 
		EmailTaskPayload{UserID: 42, Email: "user@example.com"})
	if err != nil {
		log.Fatal(err)
	}

	// 3. 提交任务
	info, err := client.Enqueue(task)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Enqueued task: %s\n", info.ID)

	// 4. 启动服务端处理任务
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: redisAddr},
		asynq.Config{Concurrency: 10},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc(TypeEmailDelivery, handleEmailTask)
	
	if err := srv.Run(mux); err != nil {
		log.Fatal(err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	fmt.Printf("Sending email to %s (userID:%d)\n", p.Email, p.UserID)
	return nil
}

选择建议:

  • 简单场景:使用goroutine+channel
  • 生产环境:推荐asynq(支持任务持久化、重试、监控)
  • 分布式系统:考虑machinery(基于AMQP)

安装asynq:

go get -u github.com/hibiken/asynq

记得根据需求配置Redis服务器,并处理错误和优雅关闭。

回到顶部