Rust消息总线库bus的使用:高效实现跨线程通信与事件驱动的轻量级解决方案

Rust消息总线库bus的使用:高效实现跨线程通信与事件驱动的轻量级解决方案

Bus 提供了一个无锁、有界、单生产者、多消费者的广播通道。

注意:当前实现中bus有时会忙等待,这可能导致CPU使用率增加

特性

  • 使用环形缓冲区和原子指令实现无锁单生产者、多消费者通道
  • 接口与std::sync::mpsc通道类似,但支持多个消费者
  • 广播模式:每个发送都会传递给每个消费者
  • 线程安全设计

安装

在Cargo.toml中添加依赖:

bus = "2.4.1"

基本用法示例

use bus::Bus;

fn main() {
    // 创建一个容量为10的消息总线
    let mut bus = Bus::new(10);
    
    // 添加两个接收者
    let mut rx1 = bus.add_rx();
    let mut rx2 = bus.add_rx();
    
    // 发送消息
    bus.broadcast("Hello");
    bus.broadcast("world");
    
    // 接收消息
    assert_eq!(rx1.recv(), Ok("Hello"));
    assert_eq!(rx1.recv(), Ok("world"));
    
    assert_eq!(rx2.recv(), Ok("Hello")); 
    assert_eq!(rx2.recv(), Ok("world"));
}

完整示例:跨线程事件总线

use bus::Bus;
use std::thread;
use std::time::Duration;

fn main() {
    // 创建事件总线,容量为5
    let mut event_bus = Bus::new(5);
    
    // 创建3个消费者线程
    for i in 0..3 {
        let mut rx = event_bus.add_rx();
        thread::spawn(move || {
            loop {
                match rx.recv() {
                    Ok(msg) => println!("消费者{}收到: {}", i, msg),
                    Err(_) => break, // 总线关闭时退出
                }
            }
        });
    }
    
    // 生产者线程
    thread::spawn(move || {
        for i in 1..=5 {
            event_bus.broadcast(format!("消息{}", i));
            thread::sleep(Duration::from_millis(500));
        }
        // 总线会在event_bus离开作用域时自动关闭
    });
    
    // 等待所有消息处理完成
    thread::sleep(Duration::from_secs(3));
}

高级用法

use bus::Bus;

fn main() {
    let mut bus = Bus::new(3);
    
    // 添加接收者
    let mut rx1 = bus.add_rx();
    let mut rx2 = bus.add_rx();
    
    // 发送不同类型的数据
    bus.broadcast(42);
    bus.broadcast("hello");
    bus.broadcast(vec![1, 2, 3]);
    
    // 使用try_recv非阻塞接收
    assert_eq!(rx1.try_recv(), Ok(42));
    assert_eq!(rx1.try_recv(), Ok("hello"));
    assert_eq!(rx1.try_recv(), Ok(vec![1, 2, 3]));
    
    // 同样适用于rx2
    assert_eq!(rx2.try_recv(), Ok(42));
    assert_eq!(rx2.try_recv(), Ok("hello"));
    assert_eq!(rx2.try_recv(), Ok(vec![1, 2, 3]));
    
    // 总线已满时发送会阻塞
    // 可以使用try_broadcast进行非阻塞发送
    assert!(bus.try_broadcast(99).is_ok());
    assert!(bus.try_broadcast(100).is_ok());
    assert!(bus.try_broadcast(101).is_err()); // 总线已满
}

注意事项

  1. 这是一个单生产者、多消费者的广播通道
  2. 所有消息都会被所有消费者接收
  3. 通道有固定容量,满时会阻塞发送
  4. 当前实现可能存在忙等待问题

许可证

可选择 MIT 或 Apache-2.0 许可证

完整示例demo

下面是一个更完整的示例,展示如何使用bus库实现一个简单的事件驱动系统:

use bus::Bus;
use std::thread;
use std::time::Duration;

// 定义事件枚举
#[derive(Debug, Clone)]
enum Event {
    Start,
    Data(String),
    Stop,
}

fn main() {
    // 创建事件总线,容量为10
    let mut bus = Bus::new(10);
    
    // 创建3个消费者线程
    for i in 0..3 {
        let mut rx = bus.add_rx();
        thread::spawn(move || {
            loop {
                match rx.recv() {
                    Ok(event) => {
                        match event {
                            Event::Start => println!("消费者{}: 系统启动", i),
                            Event::Data(msg) => println!("消费者{}: 处理数据 - {}", i, msg),
                            Event::Stop => {
                                println!("消费者{}: 系统停止", i);
                                break;
                            }
                        }
                    },
                    Err(_) => break, // 总线关闭时退出
                }
            }
        });
    }
    
    // 生产者线程
    thread::spawn(move || {
        // 发送启动事件
        bus.broadcast(Event::Start);
        
        // 发送一些数据事件
        for i in 1..=5 {
            bus.broadcast(Event::Data(format!("消息{}", i)));
            thread::sleep(Duration::from_millis(300));
        }
        
        // 发送停止事件
        bus.broadcast(Event::Stop);
        
        // 总线会在bus离开作用域时自动关闭
    });
    
    // 等待所有消息处理完成
    thread::sleep(Duration::from_secs(3));
}

