Rust消息队列库yaque的使用:高性能、轻量级的分布式任务处理与事件驱动框架

Rust消息队列库yaque的使用:高性能、轻量级的分布式任务处理与事件驱动框架

Yaque是一个基于磁盘的持久化队列(和互斥锁)Rust库。它使用操作系统的文件系统实现了一个SPSC(单生产者单消费者)通道。相比简单的VecDeque<T>,它的主要优势包括:

  1. 不受RAM大小限制,只受磁盘大小限制。这意味着你可以存储GB级数据而不会导致OOM(内存溢出)错误。
  2. 即使程序崩溃,数据也是安全的。所有队列状态在队列被丢弃时都会写入磁盘。
  3. 数据可以持久化,即可以存在于程序的多次执行过程中。可以把它看作一种非常初级的数据库。
  4. 可以在两个进程之间传递数据。

Yaque是异步的,直接构建在mionotify之上。因此它完全与你应用程序使用的运行时无关,可以平滑地与tokioasync-std或任何你选择的其他执行器配合使用。

示例用法

创建新队列只需使用channel函数,并传递队列挂载的目录路径。如果目录在创建时不存在,它(及其所有父目录)将被创建。

use yaque::channel;

futures::executor::block_on(async {
    let (mut sender, mut receiver) = channel("data/my-queue").unwrap();
})

你也可以根据需要单独使用Sender::openReceiver::open来只打开通道的一半。

使用方法与标准库中的MPSC通道类似,只是接收方法Receiver::recv是异步的。使用发送者写入队列基本上是锁自由和原子性的。

use yaque::{channel, queue::try_clear};

futures::executor::block_on(async {
    // 使用`channel`函数或直接使用构造函数打开
    let (mut sender, mut receiver) = channel("data/my-queue").unwrap();
    
    // 使用发送者发送数据...
    sender.send(b"some data").await.unwrap();

    // ...并在另一端接收
    let data = receiver.recv().await.unwrap();

    assert_eq!(&*data, b"some data");

    // 调用此方法使对队列的更改永久化
    // 不调用它将恢复队列的状态
    data.commit();
});

// 一切都完成后,你可以删除队列
// 使用`clear`等待队列被释放
try_clear("data/my-queue").unwrap();

返回的值data是一种实现了DerefDerefMut的底层类型守卫。

queue::RecvGuard和事务行为

从队列读取是事务性的。Receiver::recv返回一个queue::RecvGuard,它充当死锁开关。如果被丢弃,它将回滚出队操作,除非显式调用queue::RecvGuard::commit。这确保了操作在panic和从错误中提前返回时回滚。然而,回滚期间需要执行一个额外的文件系统操作。在丢弃期间,这是以"尽力而为"的方式完成的:如果发生错误,它会被记录并忽略。如果你有任何清理行为来处理回滚错误,你可以调用queue::RecvGuard::rollback,它将返回底层错误。

批量处理

你也可以使用yaque队列发送和接收批量数据。保证与单次读写相同,只是你可以在发送项目时节省OS开销,因为只进行一次磁盘操作。

支持超时

如果你需要应用程序在队列上没有放入任何内容时不挂起,你可以使用Receiver::recv_timeoutReceiver::recv_batch_timeout来接收数据,最多等待提供的future完成,比如延迟或通道。

use yaque::channel;
use std::time::Duration;
use futures_timer::Delay;

futures::executor::block_on(async {
    let (mut sender, mut receiver) = channel("data/my-queue-2").unwrap();
    
    // 最多等待一秒接收一些数据
    let data = receiver
        .recv_timeout(Delay::new(Duration::from_secs(1)))
        .await
        .unwrap();

    // 没有发送任何内容,所以没有数据...
    assert!(data.is_none());
    drop(data);
    
    // ...但如果你发送了某些内容...
    sender.send(b"some data").await.unwrap();
 
    // ...现在你会接收到一些内容:
    let data = receiver
        .recv_timeout(Delay::new(Duration::from_secs(1)))
        .await
        .unwrap();

    assert_eq!(&*data.unwrap(), b"some data");  
});

意外事件处理

写入队列是一个原子操作。在panic情况下,队列保证保存接收器所有最新的元数据。对于OS信号,保存队列的两端是异步信号安全的。

完整示例代码

use yaque::{channel, queue::try_clear};
use std::time::Duration;
use futures_timer::Delay;
use futures::executor::block_on;

async fn producer_consumer_example() {
    // 创建或打开队列
    let (mut sender, mut receiver) = channel("data/my-queue").unwrap();
    
    // 生产者任务 - 发送消息
    sender.send(b"message 1").await.unwrap();
    sender.send(b"message 2").await.unwrap();
    sender.send(b"message 3").await.unwrap();

    // 消费者任务 - 接收消息
    while let Ok(data) = receiver.recv_timeout(Delay::new(Duration::from_secs(1))).await {
        match data {
            Some(msg) => {
                println!("Received: {:?}", &*msg);
                msg.commit(); // 确认处理完成
            }
            None => {
                println!("No message received within timeout");
                break;
            }
        }
    }

    // 批量发送示例
    let batch = vec![b"batch1".to_vec(), b"batch2".to_vec(), b"batch3".to_vec()];
    sender.send_batch(batch).await.unwrap();

    // 批量接收示例
    if let Ok(Some(messages)) = receiver.recv_batch_timeout(3, Delay::new(Duration::from_secs(1))).await {
        for msg in messages {
            println!("Batch received: {:?}", &*msg);
            msg.commit();
        }
    }
}

fn main() {
    block_on(producer_consumer_example());
    
    // 清理队列
    try_clear("data/my-queue").unwrap();
}

要运行此示例,请确保在你的Cargo.toml中添加yaque依赖:

