Rust跨进程通信库deno_broadcast_channel的使用:实现高效多线程与进程间消息广播

Rust跨进程通信库deno_broadcast_channel的使用:实现高效多线程与进程间消息广播

deno_broadcast_channel是Deno实现的BroadcastChannel功能的Rust crate。它实现了HTML标准中定义的BroadcastChannel API,用于实现高效的多线程和进程间消息广播。

安装

在Cargo.toml中添加依赖:

deno_broadcast_channel = "0.206.0"

或运行命令:

cargo add deno_broadcast_channel

完整示例

以下是一个使用deno_broadcast_channel实现跨进程通信的完整示例:

use deno_bbroadcast_channel::BroadcastChannel;
use std::thread;
use std::time::Duration;

fn main() {
    // 创建广播频道
    let channel_name = "example_channel";
    
    // 主进程/线程发送消息
    let sender = BroadcastChannel::new(channel_name).unwrap();
    
    // 创建接收线程1
    let receiver1 = BroadcastChannel::new(channel_name).unwrap();
    thread::spawn(move || {
        receiver1.on_message(|msg| {
            println!("接收线程1收到消息: {:?}", msg);
        });
        
        // 保持接收线程运行
        loop {
            thread::sleep(Duration::from_secs(1));
        }
    });

    // 创建接收线程2
    let receiver2 = BroadcastChannel::new(channel_name).unwrap();
    thread::spawn(move || {
        receiver2.on_message(|msg| {
            println!("接收线程2收到消息: {:?}", msg);
        });
        
        // 保持接收线程运行
        loop {
            thread::sleep(Duration::from_secs(1));
        }
    });

    // 主线程发送消息
    for i in 1..=5 {
        let message = format!("消息 {}", i);
        println!("主线程发送: {}", message);
        sender.post_message(message).unwrap();
        thread::sleep(Duration::from_secs(2));
    }

    // 关闭频道
    sender.close();
}

跨进程通信完整示例

下面是一个完整的跨进程通信示例,包含发送进程和接收进程:

发送进程代码 (sender.rs):

use deno_broadcast_channel::BroadcastChannel;
use std::thread;
use std::time::Duration;

fn main() {
    // 创建广播频道
    let channel_name = "cross_process_channel";
    let sender = BroadcastChannel::new(channel_name).unwrap();
    
    println!("发送进程启动...");
    
    // 发送5条消息
    for i in 1..=5 {
        let msg = format!("跨进程消息 {}", i);
        println!("发送: {}", msg);
        sender.post_message(msg).unwrap();
        thread::sleep(Duration::from_secs(2));
    }
    
    sender.close();
    println!("发送进程结束");
}

接收进程代码 (receiver.rs):

use deno_broadcast_channel::BroadcastChannel;
use std::thread;
use std::time::Duration;

fn main() {
    // 创建相同名称的广播频道
    let channel_name = "cross_process_channel";
    let receiver = BroadcastChannel::new(channel_name).unwrap();
    
    println!("接收进程启动...");
    
    // 注册消息处理回调
    receiver.on_message(|msg| {
        println!("接收到消息: {:?}", msg);
    });
    
    // 保持进程运行
    loop {
        thread::sleep(Duration::from_secs(1));
    }
}

代码说明

  1. 两个进程使用相同的频道名称(cross_process_channel)创建BroadcastChannel实例
  2. 发送进程使用post_message方法发送消息
  3. 接收进程使用on_message方法注册消息处理回调
  4. 消息会自动在进程间传输
  5. 使用close方法关闭频道

运行方式

  1. 先启动接收进程:
cargo run --bin receiver
  1. 在另一个终端启动发送进程:
cargo run --bin sender

注意事项

  • 确保两个进程在同一台机器上运行
  • 消息会自动序列化和反序列化
  • 适用于需要高效广播消息的跨进程场景
  • 遵循HTML标准中BroadcastChannel的规范

1 回复

以下是基于提供的内容整理的完整示例demo:

基本使用示例

use deno_broadcast_channel::BroadcastChannel;
use std::thread;

fn main() {
    // 创建广播通道
    let channel = BroadcastChannel::new("my_channel");
    
    // 创建发送者线程
    let sender = channel.sender();
    thread::spawn(move || {
        for i in 0..5 {
            sender.send(format!("Message {}", i)).unwrap();
            thread::sleep(std::time::Duration::from_secs(1));
        }
    });
    
    // 在主线程接收消息
    let receiver = channel.receiver();
    for _ in 0..5 {
        let msg = receiver.recv().unwrap();
        println!("Received: {}", msg);
    }
}

完整跨进程通信示例

进程1(发送者)完整代码

// 发送者进程
use deno_broadcast_channel::BroadcastChannel;
use std::thread;

fn main() {
    // 创建IPC通道
    let channel = BroadcastChannel::new("ipc_demo");
    let sender = channel.sender();
    
    // 发送5条消息
    for i in 1..=5 {
        let msg = format!("Data {} from Process 1", i);
        sender.send(msg).unwrap();
        println!("[Sender] Sent: {}", i);
        thread::sleep(std::time::Duration::from_millis(500));
    }
    
    // 发送结束信号
    sender.send("EOF").unwrap();
}

进程2(接收者)完整代码

// 接收者进程
use deno_broadcast_channel::BroadcastChannel;

fn main() {
    // 连接相同的IPC通道
    let channel = BroadcastChannel::new("ipc_demo");
    let receiver = channel.receiver();
    
    println!("[Receiver] Waiting for messages...");
    
    // 持续接收消息
    loop {
        match receiver.recv() {
            Ok(msg) => {
                if msg == "EOF" {
                    println!("[Receiver] End of transmission");
                    break;
                }
                println!("[Receiver] Got: {}", msg);
            }
            Err(e) => {
                eprintln!("Error receiving message: {}", e);
                break;
            }
        }
    }
}

完整高级使用示例(自定义消息类型+多线程)

use deno_broadcast_channel::BroadcastChannel;
use serde::{Serialize, Deserialize};
use std::thread;
use std::time::Duration;

#[derive(Serialize, Deserialize, Debug)]
struct SensorData {
    sensor_id: String,
    value: f64,
    timestamp: u64,
}

fn main() {
    // 创建通道
    let channel = BroadcastChannel::new("sensor_data_channel");
    
    // 创建3个消费者线程
    for i in 0..3 {
        let receiver = channel.receiver();
        thread::spawn(move || {
            loop {
                let data: SensorData = receiver.recv().unwrap();
                println!("Consumer {} - {:?}", i, data);
            }
        });
    }
    
    // 生产者线程
    let sender = channel.sender();
    thread::spawn(move || {
        let mut counter = 0;
        loop {
            let data = SensorData {
                sensor_id: "temp_sensor".to_string(),
                value: 20.0 + (counter as f64 * 0.1),
                timestamp: std::time::SystemTime::now()
                    .duration_since(std::time::UNIX_EPOCH)
                    .unwrap()
                    .as_secs(),
            };
            
            sender.send(data).unwrap();
            counter += 1;
            thread::sleep(Duration::from_secs(1));
        }
    });
    
    // 主线程等待
    thread::sleep(Duration::from_secs(10));
    println!("Demo completed");
}

这些示例完整展示了deno_broadcast_channel的主要功能:

  1. 基本单进程多线程通信
  2. 跨进程通信模式
  3. 使用自定义消息类型
  4. 多消费者广播模式

注意使用时需要确保:

  1. 所有通信进程使用相同的通道名称
  2. 自定义类型需要实现Serialize和Deserialize
  3. 跨进程通信需要适当的系统权限
回到顶部