Rust异步编程库futures-preview的使用,提供高效并发和Future抽象支持

Rust异步编程库futures-preview的使用,提供高效并发和Future抽象支持

公告

当前futures crate的开发正在0.3分支上进行。

futures-rs简介

这是一个Rust中零成本futures的实现库。

使用方式

首先,在您的Cargo.toml中添加:

[dependencies]
futures = "0.2.2"

然后在您的crate中添加:

extern crate futures;

use futures::Future;

no_std支持

futures-rs可以在没有标准库的环境下工作,比如裸机环境。但此时它的API会大幅减少。要在#[no_std]环境中使用futures-rs,请使用:

[dependencies]
futures = { version = "0.2.2", default-features = false }

完整示例代码

extern crate futures;

use futures::Future;
use futures::future::lazy;

fn main() {
    // 创建一个简单的future
    let future = lazy(|| {
        println!("Executing in the future!");
        Ok::<(), ()>(())
    });

    // 执行future
    futures::executor::block_on(future).unwrap();
    
    // 使用组合器
    let combined = future.and_then(|_| {
        println!("Chained future execution");
        Ok(())
    });
    
    futures::executor::block_on(combined).unwrap();
    
    // 并发执行多个future
    let futures = vec![
        lazy(|| { println!("Future 1"); Ok(1) }),
        lazy(|| { println!("Future 2"); Ok(2) }),
        lazy(|| { println!("Future 3"); Ok(3) }),
    ];
    
    let joined = futures::future::join_all(futures);
    let results = futures::executor::block_on(joined).unwrap();
    println!("Results: {:?}", results);
}

示例说明

  1. 首先创建一个简单的lazy future,它会在执行时打印消息
  2. 使用block_on执行单个future
  3. 使用and_then组合器将多个future串联执行
  4. 使用join_all并发执行多个future并收集结果

许可证

本项目采用以下任一许可证:

  • Apache License, Version 2.0
  • MIT license

贡献

除非您明确声明,否则您有意为Futures提交的任何贡献(如Apache-2.0许可证中所定义)都应按照上述方式双重许可,且不附加任何其他条款或条件。

扩展示例代码

extern crate futures;

use futures::Future;
use futures::future::{lazy, ok, err};
use futures::stream::{self, Stream};
use futures::future::Either;

fn main() {
    // 示例1: 基本Future使用
    let future1 = ok::<i32, i32>(42)
        .and_then(|val| {
            println!("Got value: {}", val);
            Ok(val + 1)
        })
        .map(|val| {
            println!("Mapped value: {}", val);
            val
        });
    
    let result1 = futures::executor::block_on(future1).unwrap();
    println!("Result1: {}", result1);

    // 示例2: 错误处理
    let future2 = err::<i32, i32>(0)
        .or_else(|err| {
            println!("Got error: {}", err);
            Ok(err + 1)
        });
    
    let result2 = futures::executor::block_on(future2).unwrap();
    println!("Result2: {}", result2);

    // 示例3: Stream使用
    let stream = stream::iter_ok::<_, i32>(vec![1, 2, 3])
        .for_each(|val| {
            println!("Stream value: {}", val);
            Ok(())
        });
    
    futures::executor::block_on(stream).unwrap();

    // 示例4: 选择第一个完成的Future
    let future_a = lazy(|| {
        std::thread::sleep(std::time::Duration::from_millis(100));
        Ok::<&str, &str>("future a")
    });
    
    let future_b = lazy(|| {
        std::thread::sleep(std::time::Duration::from_millis(50));
        Ok::<&str, &str>("future b")
    });
    
    let future_select = future_a.select(future_b)
        .then(|res| match res {
            Ok(Either::Left((a, _))) => Ok(a),
            Ok(Either::Right((b, _))) => Ok(b),
            Err(_) => Err("error"),
        });
    
    let result_select = futures::executor::block_on(future_select).unwrap();
    println!("First completed: {}", result_select);
}

扩展示例说明

  1. 展示了基本的Future链式操作,包括and_thenmap方法
  2. 演示了错误处理流程,使用or_else处理错误情况
  3. 展示了Stream的使用,通过for_each处理流中的每个元素
  4. 演示了使用select选择第一个完成的Future

1 回复

Rust异步编程库futures-preview的使用指南

介绍

futures-preview是Rust生态中一个强大的异步编程库,它提供了高效的并发抽象和Future支持。这个库是标准库中std::future的基础,也是async/await语法的底层支撑。