这个完整示例展示了:

  1. 定义自定义事件类型
  2. 多个消费者线程处理不同类型的事件
  3. 生产者线程发送不同类型的事件
  4. 优雅地处理系统启动和停止
  5. 使用枚举来区分不同的事件类型

1 回复

Rust消息总线库bus的使用指南

bus是Rust中一个轻量级、高效的消息总线实现,专门为跨线程通信和事件驱动架构设计。它提供了简单的API来实现发布-订阅模式,非常适合需要解耦组件通信的场景。

核心特性

  • 线程安全:支持多生产者和多消费者模式
  • 轻量级:无额外依赖,性能高效
  • 广播机制:消息会被所有当前活动的接收者获取
  • 同步/异步支持:可根据需要选择阻塞或非阻塞操作

基本用法

首先在Cargo.toml中添加依赖:

[dependencies]
bus = "0.2"

创建消息总线

use bus::Bus;

fn main() {
    // 创建一个新的总线,指定消息类型
    let mut bus = Bus::new(10); // 10是内部缓冲区的容量
    
    // 添加接收者
    let mut rx1 = bus.add_rx();
    let mut rx2 = bus.add_rx();
    
    // 发送消息
    bus.broadcast("Hello from the bus!".to_string());
    
    // 接收消息
    println!("Rx1: {}", rx1.recv().unwrap());
    println!("Rx2: {}", rx2.recv().unwrap());
}

多线程示例

use bus::Bus;
use std::thread;

fn main() {
    let mut bus = Bus::new(100);
    
    // 创建工作线程接收者
    let mut rx = bus.add_rx();
    thread::spawn(move || {
        while let Ok(msg) = rx.recv() {
            println!("Worker received: {}", msg);
        }
    });
    
    // 主线程发送消息
    for i in 0..5 {
        bus.broadcast(format!("Message {}", i));
    }
    
    // 等待足够时间让工作线程处理
    thread::sleep(std::time::Duration::from_millis(100));
}

异步支持

bus也可以与异步运行时配合使用:

use bus::Bus;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let mut bus = Bus::new(10);
    let mut rx = bus.add_rx();
    
    tokio::spawn(async move {
        while let Ok(msg) = rx.recv() {
            println!("Async received: {}", msg);
        }
    });
    
    bus.broadcast("Async message".to_string());
    
    // 等待异步任务完成
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

高级用法

动态添加接收者

let mut bus = Bus::new(5);

// 先发送一些消息
bus.broadcast(1);
bus.broadcast(2);

// 然后添加接收者 - 它不会收到之前发送的消息
let mut late_rx = bus.add_rx();

bus.broadcast(3);

assert_eq!(late_rx.recv(), Ok(3)); // 只能收到之后的消息

处理错误

当所有发送端都断开时,接收操作会返回错误:

let mut bus = Bus::new(1);
let mut rx = bus.add_rx();

drop(bus); // 丢弃总线

match rx.recv() {
    Ok(_) => println!("Got message"),
    Err(e) => println!("Error: {}", e), // 会打印错误
}

性能考虑

  1. bus使用内部缓冲,大小在创建时指定
  2. 广播操作是O(n)复杂度,n是当前活跃接收者数量
  3. 对于高频消息场景,适当增大缓冲区可以提高性能

适用场景

  • 事件通知系统
  • 跨线程通信
  • 解耦组件间的直接依赖
  • 需要广播消息的应用程序

bus库提供了一种简单有效的方式来实现这些模式,而无需引入复杂的框架或重量级的消息队列系统。

完整示例代码

下面是一个结合了多线程和错误处理的完整示例:

use bus::Bus;
use std::thread;
use std::time::Duration;

fn main() {
    // 创建消息总线,缓冲区容量为5
    let mut bus = Bus::new(5);
    
    // 创建第一个接收者
    let mut rx1 = bus.add_rx();
    let handle1 = thread::spawn(move || {
        println!("线程1启动");
        while let Ok(msg) = rx1.recv() {
            println!("线程1收到: {}", msg);
        }
        println!("线程1结束: 总线已关闭");
    });
    
    // 创建第二个接收者
    let mut rx2 = bus.add_rx();
    let handle2 = thread::spawn(move || {
        println!("线程2启动");
        for i in 1..=3 {
            match rx2.recv() {
                Ok(msg) => println!("线程2收到{}: {}", i, msg),
                Err(e) => println!("线程2错误: {}", e),
            }
        }
        println!("线程2结束");
    });
    
    // 主线程发送消息
    for i in 0..3 {
        bus.broadcast(format!("消息{}", i));
        thread::sleep(Duration::from_millis(50));
    }
    
    // 关闭总线
    drop(bus);
    
    // 等待线程结束
    handle1.join().unwrap();
    handle2.join().unwrap();
    
    println!("主线程结束");
}

这个完整示例展示了:

  1. 创建消息总线
  2. 多个线程同时接收消息
  3. 主线程广播消息
  4. 总线关闭后的错误处理
  5. 线程同步等待

输出示例可能如下:

线程1启动
线程2启动
线程1收到: 消息0
线程2收到1: 消息0
线程1收到: 消息1
线程2收到2: 消息1
线程1收到: 消息2
线程2收到3: 消息2
线程1结束: 总线已关闭
线程2结束
主线程结束
回到顶部