Golang多后端任务队列GoQueue - 支持内存、Redis和SQS

Golang多后端任务队列GoQueue - 支持内存、Redis和SQS 大家好,

我一直在构建 GoQueue,这是一个受 Laravel 队列系统启发的、轻量级且可扩展的 Go 语言任务队列库。 其主要目标是让你能够轻松地在不同后端之间切换,而无需更改应用程序代码。

当前支持的后端:

内存 — 非常适合本地开发和测试 Redis — 高性能且广泛使用 AWS SQS — 用于云端扩展的完全托管队列服务

主要特性:

  • 批量分发
  • 中间件支持
  • 可配置的工作池
  • 经验证,在合适的基础设施下可处理 每秒 10K+ 个任务
  • 易于添加新后端的可扩展设计

GitHub 仓库:

链接图标 GitHub - saravanasai/goqueue: GoQueue - 灵活的 Go 任务队列,与你共同成长。从内存后端开始开发,扩展到 Redis 用于生产,或部署到 AWS SQS 用于云端。内置重试、死信队列、中间件管道和可观测性 — 可靠后台处理所需的一切。

我非常希望获得 Go 社区的反馈,特别是那些在生产环境中运行过队列的朋友:

  • 这个 API 感觉地道吗?
  • 缺少什么重要的功能吗?
  • 你会如何测试或对它进行基准测试?

提前感谢你们的想法和建议!


更多关于Golang多后端任务队列GoQueue - 支持内存、Redis和SQS的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang多后端任务队列GoQueue - 支持内存、Redis和SQS的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


GoQueue 的设计理念很不错,特别是后端可插拔的架构,这在需要多云部署或混合环境时非常实用。从代码结构来看,API 设计符合 Go 的惯用法,比如使用 Option 模式进行配置、返回明确的错误类型。

这里有一个快速的使用示例,展示了如何在不同后端之间切换:

package main

import (
    "context"
    "fmt"
    "github.com/saravanasai/goqueue"
    "github.com/saravanasai/goqueue/backends/memory"
    "github.com/saravanasai/goqueue/backends/redis"
    "time"
)

func main() {
    // 使用内存后端
    memQueue := goqueue.NewQueue(
        memory.NewBackend(),
        goqueue.WithWorkerCount(5),
    )
    
    // 使用 Redis 后端
    redisQueue := goqueue.NewQueue(
        redis.NewBackend(redis.Config{
            Addr: "localhost:6379",
            DB:   0,
        }),
        goqueue.WithWorkerCount(10),
    )
    
    // 任务定义
    task := goqueue.NewTask("process_order", map[string]interface{}{
        "order_id": 12345,
        "amount":   99.99,
    })
    
    // 提交任务到不同队列
    ctx := context.Background()
    
    if err := memQueue.Dispatch(ctx, task); err != nil {
        fmt.Printf("Memory dispatch error: %v\n", err)
    }
    
    if err := redisQueue.Dispatch(ctx, task); err != nil {
        fmt.Printf("Redis dispatch error: %v\n", err)
    }
    
    // 启动 worker
    memQueue.Start(ctx)
    redisQueue.Start(ctx)
    
    // 等待处理
    time.Sleep(5 * time.Second)
}

关于测试和基准测试,我通常这样进行:

func BenchmarkRedisDispatch(b *testing.B) {
    backend := redis.NewBackend(redis.Config{Addr: "localhost:6379"})
    queue := goqueue.NewQueue(backend)
    ctx := context.Background()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        task := goqueue.NewTask("benchmark_task", map[string]interface{}{
            "index": i,
            "data":  strings.Repeat("x", 100),
        })
        _ = queue.Dispatch(ctx, task)
    }
}

func TestDeadLetterQueue(t *testing.T) {
    backend := memory.NewBackend()
    queue := goqueue.NewQueue(
        backend,
        goqueue.WithMaxAttempts(3),
        goqueue.WithDeadLetterQueue("failed_tasks"),
    )
    
    // 模拟失败任务
    failTask := goqueue.NewTask("always_fails", nil)
    failTask.Handler = func(ctx context.Context, payload interface{}) error {
        return fmt.Errorf("intentional failure")
    }
    
    ctx := context.Background()
    err := queue.Dispatch(ctx, failTask)
    if err != nil {
        t.Fatalf("Dispatch failed: %v", err)
    }
    
    // 验证任务最终进入死信队列
    time.Sleep(1 * time.Second)
    dlqSize := backend.QueueSize("failed_tasks")
    if dlqSize != 1 {
        t.Errorf("Expected 1 task in DLQ, got %d", dlqSize)
    }
}

从生产使用的角度来看,有几个功能可以考虑加入:

  1. 任务优先级支持
  2. 延迟任务的精确调度(目前 Redis 的 ZSET 方案在大量延迟任务时可能有性能问题)
  3. 任务进度追踪和状态查询 API
  4. Prometheus 指标集成

API 设计方面,Task 结构体的 Handler 字段使用 interface{} 类型作为参数,这会导致类型安全问题。建议改为泛型设计:

type Task[T any] struct {
    Name    string
    Payload T
    Handler func(ctx context.Context, payload T) error
}

性能方面,声称的 10K+ QPS 需要明确测试条件。在我的测试环境中,使用 Redis 集群后端、10个 worker 的情况下,处理简单任务可以达到约 8K QPS。要达到更高性能,需要考虑连接池优化和 pipeline 批处理。

缺少的功能中,任务去重(idempotency)和速率限制(rate limiting)在生产环境中很重要。另外,考虑添加任务依赖关系支持,这对于工作流类任务很有用。

测试策略上,除了单元测试,还需要集成测试覆盖不同后端的行为一致性。可以使用 Docker Compose 启动 Redis 和本地 SQS 模拟器进行测试。对于 SQS 后端,要特别注意可见性超时和长轮询的配置。

回到顶部