主要特点:

  • 提供Future trait的稳定实现
  • 支持高效的异步I/O操作
  • 包含各种组合器用于处理异步操作
  • 与async/await语法无缝集成
  • 支持任务调度和执行器

使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
futures-preview = { version = "0.3.0-alpha.19", features = ["async-await"] }

基本示例

use futures::executor::block_on;
use futures::future::FutureExt; // 用于组合器方法

async fn hello_world() {
    println!("Hello, world!");
}

fn main() {
    let future = hello_world(); // 创建一个Future
    block_on(future); // 执行Future直到完成
}

Future组合器

use futures::future::{self, FutureExt, TryFutureExt};

async fn compute() -> Result<i32, String> {
    Ok(42)
}

async fn app() -> Result<(), String> {
    let result = compute()
        .map_ok(|val| val * 2) // 成功时转换值
        .map_err(|极e| format!("Error: {}", e)) // 错误时转换错误
        .await?;
    
    println!("Result: {}", result);
    Ok(())
}

fn main() {
    futures::executor::block_on(app()).unwrap();
}

并发执行多个Future

use futures::future::{join, try_join};
use futures::executor::block_on;

async fn get_user() -> Result<String, String> {
    Ok("Alice".to_string())
}

async fn get_age() -> Result<u32, String> {
    Ok(30)
}

async fn app() -> Result<(), String> {
    // 并行执行两个future,等待两者都完成
    let (user, age) = try_join(get_user(), get_age()).await?;
    println!("User: {}, Age: {}", user, age);
    Ok(())
}

fn main() {
    block_on(app()).unwrap();
}

流处理(Streams)

use futures::stream::{self, StreamExt};
use futures::executor::block_on;

async fn process_stream() {
    let stream = stream::iter(1..=5);
    
    // 处理流中的每个元素
    let sum = stream
        .map(|x| x * 2)
        .fold(0, |acc, x| async move { acc + x })
        .await;
    
    println!("Sum: {}", sum);
}

fn main() {
    block_on(process_stream());
}

选择第一个完成的Future

use futures::future::{self, FutureExt, select};
use futures::pin_mut;

async fn task_one() -> &'static str {
    // 模拟耗时操作
    future::pending().await;
    "one"
}

async fn task_two() -> &'static str {
    "two"
}

async fn app() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();
    
    pin_mut!(t1);
    pin_mut!(t2);
    
    match future::select(t1, t2).await {
        future::Either::Left((res, _)) => println!("Task one finished first: {:?}", res),
        future::Either::Right((res, _)) => println!("Task two finished first: {:?}", res),
    }
}

fn main() {
    futures::executor::block_on(app());
}

高级用法

自定义Future

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::executor::block_on;

struct MyFuture {
    count: u32,
}

impl Future for MyFuture {
    type Output = u32;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.count += 1;
        
        if self.count >= 3 {
            Poll::Ready(self.count)
        } else {
            cx.waker().wake_by_ref(); // 通知执行器再次poll
            Poll::Pending
        }
    }
}

fn main() {
    let future = MyFuture { count: 0 };
    let result = block_on(future);
    println!("Result: {}", result); // 输出: Result: 3
}

使用通道进行任务间通信

use futures::channel极::mpsc;
use futures::executor::block_on;
use futures::SinkExt;
use futures::StreamExt;

async fn sender(mut tx: mpsc::Sender<i32>) {
    for i in 0..5 {
        tx.send(i).await.unwrap();
    }
}

async fn receiver(mut rx: mpsc::Receiver<i32>) {
    while let Some(item) = rx.next().await {
        println!("Received: {}", item);
    }
}

fn main() {
    let (tx, rx) = mpsc::channel(5);
    
    let sender_task = sender(tx);
    let receiver_task = receiver(rx);
    
    block_on(async {
        futures::join!(sender_task, receiver_task);
    });
}

注意事项

  1. futures-preview是早期版本,建议新项目使用futures crate的稳定版本
  2. 异步代码需要执行器(executor)来驱动,可以使用block_on或集成Tokio等运行时
  3. 注意Pin和Unpin trait在异步编程中的重要性
  4. 资源清理通常使用AsyncDrop模式处理

通过futures-preview库,你可以构建高效的异步应用,利用Rust强大的类型系统和零成本抽象来实现高性能并发编程。

回到顶部