Rust异步I/O库mio-named-pipes的使用:高性能跨平台命名管道通信实现

Rust异步I/O库mio-named-pipes的使用:高性能跨平台命名管道通信实现

mio-named-pipes是一个将Windows命名管道与mio异步I/O库集成的库。

安装

在Cargo.toml中添加以下依赖:

[dependencies]
mio-named-pipes = "0.1"
mio = "0.6"

使用说明

主要类型NamedPipe可以通过NamedPipe::newIntoRawHandle类型构造。所有NamedPipe上的操作都是非阻塞的,如果操作会阻塞则会返回I/O错误(错误会表明这一点)。

通常你可以像在Windows上使用mio的TCP套接字一样使用NamedPipe

注意:在Windows上,命名管道在与mio接口(准备就绪,而非完成)一起使用时没有零成本抽象。因此,这个库内部有一些尚未优化的缓冲区管理。建议为你的应用程序基准测试这个库,如果发现任何问题,请随时联系我。

完整示例代码

use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_named_pipes::NamedPipe;
use std::io::{self, Read, Write};
use std::time::Duration;

const PIPE_NAME: &str = r"\\.\pipe\mio_named_pipe_example";
const SERVER: Token = Token(0);
const CLIENT: Token = Token(1);

fn main() -> io::Result<()> {
    // 创建poll实例
    let poll = Poll::new()?;
    
    // 创建命名管道服务器
    let mut server = NamedPipe::new(PIPE_NAME)?;
    
    // 注册服务器到poll
    poll.register(&server, SERVER, Ready::readable(), PollOpt::edge())?;
    
    // 创建命名管道客户端
    let mut client = NamedPipe::new(PIPE_NAME)?;
    
    // 注册客户端到poll
    poll.register(&client, CLIENT, Ready::writable(), PollOpt::edge())?;
    
    // 创建事件存储
    let mut events = Events::with_capacity(128);
    
    // 事件循环
    loop {
        poll.poll(&mut events, Some(Duration::from_millis(100)))?;
        
        for event in events.iter() {
            match event.token() {
                SERVER => {
                    // 服务器端可读
                    let mut buf = [0; 1024];
                    match server.read(&mut buf) {
                        Ok(n) => {
                            println!("Server received: {}", String::from_utf8_lossy(&buf[..n]));
                        }
                        Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                        Err(e) => return Err(e),
                    }
                }
                CLIENT => {
                    // 客户端可写
                    let msg = b"Hello from client!";
                    match client.write(msg) {
                        Ok(n) => {
                            println!("Client sent {} bytes", n);
                            // 发送消息后退出
                            return Ok(());
                        }
                        Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
                        Err(e) => return Err(e),
                    }
                }
                _ => unreachable!(),
            }
        }
    }
}

许可证

该项目采用以下任一许可证:

  • Apache License, Version 2.0
  • MIT license

贡献

除非你明确声明,否则任何有意提交到mio-named-pipes的贡献,如Apache-2.0许可证中所定义,均应按照上述双重许可,不附加任何额外条款或条件。


1 回复

Rust异步I/O库mio-named-pipes的使用:高性能跨平台命名管道通信实现

介绍

mio-named-pipes是一个基于mio的Rust库,提供了跨平台的命名管道(Windows)和Unix域套接字(Unix-like系统)的异步I/O支持。它允许在不同进程间进行高性能的进程间通信(IPC)。

主要特点:

  • 跨平台支持(Windows命名管道和Unix域套接字)
  • 基于mio的事件驱动架构
  • 非阻塞I/O操作
  • 与Tokio等异步运行时兼容

使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
mio-named-pipes = "0.2"
tokio = { version = "1", features = ["full"] }

基本示例

服务器端

use mio_named_pipes::NamedPipe;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建命名管道(Windows)或Unix域套接字(Unix)
    let pipe_name = if cfg!(windows) {
        r"\\.\pipe\my_pipe"
    } else {
        "/tmp/my_pipe.sock"
    };
    
    // 创建并监听管道
    let mut pipe = NamedPipe::new(pipe_name)?;
    pipe.listen()?;
    
    println!("Server listening on {}", pipe_name);
    
    // 接受客户端连接
    let mut client = pipe.accept().await?;
    println!("Client connected");
    
    // 读取客户端数据
    let mut buf = [0u8; 1024];
    let n = client.read(&mut buf).await?;
    println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
    
    // 发送响应
    client.write_all(b"Hello from server!").await?;
    
    Ok(())
}

客户端

use mio_named_pipes::NamedPipe;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let pipe_name = if cfg!(windows) {
        r"\\.\pipe\my_pipe"
    } else {
        "/tmp/my_pipe.sock"
    };
    
    // 客户端连接
    let mut pipe = NamedPipe::new(pipe_name)?;
    pipe.connect().await?;
    println!("Connected to server");
    
    // 发送数据
    pipe.write_all(b"Hello from client!").await?;
    
    // 读取响应
    let mut buf = [0u8; 1024];
    let n = pipe.read(&mut buf).await?;
    println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
    
    Ok(())
}

高级用法

