Rust通信库re_sdk_comms的使用:高效实现跨平台进程间通信与网络数据传输
Rust通信库re_sdk_comms的使用:高效实现跨平台进程间通信与网络数据传输
re_sdk_comms是Rerun系列crate的一部分,主要用于Rerun SDK和Rerun服务器之间的TCP通信。
基本信息
- 最新版本: 0.22.1
- 许可证: MIT OR Apache-2.0
- Rust版本要求: v1.81.0
- 大小: 12.5 KiB
安装
在项目目录中运行以下Cargo命令:
cargo add re_sdk_comms
或者在Cargo.toml中添加:
re_sdk_comms = "0.22.1"
使用示例
以下是一个使用re_sdk_comms进行TCP通信的完整示例:
use re_sdk_comms::{Client, Server};
use std::net::SocketAddr;
use std::thread;
fn main() {
// 服务器地址
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
// 启动服务器线程
let server_thread = thread::spawn(move || {
let server = Server::bind(addr).unwrap();
println!("Server listening on {}", addr);
for stream in server.incoming() {
let mut stream = stream.unwrap();
println!("Server: New connection from {}", stream.peer_addr().unwrap());
let mut buf = [0; 1024];
let n = stream.read(&mut buf).unwrap();
println!("Server received: {}", String::from_utf8_lossy(&buf[..n]));
stream.write_all(b"Hello from server").unwrap();
}
});
// 给服务器一点启动时间
std::thread::sleep(std::time::Duration::from_millis(100));
// 客户端连接
let mut client = Client::connect(addr).unwrap();
println!("Client connected to {}", addr);
// 客户端发送消息
client.write_all(b"Hello from client").unwrap();
// 客户端接收响应
let mut buf = [0; 1024];
let n = client.read(&mut buf).unwrap();
println!("Client received: {}", String::from_utf8_lossy(&buf[..n]));
// 等待服务器线程结束
server_thread.join().unwrap();
}
完整示例代码
use re_sdk_comms::{Client, Server};
use std::net::SocketAddr;
use std::thread;
use std::io::{Read, Write};
fn main() {
// 设置服务器地址
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
// 启动服务器线程
let server_thread = thread::spawn(move || {
// 绑定服务器到指定地址
let server = Server::bind(addr).expect("Failed to bind server");
println!("[Server] Listening on {}", addr);
// 处理客户端连接
for stream in server.incoming() {
let mut stream = stream.expect("Failed to accept connection");
let peer_addr = stream.peer_addr().expect("Failed to get peer address");
println!("[Server] New connection from {}", peer_addr);
// 读取客户端数据
let mut buf = [0; 1024];
let n = stream.read(&mut buf).expect("Failed to read from client");
let received = String::from_utf8_lossy(&buf[..n]);
println!("[Server] Received: {}", received);
// 发送响应给客户端
stream.write_all(b"Hello from server").expect("Failed to write to client");
}
});
// 等待服务器启动
std::thread::sleep(std::time::Duration::from_millis(100));
// 客户端连接服务器
let mut client = Client::connect(addr).expect("Failed to connect to server");
println!("[Client] Connected to {}", addr);
// 客户端发送消息
client.write_all(b"Hello from client").expect("Failed to send message");
println!("[Client] Sent message to server");
// 客户端接收响应
let mut buf = [0; 1024];
let n = client.read(&mut buf).expect("Failed to read response");
let response = String::from_utf8_lossy(&buf[..n]);
println!("[Client] Received response: {}", response);
// 等待服务器线程结束
server_thread.join().expect("Server thread panicked");
}
功能说明
- 跨平台支持: 可在不同操作系统上运行
- 高效通信: 基于TCP协议实现高效数据传输
- 简单API: 提供简洁的客户端/服务器接口
1 回复
Rust通信库re_sdk_comms的使用指南
介绍
re_sdk_comms是一个高效的Rust通信库,专门设计用于简化跨平台进程间通信(IPC)和网络数据传输。它提供了统一的API来处理本地进程间通信和网络通信,支持多种协议和传输方式。
主要特性:
- 跨平台支持(Windows/Linux/macOS)
- 高性能的二进制协议
- 支持TCP/UDP/Unix域套接字
- 异步/同步API
- 内置序列化支持
- 连接池管理
安装
在Cargo.toml中添加依赖:
[dependencies]
re_sdk_comms = "0.3"
基本使用方法
1. 创建TCP服务器
use re_sdk_comms::{TcpServer, TcpHandler, CommsResult};
struct MyHandler;
#[async_trait::async_trait]
impl TcpHandler for MyHandler {
async fn handle(&self, mut stream: TcpStream) -> CommsResult<()> {
let mut buf = [0; 1024];
let n = stream.read(&mut buf).await?;
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
stream.write_all(b"Hello from server!").await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> CommsResult<()> {
let server = TcpServer::bind("127.0.0.1:8080", MyHandler).await?;
server.serve().await
}
2. 创建TCP客户端
use re_sdk_comms::{TcpClient, CommsResult};
#[tokio::main]
async fn main() -> CommsResult<()> {
let mut client = TcpClient::connect("127.0.0.1:8080").await?;
client.write_all(b"Hello from client!").await?;
let mut response = vec![0; 1024];
let n = client.read(&mut response).await?;
println!("Server response: {}", String::from_utf8_lossy(&response[..n]));
Ok(())
}
3. 使用UDP通信
use re_sdk_comms::{UdpSocket, CommsResult};
#[tokio::main]
async fn main() -> CommsResult<()> {
// 服务器
let server = UdpSocket::bind("127.0.0.1:8081").await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
let (n, addr) = server.recv_from(&mut buf).await.unwrap();
println!("Received from {}: {}", addr, String::from_utf8_lossy(&buf[..n]));
server.send_to(b"Hello UDP client!", addr).await.unwrap();
});
// 客户端
let client = UdpSocket::bind("127.0.0.1:0").await?;
client.send_to(b"Hello UDP server!", "127.0.0.1:8081").await?;
let mut buf = [0; 1024];
let (n, _) = client.recv_from(&mut buf).await?;
println!("Server response: {}", String::from_utf8_lossy(&buf[..n]));
Ok(())
}
4. 进程间通信(Unix域套接字)
#[cfg(unix)]
use re_sdk_comms::{UnixListener, UnixStream, CommsResult};
#[cfg(unix)]
#[tokio::main]
async fn main() -> CommsResult<()> {
let path = "/tmp/re_sdk_comms_example.sock";
// 服务器
let _ = std::fs::remove_file(path);
let listener = UnixListener::bind(path)?;
tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut buf = [0; 1024];
let n = stream.read(&mut buf).await.unwrap();
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
stream.write_all(b"Hello from Unix server!").await.unwrap();
});
// 客户端
let mut stream = UnixStream::connect(path).await?;
stream.write_all(b"Hello from Unix client!").await?;
let mut response = vec![0; 1024];
let n = stream.read(&mut response).await?;
println!("Server response: {}", String::from_utf8_lossy(&response[..n]));
Ok(())
}
#[cfg(not(unix))]
fn main() {}
高级特性
1. 自定义协议
use re_sdk_comms::{Protocol, Message, CommsResult};
use bytes::{Bytes, BytesMut};
struct MyProtocol;
impl Protocol for MyProtocol {
type Request = String;
type Response = String;
fn encode(&self, msg: Message<Self::Request>) -> CommsResult<Bytes> {
match msg {
Message::Request(req) => Ok(Bytes::from(format!("REQ:{}", req))),
Message::Response(res) => Ok(Bytes::from(format!("RES:{}", res))),
}
}
fn decode(&self, buf: &mut BytesMut) -> CommsResult<Option<Message<Self::Request>>> {
if buf.len() < 4 {
return Ok(None);
}
if buf.starts_with(b"REQ:") {
let end = buf.iter().position(|&b| b == b'\n').unwrap_or(buf.len());
let req = String::from_utf8(buf[4..end].to_vec())?;
buf.advance(end + 1);
Ok(Some(Message::Request(req)))
} else if buf.starts_with(b"RES:") {
let end = buf.iter().position(|&b| b == b'\n').unwrap_or(buf.len());
let res = String::from_utf8(buf[4..end].to_vec())?;
buf.advance(end + 1);
Ok(Some(Message::Response(res)))
} else {
Err(CommsError::ProtocolError("Invalid message format".into()))
}
}
}
2. 使用连接池
use re_sdk_comms::{TcpConnectionPool, CommsResult};
#[tokio::main]
async fn main() -> CommsResult<()> {
let pool = TcpConnectionPool::new(
"127.0.0.1:8080",
5, // 最大连接数
10, // 空闲连接数
);
// 从池中获取连接
let mut client = pool.get().await?;
client.write_all(b"Hello with connection pool").await?;
// 连接会自动返回到池中当client离开作用域时
Ok(())
}
性能优化建议
- 对于高吞吐量场景,使用
Bytes
而不是Vec<u8>
来减少内存分配 - 考虑使用批处理消息来减少系统调用
- 对于固定大小的消息,预分配缓冲区
- 在高并发场景下使用连接池
错误处理
所有通信方法都返回CommsResult<T>
,可以方便地处理各种通信错误:
match client.write_all(b"data").await {
Ok(_) => println!("Send success"),
Err(e) => match e {
re_sdk_comms::CommsError::IoError(io) => println!("IO error: {}", io),
re_sdk_comms::CommsError::Timeout => println!("Operation timed out"),
_ => println!("Other error: {}", e),
},
}
re_sdk_comms库为Rust开发者提供了强大而灵活的通信能力,无论是本地进程间通信还是网络数据传输,都能以统一的API进行处理,大大简化了开发复杂度。
完整示例demo
下面是一个完整的TCP客户端-服务器通信示例:
// 服务器端代码
use re_sdk_comms::{TcpServer, TcpHandler, CommsResult};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
// 自定义处理器
struct EchoHandler;
#[async_trait::async_trait]
impl TcpHandler for EchoHandler {
async fn handle(&self, mut stream: TcpStream) -> CommsResult<()> {
let mut buf = [0; 1024];
// 读取客户端数据
let n = stream.read(&mut buf).await?;
let received = String::from_utf8_lossy(&buf[..n]);
println!("Server received: {}", received);
// 回显数据给客户端
let response = format!("Echo: {}", received);
stream.write_all(response.as_bytes()).await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> CommsResult<()> {
println!("Starting TCP echo server on 127.0.0.1:8080");
let server = TcpServer::bind("127.0.0.1:8080", EchoHandler).await?;
server.serve().await
}
// 客户端端代码
use re_sdk_comms::{TcpClient, CommsResult};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> CommsResult<()> {
// 连接服务器
println!("Connecting to server at 127.0.0.1:8080");
let mut client = TcpClient::connect("127.0.0.1:8080").await?;
// 发送消息
let message = "Hello, TCP Server!";
println!("Sending: {}", message);
client.write_all(message.as_bytes()).await?;
// 接收响应
let mut response = vec![0; 1024];
let n = client.read(&mut response).await?;
println!("Received response: {}", String::from_utf8_lossy(&response[..n]));
Ok(())
}
这个完整示例展示了:
- 创建一个TCP服务器,使用自定义的EchoHandler处理客户端连接
- 服务器接收客户端消息并回显
- 客户端连接服务器并发送消息
- 客户端接收并打印服务器响应
要运行这个示例:
- 首先启动服务器程序
- 然后运行客户端程序
- 观察控制台输出,验证通信是否成功