Rust异步I/O库futures-io-preview的使用,高性能非阻塞网络编程与流处理核心组件

Rust异步I/O库futures-io-preview的使用,高性能非阻塞网络编程与流处理核心组件

安装

在项目目录中运行以下Cargo命令:

cargo add futures-io-preview

或者在Cargo.toml中添加以下行:

futures-io-preview = "0.2.2"

示例代码

以下是一个使用futures-io-preview进行异步I/O操作的完整示例:

use futures::prelude::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 连接到TCP服务器
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    
    // 异步写入数据
    let write_buf = b"Hello, server!";
    stream.write_all(write_buf).await?;
    println!("Sent: {}", String::from_utf8_lossy(write_buf));
    
    // 异步读取响应
    let mut read_buf = vec![0; 1024];
    let n = stream.read(&mut read_buf).await?;
    println!("Received: {}", String::from_utf8_lossy(&read_buf[..n]));
    
    Ok(())
}

高级流处理示例

use futures::prelude::*;
use futures::io::{AsyncRead, AsyncWrite};
use std::pin::Pin;
use std::task::{Context, Poll};

// 自定义异步流处理器
struct MyStreamProcessor<R> {
    reader: R,
}

impl<R: AsyncRead + Unpin> AsyncRead for MyStreamProcessor<R> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<std::io::Result<usize>> {
        Pin::new(&mut self.reader).poll_read(cx, buf)
    }
}

impl<R: AsyncRead + Unpin> Stream for MyStreamProcessor<R> {
    type Item = std::io::Result<Vec<u8>>;
    
    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let mut buf = vec![0; 1024];
        match Pin::new(&mut self.reader).poll_read(cx, &mut buf) {
            Poll::Ready(Ok(n)) if n == 0 => Poll::Ready(None),
            Poll::Ready(Ok(n)) => Poll::Ready(Some(Ok(buf[..n].to_vec()))),
            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
            Poll::Pending => Poll::Pending,
        }
    }
}

完整示例代码

以下是一个完整的TCP客户端/服务器示例,展示了futures-io-preview的实际应用:

// 客户端代码 client.rs
use futures::prelude::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 连接到TCP服务器
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    
    // 异步写入数据
    let write_buf = b"Ping from client";
    stream.write_all(write_buf).await?;
    println!("Client sent: {}", String::from_utf8_lossy(write_buf));
    
    // 异步读取响应
    let mut read_buf = vec![0; 1024];
    let n = stream.read(&mut read_buf).await?;
    println!("Client received: {}", String::from_utf8_lossy(&read_buf[..n]));
    
    Ok(())
}

// 服务器代码 server.rs
use futures::prelude::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 创建TCP监听器
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Server listening on 127.0.0.1:8080");
    
    loop {
        // 接受新连接
        let (mut socket, _) = listener.accept().await?;
        
        tokio::spawn(async move {
            let mut buf = vec![0; 1024];
            
            // 读取客户端数据
            match socket.read(&mut buf).await {
                Ok(n) if n == 0 => return,
                Ok(n) => {
                    println!("Server received: {}", String::from_utf8_lossy(&buf[..n]));
                    
                    // 写入响应
                    let response = b"Pong from server";
                    if let Err(e) = socket.write_all(response).await {
                        eprintln!("Write error: {}", e);
                    }
                }
                Err(e) => {
                    eprintln!("Read error: {}", e);
                }
            }
        });
    }
}

元数据

  • 许可证: MIT OR Apache-2.0
  • 大小: 7.64 KiB
  • 所有者: rust-lang-nursery/futures团队
  • 主要贡献者: Taylor Cramer

1 回复

Rust异步I/O库futures-io-preview使用指南

简介

futures-io-preview是Rust异步I/O生态系统的核心组件之一,属于futures-preview套件的一部分。它提供了非阻塞I/O的基础抽象,是构建高性能异步网络应用和流处理的关键库。

这个库主要包含两个核心trait:

  • AsyncRead - 异步读取接口
  • AsyncWrite - 异步写入接口

基本用法

首先需要在Cargo.toml中添加依赖:

[dependencies]
futures-preview = { version = "0.3.0-alpha.19", features = ["io-compat"] }

示例1:基本异步读写

use futures::prelude::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};

async fn copy_data<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
    reader: &mut R,
    writer: &mut W,
) -> std::io::Result<()> {
    let mut buf = [0u8; 8]; // 8字节缓冲区
    loop {
        let n = reader.read(&mut buf).await?;
        if n == 0 {
            break;
        }
        writer.write_all(&buf[..n]).await?;
    }
    writer.flush().await?;
    Ok(())
}

