Rust异步任务队列库Apalis的使用,高效处理后台任务和定时作业
Rust异步任务队列库Apalis的使用,高效处理后台任务和定时作业
安装
在项目目录中运行以下Cargo命令:
cargo add apalis
或在Cargo.toml中添加:
apalis = "0.7.2"
基本使用示例
以下是一个使用Apalis处理异步任务的完整示例:
use apalis::{prelude::*, redis::RedisStorage};
use serde::{Deserialize, Serialize};
// 定义一个任务
#[derive(Debug, Serialize, Deserialize)]
struct Email {
to: String,
message: String,
}
impl Job for Email {
const NAME: &'static str = "apalis::Email";
}
// 任务处理器
async fn send_email(job: Email, _ctx: JobContext) -> Result<(), JobError> {
println!("Sending email to {}: {}", job.to, job.message);
Ok(())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 创建Redis存储
let storage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap();
// 创建工作者
let worker = Worker::new("email-worker")
.with_storage(storage.clone())
.with_job::<Email>(send_email);
// 启动工作者
let monitor = Monitor::new()
.register(worker)
.run()
.await;
// 添加任务到队列
storage
.push(Email {
to: "user@example.com".to_string(),
message: "Hello from Apalis!".to_string(),
})
.await
.unwrap();
monitor.await
}
定时任务示例
Apalis还支持定时任务调度:
use apalis::{prelude::*, cron::CronStream, redis::RedisStorage};
use std::time::Duration;
use async_trait::async_trait;
#[derive(Debug, Clone)]
struct ScheduledJob;
#[async_trait]
impl Job for ScheduledJob {
const NAME: &'static str = "apalis::ScheduledJob";
async fn run(&self, _ctx: JobContext) -> Result<(), JobError> {
println!("Running scheduled job at {}", chrono::Local::now());
Ok(())
}
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let storage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap();
// 创建定时任务流 (每5秒执行一次)
let cron = CronStream::new("@every 5s")
.duration(Duration::from_secs(5))
.with_job(ScheduledJob);
// 创建工作者
let worker = Worker::new("cron-worker")
.with_storage(storage.clone())
.with_cron(cron);
// 启动监控
Monitor::new()
.register(worker)
.run()
.await
.await
}
完整示例:使用SQLite存储的任务队列
下面是一个使用SQLite作为存储后端的完整示例:
use apalis::{prelude::*, sqlite::SqliteStorage};
use serde::{Deserialize, Serialize};
use std::path::Path;
// 定义日志清理任务
#[derive(Debug, Serialize, Deserialize)]
struct CleanLogsJob {
path: String,
days_to_keep: u32,
}
impl Job for CleanLogsJob {
const NAME: &'static str = "apalis::CleanLogsJob";
}
// 任务处理器
async fn clean_logs(job: CleanLogsJob, _ctx: JobContext) -> Result<(), JobError> {
println!("Cleaning logs at path: {}, keeping last {} days",
job.path, job.days_to_keep);
// 这里添加实际的日志清理逻辑
Ok(())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 创建SQLite存储
let storage = SqliteStorage::new("sqlite::memory:").await.unwrap();
// 初始化数据库表
storage.setup().await.unwrap();
// 创建工作者
let worker = Worker::new("log-cleaner-worker")
.with_storage(storage.clone())
.with_job::<CleanLogsJob>(clean_logs);
// 启动工作者
let monitor = Monitor::new()
.register(worker)
.run()
.await;
// 添加多个任务到队列
for i in 1..=5 {
storage
.push(CleanLogsJob {
path: format!("/var/logs/service-{}", i),
days_to_keep: 30,
})
.await
.unwrap();
}
monitor.await
}
特性
- 多种存储后端支持:Redis、SQLite、PostgreSQL等
- 定时任务:通过Cron表达式调度任务
- 任务重试:自动处理失败任务
- 优先级队列:支持不同优先级的任务
- 监控:提供任务执行监控接口
Apalis是一个功能强大的异步任务队列库,特别适合需要处理大量后台任务或定时作业的Rust应用程序。
1 回复
以下是一个完整的Apalis使用示例demo,结合了基本任务处理、定时任务和监控功能:
use apalis::{prelude::*, redis::RedisStorage, cron::CronStream};
use chrono::Utc;
use serde::{Serialize, Deserialize};
// 1. 定义邮件任务结构体
#[derive(Debug, Serialize, Deserialize)]
struct Email {
to: String,
message: String,
}
impl Job for Email {
const NAME: &'static str = "apalis::Email";
}
// 2. 定义发送邮件处理函数
async fn send_email(job: Email, ctx: JobContext) -> Result<(), JobError> {
println!("[Email Worker] Sending email to {}: {} (Attempt: {})",
job.to, job.message, ctx.attempt());
// 模拟失败情况,前两次尝试会失败
if ctx.attempt() < 2 {
println!("[Email Worker] Simulating failure on attempt {}", ctx.attempt());
return Err(JobError::RetryLater(None));
}
// 实际发送邮件逻辑
println!("[Email Worker] Successfully sent email to {}", job.to);
Ok(())
}
// 3. 定义定时任务处理函数
async fn cron_job(_: chrono::DateTime<Utc>, _: JobContext) -> Result<(), JobError> {
println!("[Cron Worker] Executing scheduled task at {}", Utc::now());
Ok(())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 4. 初始化Redis存储
let redis_url = "redis://127.0.0.1/";
let storage = RedisStorage::connect(redis_url).await.unwrap();
// 5. 创建监控器并注册工作者
let monitor = Monitor::new()
// 注册邮件工作者(2个实例)
.register_with_count(2, move |_| {
WorkerBuilder::new("email-worker")
.with_storage(storage.clone())
.build_fn(send_email)
})
// 注册定时任务工作者(1个实例)
.register(move |_| {
WorkerBuilder::new("cron-worker")
.with_storage(storage.clone())
.stream(CronStream::new(
"*/10 * * * * * *", // 每10秒执行一次
Utc::now(), // 初始时间
storage.clone(), // 存储后端
))
.build_fn(cron_job)
});
// 6. 推送几个测试任务
let test_emails = vec![
Email {
to: "user1@example.com".to_string(),
message: "Welcome to Apalis!".to_string(),
},
Email {
to: "user2@example.com".to_string(),
message: "This is a priority message".to_string(),
},
];
// 推送普通任务
storage.push(test_emails[0].clone()).await.unwrap();
// 推送高优先级任务
storage.push_with_priority(test_emails[1].clone(), 10).await.unwrap();
println!("[Main] Pushed 2 email jobs to the queue");
// 7. 启动监控服务和工作线程
apalis::serve(monitor, "127.0.0.1:8000".parse().unwrap()).await;
Ok(())
}
这个完整示例包含以下功能:
- 定义了一个Email任务结构体,实现了Job trait
- 实现了带重试机制的邮件发送处理函数
- 实现了定时任务处理函数
- 使用Redis作为存储后端
- 创建了监控器并注册了两个工作者:
- 邮件工作者(2个实例)
- 定时任务工作者(1个实例,每10秒执行一次)
- 演示了如何推送普通任务和高优先级任务
- 启动了HTTP监控服务(可通过127.0.0.1:8000访问)
要运行这个示例,需要:
- 本地运行Redis服务
- 在Cargo.toml中添加依赖:
[dependencies]
apalis = { version = "0.4", features = ["redis"] }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
chrono = "0.4"