Rust异步I/O库mio-named-pipes的使用:高性能跨平台命名管道通信实现
Rust异步I/O库mio-named-pipes的使用:高性能跨平台命名管道通信实现
mio-named-pipes是一个将Windows命名管道与mio异步I/O库集成的库。
安装
在Cargo.toml中添加以下依赖:
[dependencies]
mio-named-pipes = "0.1"
mio = "0.6"
使用说明
主要类型NamedPipe
可以通过NamedPipe::new
或IntoRawHandle
类型构造。所有NamedPipe
上的操作都是非阻塞的,如果操作会阻塞则会返回I/O错误(错误会表明这一点)。
通常你可以像在Windows上使用mio的TCP套接字一样使用NamedPipe
。
注意:在Windows上,命名管道在与mio接口(准备就绪,而非完成)一起使用时没有零成本抽象。因此,这个库内部有一些尚未优化的缓冲区管理。建议为你的应用程序基准测试这个库,如果发现任何问题,请随时联系我。
完整示例代码
use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_named_pipes::NamedPipe;
use std::io::{self, Read, Write};
use std::time::Duration;
const PIPE_NAME: &str = r"\\.\pipe\mio_named_pipe_example";
const SERVER: Token = Token(0);
const CLIENT: Token = Token(1);
fn main() -> io::Result<()> {
// 创建poll实例
let poll = Poll::new()?;
// 创建命名管道服务器
let mut server = NamedPipe::new(PIPE_NAME)?;
// 注册服务器到poll
poll.register(&server, SERVER, Ready::readable(), PollOpt::edge())?;
// 创建命名管道客户端
let mut client = NamedPipe::new(PIPE_NAME)?;
// 注册客户端到poll
poll.register(&client, CLIENT, Ready::writable(), PollOpt::edge())?;
// 创建事件存储
let mut events = Events::with_capacity(128);
// 事件循环
loop {
poll.poll(&mut events, Some(Duration::from_millis(100)))?;
for event in events.iter() {
match event.token() {
SERVER => {
// 服务器端可读
let mut buf = [0; 1024];
match server.read(&mut buf) {
Ok(n) => {
println!("Server received: {}", String::from_utf8_lossy(&buf[..n]));
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => return Err(e),
}
}
CLIENT => {
// 客户端可写
let msg = b"Hello from client!";
match client.write(msg) {
Ok(n) => {
println!("Client sent {} bytes", n);
// 发送消息后退出
return Ok(());
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => return Err(e),
}
}
_ => unreachable!(),
}
}
}
}
许可证
该项目采用以下任一许可证:
- Apache License, Version 2.0
- MIT license
贡献
除非你明确声明,否则任何有意提交到mio-named-pipes的贡献,如Apache-2.0许可证中所定义,均应按照上述双重许可,不附加任何额外条款或条件。
Rust异步I/O库mio-named-pipes的使用:高性能跨平台命名管道通信实现
介绍
mio-named-pipes是一个基于mio的Rust库,提供了跨平台的命名管道(Windows)和Unix域套接字(Unix-like系统)的异步I/O支持。它允许在不同进程间进行高性能的进程间通信(IPC)。
主要特点:
- 跨平台支持(Windows命名管道和Unix域套接字)
- 基于mio的事件驱动架构
- 非阻塞I/O操作
- 与Tokio等异步运行时兼容
使用方法
添加依赖
首先在Cargo.toml中添加依赖:
[dependencies]
mio-named-pipes = "0.2"
tokio = { version = "1", features = ["full"] }
基本示例
服务器端
use mio_named_pipes::NamedPipe;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 创建命名管道(Windows)或Unix域套接字(Unix)
let pipe_name = if cfg!(windows) {
r"\\.\pipe\my_pipe"
} else {
"/tmp/my_pipe.sock"
};
// 创建并监听管道
let mut pipe = NamedPipe::new(pipe_name)?;
pipe.listen()?;
println!("Server listening on {}", pipe_name);
// 接受客户端连接
let mut client = pipe.accept().await?;
println!("Client connected");
// 读取客户端数据
let mut buf = [0u8; 1024];
let n = client.read(&mut buf).await?;
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
// 发送响应
client.write_all(b"Hello from server!").await?;
Ok(())
}
客户端
use mio_named_pipes::NamedPipe;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
let pipe_name = if cfg!(windows) {
r"\\.\pipe\my_pipe"
} else {
"/tmp/my_pipe.sock"
};
// 客户端连接
let mut pipe = NamedPipe::new(pipe_name)?;
pipe.connect().await?;
println!("Connected to server");
// 发送数据
pipe.write_all(b"Hello from client!").await?;
// 读取响应
let mut buf = [0u8; 1024];
let n = pipe.read(&mut buf).await?;
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
Ok(())
}
高级用法
与Tokio运行时集成
use mio_named_pipes::NamedPipe;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn handle_client(mut pipe: NamedPipe) -> std::io::Result<()> {
let mut buf = [0u8; 1024];
loop {
let n = pipe.read(&mut buf).await?;
if n == 0 {
break; // 连接关闭
}
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
pipe.write_all(&buf[..n]).await?;
}
Ok(())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let pipe_name = if cfg!(windows) {
r"\\.\pipe\echo_pipe"
} else {
"/tmp/echo_pipe.sock"
};
let pipe = NamedPipe::new(pipe_name)?;
pipe.listen()?;
println!("Echo server listening on {}", pipe_name);
loop {
let client = pipe.accept().await?;
tokio::spawn(async move {
if let Err(e) = handle_client(client).await {
eprintln!("Error handling client: {}", e);
}
});
}
}
配置管道选项
use mio_named_pipes::NamedPipeBuilder;
let pipe = if cfg!(windows) {
NamedPipeBuilder::new(r"\\.\pipe\config_pipe")
.inbound(true) // 允许入站连接
.outbound(true) // 允许出站连接
.first(false) // 不立即创建管道实例
.build()?
} else {
NamedPipeBuilder::new("/tmp/config_pipe.sock")
.mode(0o660) // 设置Unix域套接字权限
.build()?
};
完整示例
下面是一个完整的客户端-服务器通信示例,包含错误处理和资源清理:
服务器端完整示例
use mio_named_pipes::NamedPipe;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::signal;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
// 设置管道名称
let pipe_name = if cfg!(windows) {
r"\\.\pipe\demo_pipe"
} else {
"/tmp/demo_pipe.sock"
};
// 清理可能存在的旧套接字文件(Unix系统)
if !cfg!(windows) {
let _ = std::fs::remove_file(pipe_name);
}
// 创建并监听管道
let pipe = NamedPipe::new(pipe_name)?;
pipe.listen()?;
println!("[Server] Listening on {}", pipe_name);
// 设置Ctrl+C信号处理
let ctrl_c = signal::ctrl_c();
tokio::pin!(ctrl_c);
loop {
tokio::select! {
// 接受客户端连接
client = pipe.accept() => {
match client {
Ok(mut client) => {
println!("[Server] Client connected");
// 处理客户端通信
let mut buf = [0u8; 1024];
match client.read(&mut buf).await {
Ok(n) if n > 0 => {
println!("[Server] Received: {}", String::from_utf8_lossy(&buf[..n]));
client.write_all(&buf[..n]).await?;
}
Ok(_) => println!("[Server] Client disconnected"),
Err(e) => eprintln!("[Server] Read error: {}", e),
}
}
Err(e) => eprintln!("[Server] Accept error: {}", e),
}
}
// 处理Ctrl+C信号
_ = ctrl_c.as_mut() => {
println!("[Server] Shutting down...");
break;
}
}
}
Ok(())
}
客户端完整示例
use mio_named_pipes::NamedPipe;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::io;
use std::time::Duration;
use tokio::time;
#[tokio::main]
async fn main() -> io::Result<()> {
// 设置管道名称
let pipe_name = if cfg!(windows) {
r"\\.\pipe\demo_pipe"
} else {
"/tmp/demo_pipe.sock"
};
// 添加连接重试逻辑
let mut retries = 5;
let mut pipe = loop {
match NamedPipe::new(pipe_name)?.connect().await {
Ok(pipe) => break pipe,
Err(e) if retries > 0 => {
eprintln!("[Client] Connection failed ({} retries left), retrying...: {}", retries, e);
retries -= 1;
time::sleep(Duration::from_secs(1)).await;
}
Err(e) => return Err(e),
}
};
println!("[Client] Connected to server");
// 发送消息并等待响应
let message = b"Hello from client!";
pipe.write_all(message).await?;
println!("[Client] Sent: {}", String::from_utf8_lossy(message));
let mut buf = [0u8; 1024];
match pipe.read(&mut buf).await {
Ok(n) => println!("[Client] Received: {}", String::from_utf8_lossy(&buf[..n])),
Err(e) => eprintln!("[Client] Read error: {}", e),
}
Ok(())
}
注意事项
-
Windows和Unix-like系统的命名管道实现有差异:
- Windows使用
\\.\pipe\pipename
格式 - Unix-like系统使用文件系统路径
- Windows使用
-
在Unix系统上,如果套接字文件已存在,需要先删除它才能创建新的
-
对于生产环境,需要考虑错误处理和连接重试逻辑
-
性能调优可能需要根据具体使用场景调整缓冲区大小
mio-named-pipes为Rust提供了简单高效的跨平台进程间通信解决方案,特别适合需要高性能IPC的场景。