Rust异步任务队列库Apalis的使用,高效处理后台任务和定时作业

Rust异步任务队列库Apalis的使用,高效处理后台任务和定时作业

安装

在项目目录中运行以下Cargo命令:

cargo add apalis

或在Cargo.toml中添加:

apalis = "0.7.2"

基本使用示例

以下是一个使用Apalis处理异步任务的完整示例:

use apalis::{prelude::*, redis::RedisStorage};
use serde::{Deserialize, Serialize};

// 定义一个任务
#[derive(Debug, Serialize, Deserialize)]
struct Email {
    to: String,
    message: String,
}

impl Job for Email {
    const NAME: &'static str = "apalis::Email";
}

// 任务处理器
async fn send_email(job: Email, _ctx: JobContext) -> Result<(), JobError> {
    println!("Sending email to {}: {}", job.to, job.message);
    Ok(())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建Redis存储
    let storage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap();
    
    // 创建工作者
    let worker = Worker::new("email-worker")
        .with_storage(storage.clone())
        .with_job::<Email>(send_email);
    
    // 启动工作者
    let monitor = Monitor::new()
        .register(worker)
        .run()
        .await;
    
    // 添加任务到队列
    storage
        .push(Email {
            to: "user@example.com".to_string(),
            message: "Hello from Apalis!".to_string(),
        })
        .await
        .unwrap();
    
    monitor.await
}

定时任务示例

Apalis还支持定时任务调度:

use apalis::{prelude::*, cron::CronStream, redis::RedisStorage};
use std::time::Duration;
use async_trait::async_trait;

#[derive(Debug, Clone)]
struct ScheduledJob;

#[async_trait]
impl Job for ScheduledJob {
    const NAME: &'static str = "apalis::ScheduledJob";
    
    async fn run(&self, _ctx: JobContext) -> Result<(), JobError> {
        println!("Running scheduled job at {}", chrono::Local::now());
        Ok(())
    }
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let storage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap();
    
    // 创建定时任务流 (每5秒执行一次)
    let cron = CronStream::new("@every 5s")
        .duration(Duration::from_secs(5))
        .with_job(ScheduledJob);
    
    // 创建工作者
    let worker = Worker::new("cron-worker")
        .with_storage(storage.clone())
        .with_cron(cron);
    
    // 启动监控
    Monitor::new()
        .register(worker)
        .run()
        .await
        .await
}

完整示例:使用SQLite存储的任务队列

下面是一个使用SQLite作为存储后端的完整示例:

use apalis::{prelude::*, sqlite::SqliteStorage};
use serde::{Deserialize, Serialize};
use std::path::Path;

// 定义日志清理任务
#[derive(Debug, Serialize, Deserialize)]
struct CleanLogsJob {
    path: String,
    days_to_keep: u32,
}

impl Job for CleanLogsJob {
    const NAME: &'static str = "apalis::CleanLogsJob";
}

// 任务处理器
async fn clean_logs(job: CleanLogsJob, _ctx: JobContext) -> Result<(), JobError> {
    println!("Cleaning logs at path: {}, keeping last {} days", 
        job.path, job.days_to_keep);
    // 这里添加实际的日志清理逻辑
    Ok(())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建SQLite存储
    let storage = SqliteStorage::new("sqlite::memory:").await.unwrap();
    
    // 初始化数据库表
    storage.setup().await.unwrap();
    
    // 创建工作者
    let worker = Worker::new("log-cleaner-worker")
        .with_storage(storage.clone())
        .with_job::<CleanLogsJob>(clean_logs);
    
    // 启动工作者
    let monitor = Monitor::new()
        .register(worker)
        .run()
        .await;
    
    // 添加多个任务到队列
    for i in 1..=5 {
        storage
            .push(CleanLogsJob {
                path: format!("/var/logs/service-{}", i),
                days_to_keep: 30,
            })
            .await
            .unwrap();
    }
    
    monitor.await
}

