Rust POSIX消息队列库posixmq的使用:实现高效跨进程通信的IPC消息队列功能

Rust POSIX消息队列库posixmq的使用:实现高效跨进程通信的IPC消息队列功能

posixmq是一个用于使用POSIX消息队列的Rust库。

let mq = posixmq::PosixMq::open("/queue").expect("cannot open /queue");
let mut buf = vec![0; mq.attributes().unwrap_or_default().max_msg_len];
loop {
    let (priority, len) = mq.recv(&mut buf).expect("recv() failed");
    let msg = std::str::from_utf8(&buf[..len]).expect("not UTF-8");
    println!("priority: {:3}, message: {}", priority, msg);
}

支持的操作系统

posixmq已在Linux、FreeBSD、NetBSD、DragonFly BSD和OmniOSce上测试通过,但并非所有功能在所有平台上都可用。详细信息请参阅rustdoc。 macOS、OpenBSD、Android和Windows没有POSIX消息队列,此库在这些平台上将无法编译。

可选的mio集成

在Linux、FreeBSD和DragonFly BSD上,POSIX消息队列可以注册到epoll/kqueue,因此可以与mio一起使用。 通过可选功能mio_06mio_07支持mio的0.6和0.7版本。例如,在Cargo.toml中启用相应功能:

[dependencies]
mio = {version="0.7", features=["os-poll"]}  # 你可能需要os-poll
posixmq = {version="1.0", features=["mio_07"]}

同时记得以非阻塞模式打开消息队列。

完整示例

下面是一个完整的POSIX消息队列使用示例,包含发送和接收两个进程:

示例1:发送消息

use posixmq::OpenOptions;

fn main() {
    // 创建或打开消息队列
    let mq = OpenOptions::readwrite()
        .create()
        .open("/test_queue")
        .expect("Failed to create/open queue");
    
    // 发送消息
    mq.send(0, b"Hello from sender!").expect("Failed to send message");
    println!("Message sent");
}

示例2:接收消息

use posixmq::PosixMq;

fn main() {
    // 打开消息队列
    let mq = PosixMq::open("/test_queue").expect("Failed to open queue");
    let mut buf = vec![0; mq.attributes().unwrap().max_msg_len];
    
    // 接收消息
    let (priority, len) = mq.recv(&mut buf).expect("Failed to receive message");
    let msg = std::str::from_utf8(&buf[..len]).unwrap();
    println!("Received message (priority {}): {}", priority, msg);
    
    // 删除队列
    posixmq::remove_queue("/test_queue").expect("Failed to remove queue");
}

最低支持的Rust版本

1.0.版本的最低Rust版本为1.39.0(如果启用了mio_07功能),否则为1.31.1。 后续的1..0版本可能会提高这个要求。

许可证

可选择以下任一许可证:

  • Apache许可证2.0版
  • MIT许可证

完整示例demo

基于上述内容,这里提供一个更完整的跨进程通信示例,包含错误处理和清理逻辑:

发送进程示例

use posixmq::OpenOptions;
use std::time::Duration;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建或打开消息队列,设置非阻塞模式和最大消息数
    let mq = OpenOptions::readwrite()
        .nonblocking()  // 设置为非阻塞模式
        .capacity(10)   // 最大消息数
        .create()
        .open("/demo_queue")?;
    
    // 发送5条不同优先级的消息
    for i in 0..5 {
        let msg = format!("Message {}", i);
        mq.send(i, msg.as_bytes())?;
        println!("Sent: {} (priority {})", msg, i);
        std::thread::sleep(Duration::from_secs(1));
    }
    
    Ok(())
}

接收进程示例

use posixmq::{PosixMq, OpenOptions};
use std::time::Duration;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 打开消息队列
    let mq = OpenOptions::readonly()
        .nonblocking()  // 设置为非阻塞模式
        .open("/demo_queue")?;
    
    let mut buf = vec![0; mq.attributes()?.max_msg_len];
    
    loop {
        match mq.recv(&mut buf) {
            Ok((priority, len)) => {
                let msg = String::from_utf8_lossy(&buf[..len]);
                println!("Received (priority {}): {}", priority, msg);
                
                // 如果收到特定消息则退出
                if msg.contains("exit") {
                    break;
                }
            }
            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                // 非阻塞模式下没有消息可读时的处理
                println!("No messages available, waiting...");
                std::thread::sleep(Duration::from_millis(500));
            }
            Err(e) => return Err(e.into()),
        }
    }
    
    // 清理队列
    posixmq::remove_queue("/demo_queue")?;
    Ok(())
}

