Rust异步任务队列库apalis-core的使用:高效处理后台作业与分布式任务调度

Rust异步任务队列库apalis-core的使用:高效处理后台作业与分布式任务调度

特性

  • 简单可预测的任务处理模型
  • 任务处理器只需是带有宏的异步函数API
  • 类似actix和axum的熟悉依赖注入方式
  • 充分利用tower生态系统的中间件、服务和工具
  • 易于扩展,默认支持分布式后端
  • 运行时无关 - 支持tokio、smol等
  • 内置并发和并行处理
  • 工作监控和平滑关闭
  • 通过API轻松暴露任务和工作
  • 持久化定时任务,可将定时任务分发到其他后端
  • 可选Web界面帮助管理工作

支持的存储后端

后端类型 状态 示例
Redis 示例
Sqlite 示例
Postgres 示例
MySQL 示例
AMQP 示例

快速开始

在Cargo.toml中添加:

[dependencies]
apalis = { version = "0.7", features = "limit" }  # 并发限制
apalis-redis = { version = "0.7" }  # 使用Redis作为持久化后端

使用示例

use apalis::prelude::*;
use apalis_redis::RedisStorage;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
struct Email {
    to: String,
}

/// 为每个任务调用的函数
async fn send_email(job: Email, data: Data<usize>) -> Result<(), Error> {
    /// 执行任务
    Ok(())
}

#[tokio::main]
async fn main() {
    std::env::set_var("RUST_LOG", "debug");
    env_logger::init();
    let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
    let conn = apalis_redis::connect(redis_url).await.expect("Could not connect");
    let storage = RedisStorage::new(conn);
    WorkerBuilder::new("email-worker")
      .concurrency(2)
      .data(0usize)
      .backend(storage)
      .build_fn(send_email)
      .run()
      .await;
}

然后在程序的其他部分或另一个应用程序(如HTTP服务器)中:

// 可以在程序的另一部分或另一个应用程序中,如HTTP服务器
async fn produce_route_jobs(storage: &mut RedisStorage<Email>) -> Result<()> {
    storage
        .push(Email {
            to: "test@example.com".to_string(),
        })
        .await?
}

分步任务示例

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    std::env::set_var("RUST_LOG", "debug");
    tracing_subscriber::fmt::init();
    let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
    let conn = apalis_redis::connect(redis_url).await.unwrap();
    let config = apalis_redis::Config::default().set_namespace("stepped-email-task");

    let mut storage = RedisStorage::new_with_config(conn, config);
    storage
        .start_stepped(WelcomeEmail { user_id: 1 })
        .await
        .unwrap();

    // 构建步骤
    let steps = StepBuilder::new()
        .step_fn(welcome)  // 步骤是tower服务
        .step_fn(campaign)
        .step_fn(complete_campaign);

    WorkerBuilder::new("tasty-banana")
        .data(())  // 所有步骤共享的数据
        .enable_tracing()
        .concurrency(2)
        .backend(storage)
        .build_极速处理后台作业与分布式任务调度

基于您提供的内容,我将整理出关于Rust异步任务队列库apalis-core的完整使用指南,包含特性介绍、快速开始、完整示例等内容。

## 完整示例Demo

以下是一个完整的Redis后端任务队列示例:

