Rust异步网络编程库compio-net的使用,高性能IO驱动与网络通信解决方案

Rust异步网络编程库compio-net的使用,高性能IO驱动与网络通信解决方案

compio logo

Compio是一个基于线程每核(thread-per-core)的Rust运行时,使用IOCP/io_uring/polling技术实现。名称来源于"completion-based IO"(基于完成的IO)。

为什么选择compio而不是Tokio?

Tokio是一个优秀的通用异步运行时。然而,它是基于轮询的。我们想要一些新的高级API来执行IOCP/io_uring。

与tokio-uring不同,这个运行时不是基于Tokio的。这主要是因为mio中没有控制IOCP的公共API。

快速开始

添加compio作为依赖:

compio = { version = "0.13.1", features = ["macros"] }
compio-net = "0.8.0"

然后我们可以使用高级API进行文件系统和网络IO操作。以下是内容中提供的示例:

use compio::{fs::File, io::AsyncReadAtExt};

#[compio::main]
async fn main() {
    let file = File::open("Cargo.toml").await.unwrap();
    let (read, buffer) = file.read_to_end_at(Vec::with_capacity(1024), 0).await.unwrap();
    assert_eq!(read, buffer.len());
    let buffer = String::from_utf8(buffer).unwrap();
    println!("{}", buffer);
}

compio-net网络编程完整示例

以下是使用compio-net进行TCP服务器和客户端通信的完整示例:

use compio::net::{TcpListener, TcpStream};
use compio::io::{AsyncReadExt, AsyncWriteExt};

// TCP服务器示例
#[compio::main]
async fn server() {
    let listener = TcpListener::bind("127.0.0.1:12345").await.unwrap();
    
    loop {
        let (mut socket, _) = listener.accept().await.unwrap();
        
        compio::spawn(async move {
            let mut buf = vec![0; 1024];
            let (n, buf) = socket.read(buf).await.unwrap();
            println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
            
            let (_, _) = socket.write_all(b"Hello from server!").await.unwrap();
        }).detach();
    }
}

// TCP客户端示例
#[compio::main]
async fn client() {
    let mut socket = TcpStream::connect("127.0.0.1:12345").await.unwrap();
    
    // 发送数据到服务器
    let (_, _) = socket.write_all(b"Hello from client!").await.unwrap();
    
    // 接收服务器响应
    let mut buf = vec![0; 1024];
    let (n, buf) = socket.read(buf).await.unwrap();
    println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
}

UDP网络通信示例

use compio::net::UdpSocket;

// UDP服务器
#[compio::main]
async fn udp_server() {
    let socket = UdpSocket::bind("127.0.0.1:12346").await.unwrap();
    let mut buf = vec![0; 1024];
    
    loop {
        let (n, buf, addr) = socket.recv_from(buf).await.unwrap();
        println!("Received from {}: {}", addr, String::from_utf8_lossy(&buf[..n]));
        
        let (_, _) = socket.send_to(b"Hello UDP client!", addr).await.unwrap();
    }
}

// UDP客户端
#[compio::main]
async fn udp_client() {
    let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
    let server_addr = "127.0.0.1:12346".parse().unwrap();
    
    // 发送数据到服务器
    let (_, _) = socket.send_to(b"Hello UDP server!", server_addr).await.unwrap();
    
    // 接收服务器响应
    let mut buf = vec![0; 1024];
    let (n, buf, _) = socket.recv_from(buf).await.unwrap();
    println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
}

贡献

无论您刚开始使用Rust还是经验丰富的专家,都有机会为Compio做出贡献。如果您对Compio有任何问题,欢迎加入我们的telegram群组。在贡献之前,请查看我们的贡献指南。


1 回复

以下是基于您提供的内容整理的完整示例代码,包含TCP服务器、TCP客户端和UDP通信的完整实现:

完整TCP服务器示例

use compio_net::{TcpListener, TcpStream};
use futures_util::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 绑定到本地地址8080端口
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Server listening on 127.0.0.1:8080");
    
    loop {
        // 接受新连接
        let (mut stream, addr) = listener.accept().await?;
        println!("Accepted connection from: {}", addr);
        
        // 为每个连接创建新任务
        tokio::spawn(async move {
            let mut buf = vec![0; 1024];
            
            // 读取客户端数据
            match stream.read(&mut buf).await {
                Ok(n) if n == 0 => return,  // 连接关闭
                Ok(n) => {
                    let received = String::from_utf8_lossy(&buf[..n]);
                    println!("Received {} bytes: {}", n, received);
                    
                    // 回写相同数据
                    if let Err(e) = stream.write_all(&buf[..n]).await {
                        eprintln!("Write error: {}", e);
                    }
                }
                Err(e) => eprintln!("Read error: {}", e),
            }
        });
    }
}

完整TCP客户端示例

