Rust异步网络库lightning-net-tokio的使用,支持高性能Tokio运行时驱动的TCP/UDP通信

Rust异步网络库lightning-net-tokio的使用,支持高性能Tokio运行时驱动的TCP/UDP通信

安装

在项目目录中运行以下Cargo命令:

cargo add lightning-net-tokio

或者在Cargo.toml中添加以下行:

lightning-net-tokio = "0.1.0"

使用示例

TCP服务器示例

use lightning_net_tokio::socket::TcpSocket;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建TCP服务器
    let socket = TcpSocket::new();
    let listener = socket.bind("127.0.0.1:8080").await?;
    
    println!("TCP server listening on 127.0.0.1:8080");
    
    loop {
        // 接受新连接
        let (mut stream, _) = listener.accept().await?;
        
        tokio::spawn(async move {
            let mut buf = [0; 1024];
            
            // 读取客户端数据
            let n = match stream.read(&mut buf).await {
                Ok(n) if n == 0 => return,
                Ok(n) => n,
                Err(e) => {
                    eprintln!("failed to read from socket; err = {:?}", e);
                    return;
                }
            };
            
            // 回写数据给客户端
            if let Err(e) = stream.write_all(&buf[0..n].await {
                eprintln!("failed to write to socket; err = {:?}", e);
                return;
            }
        });
    }
}

TCP客户端示例

use lightning_net_tokio::socket::TcpSocket;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建TCP客户端
    let socket = TcpSocket::new();
    let mut stream = socket.connect("127.0.0.1:8080").await?;
    
    // 发送数据
    stream.write_all(b"hello world").await?;
    
    // 接收响应
    let mut buf = [0; 1024];
    let n = stream.read(&mut buf).await?;
    
    println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
    
    Ok(())
}

UDP示例

use lightning_net_tokio::socket::UdpSocket;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建UDP服务器
    let server_socket = UdpSocket::new();
    server_socket.bind("127.0.0.1:8081").await?;
    
    tokio::spawn(async move {
        let mut buf = [0; 1024];
        
        // 接收UDP数据包
        let (len, addr) = server_socket.recv_from(&mut buf).await.unwrap();
        println!("Received {} bytes from {}", len, addr);
        
        // 发送响应
        server_socket.send_to(&buf[..len], addr).await.unwrap();
    });
    
    // 创建UDP客户端
    let client_socket = UdpSocket::new();
    client_socket.bind("127.0.0.1:0").await?;
    
    // 发送数据
    client_socket.send_to(b"hello udp", "127.0.0.1:8081").await?;
    
    // 接收响应
    let mut buf = [0; 1024];
    let (len, _) = client_socket.recv_from(&mut buf).await?;
    println!("Received: {}", String::from_utf8_lossy(&buf[..len]));
    
    Ok(())
}

完整示例

完整TCP Echo服务器和客户端示例

TCP服务器端 (tcp_server.rs):

use lightning_net_tokio::socket::TcpSocket;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建TCP Socket
    let socket = TcpSocket::new();
    // 绑定到本地8080端口
    let listener = socket.bind("127.0.0.1:8080").await?;
    
    println!("TCP Echo Server listening on 127.0.0.1:8080");
    
    loop {
        // 接受新连接
        let (mut stream, addr) = listener.accept().await?;
        println!("New connection from: {}", addr);
        
        // 为每个连接创建独立任务
        tokio::spawn(async move {
            let mut buf = [0; 1024];
            
            loop {
                // 读取客户端数据
                let n = match stream.read(&mut buf).await {
                    Ok(n) if n == 0 => break,  // 连接关闭
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("Read error from {}: {:?}", addr, e);
                        break;
                    }
                };
                
                // 回显数据给客户端
                if let Err(e) = stream.write_all(&buf[..n]).await {
                    eprintln!("Write error to {}: {:?}", addr, e);
                    break;
                }
            }
            
            println!("Connection closed: {}", addr);
        });
    }
}

