Rust跨进程通信库deno_broadcast_channel的使用:实现高效多线程与进程间消息广播
Rust跨进程通信库deno_broadcast_channel的使用:实现高效多线程与进程间消息广播
deno_broadcast_channel是Deno实现的BroadcastChannel功能的Rust crate。它实现了HTML标准中定义的BroadcastChannel API,用于实现高效的多线程和进程间消息广播。
安装
在Cargo.toml中添加依赖:
deno_broadcast_channel = "0.206.0"
或运行命令:
cargo add deno_broadcast_channel
完整示例
以下是一个使用deno_broadcast_channel实现跨进程通信的完整示例:
use deno_bbroadcast_channel::BroadcastChannel;
use std::thread;
use std::time::Duration;
fn main() {
// 创建广播频道
let channel_name = "example_channel";
// 主进程/线程发送消息
let sender = BroadcastChannel::new(channel_name).unwrap();
// 创建接收线程1
let receiver1 = BroadcastChannel::new(channel_name).unwrap();
thread::spawn(move || {
receiver1.on_message(|msg| {
println!("接收线程1收到消息: {:?}", msg);
});
// 保持接收线程运行
loop {
thread::sleep(Duration::from_secs(1));
}
});
// 创建接收线程2
let receiver2 = BroadcastChannel::new(channel_name).unwrap();
thread::spawn(move || {
receiver2.on_message(|msg| {
println!("接收线程2收到消息: {:?}", msg);
});
// 保持接收线程运行
loop {
thread::sleep(Duration::from_secs(1));
}
});
// 主线程发送消息
for i in 1..=5 {
let message = format!("消息 {}", i);
println!("主线程发送: {}", message);
sender.post_message(message).unwrap();
thread::sleep(Duration::from_secs(2));
}
// 关闭频道
sender.close();
}
跨进程通信完整示例
下面是一个完整的跨进程通信示例,包含发送进程和接收进程:
发送进程代码 (sender.rs):
use deno_broadcast_channel::BroadcastChannel;
use std::thread;
use std::time::Duration;
fn main() {
// 创建广播频道
let channel_name = "cross_process_channel";
let sender = BroadcastChannel::new(channel_name).unwrap();
println!("发送进程启动...");
// 发送5条消息
for i in 1..=5 {
let msg = format!("跨进程消息 {}", i);
println!("发送: {}", msg);
sender.post_message(msg).unwrap();
thread::sleep(Duration::from_secs(2));
}
sender.close();
println!("发送进程结束");
}
接收进程代码 (receiver.rs):
use deno_broadcast_channel::BroadcastChannel;
use std::thread;
use std::time::Duration;
fn main() {
// 创建相同名称的广播频道
let channel_name = "cross_process_channel";
let receiver = BroadcastChannel::new(channel_name).unwrap();
println!("接收进程启动...");
// 注册消息处理回调
receiver.on_message(|msg| {
println!("接收到消息: {:?}", msg);
});
// 保持进程运行
loop {
thread::sleep(Duration::from_secs(1));
}
}
代码说明
- 两个进程使用相同的频道名称(
cross_process_channel
)创建BroadcastChannel实例 - 发送进程使用
post_message
方法发送消息 - 接收进程使用
on_message
方法注册消息处理回调 - 消息会自动在进程间传输
- 使用
close
方法关闭频道
运行方式
- 先启动接收进程:
cargo run --bin receiver
- 在另一个终端启动发送进程:
cargo run --bin sender
注意事项
- 确保两个进程在同一台机器上运行
- 消息会自动序列化和反序列化
- 适用于需要高效广播消息的跨进程场景
- 遵循HTML标准中BroadcastChannel的规范
1 回复
以下是基于提供的内容整理的完整示例demo:
基本使用示例
use deno_broadcast_channel::BroadcastChannel;
use std::thread;
fn main() {
// 创建广播通道
let channel = BroadcastChannel::new("my_channel");
// 创建发送者线程
let sender = channel.sender();
thread::spawn(move || {
for i in 0..5 {
sender.send(format!("Message {}", i)).unwrap();
thread::sleep(std::time::Duration::from_secs(1));
}
});
// 在主线程接收消息
let receiver = channel.receiver();
for _ in 0..5 {
let msg = receiver.recv().unwrap();
println!("Received: {}", msg);
}
}
完整跨进程通信示例
进程1(发送者)完整代码
// 发送者进程
use deno_broadcast_channel::BroadcastChannel;
use std::thread;
fn main() {
// 创建IPC通道
let channel = BroadcastChannel::new("ipc_demo");
let sender = channel.sender();
// 发送5条消息
for i in 1..=5 {
let msg = format!("Data {} from Process 1", i);
sender.send(msg).unwrap();
println!("[Sender] Sent: {}", i);
thread::sleep(std::time::Duration::from_millis(500));
}
// 发送结束信号
sender.send("EOF").unwrap();
}
进程2(接收者)完整代码
// 接收者进程
use deno_broadcast_channel::BroadcastChannel;
fn main() {
// 连接相同的IPC通道
let channel = BroadcastChannel::new("ipc_demo");
let receiver = channel.receiver();
println!("[Receiver] Waiting for messages...");
// 持续接收消息
loop {
match receiver.recv() {
Ok(msg) => {
if msg == "EOF" {
println!("[Receiver] End of transmission");
break;
}
println!("[Receiver] Got: {}", msg);
}
Err(e) => {
eprintln!("Error receiving message: {}", e);
break;
}
}
}
}
完整高级使用示例(自定义消息类型+多线程)
use deno_broadcast_channel::BroadcastChannel;
use serde::{Serialize, Deserialize};
use std::thread;
use std::time::Duration;
#[derive(Serialize, Deserialize, Debug)]
struct SensorData {
sensor_id: String,
value: f64,
timestamp: u64,
}
fn main() {
// 创建通道
let channel = BroadcastChannel::new("sensor_data_channel");
// 创建3个消费者线程
for i in 0..3 {
let receiver = channel.receiver();
thread::spawn(move || {
loop {
let data: SensorData = receiver.recv().unwrap();
println!("Consumer {} - {:?}", i, data);
}
});
}
// 生产者线程
let sender = channel.sender();
thread::spawn(move || {
let mut counter = 0;
loop {
let data = SensorData {
sensor_id: "temp_sensor".to_string(),
value: 20.0 + (counter as f64 * 0.1),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
sender.send(data).unwrap();
counter += 1;
thread::sleep(Duration::from_secs(1));
}
});
// 主线程等待
thread::sleep(Duration::from_secs(10));
println!("Demo completed");
}
这些示例完整展示了deno_broadcast_channel
的主要功能:
- 基本单进程多线程通信
- 跨进程通信模式
- 使用自定义消息类型
- 多消费者广播模式
注意使用时需要确保:
- 所有通信进程使用相同的通道名称
- 自定义类型需要实现Serialize和Deserialize
- 跨进程通信需要适当的系统权限