golang基于Redis的简单可靠高效分布式任务队列插件库Asynq的使用

Golang基于Redis的简单可靠高效分布式任务队列插件库Asynq的使用

Asynq logo

简介

Asynq是一个Go语言库,用于排队任务并通过工作器异步处理它们。它基于Redis构建,设计为既易于入门又具有可扩展性。

工作原理概述:

  • 客户端将任务放入队列
  • 服务器从队列中拉取任务并为每个任务启动一个工作goroutine
  • 任务由多个工作器并发处理

任务队列用作跨多台机器分发工作的机制。系统可以由多个工作服务器和代理组成,从而实现高可用性和水平扩展。

功能特性

  • 保证任务至少执行一次
  • 任务调度
  • 失败任务的重试
  • 工作器崩溃时自动恢复任务
  • 加权优先级队列
  • 严格优先级队列
  • 低延迟添加任务
  • 使用唯一选项实现任务去重
  • 每个任务可设置超时和截止时间
  • 支持任务聚合以批量处理多个连续操作
  • 灵活的处理程序接口,支持中间件
  • 暂停队列功能
  • 周期性任务
  • 支持Redis Sentinel实现高可用
  • 与Prometheus集成收集和可视化队列指标
  • Web UI检查和远程控制队列和任务
  • CLI工具检查和远程控制队列和任务

快速入门

安装

确保已安装Go(支持最后两个Go版本)。初始化项目后运行:

go get -u github.com/hibiken/asynq

确保本地运行Redis服务器(需要4.0或更高版本)。

示例代码

  1. 首先创建一个封装任务创建和处理的包:
package tasks

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"
    "github.com/hibiken/asynq"
)

// 任务类型常量
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

// 电子邮件投递负载
type EmailDeliveryPayload struct {
    UserID     int
    TemplateID string
}

// 图片调整大小负载
type ImageResizePayload struct {
    SourceURL string
}

// 创建电子邮件投递任务
func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeEmailDelivery, payload), nil
}

// 创建图片调整大小任务
func NewImageResizeTask(src string) (*asynq.Task, error) {
    payload, err := json.Marshal(ImageResizePayload{SourceURL: src})
    if err != nil {
        return nil, err
    }
    // 可以在NewTask中传递任务选项,这些选项可以在入队时被覆盖
    return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20 * time.Minute)), nil
}

// 处理电子邮件投递任务
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    var p EmailDeliveryPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
    // 电子邮件投递代码...
    return nil
}

// 图片处理器实现asynq.Handler接口
type ImageProcessor struct {
    // 结构体字段
}

func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    var p ImageResizePayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Resizing image: src=%s", p.SourceURL)
    // 图片调整大小代码...
    return nil
}

func NewImageProcessor() *ImageProcessor {
    return &ImageProcessor{}
}
  1. 在应用代码中,导入上述包并使用Client将任务放入队列:
package main

import (
    "log"
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
    defer client.Close()

    // 示例1:立即处理任务
    task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err := client.Enqueue(task)
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)

    // 示例2:调度未来处理的任务
    info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatalf("could not schedule task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)

    // 示例3:设置其他选项来调整任务处理行为
    task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
  1. 启动工作服务器处理后台任务:
package main

import (
    "log"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: redisAddr},
        asynq.Config{
            // 指定使用多少并发工作器
            Concurrency: 10,
            // 可选地指定具有不同优先级的多个队列
            Queues: map[string]int{
                "critical": 6,
                "default":  3,
                "low":      1,
            },
            // 查看godoc了解其他配置选项
        },
    )

    // mux将类型映射到处理程序
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...注册其他处理程序...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

Web UI

Asynqmon是一个基于Web的工具,用于监控和管理Asynq队列和任务。

Web UI Queues View

Web UI TasksView

命令行工具

Asynq附带一个命令行工具来检查队列和任务的状态。

安装CLI工具:

go install github.com/hibiken/asynq/tools/asynq@latest

许可证

Asynq是免费开源软件,根据MIT许可证授权。


更多关于golang基于Redis的简单可靠高效分布式任务队列插件库Asynq的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于Redis的简单可靠高效分布式任务队列插件库Asynq的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用Asynq构建Golang分布式任务队列

Asynq是一个基于Redis的Go语言分布式任务队列库,它简单、可靠且高效。下面我将详细介绍Asynq的核心概念和使用方法。

Asynq核心概念

  1. Task:表示要执行的工作单元
  2. Client:用于创建和调度任务
  3. Server:处理任务的工作进程
  4. Worker:实际执行任务的处理器
  5. Queue:任务存储的队列(默认队列名为"default")

安装Asynq

go get -u github.com/hibiken/asynq

基本使用示例

1. 创建任务

package main