TCP客户端端 (tcp_client.rs):

use lightning_net_tokio::socket::TcpSocket;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建TCP Socket
    let socket = TcpSocket::new();
    // 连接到服务器
    let mut stream = socket.connect("127.0.0.1:8080").await?;
    println!("Connected to server");
    
    // 发送测试消息
    let message = "Hello, TCP Echo Server!";
    stream.write_all(message.as_bytes()).await?;
    println!("Sent: {}", message);
    
    // 接收响应
    let mut buf = [0; 1024];
    let n = stream.read(&mut buf).await?;
    let response = String::from_utf8_lossy(&buf[..n]);
    println!("Received: {}", response);
    
    Ok(())
}

完整UDP Echo服务器和客户端示例

UDP服务器端 (udp_server.rs):

use lightning_net_tokio::socket::UdpSocket;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建UDP Socket并绑定端口
    let socket = UdpSocket::new();
    socket.bind("127.0.0.1:8081").await?;
    println!("UDP Echo Server listening on 127.0.0.1:8081");
    
    let mut buf = [0; 1024];
    
    loop {
        // 接收数据
        let (len, addr) = socket.recv_from(&mut buf).await?;
        let message = String::from_utf8_lossy(&buf[..len]);
        println!("Received {} bytes from {}: {}", len, addr, message);
        
        // 发送回响应
        socket.send_to(&buf[..len], addr).await?;
        println!("Echoed back to {}", addr);
    }
}

UDP客户端端 (udp_client.rs):

use lightning_net_tokio::socket::UdpSocket;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 创建UDP Socket并绑定随机端口
    let socket = UdpSocket::new();
    socket.bind("127.0.0.1:0").await?;
    
    // 服务器地址
    let server_addr = "127.0.0.1:8081";
    
    // 发送消息
    let message = "Hello, UDP Echo Server!";
    socket.send_to(message.as_bytes(), server_addr).await?;
    println!("Sent to {}: {}", server_addr, message);
    
    // 接收响应
    let mut buf = [0; 1024];
    let (len, _) = socket.recv_from(&mut buf).await?;
    let response = String::from_utf8_lossy(&buf[..len]);
    println!("Received: {}", response);
    
    Ok(())
}

特性

  1. 基于Tokio运行时的高性能异步网络通信
  2. 提供TCP和UDP套接字支持
  3. 简洁的API设计,易于使用
  4. 适用于需要高性能网络通信的场景

许可证

该项目采用MIT或Apache-2.0许可证。


1 回复

Rust异步网络库lightning-net-tokio使用指南

简介

lightning-net-tokio是一个基于Tokio运行时的高性能异步网络库,提供了TCP和UDP通信的简化抽象。它构建在Tokio生态系统之上,为开发者提供了更简洁的API来处理网络通信。

主要特性

  • 基于Tokio的高性能异步IO
  • 简洁的TCP和UDP抽象接口
  • 支持异步读写操作
  • 内置连接管理和错误处理
  • 与Tokio生态系统无缝集成

完整示例代码

TCP回显服务器和客户端

TCP服务器 (完整增强版)

use lightning_net_tokio::tcp::TcpListener;
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    signal,
};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 绑定到本地8080端口
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("TCP Echo Server running on 127.0.0.1:8080");
    
    // 处理Ctrl+C信号
    let ctrl_c = async {
        signal::ctrl_c().await.expect("failed to listen for event");
        println!("Shutting down server...");
    };
    
    tokio::select! {
        _ = run_server(listener) => {},
        _ = ctrl_c => {},
    }
    
    Ok(())
}

