Rust异步条件变量库async-condvar-fair的使用:公平调度与高效线程同步解决方案
Rust异步条件变量库async-condvar-fair的使用:公平调度与高效线程同步解决方案
功能特性
- 公平性:
notify_one
总是唤醒等待时间最长的任务 - 提供新颖的
Baton
机制,避免由于异步任务取消导致的通知丢失问题 - 兼容任何异步运行时
- 可与任何类型的互斥锁(sync或async)配合使用
- 100%安全代码(内部使用
parking_lot
和dlv-list
) - 支持多个不同互斥锁与多个条件变量交互
主要入口点
该库的主要入口点是Condvar
类型,请参阅其文档了解可用的构造函数和方法详情。
示例代码
// async-condvar-fair = { version = "0.2", features = "parking_lot_0_12" }
// parking_lot = { version = "0.12", features = ["send_guard"] }
# #[cfg(feature = "parking_lot_0_12")] mod demo {
use std::collections::VecDeque;
use async_condvar_fair::{Condvar, BatonExt};
use parking_lot::Mutex;
struct Shared {
cv: Condvar,
queue: Mutex<VecDeque<String>>,
}
impl Shared {
pub async fn procssor(&self) {
let mut guard = self.queue.lock();
let mut baton = None;
loop {
while let Some(entry) = guard.pop_front() {
println!("processing {:?}", &entry);
}
baton.dispose();
let got = self.cv.wait_baton(guard).await;
guard = got.0;
baton = got.1;
}
}
}
# }
完整示例
下面是一个更完整的示例,展示了如何使用async-condvar-fair
实现一个生产者-消费者模式:
use std::collections::VecDeque;
use async_condvar_fair::{Condvar, BatonExt};
use parking_lot::Mutex;
use tokio::time::{sleep, Duration};
// 共享数据结构
struct Shared {
cv: Condvar, // 条件变量
queue: Mutex<VecDeque<String>>, // 使用parking_lot的互斥锁保护的消息队列
}
impl Shared {
// 消费者实现
pub async fn consumer(&self, id: usize) {
let mut guard = self.queue.lock();
let mut baton = None; // 初始化Baton
loop {
// 处理队列中的所有消息
while let Some(entry) = guard.pop_front() {
println!("消费者 {} 正在处理: {}", id, &entry);
}
baton.dispose(); // 释放之前的Baton
// 等待新消息
println!("消费者 {} 等待中...", id);
let got = self.cv.wait_baton(guard).await; // 等待条件变量通知
guard = got.0; // 获取新的互斥锁守卫
baton = got.1; // 获取新的Baton
}
}
// 生产者实现
pub async fn producer(&self, id: usize) {
for i in 0..5 {
sleep(Duration::from_millis(500)).await; // 模拟工作延迟
let msg = format!("消息 {} 来自生产者 {}", i, id);
let mut guard = self.queue.lock();
guard.push_back(msg); // 添加消息到队列
// 通知一个消费者
self.cv.notify_one(); // 公平地唤醒等待时间最长的任务
println!("生产者 {} 发送了一条消息", id);
}
}
}
#[tokio::main]
async fn main() {
let shared = Shared {
cv: Condvar::new(), // 创建新的条件变量
queue: Mutex::new(VecDeque::new()), // 创建空的消息队列
};
// 启动两个消费者任务
let consumer1 = tokio::spawn(shared.consumer(1));
let consumer2 = tokio::spawn(shared.consumer(2));
// 启动两个生产者任务
let producer1 = tokio::spawn(shared.producer(1));
let producer2 = tokio::spawn(shared.producer(2));
// 等待所有任务完成
let _ = tokio::join!(consumer1, consumer2, producer1, producer2);
}
互斥锁选择指南
- 如果互斥锁保护的操作很快(特别是不涉及IO操作),使用同步互斥锁是最佳选择
parking_lot::Mutex
是同步互斥锁的好选择,并且async-condvar-fair
对其提供了良好支持- 如果互斥锁保护的操作很慢,应使用异步运行时提供的异步互斥锁
- 在多线程异步运行时中,互斥锁保护需要是
Send
的
版本变更记录
- 1.0.1 2024-02-20:无API或功能变更
- 1.0.0 2022-10-11:更新版本号至1.x
- 0.2.2 2022-03-27:更新
dlv-list
依赖 - 0.2.1 2022-03-27:为不同
parking_lot
版本提供RelockMutexGuard
实现 - 0.1.0 2021-07-05:初始版本
版权与许可
版权所有 Ian Jackson 和 async-condvar-fair 的贡献者。无任何担保。源代码树中的文件GPL-3
和LICENCE
提供了详细信息。
1 回复
Rust异步条件变量库async-condvar-fair使用指南
以下是基于您提供的内容整理的完整示例demo:
内容中提供的示例
公平调度示例
use async_condvar_fair::CondVar;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let condvar = Arc::new(CondVar::new());
let mut handles = vec![];
for i in 0..5 {
let cv = condvar.clone();
handles.push(tokio::spawn(async move {
println!("Task {} waiting", i);
cv.wait().await;
println!("Task {} awakened", i);
}));
}
// 确保所有任务都已开始等待
sleep(Duration::from_millis(100)).await;
// 按顺序唤醒所有任务
for _ in 0..5 {
condvar.notify_one();
sleep(Duration::from_millis(50)).await;
}
for handle in handles {
handle.await.unwrap();
}
}
与Mutex配合使用示例
use async_condvar_fair::CondVar;
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let pair = Arc::new((Mutex::new(false), CondVar::new()));
let pair2 = pair.clone();
// 等待线程
tokio::spawn(async move {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().await;
while !*started {
started = cvar.wait(started).await;
}
println!("Received notification!");
});
// 通知线程
tokio::spawn(async move {
let (lock, cvar) = &*pair;
let mut started = lock.lock().await;
*started = true;
cvar.notify_one();
}).await.unwrap();
}
完整示例demo
以下是一个更完整的示例,展示了如何使用async-condvar-fair实现生产者-消费者模式:
use async_condvar_fair::CondVar;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
struct SharedState {
queue: Mutex<Vec<i32>>, // 共享队列
condvar: CondVar, // 条件变量
}
#[tokio::main]
async fn main() {
let shared = Arc::new(SharedState {
queue: Mutex::new(Vec::new()),
condvar: CondVar::new(),
});
// 消费者任务
let consumer = {
let shared = shared.clone();
tokio::spawn(async move {
loop {
let mut queue = shared.queue.lock().await;
// 等待队列不为空
while queue.is_empty() {
queue = shared.condvar.wait(queue).await;
}
// 消费一个项目
let item = queue.remove(0);
println!("Consumed item: {}", item);
// 模拟处理时间
sleep(Duration::from_millis(100)).await;
}
})
};
// 生产者任务
let producer = {
let shared = shared.clone();
tokio::spawn(async move {
for i in 1..=10 {
sleep(Duration::from_millis(200)).await; // 模拟生产间隔
let mut queue = shared.queue.lock().await;
queue.push(i);
println!("Produced item: {}", i);
// 通知消费者
shared.condvar.notify_one();
}
})
};
// 等待生产者完成
producer.await.unwrap();
// 等待消费者处理完剩余项目
sleep(Duration::from_secs(1)).await;
// 取消消费者任务(在实际应用中应该有更优雅的关闭方式)
consumer.abort();
}
这个完整示例展示了:
- 创建一个共享状态,包含一个受Mutex保护的队列和条件变量
- 消费者等待队列不为空的条件
- 生产者在添加项目后通知消费者
- 使用了公平唤醒机制确保消费者能及时响应
注意在实际应用中,应该实现更优雅的关闭机制而不是直接abort消费者任务。