import (
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

// 定义任务类型
const (
	TypeEmailDelivery = "email:deliver"
	TypeImageResize   = "image:resize"
)

// EmailDeliveryPayload 定义邮件任务负载
type EmailDeliveryPayload struct {
	UserID     int
	TemplateID string
}

// ImageResizePayload 定义图片处理任务负载
type ImageResizePayload struct {
	SourceURL string
	Width     int
	Height    int
}

func main() {
	// 创建Redis连接选项
	redisConnOpt := asynq.RedisClientOpt{
		Addr: "localhost:6379", // Redis地址
	}

	// 创建Asynq客户端
	client := asynq.NewClient(redisConnOpt)
	defer client.Close()

	// 创建邮件任务
	emailTask, err := asynq.NewTask(
		TypeEmailDelivery,
		EmailDeliveryPayload{UserID: 42, TemplateID: "welcome"},
	)
	if err != nil {
		log.Fatalf("could not create task: %v", err)
	}

	// 将任务加入队列
	info, err := client.Enqueue(emailTask)
	if err != nil {
		log.Fatalf("could not enqueue task: %v", err)
	}
	fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID, info.Queue)

	// 创建图片处理任务(带选项)
	imgTask, err := asynq.NewTask(
		TypeImageResize,
		ImageResizePayload{SourceURL: "http://example.com/image.jpg", Width: 300, Height: 200},
	)
	if err != nil {
		log.Fatalf("could not create task: %v", err)
	}

	// 带选项的任务入队
	info, err = client.Enqueue(
		imgTask,
		asynq.Queue("critical"), // 指定队列
		asynq.MaxRetry(5),       // 最大重试次数
		asynq.Timeout(20*time.Minute), // 超时时间
	)
	if err != nil {
		log.Fatalf("could not enqueue task: %v", err)
	}
	fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID, info.Queue)
}

2. 创建任务处理器

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

// NewEmailTaskProcessor 创建邮件任务处理器
func NewEmailTaskProcessor() asynq.Handler {
	return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
		var p EmailDeliveryPayload
		if err := asynq.UnmarshalPayload(task.Payload(), &p); err != nil {
			return fmt.Errorf("unmarshal payload failed: %v", err)
		}
		log.Printf("sending email to user %d using template %s", p.UserID, p.TemplateID)
		// 这里实现实际的邮件发送逻辑
		return nil
	})
}

// NewImageTaskProcessor 创建图片处理任务处理器
func NewImageTaskProcessor() asynq.Handler {
	return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
		var p ImageResizePayload
		if err := asynq.UnmarshalPayload(task.Payload(), &p); err != nil {
			return fmt.Errorf("unmarshal payload failed: %v", err)
		}
		log.Printf("resizing image from %s to %dx%d", p.SourceURL, p.Width, p.Height)
		// 这里实现实际的图片处理逻辑
		return nil
	})
}

func main() {
	// 创建Redis连接选项
	redisConnOpt := asynq.RedisClientOpt{
		Addr: "localhost:6379",
	}

	// 创建Asynq服务器
	srv := asynq.NewServer(
		redisConnOpt,
		asynq.Config{
			Concurrency: 10, // 最大并发数
			Queues: map[string]int{
				"critical": 6, // 关键队列权重
				"default":  3,  // 默认队列权重
				"low":      1,  // 低优先级队列权重
			},
		},
	)

	// 创建任务多路复用器
	mux := asynq.NewServeMux()
	mux.HandleFunc(TypeEmailDelivery, NewEmailTaskProcessor())
	mux.HandleFunc(TypeImageResize, NewImageTaskProcessor())

	// 启动服务器
	if err := srv.Run(mux); err != nil {
		log.Fatalf("could not run server: %v", err)
	}
}

高级功能

1. 定时任务

// 在未来某个时间执行任务
info, err := client.Enqueue(
    emailTask,
    asynq.ProcessIn(24*time.Hour), // 24小时后执行
)

// 定期任务(需要Asynq Monitor服务)
// 首先创建调度器
scheduler := asynq.NewScheduler(redisConnOpt, &asynq.SchedulerOpts{})

// 注册定期任务
entryID, err := scheduler.Register(
    "@every 1h", // 每小时执行一次
    emailTask,
)
if err != nil {
    log.Fatalf("could not schedule task: %v", err)
}

// 启动调度器
if err := scheduler.Run(); err != nil {
    log.Fatalf("could not run scheduler: %v", err)
}

2. 任务重试机制

Asynq内置了自动重试机制,可以通过选项配置:

info, err := client.Enqueue(
    task,
    asynq.MaxRetry(3),       // 最大重试次数
    asynq.RetryDelay(2*time.Minute), // 重试间隔
    asynq.Timeout(10*time.Minute), // 任务超时时间
)

3. 任务优先级

通过设置不同队列的权重来实现优先级:

srv := asynq.NewServer(
    redisConnOpt,
    asynq.Config{
        Concurrency: 10,
        Queues: map[string]int{
            "critical": 6, // 60%的资源
            "default":  3,  // 30%的资源
            "low":      1,  // 10%的资源
        },
    },
)

监控和管理

Asynq提供了Web UI和CLI工具来监控和管理任务:

  1. Web UI
http.Handle("/", asynqmon.New(asynqmon.Options{
    RootPath:     "/monitoring", // 根路径
    RedisConnOpt: redisConnOpt,
}))
log.Fatal(http.ListenAndServe(":8080", nil))
  1. CLI工具
# 安装CLI
go install github.com/hibiken/asynq/tools/asynq@latest

# 查看队列统计
asynq stats --redis-addr=localhost:6379

# 查看待处理任务
asynq queue list --redis-addr=localhost:6379

最佳实践

  1. 任务设计

    • 保持任务小而专注
    • 任务应该是幂等的
    • 合理设置任务超时时间
  2. 错误处理

    • 在处理器中妥善处理错误
    • 记录失败任务以便排查
    • 设置合理的重试策略
  3. 性能优化

    • 根据服务器资源调整并发数
    • 对任务进行合理分类和优先级划分
    • 监控队列积压情况

Asynq是一个功能强大但易于使用的分布式任务队列库,非常适合需要可靠后台任务处理的Go应用程序。通过合理配置,它可以处理从简单到复杂的各种任务处理需求。

回到顶部