golang实现RESTful异步任务队列服务的插件库Ratus的使用
Golang实现RESTful异步任务队列服务的插件库Ratus的使用
简介
Ratus是一个RESTful异步任务队列服务器。它将分布式任务队列的概念转化为一组符合REST原则的资源,并为各种后端提供一致的HTTP API。
Ratus的主要特性包括:
- 自带快速内存存储的自包含二进制文件
- 支持多种嵌入式或外部存储引擎
- 保证任务至少执行一次
- 统一优先级和时间调度的模型
- 任务级超时控制与自动恢复
- 内置Swagger UI的语言无关RESTful API
- 动态消费者数量间的负载均衡
- 通过复制和分区实现水平扩展
- 原生支持Prometheus和Kubernetes
快速开始
安装
Ratus提供多种安装选项:
- Docker镜像可在Docker Hub和GitHub Packages获取
- Kubernetes和Docker Compose示例可在deployments目录找到
- 预编译二进制文件可在GitHub releases页面下载
- 从源码构建:
go install github.com/hyperonym/ratus/cmd/ratus@latest
从命令行运行Ratus非常简单:
$ ratus
上述命令将使用默认的内存存储引擎memdb
启动一个临时的Ratus实例,并监听默认的HTTP端口80。
要使用其他端口并启用磁盘快照持久化:
$ ratus --port 8000 --engine memdb --memdb-snapshot-path ratus.db
基本用法
cURL示例
生产者创建一个新任务并将其推送到example
主题:
$ curl -X POST -d '{"payload": "hello world"}' "http://127.0.0.1:8000/v1/topics/example/tasks/1"
消费者可以做出承诺来声明并执行example
主题中的下一个任务:
$ curl -X POST "http://127.0.0.1:8000/v1/topics/example/promises?timeout=30s"
执行完任务后,记得通过提交确认任务已完成:
$ curl -X PATCH "http://127.0.0.1:8000/v1/topics/example/tasks/1"
Go客户端示例
Ratus附带了一个Go客户端库,封装了所有API调用,并提供了惯用的轮询-执行-提交工作流。以下是完整示例:
package main
import (
"context"
"log"
"time"
"github.com/hyperonym/ratus"
)
func main() {
// 创建客户端实例
client := ratus.NewClient("http://127.0.0.1:8000")
// 生产者:创建任务
task := &ratus.Task{
ID: "1",
Topic: "example",
Payload: "hello world",
}
if _, err := client.InsertTask(context.Background(), task); err != nil {
log.Fatal(err)
}
// 消费者:轮询并执行任务
for {
// 轮询任务,设置30秒超时
promise, err := client.Poll(context.Background(), "example", &ratus.Promise{
Timeout: 30 * time.Second,
})
if err != nil {
log.Println("Poll error:", err)
time.Sleep(5 * time.Second)
continue
}
// 处理任务
log.Println("Processing task:", promise.ID)
log.Println("Payload:", promise.Payload)
// 模拟任务处理
time.Sleep(2 * time.Second)
// 提交任务完成状态
commit := &ratus.Commit{
State: ratus.TaskStateCompleted,
}
if _, err := client.Commit(context.Background(), "example", promise.ID, commit); err != nil {
log.Println("Commit error:", err)
}
}
}
核心概念
数据模型
- Task:引用一个应异步执行的幂等工作单元
- Topic:指具有相同主题名称属性的任务的有序子集
- Promise:表示对活动任务所有权的声明
- Commit:包含要应用于任务的一组更新
工作流
- Producer客户端将带有预期执行时间(计划时间)的任务推送到主题
- Consumer客户端承诺执行从主题轮询的任务,并在完成后确认提交
任务状态
- pending (0):任务已准备好执行或等待将来执行
- active (1):任务正在被消费者处理
- completed (2):任务已完成执行
- archived (3):任务作为存档存储
存储引擎
Ratus为各种后端提供一致的API,允许用户根据需要选择特定引擎而无需修改客户端代码。
MemDB
MemDB是Ratus的默认存储引擎,基于不可变的基数树实现。适用于开发和对持久性要求不高的生产环境。
持久化
MemDB存储引擎默认是临时的,但也提供基于快照的持久化选项。
MongoDB
Ratus最适合与MongoDB版本~4.4一起使用。也支持MongoDB 5.0+,但需要额外考虑。
复制
使用MongoDB存储引擎时,Ratus实例本身是无状态的。为实现高可用性,启动多个Ratus实例并将它们连接到同一个MongoDB副本集。
可观测性
指标和标签
Ratus在/metrics
端点暴露以下Prometheus指标:
名称 | 类型 | 标签 |
---|---|---|
ratus_request_duration_seconds | histogram | topic, method, endpoint, status_code |
ratus_chore_duration_seconds | histogram | - |
ratus_task_schedule_delay_seconds | gauge | topic, producer, consumer |
ratus_task_execution_duration_seconds | gauge | topic, producer, consumer |
ratus_task_produced_count_total | counter | topic, producer |
ratus_task_consumed_count_total | counter | topic, producer, consumer |
ratus_task_committed_count_total | counter | topic, producer, consumer |
存活和就绪检查
Ratus通过HTTP GET请求支持存活和就绪检查:
/livez
端点返回状态码200表示实例正在运行/readyz
端点返回状态码200表示实例已准备好接收流量
注意事项
- 🚨 主题名称和任务ID不得包含加号(’+’)
- 不建议将Ratus作为任务的主要存储
- Ratus是Celery等任务队列的简单高效替代品
更多关于golang实现RESTful异步任务队列服务的插件库Ratus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现RESTful异步任务队列服务的插件库Ratus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用Ratus实现Golang RESTful异步任务队列服务
Ratus是一个轻量级的Golang异步任务队列服务框架,它提供了RESTful API接口,非常适合构建分布式任务处理系统。下面我将详细介绍如何使用Ratus实现异步任务队列服务。
Ratus核心概念
- 任务(Task): 需要异步执行的工作单元
- 承诺(Promise): 任务被消费者获取后的状态
- 主题(Topic): 任务的分类/队列名称
安装Ratus
go get github.com/hyperonym/ratus
基本使用示例
1. 创建Ratus服务
package main
import (
"context"
"log"
"net/http"
"github.com/hyperonym/ratus"
"github.com/hyperonym/ratus/engine/memory"
)
func main() {
// 创建内存引擎实例(生产环境可用Redis或MongoDB引擎)
engine := memory.New()
// 创建Ratus服务实例
service := ratus.New(engine)
// 配置HTTP服务器
server := &http.Server{
Addr: ":8080",
Handler: service,
}
// 启动服务
log.Println("Ratus service starting on :8080")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Failed to start server: %v", err)
}
}
2. 生产者代码示例
package main
import (
"context"
"log"
"time"
"github.com/hyperonym/ratus"
)
func produceTasks() {
// 创建Ratus客户端
client := ratus.NewClient("http://localhost:8080")
// 创建任务
task := &ratus.Task{
ID: "task-1",
Topic: "email-queue",
Payload: "{\"to\":\"user@example.com\",\"subject\":\"Welcome\"}",
}
// 提交任务
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
created, err := client.CreateTask(ctx, task)
if err != nil {
log.Printf("Failed to create task: %v", err)
return
}
log.Printf("Task created: %s", created.ID)
}
3. 消费者代码示例
package main
import (
"context"
"log"
"time"
"github.com/hyperonym/ratus"
)
func consumeTasks() {
// 创建Ratus客户端
client := ratus.NewClient("http://localhost:8080")
for {
// 获取任务
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 从"email-queue"主题获取任务
promise, err := client.Poll(ctx, "email-queue", &ratus.PollRequest{
Limit: 1,
})
if err != nil {
log.Printf("Failed to poll task: %v", err)
time.Sleep(5 * time.Second)
continue
}
if len(promise.Tasks) == 0 {
log.Println("No tasks available, waiting...")
time.Sleep(5 * time.Second)
continue
}
task := promise.Tasks[0]
// 处理任务
log.Printf("Processing task %s: %s", task.ID, task.Payload)
// TODO: 实际业务逻辑处理
// 标记任务完成
_, err = client.Commit(ctx, &ratus.CommitRequest{
ID: task.ID,
State: ratus.TaskStateCompleted,
Result: "{\"status\":\"success\"}",
})
if err != nil {
log.Printf("Failed to commit task: %v", err)
}
}
}
高级功能
1. 延迟任务
// 创建延迟5分钟执行的任务
task := &ratus.Task{
ID: "delayed-task",
Topic: "delayed-queue",
Payload: "{\"action\":\"reminder\"}",
Scheduled: time.Now().Add(5 * time.Minute).Format(time.RFC3339),
}
2. 任务重试
// 创建带重试策略的任务
task := &ratus.Task{
ID: "retry-task",
Topic: "retry-queue",
Payload: "{\"action\":\"process\"}",
Policy: &ratus.Policy{
RetryLimit: 3, // 最大重试次数
RetryDelay: "10s", // 重试间隔
},
}
3. 使用Redis引擎
import "github.com/hyperonym/ratus/engine/redis"
func main() {
// 创建Redis引擎
engine, err := redis.New(&redis.Options{
Addr: "localhost:6379",
Password: "", // 无密码
DB: 0, // 默认DB
})
if err != nil {
log.Fatal(err)
}
// 创建Ratus服务
service := ratus.New(engine)
// ... 其余代码相同
}
最佳实践
- 任务ID生成: 使用UUID或其他分布式唯一ID生成算法
- 错误处理: 消费者应妥善处理错误并适当重试
- 监控: 实现任务处理监控和告警机制
- 幂等性: 确保任务处理逻辑是幂等的
- 资源限制: 控制并发任务数量防止资源耗尽
RESTful API示例
Ratus提供了标准的RESTful接口:
POST /tasks
- 创建任务GET /topics/{topic}/tasks
- 获取任务DELETE /tasks/{id}
- 删除任务PATCH /tasks/{id}
- 更新任务状态
通过以上代码和说明,您可以快速构建一个基于Ratus的异步任务队列服务。根据实际需求,您可以选择内存引擎(适合开发和测试)或Redis/MongoDB引擎(适合生产环境)。