golang实现RESTful异步任务队列服务的插件库Ratus的使用

Golang实现RESTful异步任务队列服务的插件库Ratus的使用

简介

Ratus是一个RESTful异步任务队列服务器。它将分布式任务队列的概念转化为一组符合REST原则的资源,并为各种后端提供一致的HTTP API。

Ratus的主要特性包括:

  • 自带快速内存存储的自包含二进制文件
  • 支持多种嵌入式或外部存储引擎
  • 保证任务至少执行一次
  • 统一优先级和时间调度的模型
  • 任务级超时控制与自动恢复
  • 内置Swagger UI的语言无关RESTful API
  • 动态消费者数量间的负载均衡
  • 通过复制和分区实现水平扩展
  • 原生支持Prometheus和Kubernetes

终端截图

快速开始

安装

Ratus提供多种安装选项:

  • Docker镜像可在Docker Hub和GitHub Packages获取
  • Kubernetes和Docker Compose示例可在deployments目录找到
  • 预编译二进制文件可在GitHub releases页面下载
  • 从源码构建:go install github.com/hyperonym/ratus/cmd/ratus@latest

从命令行运行Ratus非常简单:

$ ratus

上述命令将使用默认的内存存储引擎memdb启动一个临时的Ratus实例,并监听默认的HTTP端口80。

要使用其他端口并启用磁盘快照持久化:

$ ratus --port 8000 --engine memdb --memdb-snapshot-path ratus.db

基本用法

cURL示例

生产者创建一个新任务并将其推送到example主题:

$ curl -X POST -d '{"payload": "hello world"}' "http://127.0.0.1:8000/v1/topics/example/tasks/1"

消费者可以做出承诺来声明并执行example主题中的下一个任务:

$ curl -X POST "http://127.0.0.1:8000/v1/topics/example/promises?timeout=30s"

执行完任务后,记得通过提交确认任务已完成:

$ curl -X PATCH "http://127.0.0.1:8000/v1/topics/example/tasks/1"

Go客户端示例

Ratus附带了一个Go客户端库,封装了所有API调用,并提供了惯用的轮询-执行-提交工作流。以下是完整示例:

package main

import (
	"context"
	"log"
	"time"

	"github.com/hyperonym/ratus"
)

func main() {
	// 创建客户端实例
	client := ratus.NewClient("http://127.0.0.1:8000")

	// 生产者:创建任务
	task := &ratus.Task{
		ID:      "1",
		Topic:   "example",
		Payload: "hello world",
	}
	if _, err := client.InsertTask(context.Background(), task); err != nil {
		log.Fatal(err)
	}

	// 消费者:轮询并执行任务
	for {
		// 轮询任务,设置30秒超时
		promise, err := client.Poll(context.Background(), "example", &ratus.Promise{
			Timeout: 30 * time.Second,
		})
		if err != nil {
			log.Println("Poll error:", err)
			time.Sleep(5 * time.Second)
			continue
		}

		// 处理任务
		log.Println("Processing task:", promise.ID)
		log.Println("Payload:", promise.Payload)

		// 模拟任务处理
		time.Sleep(2 * time.Second)

		// 提交任务完成状态
		commit := &ratus.Commit{
			State: ratus.TaskStateCompleted,
		}
		if _, err := client.Commit(context.Background(), "example", promise.ID, commit); err != nil {
			log.Println("Commit error:", err)
		}
	}
}

核心概念

数据模型

  • Task:引用一个应异步执行的幂等工作单元
  • Topic:指具有相同主题名称属性的任务的有序子集
  • Promise:表示对活动任务所有权的声明
  • Commit:包含要应用于任务的一组更新

工作流

  • Producer客户端将带有预期执行时间(计划时间)的任务推送到主题
  • Consumer客户端承诺执行从主题轮询的任务,并在完成后确认提交

任务状态

  • pending (0):任务已准备好执行或等待将来执行
  • active (1):任务正在被消费者处理
  • completed (2):任务已完成执行
  • archived (3):任务作为存档存储

存储引擎

Ratus为各种后端提供一致的API,允许用户根据需要选择特定引擎而无需修改客户端代码。

MemDB

MemDB是Ratus的默认存储引擎,基于不可变的基数树实现。适用于开发和对持久性要求不高的生产环境

持久化

MemDB存储引擎默认是临时的,但也提供基于快照的持久化选项。

MongoDB

Ratus最适合与MongoDB版本~4.4一起使用。也支持MongoDB 5.0+,但需要额外考虑。

复制

使用MongoDB存储引擎时,Ratus实例本身是无状态的。为实现高可用性,启动多个Ratus实例并将它们连接到同一个MongoDB副本集

可观测性

指标和标签

Ratus在/metrics端点暴露以下Prometheus指标:

名称 类型 标签
ratus_request_duration_seconds histogram topic, method, endpoint, status_code
ratus_chore_duration_seconds histogram -
ratus_task_schedule_delay_seconds gauge topic, producer, consumer
ratus_task_execution_duration_seconds gauge topic, producer, consumer
ratus_task_produced_count_total counter topic, producer
ratus_task_consumed_count_total counter topic, producer, consumer
ratus_task_committed_count_total counter topic, producer, consumer

存活和就绪检查

Ratus通过HTTP GET请求支持存活和就绪检查:

  • /livez端点返回状态码200表示实例正在运行
  • /readyz端点返回状态码200表示实例已准备好接收流量