与Tokio运行时集成

use mio_named_pipes::NamedPipe;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

async fn handle_client(mut pipe: NamedPipe) -> std::io::Result<()> {
    let mut buf = [0u8; 1024];
    loop {
        let n = pipe.read(&mut buf).await?;
        if n == 0 {
            break; // 连接关闭
        }
        println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
        pipe.write_all(&buf[..n]).await?;
    }
    Ok(())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let pipe_name = if cfg!(windows) {
        r"\\.\pipe\echo_pipe"
    } else {
        "/tmp/echo_pipe.sock"
    };
    
    let pipe = NamedPipe::new(pipe_name)?;
    pipe.listen()?;
    
    println!("Echo server listening on {}", pipe_name);
    
    loop {
        let client = pipe.accept().await?;
        tokio::spawn(async move {
            if let Err(e) = handle_client(client).await {
                eprintln!("Error handling client: {}", e);
            }
        });
    }
}

配置管道选项

use mio_named_pipes::NamedPipeBuilder;

let pipe = if cfg!(windows) {
    NamedPipeBuilder::new(r"\\.\pipe\config_pipe")
        .inbound(true)  // 允许入站连接
        .outbound(true) // 允许出站连接
        .first(false)   // 不立即创建管道实例
        .build()?
} else {
    NamedPipeBuilder::new("/tmp/config_pipe.sock")
        .mode(0o660)    // 设置Unix域套接字权限
        .build()?
};

完整示例

下面是一个完整的客户端-服务器通信示例,包含错误处理和资源清理:

服务器端完整示例

use mio_named_pipes::NamedPipe;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::signal;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    // 设置管道名称
    let pipe_name = if cfg!(windows) {
        r"\\.\pipe\demo_pipe"
    } else {
        "/tmp/demo_pipe.sock"
    };

    // 清理可能存在的旧套接字文件(Unix系统)
    if !cfg!(windows) {
        let _ = std::fs::remove_file(pipe_name);
    }

    // 创建并监听管道
    let pipe = NamedPipe::new(pipe_name)?;
    pipe.listen()?;
    println!("[Server] Listening on {}", pipe_name);

    // 设置Ctrl+C信号处理
    let ctrl_c = signal::ctrl_c();
    tokio::pin!(ctrl_c);

    loop {
        tokio::select! {
            // 接受客户端连接
            client = pipe.accept() => {
                match client {
                    Ok(mut client) => {
                        println!("[Server] Client connected");
                        
                        // 处理客户端通信
                        let mut buf = [0u8; 1024];
                        match client.read(&mut buf).await {
                            Ok(n) if n > 0 => {
                                println!("[Server] Received: {}", String::from_utf8_lossy(&buf[..n]));
                                client.write_all(&buf[..n]).await?;
                            }
                            Ok(_) => println!("[Server] Client disconnected"),
                            Err(e) => eprintln!("[Server] Read error: {}", e),
                        }
                    }
                    Err(e) => eprintln!("[Server] Accept error: {}", e),
                }
            }
            // 处理Ctrl+C信号
            _ = ctrl_c.as_mut() => {
                println!("[Server] Shutting down...");
                break;
            }
        }
    }

    Ok(())
}

客户端完整示例

use mio_named_pipes::NamedPipe;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::io;
use std::time::Duration;
use tokio::time;

#[tokio::main]
async fn main() -> io::Result<()> {
    // 设置管道名称
    let pipe_name = if cfg!(windows) {
        r"\\.\pipe\demo_pipe"
    } else {
        "/tmp/demo_pipe.sock"
    };

    // 添加连接重试逻辑
    let mut retries = 5;
    let mut pipe = loop {
        match NamedPipe::new(pipe_name)?.connect().await {
            Ok(pipe) => break pipe,
            Err(e) if retries > 0 => {
                eprintln!("[Client] Connection failed ({} retries left), retrying...: {}", retries, e);
                retries -= 1;
                time::sleep(Duration::from_secs(1)).await;
            }
            Err(e) => return Err(e),
        }
    };

    println!("[Client] Connected to server");

    // 发送消息并等待响应
    let message = b"Hello from client!";
    pipe.write_all(message).await?;
    println!("[Client] Sent: {}", String::from_utf8_lossy(message));

    let mut buf = [0u8; 1024];
    match pipe.read(&mut buf).await {
        Ok(n) => println!("[Client] Received: {}", String::from_utf8_lossy(&buf[..n])),
        Err(e) => eprintln!("[Client] Read error: {}", e),
    }

    Ok(())
}

注意事项

  1. Windows和Unix-like系统的命名管道实现有差异:

    • Windows使用\\.\pipe\pipename格式
    • Unix-like系统使用文件系统路径
  2. 在Unix系统上,如果套接字文件已存在,需要先删除它才能创建新的

  3. 对于生产环境,需要考虑错误处理和连接重试逻辑

  4. 性能调优可能需要根据具体使用场景调整缓冲区大小

mio-named-pipes为Rust提供了简单高效的跨平台进程间通信解决方案,特别适合需要高性能IPC的场景。

回到顶部