编译配置示例

在Cargo.toml中添加依赖:

[dependencies]
posixmq = "1.0"

如果要使用mio集成:

[dependencies]
posixmq = { version = "1.0", features = ["mio_07"] }
mio = { version = "0.7", features = ["os-poll"] }

1 回复

Rust POSIX消息队列库posixmq使用指南

POSIX消息队列是一种高效的进程间通信(IPC)机制,Rust的posixmq库提供了对它的安全封装。下面介绍如何使用这个库。

基本概念

POSIX消息队列允许不相关的进程通过消息进行通信,特点包括:

  • 消息是有优先级的
  • 消息是持久化的(即使没有进程打开队列也会存在)
  • 支持异步通知机制

安装

在Cargo.toml中添加依赖:

[dependencies]
posixmq = "0.2"
libc = "0.2"  # 用于一些常量

基本用法

1. 创建或打开消息队列

use posixmq::{PosixMq, OpenOptions};
use libc::{S_IRUSR, S_IWUSR};

// 创建或打开一个消息队列
let mq = OpenOptions::readwrite()
    .create()  // 如果不存在则创建
    .mode(S_IRUSR | S_IWUSR)  // 权限设置
    .capacity(10)  // 队列中最大消息数
    .max_msg_len(1024)  // 每条消息最大长度
    .open("/my_queue")?;

2. 发送消息

let msg = b"Hello from process 1";
mq.send(1, msg)?;  // 1是消息优先级(0最低,越大优先级越高)

3. 接收消息

let mut buf = vec![0; mq.attributes()?.max_msg_len as usize];
let (priority, len) = mq.recv(&mut buf)?;
println!("Received: {:?} with priority {}", &buf[..len], priority);

高级功能

异步通知

use posixmq::Notification;

// 设置消息到达通知
mq.notify(Notification::Signal {
    signal: libc::SIGUSR1,
    si_value: 42,
})?;

// 在信号处理程序中处理通知
unsafe {
    libc::signal(libc::SIGUSR1, handle_notification as libc::sighandler_t);
}

extern "C" fn handle_notification(signo: libc::c_int) {
    println!("Received notification for signal {}", signo);
    // 在这里处理消息
}

获取队列属性

let attr = mq.attributes()?;
println!(
    "Queue attributes: max_msgs={}, max_msg_len={}, current_msgs={}",
    attr.capacity,
    attr.max_msg_len,
    attr.current_msgs
);

完整示例

下面是一个完整的生产者-消费者示例:

生产者代码:

use posixmq::{OpenOptions, PosixMq};
use libc::{S_IRUSR, S_IWUSR};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mq = OpenOptions::writeonly()
        .create()
        .mode(S_IRUSR | S_IWUSR)
        .capacity(10)
        .max_msg_len(128)
        .open("/example_queue")?;
    
    for i in 0..5 {
        let msg = format!("Message {}", i).into_bytes();
        mq.send(i as u32, &msg)?;
        println!("Sent: {}", String::from_utf8_lossy(&msg));
    }
    
    Ok(())
}

消费者代码:

use posixmq::{OpenOptions, PosixMq};
use libc::{S_IRUSR, S_IWUSR};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mq = OpenOptions::readonly()
        .open("/example_queue")?;
    
    let mut buf = vec![0; mq.attributes()?.max_msg_len as usize];
    
    loop {
        let (priority, len) = mq.recv(&mut buf)?;
        let msg = String::from_utf8_lossy(&buf[..len]);
        println!("Received: {} (priority: {})", msg, priority);
        
        if msg == "Message 4" {
            break;
        }
    }
    
    Ok(())
}

注意事项

  1. POSIX消息队列有系统限制,可以通过/proc/sys/fs/mqueue/查看和修改
  2. 队列名称必须以斜杠开头,且只包含一个斜杠(如/myqueue)
  3. 使用完毕后,队列不会自动删除,需要显式调用posixmq::remove_queue("/queue_name")

POSIX消息队列适用于需要可靠、有序且带优先级的进程间通信场景,比管道或共享内存提供了更结构化的通信方式。

回到顶部