use compio_net::TcpStream;
use futures_util::{AsyncReadExt, AsyncWriteExt};
use std::time::Duration;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 连接到服务器
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    println!("Connected to server at 127.0.0.1:8080");
    
    // 要发送的消息
    let messages = [
        "Hello", 
        "This is compio-net", 
        "Testing TCP connection",
        "Goodbye"
    ];
    
    for msg in messages {
        // 发送数据
        println!("Sending: {}", msg);
        stream.write_all(msg.as_bytes()).await?;
        
        // 接收响应
        let mut buf = vec![0; 1024];
        let n = stream.read(&mut buf).await?;
        let response = String::from_utf8_lossy(&buf[..n]);
        println!("Received: {}", response);
        
        // 等待1秒
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
    
    Ok(())
}

完整UDP通信示例

UDP服务器端

use compio_net::UdpSocket;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 绑定到本地端口8081
    let socket = UdpSocket::bind("127.0.0.1:8081").await?;
    println!("UDP server listening on 127.0.0.1:8081");
    
    let mut buf = vec![0; 1024];
    
    loop {
        // 接收数据
        let (n, addr) = socket.recv_from(&mut buf).await?;
        let received = String::from_utf8_lossy(&buf[..n]);
        println!("Received {} bytes from {}: {}", n, addr, received);
        
        // 发送响应
        let response = format!("Echo: {}", received);
        socket.send_to(response.as_bytes(), addr).await?;
    }
}

UDP客户端端

use compio_net::UdpSocket;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建UDP socket并绑定到随机端口
    let socket = UdpSocket::bind("127.0.0.1:0").await?;
    println!("Local address: {}", socket.local_addr()?);
    
    // 连接到服务器
    let server_addr = "127.0.0.1:8081";
    socket.connect(server_addr).await?;
    println!("Connected to server at {}", server_addr);
    
    // 要发送的消息
    let messages = [
        "Hello UDP", 
        "This is a test", 
        "From compio-net",
        "Goodbye UDP"
    ];
    
    for msg in messages {
        // 发送数据
        println!("Sending: {}", msg);
        socket.send(msg.as_bytes()).await?;
        
        // 接收响应
        let mut buf = vec![0; 1024];
        let n = socket.recv(&mut buf).await?;
        let response = String::from_utf8_lossy(&buf[..n]);
        println!("Received: {}", response);
    }
    
    Ok(())
}

零拷贝操作完整示例

use compio_net::TcpStream;
use compio::buf::{BufResult, IntoInner};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 连接到服务器
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    
    // 准备要发送的数据
    let send_data = vec![1, 2, 3, 4, 5];
    println!("Sending data: {:?}", send_data);
    
    // 使用零拷贝发送
    let BufResult(res, buf) = stream.send_all(send_data).await;
    res?;
    
    // 取回缓冲区(如果需要重用)
    let original_buf = buf.into_inner();
    println!("Original buffer after send: {:?}", original_buf);
    
    // 准备接收缓冲区
    let mut recv_buf = Vec::with_capacity(1024);
    
    // 使用零拷贝接收精确5字节
    let BufResult(res, buf) = stream.recv_exact(recv_buf).await;
    let n = res?;
    
    // 取回填充了数据的缓冲区
    let filled_buf = buf.into_inner();
    println!("Received {} bytes: {:?}", n, &filled_buf[..n]);
    
    Ok(())
}

多连接处理完整示例

use compio_net::{TcpListener, TcpStream};
use futures_util::{AsyncReadExt, AsyncWriteExt};
use std::sync::Arc;

async fn handle_connection(mut stream: TcpStream) -> std::io::Result<()> {
    let mut buf = vec![0; 1024];
    
    loop {
        // 读取数据
        let n = match stream.read(&mut buf).await {
            Ok(n) if n == 0 => return Ok(()),  // 连接关闭
            Ok(n) => n,
            Err(e) => {
                eprintln!("Read error: {}", e);
                return Err(e);
            }
        };
        
        // 处理数据
        let received = String::from_utf8_lossy(&buf[..n]);
        println!("Received: {}", received);
        
        // 回写数据
        if let Err(e) = stream.write_all(&buf[..n]).await {
            eprintln!("Write error: {}", e);
            return Err(e);
        }
    }
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 绑定到本地地址
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    let listener = Arc::new(listener);
    println!("Server started on 127.0.0.1:8080");
    
    // 根据CPU核心数创建工作线程
    let num_workers = num_cpus::get();
    println!("Starting {} worker threads", num_workers);
    
    for _ in 0..num_workers {
        let listener = listener.clone();
        
        tokio::spawn(async move {
            loop {
                // 接受新连接
                match listener.accept().await {
                    Ok((stream, addr)) => {
                        println!("Accepted connection from {}", addr);
                        
                        // 处理连接
                        if let Err(e) = handle_connection(stream).await {
                            eprintln!("Connection error: {}", e);
                        }
                    }
                    Err(e) => eprintln!("Accept error: {}", e),
                }
            }
        });
    }
    
    // 等待Ctrl+C信号
    tokio::signal::ctrl_c().await?;
    println!("Shutting down server...");
    
    Ok(())
}

这些完整示例展示了compio-net库的主要功能,包括TCP/UDP通信、零拷贝操作和多连接处理。您可以根据实际需求调整缓冲区大小、并发工作线程数等参数以获得最佳性能。

回到顶部