```rust
use apalis::prelude::*;
use apalis_redis::RedisStorage;
use serde::{Deserialize, Serialize};
use std::env;

#[derive(Debug, Deserialize, Serialize)]
struct EmailTask {
    to: String,
    subject: String,
    body: String,
}

// 任务处理函数
async fn process_email(task: EmailTask, _ctx: Data<()>) -> Result<(), JobError> {
    println!("Sending email to: {}", task.to);
    println!("Subject: {}", task.subject);
    println!("Body: {}", task.body);
    // 这里可以添加实际的邮件发送逻辑
    Ok(())
}

#[tokio::main]
async fn main() {
    // 初始化日志
    env::set_var("RUST_LOG", "info");
    env_logger::init();
    
    // 连接到Redis
    let redis_url = env::var("REDIS_URL").unwrap_or("redis://127.0.0.1/".to_string());
    let redis = apalis_redis::connect(redis_url).await.expect("无法连接Redis");
    
    // 创建存储后端
    let storage = RedisStorage::new(redis);
    
    // 构建并启动工作者
    Monitor::new()
        .register(
            WorkerBuilder::new("email-worker")
                .with_storage(storage.clone())
                .build_fn(process_email)
        )
        .run()
        .await
        .expect("工作者监控器运行失败");
    
    // 在另一个线程中添加任务
    tokio::spawn(async move {
        let task = EmailTask {
            to: "user@example.com".to_string(),
            subject: "Welcome".to_string(),
            body: "Thank you for signing up!".to_string(),
        };
        
        storage.push(task).await.expect("无法添加任务到队列");
        println!("任务已添加到队列");
    });
}

这个示例展示了如何:

  1. 定义任务结构体
  2. 创建任务处理函数
  3. 设置Redis存储后端
  4. 启动工作者监控
  5. 在另一个线程中添加任务到队列

要运行此示例,你需要:

  1. 安装Redis并确保其运行
  2. 添加依赖到Cargo.toml:
    [dependencies]
    apalis = { version = "0.7", features = ["limit"] }
    apalis-redis = "0.7"
    serde = { version = "1.0", features = ["derive"] }
    tokio = { version = "1.0", features = ["full"] }
    env_logger = "0.9"
    

1 回复

Rust异步任务队列库apalis-core的使用:高效处理后台作业与分布式任务调度

介绍

apalis-core是一个Rust编写的异步任务队列库,专注于高效处理后台作业和分布式任务调度。它提供了以下核心特性:

  • 基于async/await的异步任务处理
  • 支持多种存储后端(Redis、PostgreSQL、SQLite等)
  • 任务重试机制
  • 任务优先级支持
  • 分布式任务调度能力
  • 与Tokio运行时深度集成

安装

在Cargo.toml中添加依赖:

[dependencies]
apalis-core = "0.5"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 定义任务

use apalis_core::job::{Job, JobContext};

#[derive(Debug, Clone)]
struct EmailJob {
    to: String,
    subject: String,
    body: String,
}

#[async_trait::async_trait]
impl Job for EmailJob {
    type Output = ();

    async fn run(self, ctx: JobContext) -> Result<Self::Output, anyhow::Error> {
        println!("Sending email to {} with subject '{}'", self.to, self.subject);
        // 这里实际发送邮件...
        Ok(())
    }
}

2. 创建并运行Worker

use apalis_core::{builder::WorkerBuilder, storage::SqliteStorage};

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    // 使用SQLite作为存储后端
    let storage = SqliteStorage::connect("sqlite://jobs.db").await?;
    
    // 创建worker
    let worker = WorkerBuilder::new("email-worker")
        .with_storage(storage)
        .build::<EmailJob>();
    
    // 运行worker
    worker.run().await?;
    
    Ok(())
}

3. 提交任务

use apalis_core::storage::Storage;

async fn submit_email_job(storage: &SqliteStorage) -> Result<(), anyhow::Error> {
    let job = EmailJob {
        to: "user@example.com".to_string(),
        subject: "Welcome".to_string(),
        body: "Thank you for signing up!".to_string(),
    };
    
    storage.push(job).await?;
    Ok(())
}

高级功能

任务重试

#[async_trait::async_trait]
impl Job for EmailJob {
    type Output = ();
    
    // 设置最大重试次数
    fn max_retries(&self) -> i32 {
        3
    }
    
    // 设置重试间隔
    fn backoff(&self, attempt: u32) -> std::time::Duration {
        std::time::Duration::from_secs(60 * attempt as u64)
    }
    
    async fn run(self, ctx: JobContext) -> Result<Self::Output, anyhow::Error> {
        // 实现略...
    }
}

使用Redis作为后端

use apalis_core::storage::RedisStorage;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    let redis_url = "redis://127.0.0.1/";
    let storage = RedisStorage::connect(redis_url).await?;
    
    let worker = WorkerBuilder::new("redis-worker")
        .with_storage(storage)
        .build::<EmailJob>();
    
    worker.run().await?;
    Ok(())
}

分布式任务调度

use apalis_core::scheduler::Scheduler;

async fn schedule_daily_report(storage极抱歉,您的内容似乎被截断了。我将基于您提供的完整内容,给出一个完整的示例demo:

```rust
//! 完整示例:使用apalis-core实现电子邮件任务队列

