Rust异步网络编程库compio-net的使用,高性能IO驱动与网络通信解决方案
Rust异步网络编程库compio-net的使用,高性能IO驱动与网络通信解决方案
Compio是一个基于线程每核(thread-per-core)的Rust运行时,使用IOCP/io_uring/polling技术实现。名称来源于"completion-based IO"(基于完成的IO)。
为什么选择compio而不是Tokio?
Tokio是一个优秀的通用异步运行时。然而,它是基于轮询的。我们想要一些新的高级API来执行IOCP/io_uring。
与tokio-uring不同,这个运行时不是基于Tokio的。这主要是因为mio中没有控制IOCP的公共API。
快速开始
添加compio作为依赖:
compio = { version = "0.13.1", features = ["macros"] }
compio-net = "0.8.0"
然后我们可以使用高级API进行文件系统和网络IO操作。以下是内容中提供的示例:
use compio::{fs::File, io::AsyncReadAtExt};
#[compio::main]
async fn main() {
let file = File::open("Cargo.toml").await.unwrap();
let (read, buffer) = file.read_to_end_at(Vec::with_capacity(1024), 0).await.unwrap();
assert_eq!(read, buffer.len());
let buffer = String::from_utf8(buffer).unwrap();
println!("{}", buffer);
}
compio-net网络编程完整示例
以下是使用compio-net进行TCP服务器和客户端通信的完整示例:
use compio::net::{TcpListener, TcpStream};
use compio::io::{AsyncReadExt, AsyncWriteExt};
// TCP服务器示例
#[compio::main]
async fn server() {
let listener = TcpListener::bind("127.0.0.1:12345").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await.unwrap();
compio::spawn(async move {
let mut buf = vec![0; 1024];
let (n, buf) = socket.read(buf).await.unwrap();
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
let (_, _) = socket.write_all(b"Hello from server!").await.unwrap();
}).detach();
}
}
// TCP客户端示例
#[compio::main]
async fn client() {
let mut socket = TcpStream::connect("127.0.0.1:12345").await.unwrap();
// 发送数据到服务器
let (_, _) = socket.write_all(b"Hello from client!").await.unwrap();
// 接收服务器响应
let mut buf = vec![0; 1024];
let (n, buf) = socket.read(buf).await.unwrap();
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
}
UDP网络通信示例
use compio::net::UdpSocket;
// UDP服务器
#[compio::main]
async fn udp_server() {
let socket = UdpSocket::bind("127.0.0.1:12346").await.unwrap();
let mut buf = vec![0; 1024];
loop {
let (n, buf, addr) = socket.recv_from(buf).await.unwrap();
println!("Received from {}: {}", addr, String::from_utf8_lossy(&buf[..n]));
let (_, _) = socket.send_to(b"Hello UDP client!", addr).await.unwrap();
}
}
// UDP客户端
#[compio::main]
async fn udp_client() {
let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let server_addr = "127.0.0.1:12346".parse().unwrap();
// 发送数据到服务器
let (_, _) = socket.send_to(b"Hello UDP server!", server_addr).await.unwrap();
// 接收服务器响应
let mut buf = vec![0; 1024];
let (n, buf, _) = socket.recv_from(buf).await.unwrap();
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
}
贡献
无论您刚开始使用Rust还是经验丰富的专家,都有机会为Compio做出贡献。如果您对Compio有任何问题,欢迎加入我们的telegram群组。在贡献之前,请查看我们的贡献指南。
1 回复
以下是基于您提供的内容整理的完整示例代码,包含TCP服务器、TCP客户端和UDP通信的完整实现:
完整TCP服务器示例
use compio_net::{TcpListener, TcpStream};
use futures_util::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 绑定到本地地址8080端口
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on 127.0.0.1:8080");
loop {
// 接受新连接
let (mut stream, addr) = listener.accept().await?;
println!("Accepted connection from: {}", addr);
// 为每个连接创建新任务
tokio::spawn(async move {
let mut buf = vec![0; 1024];
// 读取客户端数据
match stream.read(&mut buf).await {
Ok(n) if n == 0 => return, // 连接关闭
Ok(n) => {
let received = String::from_utf8_lossy(&buf[..n]);
println!("Received {} bytes: {}", n, received);
// 回写相同数据
if let Err(e) = stream.write_all(&buf[..n]).await {
eprintln!("Write error: {}", e);
}
}
Err(e) => eprintln!("Read error: {}", e),
}
});
}
}
完整TCP客户端示例
use compio_net::TcpStream;
use futures_util::{AsyncReadExt, AsyncWriteExt};
use std::time::Duration;
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 连接到服务器
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
println!("Connected to server at 127.0.0.1:8080");
// 要发送的消息
let messages = [
"Hello",
"This is compio-net",
"Testing TCP connection",
"Goodbye"
];
for msg in messages {
// 发送数据
println!("Sending: {}", msg);
stream.write_all(msg.as_bytes()).await?;
// 接收响应
let mut buf = vec![0; 1024];
let n = stream.read(&mut buf).await?;
let response = String::from_utf8_lossy(&buf[..n]);
println!("Received: {}", response);
// 等待1秒
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok(())
}
完整UDP通信示例
UDP服务器端
use compio_net::UdpSocket;
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 绑定到本地端口8081
let socket = UdpSocket::bind("127.0.0.1:8081").await?;
println!("UDP server listening on 127.0.0.1:8081");
let mut buf = vec![0; 1024];
loop {
// 接收数据
let (n, addr) = socket.recv_from(&mut buf).await?;
let received = String::from_utf8_lossy(&buf[..n]);
println!("Received {} bytes from {}: {}", n, addr, received);
// 发送响应
let response = format!("Echo: {}", received);
socket.send_to(response.as_bytes(), addr).await?;
}
}
UDP客户端端
use compio_net::UdpSocket;
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 创建UDP socket并绑定到随机端口
let socket = UdpSocket::bind("127.0.0.1:0").await?;
println!("Local address: {}", socket.local_addr()?);
// 连接到服务器
let server_addr = "127.0.0.1:8081";
socket.connect(server_addr).await?;
println!("Connected to server at {}", server_addr);
// 要发送的消息
let messages = [
"Hello UDP",
"This is a test",
"From compio-net",
"Goodbye UDP"
];
for msg in messages {
// 发送数据
println!("Sending: {}", msg);
socket.send(msg.as_bytes()).await?;
// 接收响应
let mut buf = vec![0; 1024];
let n = socket.recv(&mut buf).await?;
let response = String::from_utf8_lossy(&buf[..n]);
println!("Received: {}", response);
}
Ok(())
}
零拷贝操作完整示例
use compio_net::TcpStream;
use compio::buf::{BufResult, IntoInner};
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 连接到服务器
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
// 准备要发送的数据
let send_data = vec![1, 2, 3, 4, 5];
println!("Sending data: {:?}", send_data);
// 使用零拷贝发送
let BufResult(res, buf) = stream.send_all(send_data).await;
res?;
// 取回缓冲区(如果需要重用)
let original_buf = buf.into_inner();
println!("Original buffer after send: {:?}", original_buf);
// 准备接收缓冲区
let mut recv_buf = Vec::with_capacity(1024);
// 使用零拷贝接收精确5字节
let BufResult(res, buf) = stream.recv_exact(recv_buf).await;
let n = res?;
// 取回填充了数据的缓冲区
let filled_buf = buf.into_inner();
println!("Received {} bytes: {:?}", n, &filled_buf[..n]);
Ok(())
}
多连接处理完整示例
use compio_net::{TcpListener, TcpStream};
use futures_util::{AsyncReadExt, AsyncWriteExt};
use std::sync::Arc;
async fn handle_connection(mut stream: TcpStream) -> std::io::Result<()> {
let mut buf = vec![0; 1024];
loop {
// 读取数据
let n = match stream.read(&mut buf).await {
Ok(n) if n == 0 => return Ok(()), // 连接关闭
Ok(n) => n,
Err(e) => {
eprintln!("Read error: {}", e);
return Err(e);
}
};
// 处理数据
let received = String::from_utf8_lossy(&buf[..n]);
println!("Received: {}", received);
// 回写数据
if let Err(e) = stream.write_all(&buf[..n]).await {
eprintln!("Write error: {}", e);
return Err(e);
}
}
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 绑定到本地地址
let listener = TcpListener::bind("127.0.0.1:8080").await?;
let listener = Arc::new(listener);
println!("Server started on 127.0.0.1:8080");
// 根据CPU核心数创建工作线程
let num_workers = num_cpus::get();
println!("Starting {} worker threads", num_workers);
for _ in 0..num_workers {
let listener = listener.clone();
tokio::spawn(async move {
loop {
// 接受新连接
match listener.accept().await {
Ok((stream, addr)) => {
println!("Accepted connection from {}", addr);
// 处理连接
if let Err(e) = handle_connection(stream).await {
eprintln!("Connection error: {}", e);
}
}
Err(e) => eprintln!("Accept error: {}", e),
}
}
});
}
// 等待Ctrl+C信号
tokio::signal::ctrl_c().await?;
println!("Shutting down server...");
Ok(())
}
这些完整示例展示了compio-net库的主要功能,包括TCP/UDP通信、零拷贝操作和多连接处理。您可以根据实际需求调整缓冲区大小、并发工作线程数等参数以获得最佳性能。