Rust异步编程库n0-future的使用:高性能并发和Future抽象工具包

Rust异步编程库n0-future的使用:高性能并发和Future抽象工具包

n0-future是number 0的异步Rust实现方式,这个库有两个主要目的:

  1. 提供一个更容易获取的库,重新暴露一些合理的future/stream组合器,这些组合器不使用太多unsafe代码,看起来是安全的,而不需要安装很多小库。
  2. 使编写与Wasm兼容的异步代码更容易。

关于更安全的future相关代码

n0-future重新暴露了futures-litefutures-bufferedfutures-util(但主要用于Sink及其组合器)。

关于更简单的Wasm兼容代码

编写在wasm*-*-unknown目标上工作的代码并不容易:

  • std::time::Instant::now()在使用时会panic
  • 不能生成线程
  • 如果使用wasm-bindgen(实际上是你唯一的选择),像JsValue这样的结构是!Send

n0-future通过提供类似的API来解决这些问题,这些API很容易在Wasm和非Wasm目标之间使用#[cfg(...)],理想情况下不需要任何cfg,而是将cfg限制只发生在库内部。

示例代码

use n0_future::task;
use n0_future::time::{sleep, Duration};

async fn example_task(id: u32) {
    println!("Task {} started", id);
    // 模拟异步工作
    sleep(Duration::from_secs(1)).await;
    println!("Task {} finished", id);
}

#[tokio::main]
async fn main() {
    // 在非Wasm环境下使用tokio的任务生成
    #[cfg(not(target_arch = "wasm32"))] {
        let handle = task::spawn(example_task(1));
        handle.await.unwrap();
    }
    
    // 在Wasm环境下使用wasm-bindgen-futures的任务生成
    #[cfg(target_arch = "wasm32")] {
        let _ = task::spawn_local(example_task(2));
    }
    
    // 跨平台定时器示例
    println!("Waiting for 2 seconds...");
    sleep(Duration::from_secs(2)).await;
    println!("Done waiting!");
}

完整示例:跨平台异步HTTP请求

use n0_future::boxed::BoxFuture;
use n0_future::task;
use std::error::Error;

// 定义一个跨平台的异步HTTP GET函数
fn fetch_url(url: &str) -> BoxFuture<Result<String, Box<dyn Error>>> {
    #[cfg(not(target_arch = "wasm32"))] {
        // 使用reqwest的异步实现
        Box::pin(async move {
            let resp = reqwest::get(url).await?;
            let body = resp.text().await?;
            Ok(body)
        })
    }
    
    #[cfg(target_arch = "wasm32")] {
        // 使用wasm-bindgen和web_sys的实现
        Box::pin(async move {
            use wasm_bindgen::JsValue;
            use wasm_bindgen_futures::JsFuture;
            use web_sys::{Request, RequestInit, Response};
            
            let mut opts = RequestInit::new();
            opts.method("GET");
            
            let request = Request::new_with_str_and_init(url, &opts)?;
            let window = web_sys::window().unwrap();
            let resp_value = JsFuture::from(window.fetch_with_request(&request)).await?;
            let resp: Response = resp_value.dyn_into()?;
            let text = JsFuture::from(resp.text()?).await?;
            Ok(text.as_string().unwrap())
        })
    }
}

#[tokio::main]
#[cfg(not(target_arch = "wasm32"))]
async fn main() -> Result<(), Box<dyn Error>> {
    let handle = task::spawn(async {
        let body = fetch_url("https://httpbin.org/get").await?;
        println!("Response: {}", body);
        Ok(())
    });
    
    handle.await?
}

#[cfg(target_arch = "wasm32")]
fn main() {
    wasm_bindgen_futures::spawn_local(async {
        match fetch_url("https://httpbin.org/get").await {
            Ok(body) => log::info!("Response: {}", body),
            Err(e) => log::error!("Error: {}", e),
        }
    });
}

完整示例:跨平台文件读写

use n0_future::io::{read_to_string, write};
use n0_future::task;
use std::error::Error;

async fn process_file(input_path: &str, output_path: &str) -> Result<(), Box<dyn Error>> {
    // 读取文件内容
    let content = read_to_string(input_path).await?;
    
    // 处理内容(示例:转换为大写)
    let processed = content.to_uppercase();
    
    // 写入新文件
    write(output_path, processed.as_bytes()).await?;
    
    Ok(())
}

#[tokio::main]
#[cfg(not(target_arch = "wasm32"))]
async fn main() -> Result<(), Box<dyn Error>> {
    task::spawn(async {
        process_file("input.txt", "output.txt").await
    }).await?
}

#[cfg(target_arch = "wasm32")]
fn main() {
    wasm_bindgen_futures::spawn_local(async {
        if let Err(e) = process_file("/input.txt", "/output.txt").await {
            log::error!("文件处理错误: {}", e);
        }
    });
}

完整示例:跨平台定时任务

use n0_future::time::{interval, Duration, Instant};
use n0_future::task;

async fn run_timer(seconds: u64) {
    let start = Instant::now();
    let mut intv = interval(Duration::from_secs(1));
    
    for _ in 0..seconds {
        intv.next().await;
        let elapsed = start.elapsed().as_secs();
        println!("已运行 {} 秒", elapsed);
    }
}

#[tokio::main]
#[cfg(not(target_arch = "wasm32"))]
async fn main() {
    task::spawn(run_timer(5)).await.unwrap();
}

#[cfg(target_arch = "wasm32")]
fn main() {
    wasm_bindgen_futures::spawn_local(run_timer(5));
}

许可证