示例2:与标准库I/O类型互操作

futures-io-preview提供了与标准库std::io类型的互操作性:

use futures::io::AllowStdIo;
use std::fs::File;

async fn read_file() -> std::io::Result<()> {
    let file = File::open("example.txt")?;
    let mut async_file = AllowStdIo::new(file);
    
    let mut contents = Vec::new();
    async_file.read_to_end(&mut contents).await?;
    
    println!("File content length: {}", contents.len());
    Ok(())
}

高级特性

示例3:组合异步读写操作

use futures::io::{AsyncReadExt, AsyncWriteExt};
use futures::executor::block_on;

async fn process_stream<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
    input: &mut R,
    output: &mut W,
) -> std::io::Result<()> {
    // 读取头部
    let mut header = [0u8; 4];
    input.read_exact(&mut header).await?;
    
    // 处理并写入头部
    let processed_header = header.iter().map(|b| b.wrapping_add(1)).collect::<Vec<_>>();
    output.write_all(&processed_header).await?;
    
    // 复制剩余内容
    let mut buffer = [0u8; 1024];
    loop {
        let n = input.read(&mut buffer).await?;
        if n == 0 {
            break;
        }
        output.write_all(&buffer[..n]).await?;
    }
    
    output.flush().await?;
    Ok(())
}

示例4:超时处理

use futures::io::AsyncReadExt;
use futures::future::FutureExt;
use std::time::Duration;

async fn read_with_timeout<R: AsyncRead + Unpin>(
    reader: &mut R,
    timeout: Duration,
) -> std::io::Result<Vec<u8>> {
    let mut buf = Vec::new();
    
    let read_future = reader.read_to_end(&mut buf);
    let timeout_future = async_std::task::sleep(timeout);
    
    futures::select! {
        res = read_future.fuse() => res.map(|_| buf),
        _ = timeout_future.fuse() => Err std::io::Error::new(
            std::io::ErrorKind::TimedOut,
            "Read operation timed out",
        )),
    }
}

完整示例Demo

下面是一个完整的异步TCP客户端/服务器示例,展示了如何使用futures-io-preview进行网络编程:

use futures::prelude::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use async_std::net::{TcpListener, TcpStream};
use async_std::task;

// 异步TCP服务器
async fn server() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Server listening on port 8080");
    
    loop {
        let (mut stream, _) = listener.accept().await?;
        println!("New client connected");
        
        task::spawn(async move {
            let mut buf = [0u8; 1024];
            loop {
                let n = match stream.read(&mut buf).await {
                    Ok(n) if n == 0 => break,  // 连接关闭
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("Read error: {}", e);
                        break;
                    }
                };
                
                // 处理数据并回显
                if let Err(e) = stream.write_all(&buf[..n]).await {
                    eprintln!("Write error: {}", e);
                    break;
                }
            }
        });
    }
}

// 异步TCP客户端
async fn client() -> std::io::Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    println!("Connected to server");
    
    // 发送数据
    let message = b"Hello, server!";
    stream.write_all(message).await?;
    println!("Sent: {}", String::from_utf8_lossy(message));
    
    // 接收回显
    let mut buf = [0u8; 1024];
    let n = stream.read(&mut buf).await?;
    println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
    
    Ok(())
}

// 主函数
fn main() -> std::io::Result<()> {
    // 启动服务器
    let server_handle = task::spawn(server());
    
    // 等待服务器启动
    task::block_on(async {
        async_std::task::sleep(std::time::Duration::from_secs(1)).await;
    });
    
    // 运行客户端
    task::block_on(client())?;
    
    // 保持服务器运行
    task::block_on(server_handle)
}

性能提示

  1. 缓冲区大小选择很重要 - 通常8KB-64KB是不错的选择
  2. 批量操作比单字节操作高效得多
  3. 避免在异步任务中执行阻塞操作
  4. 使用flush()确保数据真正写入,但不要过度调用

与其他库集成

futures-io-preview可以与以下流行库良好配合:

  • tokio - 通过tokio-util的兼容层
  • async-std - 原生支持
  • hyper - 用于HTTP客户端/服务器
  • tungstenite - WebSocket实现

注意事项

  1. 这是一个预览版库,API可能会发生变化
  2. 需要Rust 1.39+版本(支持async/await语法)
  3. 实际使用时通常需要配合执行器(runtime)如tokioasync-std

通过合理使用futures-io-preview,可以构建出高性能、非阻塞的网络应用和流处理系统。

回到顶部