Rust异步条件变量库async-condvar-fair的使用:公平调度与高效线程同步解决方案

Rust异步条件变量库async-condvar-fair的使用:公平调度与高效线程同步解决方案

功能特性

  • 公平性:notify_one总是唤醒等待时间最长的任务
  • 提供新颖的Baton机制,避免由于异步任务取消导致的通知丢失问题
  • 兼容任何异步运行时
  • 可与任何类型的互斥锁(sync或async)配合使用
  • 100%安全代码(内部使用parking_lotdlv-list)
  • 支持多个不同互斥锁与多个条件变量交互

主要入口点

该库的主要入口点是Condvar类型,请参阅其文档了解可用的构造函数和方法详情。

示例代码

// async-condvar-fair = { version = "0.2", features = "parking_lot_0_12" }
// parking_lot = { version = "0.12", features = ["send_guard"] }
# #[cfg(feature = "parking_lot_0_12")] mod demo {
use std::collections::VecDeque;
use async_condvar_fair::{Condvar, BatonExt};
use parking_lot::Mutex;

struct Shared {
    cv: Condvar,
    queue: Mutex<VecDeque<String>>,
}

impl Shared {
    pub async fn procssor(&self) {
        let mut guard = self.queue.lock();
        let mut baton = None;
        loop {
            while let Some(entry) = guard.pop_front() {
                println!("processing {:?}", &entry);
            }
            baton.dispose();

            let got = self.cv.wait_baton(guard).await;
            guard = got.0;
            baton = got.1;
        }
    }
}
# }

完整示例

下面是一个更完整的示例,展示了如何使用async-condvar-fair实现一个生产者-消费者模式:

use std::collections::VecDeque;
use async_condvar_fair::{Condvar, BatonExt};
use parking_lot::Mutex;
use tokio::time::{sleep, Duration};

// 共享数据结构
struct Shared {
    cv: Condvar,  // 条件变量
    queue: Mutex<VecDeque<String>>,  // 使用parking_lot的互斥锁保护的消息队列
}

impl Shared {
    // 消费者实现
    pub async fn consumer(&self, id: usize) {
        let mut guard = self.queue.lock();
        let mut baton = None;  // 初始化Baton
        
        loop {
            // 处理队列中的所有消息
            while let Some(entry) = guard.pop_front() {
                println!("消费者 {} 正在处理: {}", id, &entry);
            }
            baton.dispose();  // 释放之前的Baton

            // 等待新消息
            println!("消费者 {} 等待中...", id);
            let got = self.cv.wait_baton(guard).await;  // 等待条件变量通知
            guard = got.0;  // 获取新的互斥锁守卫
            baton = got.1;  // 获取新的Baton
        }
    }

    // 生产者实现
    pub async fn producer(&self, id: usize) {
        for i in 0..5 {
            sleep(Duration::from_millis(500)).await;  // 模拟工作延迟
            let msg = format!("消息 {} 来自生产者 {}", i, id);
            
            let mut guard = self.queue.lock();
            guard.push_back(msg);  // 添加消息到队列
            
            // 通知一个消费者
            self.cv.notify_one();  // 公平地唤醒等待时间最长的任务
            println!("生产者 {} 发送了一条消息", id);
        }
    }
}

#[tokio::main]
async fn main() {
    let shared = Shared {
        cv: Condvar::new(),  // 创建新的条件变量
        queue: Mutex::new(VecDeque::new()),  // 创建空的消息队列
    };

    // 启动两个消费者任务
    let consumer1 = tokio::spawn(shared.consumer(1));
    let consumer2 = tokio::spawn(shared.consumer(2));
    
    // 启动两个生产者任务
    let producer1 = tokio::spawn(shared.producer(1));
    let producer2 = tokio::spawn(shared.producer(2));
    
    // 等待所有任务完成
    let _ = tokio::join!(consumer1, consumer2, producer1, producer2);
}

互斥锁选择指南

  • 如果互斥锁保护的操作很快(特别是不涉及IO操作),使用同步互斥锁是最佳选择
  • parking_lot::Mutex是同步互斥锁的好选择,并且async-condvar-fair对其提供了良好支持
  • 如果互斥锁保护的操作很慢,应使用异步运行时提供的异步互斥锁
  • 在多线程异步运行时中,互斥锁保护需要是Send