use apalis_core::{
    builder::WorkerBuilder,
    job::{Job, JobContext},
    scheduler::Scheduler,
    storage::{RedisStorage, SqliteStorage, Storage},
};
use async_trait::async_trait;
use chrono::Utc;
use std::time::Duration;
use tokio::time::sleep;

// 1. 定义任务类型
#[derive(Debug, Clone)]
struct EmailJob {
    to: String,
    subject: String,
    body: String,
}

// 2. 实现Job trait
#[async_trait]
impl Job for EmailJob {
    type Output = ();

    // 最大重试次数
    fn max_retries(&self) -> i32 {
        3
    }

    // 重试间隔
    fn backoff(&self, attempt: u32) -> Duration {
        Duration::from_secs(60 * attempt as u64)
    }

    async fn run(self, ctx: JobContext) -> Result<Self::Output, anyhow::Error> {
        println!(
            "[{}] Processing email to: {}, subject: {}",
            Utc::now(),
            self.to,
            self.subject
        );
        
        // 模拟邮件发送失败
        if self.to.contains("fail") {
            return Err(anyhow::anyhow!("Failed to send email to {}", self.to));
        }
        
        // 模拟邮件发送成功
        sleep(Duration::from_secs(1)).await;
        println!("Email sent successfully to {}", self.to);
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    // 3. 初始化存储后端 (这里使用Redis)
    let redis_url = "redis://127.0.0.1/";
    let storage = RedisStorage::connect(redis_url).await?;

    // 4. 创建并启动worker
    let worker = WorkerBuilder::new("email-worker")
        .with_storage(storage.clone())
        .build::<EmailJob>();

    tokio::spawn(async move {
        worker.run().await.expect("Worker failed");
    });

    // 5. 添加定时任务
    let scheduler = Scheduler::new(storage.clone());
    scheduler
        .cron("0 * * * * *", || {
            // 每分钟执行一次
            EmailJob {
                to: "cron@example.com".to_string(),
                subject: "Scheduled Email".to_string(),
                body: "This is a scheduled email".to_string(),
            }
        })
        .await?;

    // 6. 提交即时任务
    storage
        .push(EmailJob {
            to: "user@example.com".to_string(),
            subject: "Welcome".to_string(),
            body: "Thank you for signing up!".to_string(),
        })
        .await?;

    // 7. 提交会失败的任务(测试重试机制)
    storage
        .push(EmailJob {
            to: "fail@example.com".to_string(),
            subject: "Test Failure".to_string(),
            body: "This should fail".to_string(),
        })
        .await?;

    // 保持程序运行
    loop {
        sleep(Duration::from_secs(10)).await;
    }
}

这个完整示例演示了:

  1. 定义EmailJob任务类型
  2. 实现Job trait并配置重试策略
  3. 使用Redis作为存储后端
  4. 创建并运行worker处理任务
  5. 设置定时任务(每分钟执行)
  6. 提交即时任务
  7. 测试失败重试机制

要运行此示例,需要:

  1. 安装并运行Redis服务器
  2. 在Cargo.toml中添加依赖:
    [dependencies]
    apalis-core = "0.5"
    tokio = { version = "1.0", features = ["full"] }
    async-trait = "0.1"
    anyhow = "1.0"
    chrono = "0.4"
    
回到顶部