async fn run_server(listener: TcpListener) -> Result<(), Box<dyn Error>> {
    loop {
        // 接受新连接
        let (mut socket, addr) = listener.accept().await?;
        println!("New connection from: {}", addr);
        
        // 为每个连接生成新任务
        tokio::spawn(async move {
            let mut buf = [0; 1024];
            
            loop {
                // 读取数据
                match socket.read(&mut buf).await {
                    Ok(0) => {  // 连接关闭
                        println!("Connection closed by client: {}", addr);
                        break;
                    }
                    Ok(n) => {
                        // 回显数据
                        if let Err(e) = socket.write_all(&buf[..n]).await {
                            eprintln!("Write error for {}: {}", addr, e);
                            break;
                        }
                    }
                    Err(e) => {
                        eprintln!("Read error for {}: {}", addr, e);
                        break;
                    }
                }
            }
        });
    }
}

TCP客户端 (完整增强版)

use lightning_net_tokio::tcp::TcpStream;
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    time::{sleep, Duration},
};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 连接到服务器
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    println!("Connected to server at 127.0.0.1:8080");
    
    // 发送5条测试消息
    for i in 1..=5 {
        let msg = format!("Message {} from client", i);
        
        // 发送消息
        stream.write_all(msg.as_bytes()).await?;
        println!("Sent: {}", msg);
        
        // 接收回显
        let mut buf = [0; 1024];
        let n = stream.read(&mut buf).await?;
        println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
        
        // 间隔1秒
        sleep(Duration::from_secs(1)).await;
    }
    
    Ok(())
}

UDP聊天服务器和客户端

UDP服务器 (完整增强版)

use lightning_net_tokio::udp::UdpSocket;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 绑定到本地8080端口
    let socket = UdpSocket::bind("127.0.0.1:8080").await?;
    println!("UDP Server running on 127.0.0.1:8080");
    
    let mut buf = [0; 1024];
    
    loop {
        // 接收数据
        let (len, addr) = socket.recv_from(&mut buf).await?;
        let msg = String::from_utf8_lossy(&buf[..len]);
        println!("Received from {}: {}", addr, msg);
        
        // 发送回复
        let reply = format!("Echo: {}", msg);
        socket.send_to(reply.as_bytes(), addr).await?;
    }
}

UDP客户端 (完整增强版)

use lightning_net_tokio::udp::UdpSocket;
use tokio::{
    io::{stdin, AsyncBufReadExt, BufReader},
    time::{sleep, Duration},
};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 绑定到任意可用端口
    let socket = UdpSocket::bind("0.0.0.0:0").await?;
    let server_addr = "127.0.0.1:8080";
    println!("UDP Client connected to server at {}", server_addr);
    
    // 从标准输入读取消息
    let stdin = BufReader::new(stdin());
    let mut lines = stdin.lines();
    
    loop {
        println!("Enter message (or 'quit' to exit):");
        let msg = match lines.next_line().await? {
            Some(msg) if msg == "quit" => break,
            Some(msg) => msg,
            None => break,
        };
        
        // 发送消息
        socket.send_to(msg.as_bytes(), server_addr).await?;
        
        // 接收回复
        let mut buf = [0; 1024];
        let (len, _) = socket.recv_from(&mut buf).await?;
        println!("Server reply: {}", String::from_utf8_lossy(&buf[..len]));
        
        // 短暂延迟
        sleep(Duration::from_millis(100)).await;
    }
    
    println!("Client shutting down...");
    Ok(())
}

性能优化建议

  1. 连接池管理:对于高频TCP连接,考虑实现连接池
  2. 缓冲区大小:根据实际数据量调整缓冲区大小
  3. 批处理:对UDP小数据包考虑批处理发送
  4. 错误恢复:实现自动重连机制
  5. 日志记录:添加详细日志记录连接状态和错误

总结

lightning-net-tokio通过简洁的API提供了强大的异步网络编程能力。这些完整示例展示了如何构建健壮的TCP/UDP应用,包括错误处理、并发控制和用户交互等关键功能。您可以根据实际需求进一步扩展这些基础示例。

回到顶部