[dependencies]
yaque = "0.6.6"
futures = "0.3"
futures-timer = "3.0"

1 回复

Rust消息队列库yaque的使用:高性能、轻量级的分布式任务处理与事件驱动框架

简介

yaque是一个用Rust编写的高性能、轻量级消息队列库,专为分布式任务处理和事件驱动架构设计。它提供了简单易用的API,同时保持了出色的性能表现,适合需要可靠消息传递的各种应用场景。

主要特性

  • 高性能:优化的内存管理和零拷贝设计
  • 轻量级:无额外依赖,易于集成
  • 持久化支持:消息可持久化到磁盘
  • 多生产者/消费者模型:支持并发处理
  • 分布式就绪:设计时考虑了分布式场景

安装

在Cargo.toml中添加依赖:

[dependencies]
yaque = "0.3"

基本使用方法

1. 创建队列

use yaque::Queue;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建或打开一个队列
    let mut queue = Queue::open("my_queue")?;
    
    // 发送消息
    queue.push("Hello, world!".as_bytes())?;
    
    Ok(())
}

2. 消费消息

use yaque::{Queue, Consumer};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建消费者
    let mut consumer = Consumer::open("my_queue")?;
    
    // 接收消息
    if let Some(message) = consumer.next()? {
        println!("Received: {}", String::from_utf8_lossy(&message));
        consumer.ack()?; // 确认消息处理完成
    }
    
    Ok(())
}

高级用法

批量处理消息

use yaque::{Queue, Producer};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut producer = Producer::open("my_queue")?;
    
    // 批量发送
    let messages = vec!["msg1", "msg2", "msg3"];
    for msg in messages {
        producer.push(msg.as_bytes())?;
    }
    
    Ok(())
}

分布式处理示例

use yaque::{Queue, Consumer};
use std::thread;

fn producer() -> Result<(), Box<dyn std::error::Error>> {
    let mut queue = Queue::open("dist_queue")?;
    for i in 0..10 {
        queue.push(format!("Message {}", i).as_bytes())?;
    }
    Ok(())
}

fn consumer(id: usize) -> Result<(), Box<dyn std::error::Error>> {
    let mut consumer = Consumer::open("dist_queue")?;
    while let Some(msg) = consumer.next()? {
        println!("Consumer {}: {}", id, String::from_utf8_lossy(&msg));
        consumer.ack()?;
    }
    Ok(())
}

fn main() {
    // 启动生产者线程
    thread::spawn(|| producer().unwrap());
    
    // 启动多个消费者线程
    let handles: Vec<_> = (0..3).map(|i| {
        thread::spawn(move || consumer(i).unwrap())
    }).collect();
    
    for handle in handles {
        handle.join().unwrap();
    }
}

性能调优

yaque提供了一些配置选项来优化性能:

use yaque::{Queue, QueueConfig};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = QueueConfig {
        max_items: 100_000,    // 最大队列大小
        segment_size: 16_384,  // 段大小(字节)
        ..Default::default()
    };
    
    let mut queue = Queue::open_with_config("tuned_queue", config)?;
    // ...使用队列
    
    Ok(())
}

错误处理

yaque使用Rust的标准错误处理机制:

use yaque::Queue;

fn process_queue() -> Result<(), yaque::Error> {
    let mut queue = Queue::open("error_queue")?;
    queue.push(b"data")?;
    
    let mut consumer = queue.consume()?;
    if let Some(msg) = consumer.next()? {
        // 处理消息
        consumer.ack()?;
    }
    
    Ok(())
}

fn main() {
    if let Err(e) = process_queue() {
        eprintln!("Queue error: {}", e);
    }
}

完整示例demo

下面是一个完整的使用yaque实现生产者-消费者模型的示例:

use yaque::{Queue, Producer, Consumer};
use std::thread;
use std::time::Duration;

// 生产者线程函数
fn producer_thread(queue_name: &str, count: usize) -> Result<(), Box<dyn std::error::Error>> {
    let mut producer = Producer::open(queue_name)?;
    
    for i in 0..count {
        let msg = format!("Message {}", i);
        producer.push(msg.as_bytes())?;
        println!("Produced: {}", msg);
        thread::sleep(Duration::from_millis(100));
    }
    
    Ok(())
}

// 消费者线程函数
fn consumer_thread(queue_name: &str, id: usize) -> Result<(), Box<dyn std::error::Error>> {
    let mut consumer = Consumer::open(queue_name)?;
    
    loop {
        if let Some(msg) = consumer.next()? {
            println!("Consumer {} received: {}", id, String::from_utf8_lossy(&msg));
            consumer.ack()?;
        } else {
            // 队列为空,稍等再试
            thread::sleep(Duration::from_millis(50));
        }
    }
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let queue_name = "demo_queue";
    
    // 创建生产者线程
    let producer_handle = thread::spawn(move || {
        producer_thread(queue_name, 10).unwrap()
    });
    
    // 创建3个消费者线程
    let mut consumer_handles = vec![];
    for i in 0..3 {
        let name = queue_name.to_string();
        let handle = thread::spawn(move || {
            consumer_thread(&name, i).unwrap()
        });
        consumer_handles.push(handle);
    }
    
    // 等待生产者完成
    producer_handle.join().unwrap();
    
    // 等待消费者完成(实际这个简单示例中消费者会一直运行)
    for handle in consumer_handles {
        handle.join().unwrap();
    }
    
    Ok(())
}

总结

yaque为Rust开发者提供了一个简单而强大的消息队列解决方案,特别适合需要高性能和轻量级的分布式任务处理场景。其简洁的API设计使得集成和使用都非常方便,同时保持了Rust语言特有的安全性和并发优势。

回到顶部