Rust通信库re_sdk_comms的使用:高效实现跨平台进程间通信与网络数据传输

Rust通信库re_sdk_comms的使用:高效实现跨平台进程间通信与网络数据传输

re_sdk_comms是Rerun系列crate的一部分,主要用于Rerun SDK和Rerun服务器之间的TCP通信。

基本信息

  • 最新版本: 0.22.1
  • 许可证: MIT OR Apache-2.0
  • Rust版本要求: v1.81.0
  • 大小: 12.5 KiB

安装

在项目目录中运行以下Cargo命令:

cargo add re_sdk_comms

或者在Cargo.toml中添加:

re_sdk_comms = "0.22.1"

使用示例

以下是一个使用re_sdk_comms进行TCP通信的完整示例:

use re_sdk_comms::{Client, Server};
use std::net::SocketAddr;
use std::thread;

fn main() {
    // 服务器地址
    let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
    
    // 启动服务器线程
    let server_thread = thread::spawn(move || {
        let server = Server::bind(addr).unwrap();
        println!("Server listening on {}", addr);
        
        for stream in server.incoming() {
            let mut stream = stream.unwrap();
            println!("Server: New connection from {}", stream.peer_addr().unwrap());
            
            let mut buf = [0; 1024];
            let n = stream.read(&mut buf).unwrap();
            println!("Server received: {}", String::from_utf8_lossy(&buf[..n]));
            
            stream.write_all(b"Hello from server").unwrap();
        }
    });
    
    // 给服务器一点启动时间
    std::thread::sleep(std::time::Duration::from_millis(100));
    
    // 客户端连接
    let mut client = Client::connect(addr).unwrap();
    println!("Client connected to {}", addr);
    
    // 客户端发送消息
    client.write_all(b"Hello from client").unwrap();
    
    // 客户端接收响应
    let mut buf = [0; 1024];
    let n = client.read(&mut buf).unwrap();
    println!("Client received: {}", String::from_utf8_lossy(&buf[..n]));
    
    // 等待服务器线程结束
    server_thread.join().unwrap();
}

完整示例代码

use re_sdk_comms::{Client, Server};
use std::net::SocketAddr;
use std::thread;
use std::io::{Read, Write};

fn main() {
    // 设置服务器地址
    let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
    
    // 启动服务器线程
    let server_thread = thread::spawn(move || {
        // 绑定服务器到指定地址
        let server = Server::bind(addr).expect("Failed to bind server");
        println!("[Server] Listening on {}", addr);
        
        // 处理客户端连接
        for stream in server.incoming() {
            let mut stream = stream.expect("Failed to accept connection");
            let peer_addr = stream.peer_addr().expect("Failed to get peer address");
            println!("[Server] New connection from {}", peer_addr);
            
            // 读取客户端数据
            let mut buf = [0; 1024];
            let n = stream.read(&mut buf).expect("Failed to read from client");
            let received = String::from_utf8_lossy(&buf[..n]);
            println!("[Server] Received: {}", received);
            
            // 发送响应给客户端
            stream.write_all(b"Hello from server").expect("Failed to write to client");
        }
    });
    
    // 等待服务器启动
    std::thread::sleep(std::time::Duration::from_millis(100));
    
    // 客户端连接服务器
    let mut client = Client::connect(addr).expect("Failed to connect to server");
    println!("[Client] Connected to {}", addr);
    
    // 客户端发送消息
    client.write_all(b"Hello from client").expect("Failed to send message");
    println!("[Client] Sent message to server");
    
    // 客户端接收响应
    let mut buf = [0; 1024];
    let n = client.read(&mut buf).expect("Failed to read response");
    let response = String::from_utf8_lossy(&buf[..n]);
    println!("[Client] Received response: {}", response);
    
    // 等待服务器线程结束
    server_thread.join().expect("Server thread panicked");
}

功能说明

  1. 跨平台支持: 可在不同操作系统上运行
  2. 高效通信: 基于TCP协议实现高效数据传输
  3. 简单API: 提供简洁的客户端/服务器接口

1 回复

Rust通信库re_sdk_comms的使用指南

介绍

re_sdk_comms是一个高效的Rust通信库,专门设计用于简化跨平台进程间通信(IPC)和网络数据传输。它提供了统一的API来处理本地进程间通信和网络通信,支持多种协议和传输方式。

