Rust异步I/O库futures-io-preview的使用,高性能非阻塞网络编程与流处理核心组件
Rust异步I/O库futures-io-preview的使用,高性能非阻塞网络编程与流处理核心组件
安装
在项目目录中运行以下Cargo命令:
cargo add futures-io-preview
或者在Cargo.toml中添加以下行:
futures-io-preview = "0.2.2"
示例代码
以下是一个使用futures-io-preview进行异步I/O操作的完整示例:
use futures::prelude::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// 连接到TCP服务器
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
// 异步写入数据
let write_buf = b"Hello, server!";
stream.write_all(write_buf).await?;
println!("Sent: {}", String::from_utf8_lossy(write_buf));
// 异步读取响应
let mut read_buf = vec![0; 1024];
let n = stream.read(&mut read_buf).await?;
println!("Received: {}", String::from_utf8_lossy(&read_buf[..n]));
Ok(())
}
高级流处理示例
use futures::prelude::*;
use futures::io::{AsyncRead, AsyncWrite};
use std::pin::Pin;
use std::task::{Context, Poll};
// 自定义异步流处理器
struct MyStreamProcessor<R> {
reader: R,
}
impl<R: AsyncRead + Unpin> AsyncRead for MyStreamProcessor<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}
impl<R: AsyncRead + Unpin> Stream for MyStreamProcessor<R> {
type Item = std::io::Result<Vec<u8>>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut buf = vec![0; 1024];
match Pin::new(&mut self.reader).poll_read(cx, &mut buf) {
Poll::Ready(Ok(n)) if n == 0 => Poll::Ready(None),
Poll::Ready(Ok(n)) => Poll::Ready(Some(Ok(buf[..n].to_vec()))),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
}
}
}
完整示例代码
以下是一个完整的TCP客户端/服务器示例,展示了futures-io-preview的实际应用:
// 客户端代码 client.rs
use futures::prelude::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// 连接到TCP服务器
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
// 异步写入数据
let write_buf = b"Ping from client";
stream.write_all(write_buf).await?;
println!("Client sent: {}", String::from_utf8_lossy(write_buf));
// 异步读取响应
let mut read_buf = vec![0; 1024];
let n = stream.read(&mut read_buf).await?;
println!("Client received: {}", String::from_utf8_lossy(&read_buf[..n]));
Ok(())
}
// 服务器代码 server.rs
use futures::prelude::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;
use tokio::net::{TcpListener, TcpStream};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// 创建TCP监听器
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on 127.0.0.1:8080");
loop {
// 接受新连接
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
// 读取客户端数据
match socket.read(&mut buf).await {
Ok(n) if n == 0 => return,
Ok(n) => {
println!("Server received: {}", String::from_utf8_lossy(&buf[..n]));
// 写入响应
let response = b"Pong from server";
if let Err(e) = socket.write_all(response).await {
eprintln!("Write error: {}", e);
}
}
Err(e) => {
eprintln!("Read error: {}", e);
}
}
});
}
}
元数据
- 许可证: MIT OR Apache-2.0
- 大小: 7.64 KiB
- 所有者: rust-lang-nursery/futures团队
- 主要贡献者: Taylor Cramer
1 回复
Rust异步I/O库futures-io-preview使用指南
简介
futures-io-preview
是Rust异步I/O生态系统的核心组件之一,属于futures-preview
套件的一部分。它提供了非阻塞I/O的基础抽象,是构建高性能异步网络应用和流处理的关键库。
这个库主要包含两个核心trait:
AsyncRead
- 异步读取接口AsyncWrite
- 异步写入接口
基本用法
首先需要在Cargo.toml
中添加依赖:
[dependencies]
futures-preview = { version = "0.3.0-alpha.19", features = ["io-compat"] }
示例1:基本异步读写
use futures::prelude::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
async fn copy_data<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
reader: &mut R,
writer: &mut W,
) -> std::io::Result<()> {
let mut buf = [0u8; 8]; // 8字节缓冲区
loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
break;
}
writer.write_all(&buf[..n]).await?;
}
writer.flush().await?;
Ok(())
}
示例2:与标准库I/O类型互操作
futures-io-preview
提供了与标准库std::io
类型的互操作性:
use futures::io::AllowStdIo;
use std::fs::File;
async fn read_file() -> std::io::Result<()> {
let file = File::open("example.txt")?;
let mut async_file = AllowStdIo::new(file);
let mut contents = Vec::new();
async_file.read_to_end(&mut contents).await?;
println!("File content length: {}", contents.len());
Ok(())
}
高级特性
示例3:组合异步读写操作
use futures::io::{AsyncReadExt, AsyncWriteExt};
use futures::executor::block_on;
async fn process_stream<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
input: &mut R,
output: &mut W,
) -> std::io::Result<()> {
// 读取头部
let mut header = [0u8; 4];
input.read_exact(&mut header).await?;
// 处理并写入头部
let processed_header = header.iter().map(|b| b.wrapping_add(1)).collect::<Vec<_>>();
output.write_all(&processed_header).await?;
// 复制剩余内容
let mut buffer = [0u8; 1024];
loop {
let n = input.read(&mut buffer).await?;
if n == 0 {
break;
}
output.write_all(&buffer[..n]).await?;
}
output.flush().await?;
Ok(())
}
示例4:超时处理
use futures::io::AsyncReadExt;
use futures::future::FutureExt;
use std::time::Duration;
async fn read_with_timeout<R: AsyncRead + Unpin>(
reader: &mut R,
timeout: Duration,
) -> std::io::Result<Vec<u8>> {
let mut buf = Vec::new();
let read_future = reader.read_to_end(&mut buf);
let timeout_future = async_std::task::sleep(timeout);
futures::select! {
res = read_future.fuse() => res.map(|_| buf),
_ = timeout_future.fuse() => Err std::io::Error::new(
std::io::ErrorKind::TimedOut,
"Read operation timed out",
)),
}
}
完整示例Demo
下面是一个完整的异步TCP客户端/服务器示例,展示了如何使用futures-io-preview
进行网络编程:
use futures::prelude::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
// 异步TCP服务器
async fn server() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on port 8080");
loop {
let (mut stream, _) = listener.accept().await?;
println!("New client connected");
task::spawn(async move {
let mut buf = [0u8; 1024];
loop {
let n = match stream.read(&mut buf).await {
Ok(n) if n == 0 => break, // 连接关闭
Ok(n) => n,
Err(e) => {
eprintln!("Read error: {}", e);
break;
}
};
// 处理数据并回显
if let Err(e) = stream.write_all(&buf[..n]).await {
eprintln!("Write error: {}", e);
break;
}
}
});
}
}
// 异步TCP客户端
async fn client() -> std::io::Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
println!("Connected to server");
// 发送数据
let message = b"Hello, server!";
stream.write_all(message).await?;
println!("Sent: {}", String::from_utf8_lossy(message));
// 接收回显
let mut buf = [0u8; 1024];
let n = stream.read(&mut buf).await?;
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
Ok(())
}
// 主函数
fn main() -> std::io::Result<()> {
// 启动服务器
let server_handle = task::spawn(server());
// 等待服务器启动
task::block_on(async {
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
});
// 运行客户端
task::block_on(client())?;
// 保持服务器运行
task::block_on(server_handle)
}
性能提示
- 缓冲区大小选择很重要 - 通常8KB-64KB是不错的选择
- 批量操作比单字节操作高效得多
- 避免在异步任务中执行阻塞操作
- 使用
flush()
确保数据真正写入,但不要过度调用
与其他库集成
futures-io-preview
可以与以下流行库良好配合:
tokio
- 通过tokio-util
的兼容层async-std
- 原生支持hyper
- 用于HTTP客户端/服务器tungstenite
- WebSocket实现
注意事项
- 这是一个预览版库,API可能会发生变化
- 需要Rust 1.39+版本(支持async/await语法)
- 实际使用时通常需要配合执行器(runtime)如
tokio
或async-std
通过合理使用futures-io-preview
,可以构建出高性能、非阻塞的网络应用和流处理系统。