版本变更记录

  • 1.0.1 2024-02-20:无API或功能变更
  • 1.0.0 2022-10-11:更新版本号至1.x
  • 0.2.2 2022-03-27:更新dlv-list依赖
  • 0.2.1 2022-03-27:为不同parking_lot版本提供RelockMutexGuard实现
  • 0.1.0 2021-07-05:初始版本

版权与许可

版权所有 Ian Jackson 和 async-condvar-fair 的贡献者。无任何担保。源代码树中的文件GPL-3LICENCE提供了详细信息。


1 回复

Rust异步条件变量库async-condvar-fair使用指南

以下是基于您提供的内容整理的完整示例demo:

内容中提供的示例

公平调度示例

use async_condvar_fair::CondVar;
use std::sync::Arc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let condvar = Arc::new(CondVar::new());
    let mut handles = vec![];

    for i in 0..5 {
        let cv = condvar.clone();
        handles.push(tokio::spawn(async move {
            println!("Task {} waiting", i);
            cv.wait().await;
            println!("Task {} awakened", i);
        }));
    }

    // 确保所有任务都已开始等待
    sleep(Duration::from_millis(100)).await;

    // 按顺序唤醒所有任务
    for _ in 0..5 {
        condvar.notify_one();
        sleep(Duration::from_millis(50)).await;
    }

    for handle in handles {
        handle.await.unwrap();
    }
}

与Mutex配合使用示例

use async_condvar_fair::CondVar;
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let pair = Arc::new((Mutex::new(false), CondVar::new()));
    let pair2 = pair.clone();

    // 等待线程
    tokio::spawn(async move {
        let (lock, cvar) = &*pair2;
        let mut started = lock.lock().await;
        while !*started {
            started = cvar.wait(started).await;
        }
        println!("Received notification!");
    });

    // 通知线程
    tokio::spawn(async move {
        let (lock, cvar) = &*pair;
        let mut started = lock.lock().await;
        *started = true;
        cvar.notify_one();
    }).await.unwrap();
}

完整示例demo

以下是一个更完整的示例,展示了如何使用async-condvar-fair实现生产者-消费者模式:

use async_condvar_fair::CondVar;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};

struct SharedState {
    queue: Mutex<Vec<i32>>,  // 共享队列
    condvar: CondVar,        // 条件变量
}

#[tokio::main]
async fn main() {
    let shared = Arc::new(SharedState {
        queue: Mutex::new(Vec::new()),
        condvar: CondVar::new(),
    });

    // 消费者任务
    let consumer = {
        let shared = shared.clone();
        tokio::spawn(async move {
            loop {
                let mut queue = shared.queue.lock().await;
                
                // 等待队列不为空
                while queue.is_empty() {
                    queue = shared.condvar.wait(queue).await;
                }
                
                // 消费一个项目
                let item = queue.remove(0);
                println!("Consumed item: {}", item);
                
                // 模拟处理时间
                sleep(Duration::from_millis(100)).await;
            }
        })
    };

    // 生产者任务
    let producer = {
        let shared = shared.clone();
        tokio::spawn(async move {
            for i in 1..=10 {
                sleep(Duration::from_millis(200)).await;  // 模拟生产间隔
                
                let mut queue = shared.queue.lock().await;
                queue.push(i);
                println!("Produced item: {}", i);
                
                // 通知消费者
                shared.condvar.notify_one();
            }
        })
    };

    // 等待生产者完成
    producer.await.unwrap();
    
    // 等待消费者处理完剩余项目
    sleep(Duration::from_secs(1)).await;
    
    // 取消消费者任务(在实际应用中应该有更优雅的关闭方式)
    consumer.abort();
}

这个完整示例展示了:

  1. 创建一个共享状态,包含一个受Mutex保护的队列和条件变量
  2. 消费者等待队列不为空的条件
  3. 生产者在添加项目后通知消费者
  4. 使用了公平唤醒机制确保消费者能及时响应

注意在实际应用中,应该实现更优雅的关闭机制而不是直接abort消费者任务。

回到顶部