特性

  1. 多种存储后端支持:Redis、SQLite、PostgreSQL等
  2. 定时任务:通过Cron表达式调度任务
  3. 任务重试:自动处理失败任务
  4. 优先级队列:支持不同优先级的任务
  5. 监控:提供任务执行监控接口

Apalis是一个功能强大的异步任务队列库,特别适合需要处理大量后台任务或定时作业的Rust应用程序。


1 回复

以下是一个完整的Apalis使用示例demo,结合了基本任务处理、定时任务和监控功能:

use apalis::{prelude::*, redis::RedisStorage, cron::CronStream};
use chrono::Utc;
use serde::{Serialize, Deserialize};

// 1. 定义邮件任务结构体
#[derive(Debug, Serialize, Deserialize)]
struct Email {
    to: String,
    message: String,
}

impl Job for Email {
    const NAME: &'static str = "apalis::Email";
}

// 2. 定义发送邮件处理函数
async fn send_email(job: Email, ctx: JobContext) -> Result<(), JobError> {
    println!("[Email Worker] Sending email to {}: {} (Attempt: {})", 
        job.to, job.message, ctx.attempt());
    
    // 模拟失败情况,前两次尝试会失败
    if ctx.attempt() < 2 {
        println!("[Email Worker] Simulating failure on attempt {}", ctx.attempt());
        return Err(JobError::RetryLater(None));
    }
    
    // 实际发送邮件逻辑
    println!("[Email Worker] Successfully sent email to {}", job.to);
    Ok(())
}

// 3. 定义定时任务处理函数
async fn cron_job(_: chrono::DateTime<Utc>, _: JobContext) -> Result<(), JobError> {
    println!("[Cron Worker] Executing scheduled task at {}", Utc::now());
    Ok(())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 4. 初始化Redis存储
    let redis_url = "redis://127.0.0.1/";
    let storage = RedisStorage::connect(redis_url).await.unwrap();
    
    // 5. 创建监控器并注册工作者
    let monitor = Monitor::new()
        // 注册邮件工作者(2个实例)
        .register_with_count(2, move |_| {
            WorkerBuilder::new("email-worker")
                .with_storage(storage.clone())
                .build_fn(send_email)
        })
        // 注册定时任务工作者(1个实例)
        .register(move |_| {
            WorkerBuilder::new("cron-worker")
                .with_storage(storage.clone())
                .stream(CronStream::new(
                    "*/10 * * * * * *",  // 每10秒执行一次
                    Utc::now(),          // 初始时间
                    storage.clone(),     // 存储后端
                ))
                .build_fn(cron_job)
        });
    
    // 6. 推送几个测试任务
    let test_emails = vec![
        Email {
            to: "user1@example.com".to_string(),
            message: "Welcome to Apalis!".to_string(),
        },
        Email {
            to: "user2@example.com".to_string(),
            message: "This is a priority message".to_string(),
        },
    ];
    
    // 推送普通任务
    storage.push(test_emails[0].clone()).await.unwrap();
    
    // 推送高优先级任务
    storage.push_with_priority(test_emails[1].clone(), 10).await.unwrap();
    
    println!("[Main] Pushed 2 email jobs to the queue");
    
    // 7. 启动监控服务和工作线程
    apalis::serve(monitor, "127.0.0.1:8000".parse().unwrap()).await;
    
    Ok(())
}

这个完整示例包含以下功能:

  1. 定义了一个Email任务结构体,实现了Job trait
  2. 实现了带重试机制的邮件发送处理函数
  3. 实现了定时任务处理函数
  4. 使用Redis作为存储后端
  5. 创建了监控器并注册了两个工作者:
    • 邮件工作者(2个实例)
    • 定时任务工作者(1个实例,每10秒执行一次)
  6. 演示了如何推送普通任务和高优先级任务
  7. 启动了HTTP监控服务(可通过127.0.0.1:8000访问)

要运行这个示例,需要:

  1. 本地运行Redis服务
  2. 在Cargo.toml中添加依赖:
[dependencies]
apalis = { version = "0.4", features = ["redis"] }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
chrono = "0.4"
回到顶部