Rust异步任务队列库apalis-core的使用:高效处理后台作业与分布式任务调度
Rust异步任务队列库apalis-core的使用:高效处理后台作业与分布式任务调度
特性
- 简单可预测的任务处理模型
- 任务处理器只需是带有宏的异步函数API
- 类似actix和axum的熟悉依赖注入方式
- 充分利用tower生态系统的中间件、服务和工具
- 易于扩展,默认支持分布式后端
- 运行时无关 - 支持tokio、smol等
- 内置并发和并行处理
- 工作监控和平滑关闭
- 通过API轻松暴露任务和工作
- 持久化定时任务,可将定时任务分发到其他后端
- 可选Web界面帮助管理工作
支持的存储后端
后端类型 | 状态 | 示例 |
---|---|---|
Redis | ✓ | 示例 |
Sqlite | ✓ | 示例 |
Postgres | ✓ | 示例 |
MySQL | ✓ | 示例 |
AMQP | ✓ | 示例 |
快速开始
在Cargo.toml中添加:
[dependencies]
apalis = { version = "0.7", features = "limit" } # 并发限制
apalis-redis = { version = "0.7" } # 使用Redis作为持久化后端
使用示例
use apalis::prelude::*;
use apalis_redis::RedisStorage;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
struct Email {
to: String,
}
/// 为每个任务调用的函数
async fn send_email(job: Email, data: Data<usize>) -> Result<(), Error> {
/// 执行任务
Ok(())
}
#[tokio::main]
async fn main() {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();
let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
let conn = apalis_redis::connect(redis_url).await.expect("Could not connect");
let storage = RedisStorage::new(conn);
WorkerBuilder::new("email-worker")
.concurrency(2)
.data(0usize)
.backend(storage)
.build_fn(send_email)
.run()
.await;
}
然后在程序的其他部分或另一个应用程序(如HTTP服务器)中:
// 可以在程序的另一部分或另一个应用程序中,如HTTP服务器
async fn produce_route_jobs(storage: &mut RedisStorage<Email>) -> Result<()> {
storage
.push(Email {
to: "test@example.com".to_string(),
})
.await?
}
分步任务示例
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
std::env::set_var("RUST_LOG", "debug");
tracing_subscriber::fmt::init();
let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
let conn = apalis_redis::connect(redis_url).await.unwrap();
let config = apalis_redis::Config::default().set_namespace("stepped-email-task");
let mut storage = RedisStorage::new_with_config(conn, config);
storage
.start_stepped(WelcomeEmail { user_id: 1 })
.await
.unwrap();
// 构建步骤
let steps = StepBuilder::new()
.step_fn(welcome) // 步骤是tower服务
.step_fn(campaign)
.step_fn(complete_campaign);
WorkerBuilder::new("tasty-banana")
.data(()) // 所有步骤共享的数据
.enable_tracing()
.concurrency(2)
.backend(storage)
.build_极速处理后台作业与分布式任务调度
基于您提供的内容,我将整理出关于Rust异步任务队列库apalis-core的完整使用指南,包含特性介绍、快速开始、完整示例等内容。
## 完整示例Demo
以下是一个完整的Redis后端任务队列示例:
```rust
use apalis::prelude::*;
use apalis_redis::RedisStorage;
use serde::{Deserialize, Serialize};
use std::env;
#[derive(Debug, Deserialize, Serialize)]
struct EmailTask {
to: String,
subject: String,
body: String,
}
// 任务处理函数
async fn process_email(task: EmailTask, _ctx: Data<()>) -> Result<(), JobError> {
println!("Sending email to: {}", task.to);
println!("Subject: {}", task.subject);
println!("Body: {}", task.body);
// 这里可以添加实际的邮件发送逻辑
Ok(())
}
#[tokio::main]
async fn main() {
// 初始化日志
env::set_var("RUST_LOG", "info");
env_logger::init();
// 连接到Redis
let redis_url = env::var("REDIS_URL").unwrap_or("redis://127.0.0.1/".to_string());
let redis = apalis_redis::connect(redis_url).await.expect("无法连接Redis");
// 创建存储后端
let storage = RedisStorage::new(redis);
// 构建并启动工作者
Monitor::new()
.register(
WorkerBuilder::new("email-worker")
.with_storage(storage.clone())
.build_fn(process_email)
)
.run()
.await
.expect("工作者监控器运行失败");
// 在另一个线程中添加任务
tokio::spawn(async move {
let task = EmailTask {
to: "user@example.com".to_string(),
subject: "Welcome".to_string(),
body: "Thank you for signing up!".to_string(),
};
storage.push(task).await.expect("无法添加任务到队列");
println!("任务已添加到队列");
});
}
这个示例展示了如何:
- 定义任务结构体
- 创建任务处理函数
- 设置Redis存储后端
- 启动工作者监控
- 在另一个线程中添加任务到队列
要运行此示例,你需要:
- 安装Redis并确保其运行
- 添加依赖到Cargo.toml:
[dependencies] apalis = { version = "0.7", features = ["limit"] } apalis-redis = "0.7" serde = { version = "1.0", features = ["derive"] } tokio = { version = "1.0", features = ["full"] } env_logger = "0.9"
1 回复
Rust异步任务队列库apalis-core的使用:高效处理后台作业与分布式任务调度
介绍
apalis-core是一个Rust编写的异步任务队列库,专注于高效处理后台作业和分布式任务调度。它提供了以下核心特性:
- 基于async/await的异步任务处理
- 支持多种存储后端(Redis、PostgreSQL、SQLite等)
- 任务重试机制
- 任务优先级支持
- 分布式任务调度能力
- 与Tokio运行时深度集成
安装
在Cargo.toml中添加依赖:
[dependencies]
apalis-core = "0.5"
tokio = { version = "1.0", features = ["full"] }
基本使用方法
1. 定义任务
use apalis_core::job::{Job, JobContext};
#[derive(Debug, Clone)]
struct EmailJob {
to: String,
subject: String,
body: String,
}
#[async_trait::async_trait]
impl Job for EmailJob {
type Output = ();
async fn run(self, ctx: JobContext) -> Result<Self::Output, anyhow::Error> {
println!("Sending email to {} with subject '{}'", self.to, self.subject);
// 这里实际发送邮件...
Ok(())
}
}
2. 创建并运行Worker
use apalis_core::{builder::WorkerBuilder, storage::SqliteStorage};
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// 使用SQLite作为存储后端
let storage = SqliteStorage::connect("sqlite://jobs.db").await?;
// 创建worker
let worker = WorkerBuilder::new("email-worker")
.with_storage(storage)
.build::<EmailJob>();
// 运行worker
worker.run().await?;
Ok(())
}
3. 提交任务
use apalis_core::storage::Storage;
async fn submit_email_job(storage: &SqliteStorage) -> Result<(), anyhow::Error> {
let job = EmailJob {
to: "user@example.com".to_string(),
subject: "Welcome".to_string(),
body: "Thank you for signing up!".to_string(),
};
storage.push(job).await?;
Ok(())
}
高级功能
任务重试
#[async_trait::async_trait]
impl Job for EmailJob {
type Output = ();
// 设置最大重试次数
fn max_retries(&self) -> i32 {
3
}
// 设置重试间隔
fn backoff(&self, attempt: u32) -> std::time::Duration {
std::time::Duration::from_secs(60 * attempt as u64)
}
async fn run(self, ctx: JobContext) -> Result<Self::Output, anyhow::Error> {
// 实现略...
}
}
使用Redis作为后端
use apalis_core::storage::RedisStorage;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let redis_url = "redis://127.0.0.1/";
let storage = RedisStorage::connect(redis_url).await?;
let worker = WorkerBuilder::new("redis-worker")
.with_storage(storage)
.build::<EmailJob>();
worker.run().await?;
Ok(())
}
分布式任务调度
use apalis_core::scheduler::Scheduler;
async fn schedule_daily_report(storage极抱歉,您的内容似乎被截断了。我将基于您提供的完整内容,给出一个完整的示例demo:
```rust
//! 完整示例:使用apalis-core实现电子邮件任务队列
use apalis_core::{
builder::WorkerBuilder,
job::{Job, JobContext},
scheduler::Scheduler,
storage::{RedisStorage, SqliteStorage, Storage},
};
use async_trait::async_trait;
use chrono::Utc;
use std::time::Duration;
use tokio::time::sleep;
// 1. 定义任务类型
#[derive(Debug, Clone)]
struct EmailJob {
to: String,
subject: String,
body: String,
}
// 2. 实现Job trait
#[async_trait]
impl Job for EmailJob {
type Output = ();
// 最大重试次数
fn max_retries(&self) -> i32 {
3
}
// 重试间隔
fn backoff(&self, attempt: u32) -> Duration {
Duration::from_secs(60 * attempt as u64)
}
async fn run(self, ctx: JobContext) -> Result<Self::Output, anyhow::Error> {
println!(
"[{}] Processing email to: {}, subject: {}",
Utc::now(),
self.to,
self.subject
);
// 模拟邮件发送失败
if self.to.contains("fail") {
return Err(anyhow::anyhow!("Failed to send email to {}", self.to));
}
// 模拟邮件发送成功
sleep(Duration::from_secs(1)).await;
println!("Email sent successfully to {}", self.to);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// 3. 初始化存储后端 (这里使用Redis)
let redis_url = "redis://127.0.0.1/";
let storage = RedisStorage::connect(redis_url).await?;
// 4. 创建并启动worker
let worker = WorkerBuilder::new("email-worker")
.with_storage(storage.clone())
.build::<EmailJob>();
tokio::spawn(async move {
worker.run().await.expect("Worker failed");
});
// 5. 添加定时任务
let scheduler = Scheduler::new(storage.clone());
scheduler
.cron("0 * * * * *", || {
// 每分钟执行一次
EmailJob {
to: "cron@example.com".to_string(),
subject: "Scheduled Email".to_string(),
body: "This is a scheduled email".to_string(),
}
})
.await?;
// 6. 提交即时任务
storage
.push(EmailJob {
to: "user@example.com".to_string(),
subject: "Welcome".to_string(),
body: "Thank you for signing up!".to_string(),
})
.await?;
// 7. 提交会失败的任务(测试重试机制)
storage
.push(EmailJob {
to: "fail@example.com".to_string(),
subject: "Test Failure".to_string(),
body: "This should fail".to_string(),
})
.await?;
// 保持程序运行
loop {
sleep(Duration::from_secs(10)).await;
}
}
这个完整示例演示了:
- 定义EmailJob任务类型
- 实现Job trait并配置重试策略
- 使用Redis作为存储后端
- 创建并运行worker处理任务
- 设置定时任务(每分钟执行)
- 提交即时任务
- 测试失败重试机制
要运行此示例,需要:
- 安装并运行Redis服务器
- 在Cargo.toml中添加依赖:
[dependencies] apalis-core = "0.5" tokio = { version = "1.0", features = ["full"] } async-trait = "0.1" anyhow = "1.0" chrono = "0.4"