主要特性:

  • 跨平台支持(Windows/Linux/macOS)
  • 高性能的二进制协议
  • 支持TCP/UDP/Unix域套接字
  • 异步/同步API
  • 内置序列化支持
  • 连接池管理

安装

在Cargo.toml中添加依赖:

[dependencies]
re_sdk_comms = "0.3"

基本使用方法

1. 创建TCP服务器

use re_sdk_comms::{TcpServer, TcpHandler, CommsResult};

struct MyHandler;

#[async_trait::async_trait]
impl TcpHandler for MyHandler {
    async fn handle(&self, mut stream: TcpStream) -> CommsResult<()> {
        let mut buf = [0; 1024];
        let n = stream.read(&mut buf).await?;
        println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
        stream.write_all(b"Hello from server!").await?;
        Ok(())
    }
}

#[tokio::main]
async fn main() -> CommsResult<()> {
    let server = TcpServer::bind("127.0.0.1:8080", MyHandler).await?;
    server.serve().await
}

2. 创建TCP客户端

use re_sdk_comms::{TcpClient, CommsResult};

#[tokio::main]
async fn main() -> CommsResult<()> {
    let mut client = TcpClient::connect("127.0.0.1:8080").await?;
    client.write_all(b"Hello from client!").await?;
    
    let mut response = vec![0; 1024];
    let n = client.read(&mut response).await?;
    println!("Server response: {}", String::from_utf8_lossy(&response[..n]));
    
    Ok(())
}

3. 使用UDP通信

use re_sdk_comms::{UdpSocket, CommsResult};

#[tokio::main]
async fn main() -> CommsResult<()> {
    // 服务器
    let server = UdpSocket::bind("127.0.0.1:8081").await?;
    tokio::spawn(async move {
        let mut buf = [0; 1024];
        let (n, addr) = server.recv_from(&mut buf).await.unwrap();
        println!("Received from {}: {}", addr, String::from_utf8_lossy(&buf[..n]));
        server.send_to(b"Hello UDP client!", addr).await.unwrap();
    });

    // 客户端
    let client = UdpSocket::bind("127.0.0.1:0").await?;
    client.send_to(b"Hello UDP server!", "127.0.0.1:8081").await?;
    
    let mut buf = [0; 1024];
    let (n, _) = client.recv_from(&mut buf).await?;
    println!("Server response: {}", String::from_utf8_lossy(&buf[..n]));
    
    Ok(())
}

4. 进程间通信(Unix域套接字)

#[cfg(unix)]
use re_sdk_comms::{UnixListener, UnixStream, CommsResult};

#[cfg(unix)]
#[tokio::main]
async fn main() -> CommsResult<()> {
    let path = "/tmp/re_sdk_comms_example.sock";
    
    // 服务器
    let _ = std::fs::remove_file(path);
    let listener = UnixListener::bind(path)?;
    tokio::spawn(async move {
        let (mut stream, _) = listener.accept().await.unwrap();
        let mut buf = [0; 1024];
        let n = stream.read(&mut buf).await.unwrap();
        println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
        stream.write_all(b"Hello from Unix server!").await.unwrap();
    });

    // 客户端
    let mut stream = UnixStream::connect(path).await?;
    stream.write_all(b"Hello from Unix client!").await?;
    
    let mut response = vec![0; 1024];
    let n = stream.read(&mut response).await?;
    println!("Server response: {}", String::from_utf8_lossy(&response[..n]));
    
    Ok(())
}

#[cfg(not(unix))]
fn main() {}

高级特性

1. 自定义协议

use re_sdk_comms::{Protocol, Message, CommsResult};
use bytes::{Bytes, BytesMut};

struct MyProtocol;

impl Protocol for MyProtocol {
    type Request = String;
    type Response = String;

    fn encode(&self, msg: Message<Self::Request>) -> CommsResult<Bytes> {
        match msg {
            Message::Request(req) => Ok(Bytes::from(format!("REQ:{}", req))),
            Message::Response(res) => Ok(Bytes::from(format!("RES:{}", res))),
        }
    }