该项目采用以下任一许可证:

  • Apache License, Version 2.0
  • MIT license

1 回复

Rust异步编程库n0-future的使用:高性能并发和Future抽象工具包

介绍

n0-future是一个轻量级、高性能的Rust异步编程库,专注于提供高效的Future抽象和并发工具。它特别适合需要精细控制异步任务执行和资源管理的场景。

主要特性

  • 轻量级Future实现
  • 高性能任务调度
  • 灵活的并发控制
  • 零成本抽象
  • 与async/await语法良好集成

使用方法

基本依赖配置

首先在Cargo.toml中添加依赖:

[dependencies]
n0-future = "0.3"

基本Future使用

use n0_future::{Future, Poll};

// 自定义Future实现
struct MyFuture {
    count: usize,
}

impl Future for MyFuture {
    type Output = usize;
    
    fn poll(&mut self) -> Poll<Self::Output> {
        self.count += 1;
        if self.count >= 10 {
            Poll::Ready(self.count)
        } else {
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let fut = MyFuture { count: 0 };
    let result = fut.await;
    println!("Final count: {}", result);
}

并发任务处理

use n0_future::join_all;

// 模拟异步任务
async fn task(id: u32, delay_ms: u64) -> u32 {
    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
    println!("Task {} completed", id);
    id
}

#[tokio::main]
async fn main() {
    let tasks = vec![
        task(1, 100),
        task(2, 50),
        task(3, 200),
        task(4, 30),
    ];
    
    // 使用join_all并发执行所有任务
    let results = join_all(tasks).await;
    println!("All tasks completed: {:?}", results);
}

自定义执行器

use n0_future::{Executor, Spawner};

// 自定义执行器实现
struct SimpleExecutor;

impl Executor for SimpleExecutor {
    fn spawn(&mut self, future: Box<dyn Future<Output = ()> + Send>) {
        tokio::spawn(async move {
            future.await;
        });
    }
}

#[tokio::main]
async fn main() {
    let executor = SimpleExecutor;
    let spawner = Spawner::new(executor);
    
    // 使用自定义执行器生成任务
    spawner.spawn(async {
        println!("Running in custom executor");
    });
}

高级用法:Future组合

use n0_future::{FutureExt, TryFutureExt};

// 模拟数据获取
async fn fetch_data() -> Result<String, &'static str> {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    Ok("Data loaded".to_string())
}

// 模拟数据处理
async fn process_data(data: String) -> Result<usize, &'static str> {
    Ok(data.len())
}

#[tokio::main]
async fn main() {
    // 组合多个Future操作
    let result = fetch_data()
        .and_then(|data| process_data(data))
        .map_ok(|len| println!("Processed data length: {}", len))
        .await;
    
    if let Err(e) = result {
        eprintln!("Error occurred: {}", e);
    }
}

性能建议

  1. 对于大量小任务,使用n0_future::join_all比单独await每个任务更高效
  2. 考虑使用n0_future::select代替tokio::select以获得更精细的控制
  3. 对于CPU密集型任务,使用n0_future::block_in_place避免阻塞运行时

注意事项

  • n0-future目前仍在活跃开发中,API可能会有变动
  • 与tokio生态系统集成良好,但也可以与其他运行时一起使用
  • 对于简单的异步需求,标准库或tokio可能更合适

完整示例Demo

下面是一个完整的n0-future使用示例,展示了多个特性的综合应用:

use n0_future::{Future, Poll, join_all, FutureExt, TryFutureExt, Executor, Spawner};
use tokio::time;

// 自定义Future实现
struct CounterFuture {
    count: usize,
    max: usize,
}

impl Future for CounterFuture {
    type Output = usize;
    
    fn poll(&mut self) -> Poll<Self::Output> {
        self.count += 1;
        if self.count >= self.max {
            Poll::Ready(self.count)
        } else {
            Poll::Pending
        }
    }
}

// 自定义执行器
struct MyExecutor;

impl Executor for MyExecutor {
    fn spawn(&mut self, future: Box<dyn Future<Output = ()> + Send>) {
        tokio::spawn(async move {
            future.await;
        });
    }
}

// 模拟API请求
async fn fetch_api_data(id: u32) -> Result<String, &'static str> {
    let delay = time::Duration::from_millis(50 * id as u64);
    time::sleep(delay).await;
    Ok(format!("Data from API {}", id))
}

#[tokio::main]
async fn main() {
    // 1. 基本Future使用
    let counter = CounterFuture { count: 0, max: 5 };
    let count = counter.await;
    println!("Counter reached: {}", count);
    
    // 2. 并发任务处理
    let tasks = (1..=5).map(|i| fetch_api_data(i));
    let results = join_all(tasks).await;
    println!("API results: {:?}", results);
    
    // 3. 自定义执行器
    let executor = MyExecutor;
    let spawner = Spawner::new(executor);
    
    spawner.spawn(async {
        println!("Task running in custom executor");
    });
    
    // 4. Future组合
    let processed = fetch_api_data(1)
        .and_then(|data| async move {
            Ok(data.len())
        })
        .map_ok(|len| println!("Data length: {}", len))
        .await;
    
    if let Err(e) = processed {
        eprintln!("Processing error: {}", e);
    }
}

这个完整示例展示了:

  1. 自定义Future实现
  2. 并发任务处理
  3. 自定义执行器
  4. Future组合操作

运行此示例需要确保在Cargo.toml中添加了n0-future和tokio的依赖:

[dependencies]
n0-future = "0.3"
tokio = { version = "1.0", features = ["full"] }
回到顶部