Rust消息总线库bus的使用:高效实现跨线程通信与事件驱动的轻量级解决方案
Rust消息总线库bus的使用:高效实现跨线程通信与事件驱动的轻量级解决方案
Bus 提供了一个无锁、有界、单生产者、多消费者的广播通道。
注意:当前实现中bus有时会忙等待,这可能导致CPU使用率增加
特性
- 使用环形缓冲区和原子指令实现无锁单生产者、多消费者通道
- 接口与
std::sync::mpsc
通道类似,但支持多个消费者 - 广播模式:每个发送都会传递给每个消费者
- 线程安全设计
安装
在Cargo.toml中添加依赖:
bus = "2.4.1"
基本用法示例
use bus::Bus;
fn main() {
// 创建一个容量为10的消息总线
let mut bus = Bus::new(10);
// 添加两个接收者
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
// 发送消息
bus.broadcast("Hello");
bus.broadcast("world");
// 接收消息
assert_eq!(rx1.recv(), Ok("Hello"));
assert_eq!(rx1.recv(), Ok("world"));
assert_eq!(rx2.recv(), Ok("Hello"));
assert_eq!(rx2.recv(), Ok("world"));
}
完整示例:跨线程事件总线
use bus::Bus;
use std::thread;
use std::time::Duration;
fn main() {
// 创建事件总线,容量为5
let mut event_bus = Bus::new(5);
// 创建3个消费者线程
for i in 0..3 {
let mut rx = event_bus.add_rx();
thread::spawn(move || {
loop {
match rx.recv() {
Ok(msg) => println!("消费者{}收到: {}", i, msg),
Err(_) => break, // 总线关闭时退出
}
}
});
}
// 生产者线程
thread::spawn(move || {
for i in 1..=5 {
event_bus.broadcast(format!("消息{}", i));
thread::sleep(Duration::from_millis(500));
}
// 总线会在event_bus离开作用域时自动关闭
});
// 等待所有消息处理完成
thread::sleep(Duration::from_secs(3));
}
高级用法
use bus::Bus;
fn main() {
let mut bus = Bus::new(3);
// 添加接收者
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
// 发送不同类型的数据
bus.broadcast(42);
bus.broadcast("hello");
bus.broadcast(vec![1, 2, 3]);
// 使用try_recv非阻塞接收
assert_eq!(rx1.try_recv(), Ok(42));
assert_eq!(rx1.try_recv(), Ok("hello"));
assert_eq!(rx1.try_recv(), Ok(vec![1, 2, 3]));
// 同样适用于rx2
assert_eq!(rx2.try_recv(), Ok(42));
assert_eq!(rx2.try_recv(), Ok("hello"));
assert_eq!(rx2.try_recv(), Ok(vec![1, 2, 3]));
// 总线已满时发送会阻塞
// 可以使用try_broadcast进行非阻塞发送
assert!(bus.try_broadcast(99).is_ok());
assert!(bus.try_broadcast(100).is_ok());
assert!(bus.try_broadcast(101).is_err()); // 总线已满
}
注意事项
- 这是一个单生产者、多消费者的广播通道
- 所有消息都会被所有消费者接收
- 通道有固定容量,满时会阻塞发送
- 当前实现可能存在忙等待问题
许可证
可选择 MIT 或 Apache-2.0 许可证
完整示例demo
下面是一个更完整的示例,展示如何使用bus库实现一个简单的事件驱动系统:
use bus::Bus;
use std::thread;
use std::time::Duration;
// 定义事件枚举
#[derive(Debug, Clone)]
enum Event {
Start,
Data(String),
Stop,
}
fn main() {
// 创建事件总线,容量为10
let mut bus = Bus::new(10);
// 创建3个消费者线程
for i in 0..3 {
let mut rx = bus.add_rx();
thread::spawn(move || {
loop {
match rx.recv() {
Ok(event) => {
match event {
Event::Start => println!("消费者{}: 系统启动", i),
Event::Data(msg) => println!("消费者{}: 处理数据 - {}", i, msg),
Event::Stop => {
println!("消费者{}: 系统停止", i);
break;
}
}
},
Err(_) => break, // 总线关闭时退出
}
}
});
}
// 生产者线程
thread::spawn(move || {
// 发送启动事件
bus.broadcast(Event::Start);
// 发送一些数据事件
for i in 1..=5 {
bus.broadcast(Event::Data(format!("消息{}", i)));
thread::sleep(Duration::from_millis(300));
}
// 发送停止事件
bus.broadcast(Event::Stop);
// 总线会在bus离开作用域时自动关闭
});
// 等待所有消息处理完成
thread::sleep(Duration::from_secs(3));
}
这个完整示例展示了:
- 定义自定义事件类型
- 多个消费者线程处理不同类型的事件
- 生产者线程发送不同类型的事件
- 优雅地处理系统启动和停止
- 使用枚举来区分不同的事件类型
1 回复
Rust消息总线库bus
的使用指南
bus
是Rust中一个轻量级、高效的消息总线实现,专门为跨线程通信和事件驱动架构设计。它提供了简单的API来实现发布-订阅模式,非常适合需要解耦组件通信的场景。
核心特性
- 线程安全:支持多生产者和多消费者模式
- 轻量级:无额外依赖,性能高效
- 广播机制:消息会被所有当前活动的接收者获取
- 同步/异步支持:可根据需要选择阻塞或非阻塞操作
基本用法
首先在Cargo.toml
中添加依赖:
[dependencies]
bus = "0.2"
创建消息总线
use bus::Bus;
fn main() {
// 创建一个新的总线,指定消息类型
let mut bus = Bus::new(10); // 10是内部缓冲区的容量
// 添加接收者
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
// 发送消息
bus.broadcast("Hello from the bus!".to_string());
// 接收消息
println!("Rx1: {}", rx1.recv().unwrap());
println!("Rx2: {}", rx2.recv().unwrap());
}
多线程示例
use bus::Bus;
use std::thread;
fn main() {
let mut bus = Bus::new(100);
// 创建工作线程接收者
let mut rx = bus.add_rx();
thread::spawn(move || {
while let Ok(msg) = rx.recv() {
println!("Worker received: {}", msg);
}
});
// 主线程发送消息
for i in 0..5 {
bus.broadcast(format!("Message {}", i));
}
// 等待足够时间让工作线程处理
thread::sleep(std::time::Duration::from_millis(100));
}
异步支持
bus
也可以与异步运行时配合使用:
use bus::Bus;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let mut bus = Bus::new(10);
let mut rx = bus.add_rx();
tokio::spawn(async move {
while let Ok(msg) = rx.recv() {
println!("Async received: {}", msg);
}
});
bus.broadcast("Async message".to_string());
// 等待异步任务完成
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
高级用法
动态添加接收者
let mut bus = Bus::new(5);
// 先发送一些消息
bus.broadcast(1);
bus.broadcast(2);
// 然后添加接收者 - 它不会收到之前发送的消息
let mut late_rx = bus.add_rx();
bus.broadcast(3);
assert_eq!(late_rx.recv(), Ok(3)); // 只能收到之后的消息
处理错误
当所有发送端都断开时,接收操作会返回错误:
let mut bus = Bus::new(1);
let mut rx = bus.add_rx();
drop(bus); // 丢弃总线
match rx.recv() {
Ok(_) => println!("Got message"),
Err(e) => println!("Error: {}", e), // 会打印错误
}
性能考虑
bus
使用内部缓冲,大小在创建时指定- 广播操作是O(n)复杂度,n是当前活跃接收者数量
- 对于高频消息场景,适当增大缓冲区可以提高性能
适用场景
- 事件通知系统
- 跨线程通信
- 解耦组件间的直接依赖
- 需要广播消息的应用程序
bus
库提供了一种简单有效的方式来实现这些模式,而无需引入复杂的框架或重量级的消息队列系统。
完整示例代码
下面是一个结合了多线程和错误处理的完整示例:
use bus::Bus;
use std::thread;
use std::time::Duration;
fn main() {
// 创建消息总线,缓冲区容量为5
let mut bus = Bus::new(5);
// 创建第一个接收者
let mut rx1 = bus.add_rx();
let handle1 = thread::spawn(move || {
println!("线程1启动");
while let Ok(msg) = rx1.recv() {
println!("线程1收到: {}", msg);
}
println!("线程1结束: 总线已关闭");
});
// 创建第二个接收者
let mut rx2 = bus.add_rx();
let handle2 = thread::spawn(move || {
println!("线程2启动");
for i in 1..=3 {
match rx2.recv() {
Ok(msg) => println!("线程2收到{}: {}", i, msg),
Err(e) => println!("线程2错误: {}", e),
}
}
println!("线程2结束");
});
// 主线程发送消息
for i in 0..3 {
bus.broadcast(format!("消息{}", i));
thread::sleep(Duration::from_millis(50));
}
// 关闭总线
drop(bus);
// 等待线程结束
handle1.join().unwrap();
handle2.join().unwrap();
println!("主线程结束");
}
这个完整示例展示了:
- 创建消息总线
- 多个线程同时接收消息
- 主线程广播消息
- 总线关闭后的错误处理
- 线程同步等待
输出示例可能如下:
线程1启动
线程2启动
线程1收到: 消息0
线程2收到1: 消息0
线程1收到: 消息1
线程2收到2: 消息1
线程1收到: 消息2
线程2收到3: 消息2
线程1结束: 总线已关闭
线程2结束
主线程结束