    fn decode(&self, buf: &mut BytesMut) -> CommsResult<Option<Message<Self::Request>>> {
        if buf.len() < 4 {
            return Ok(None);
        }
        
        if buf.starts_with(b"REQ:") {
            let end = buf.iter().position(|&b| b == b'\n').unwrap_or(buf.len());
            let req = String::from_utf8(buf[4..end].to_vec())?;
            buf.advance(end + 1);
            Ok(Some(Message::Request(req)))
        } else if buf.starts_with(b"RES:") {
            let end = buf.iter().position(|&b| b == b'\n').unwrap_or(buf.len());
            let res = String::from_utf8(buf[4..end].to_vec())?;
            buf.advance(end + 1);
            Ok(Some(Message::Response(res)))
        } else {
            Err(CommsError::ProtocolError("Invalid message format".into()))
        }
    }
}

2. 使用连接池

use re_sdk_comms::{TcpConnectionPool, CommsResult};

#[tokio::main]
async fn main() -> CommsResult<()> {
    let pool = TcpConnectionPool::new(
        "127.0.0.1:8080",
        5,  // 最大连接数
        10, // 空闲连接数
    );
    
    // 从池中获取连接
    let mut client = pool.get().await?;
    client.write_all(b"Hello with connection pool").await?;
    
    // 连接会自动返回到池中当client离开作用域时
    
    Ok(())
}

性能优化建议

  1. 对于高吞吐量场景,使用Bytes而不是Vec<u8>来减少内存分配
  2. 考虑使用批处理消息来减少系统调用
  3. 对于固定大小的消息,预分配缓冲区
  4. 在高并发场景下使用连接池

错误处理

所有通信方法都返回CommsResult<T>,可以方便地处理各种通信错误:

match client.write_all(b"data").await {
    Ok(_) => println!("Send success"),
    Err(e) => match e {
        re_sdk_comms::CommsError::IoError(io) => println!("IO error: {}", io),
        re_sdk_comms::CommsError::Timeout => println!("Operation timed out"),
        _ => println!("Other error: {}", e),
    },
}

re_sdk_comms库为Rust开发者提供了强大而灵活的通信能力,无论是本地进程间通信还是网络数据传输,都能以统一的API进行处理,大大简化了开发复杂度。

完整示例demo

下面是一个完整的TCP客户端-服务器通信示例:

// 服务器端代码
use re_sdk_comms::{TcpServer, TcpHandler, CommsResult};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

// 自定义处理器
struct EchoHandler;

#[async_trait::async_trait]
impl TcpHandler for EchoHandler {
    async fn handle(&self, mut stream: TcpStream) -> CommsResult<()> {
        let mut buf = [0; 1024];
        
        // 读取客户端数据
        let n = stream.read(&mut buf).await?;
        let received = String::from_utf8_lossy(&buf[..n]);
        println!("Server received: {}", received);
        
        // 回显数据给客户端
        let response = format!("Echo: {}", received);
        stream.write_all(response.as_bytes()).await?;
        
        Ok(())
    }
}

#[tokio::main]
async fn main() -> CommsResult<()> {
    println!("Starting TCP echo server on 127.0.0.1:8080");
    let server = TcpServer::bind("127.0.0.1:8080", EchoHandler).await?;
    server.serve().await
}
// 客户端端代码
use re_sdk_comms::{TcpClient, CommsResult};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> CommsResult<()> {
    // 连接服务器
    println!("Connecting to server at 127.0.0.1:8080");
    let mut client = TcpClient::connect("127.0.0.1:8080").await?;
    
    // 发送消息
    let message = "Hello, TCP Server!";
    println!("Sending: {}", message);
    client.write_all(message.as_bytes()).await?;
    
    // 接收响应
    let mut response = vec![0; 1024];
    let n = client.read(&mut response).await?;
    println!("Received response: {}", String::from_utf8_lossy(&response[..n]));
    
    Ok(())
}

这个完整示例展示了:

  1. 创建一个TCP服务器,使用自定义的EchoHandler处理客户端连接
  2. 服务器接收客户端消息并回显
  3. 客户端连接服务器并发送消息
  4. 客户端接收并打印服务器响应

要运行这个示例:

  1. 首先启动服务器程序
  2. 然后运行客户端程序
  3. 观察控制台输出,验证通信是否成功
回到顶部