注意事项

  • 🚨 主题名称和任务ID不得包含加号(’+’)
  • 不建议将Ratus作为任务的主要存储
  • Ratus是Celery等任务队列的简单高效替代品

更多关于golang实现RESTful异步任务队列服务的插件库Ratus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现RESTful异步任务队列服务的插件库Ratus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用Ratus实现Golang RESTful异步任务队列服务

Ratus是一个轻量级的Golang异步任务队列服务框架,它提供了RESTful API接口,非常适合构建分布式任务处理系统。下面我将详细介绍如何使用Ratus实现异步任务队列服务。

Ratus核心概念

  1. 任务(Task): 需要异步执行的工作单元
  2. 承诺(Promise): 任务被消费者获取后的状态
  3. 主题(Topic): 任务的分类/队列名称

安装Ratus

go get github.com/hyperonym/ratus

基本使用示例

1. 创建Ratus服务

package main

import (
	"context"
	"log"
	"net/http"

	"github.com/hyperonym/ratus"
	"github.com/hyperonym/ratus/engine/memory"
)

func main() {
	// 创建内存引擎实例(生产环境可用Redis或MongoDB引擎)
	engine := memory.New()

	// 创建Ratus服务实例
	service := ratus.New(engine)

	// 配置HTTP服务器
	server := &http.Server{
		Addr:    ":8080",
		Handler: service,
	}

	// 启动服务
	log.Println("Ratus service starting on :8080")
	if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
		log.Fatalf("Failed to start server: %v", err)
	}
}

2. 生产者代码示例

package main

import (
	"context"
	"log"
	"time"

	"github.com/hyperonym/ratus"
)

func produceTasks() {
	// 创建Ratus客户端
	client := ratus.NewClient("http://localhost:8080")

	// 创建任务
	task := &ratus.Task{
		ID:      "task-1",
		Topic:   "email-queue",
		Payload: "{\"to\":\"user@example.com\",\"subject\":\"Welcome\"}",
	}

	// 提交任务
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	created, err := client.CreateTask(ctx, task)
	if err != nil {
		log.Printf("Failed to create task: %v", err)
		return
	}

	log.Printf("Task created: %s", created.ID)
}

3. 消费者代码示例

package main

import (
	"context"
	"log"
	"time"

	"github.com/hyperonym/ratus"
)

func consumeTasks() {
	// 创建Ratus客户端
	client := ratus.NewClient("http://localhost:8080")

	for {
		// 获取任务
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()

		// 从"email-queue"主题获取任务
		promise, err := client.Poll(ctx, "email-queue", &ratus.PollRequest{
			Limit: 1,
		})
		if err != nil {
			log.Printf("Failed to poll task: %v", err)
			time.Sleep(5 * time.Second)
			continue
		}

		if len(promise.Tasks) == 0 {
			log.Println("No tasks available, waiting...")
			time.Sleep(5 * time.Second)
			continue
		}

		task := promise.Tasks[0]

		// 处理任务
		log.Printf("Processing task %s: %s", task.ID, task.Payload)
		// TODO: 实际业务逻辑处理

		// 标记任务完成
		_, err = client.Commit(ctx, &ratus.CommitRequest{
			ID:     task.ID,
			State:  ratus.TaskStateCompleted,
			Result: "{\"status\":\"success\"}",
		})
		if err != nil {
			log.Printf("Failed to commit task: %v", err)
		}
	}
}

高级功能

1. 延迟任务

// 创建延迟5分钟执行的任务
task := &ratus.Task{
	ID:      "delayed-task",
	Topic:   "delayed-queue",
	Payload: "{\"action\":\"reminder\"}",
	Scheduled: time.Now().Add(5 * time.Minute).Format(time.RFC3339),
}

2. 任务重试

// 创建带重试策略的任务
task := &ratus.Task{
	ID:      "retry-task",
	Topic:   "retry-queue",
	Payload: "{\"action\":\"process\"}",
	Policy: &ratus.Policy{
		RetryLimit: 3,  // 最大重试次数
		RetryDelay: "10s", // 重试间隔
	},
}

3. 使用Redis引擎

import "github.com/hyperonym/ratus/engine/redis"

func main() {
	// 创建Redis引擎
	engine, err := redis.New(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // 无密码
		DB:       0,  // 默认DB
	})
	if err != nil {
		log.Fatal(err)
	}

	// 创建Ratus服务
	service := ratus.New(engine)
	// ... 其余代码相同
}

最佳实践

  1. 任务ID生成: 使用UUID或其他分布式唯一ID生成算法
  2. 错误处理: 消费者应妥善处理错误并适当重试
  3. 监控: 实现任务处理监控和告警机制
  4. 幂等性: 确保任务处理逻辑是幂等的
  5. 资源限制: 控制并发任务数量防止资源耗尽

RESTful API示例

Ratus提供了标准的RESTful接口:

  • POST /tasks - 创建任务
  • GET /topics/{topic}/tasks - 获取任务
  • DELETE /tasks/{id} - 删除任务
  • PATCH /tasks/{id} - 更新任务状态

通过以上代码和说明,您可以快速构建一个基于Ratus的异步任务队列服务。根据实际需求,您可以选择内存引擎(适合开发和测试)或Redis/MongoDB引擎(适合生产环境)。

回到顶部