Rust异步任务管理库zenoh-task的使用:高效处理分布式系统中的并发与任务调度

Rust异步任务管理库zenoh-task的使用:高效处理分布式系统中的并发与任务调度

⚠️ 警告 ⚠️
这个crate是专为Zenoh内部使用而设计的。
不能保证API在任何版本中保持不变,包括补丁更新。
强烈建议仅依赖zenoh和zenoh-ext crates,并使用它们的公共API。

安装

在项目目录中运行以下Cargo命令:

cargo add zenoh-task

或者在Cargo.toml中添加以下行:

zenoh-task = "1.5.0"

示例代码

以下是一个使用zenoh-task进行异步任务调度和管理的完整示例:

use zenoh_task::{AsyncTask, TaskManager};
use std::time::Duration;
use async_std::task;

// 定义一个异步任务
async fn sample_task(id: usize) {
    println!("Task {} started", id);
    task::sleep(Duration::from_secs(1)).await;
    println!("Task {} completed", id);
}

#[async_std::main]
async fn main() {
    // 创建任务管理器
    let task_manager = TaskManager::new();
    
    // 启动多个异步任务
    for i in 0..5 {
        task_manager.spawn(AsyncTask::new(sample_task(i)));
    }
    
    // 等待所有任务完成
    task_manager.join_all().await;
    
    println!("All tasks completed");
}

高级用法示例

use zenoh_task::{AsyncTask, TaskManager, TaskHandle};
use std::time::Duration;
use async_std::task;

// 带返回值的任务
async fn compute_task(id: usize) -> usize {
    println!("Computing task {}...", id);
    task::sleep(Duration::from_secs(1)).await;
    id * 2
}

#[async_std::main]
async fn main() {
    let task_manager = TaskManager::new();
    let mut handles = Vec::new();
    
    // 启动计算任务并保留句柄
    for i in 0..3 {
        let handle: TaskHandle<usize> = task_manager.spawn_with_handle(AsyncTask::new(compute_task(i)));
        handles.push(handle);
    }
    
    // 获取任务结果
    for (i, handle) in handles.into_iter().enumerate() {
        let result = handle.await;
        println!("Task {} result: {}", i, result);
    }
    
    // 关闭任务管理器
    task_manager.shutdown().await;
}

完整示例代码

以下是一个结合基本用法和高级用法的完整示例:

use zenoh_task::{AsyncTask, TaskManager, TaskHandle};
use std::time::Duration;
use async_std::task;

// 简单任务
async fn simple_task(id: usize) {
    println!("[Simple] Task {} started", id);
    task::sleep(Duration::from_secs(1)).await;
    println!("[Simple] Task {} completed", id);
}

// 带返回值的复杂任务
async fn complex_task(id: usize) -> String {
    println!("[Complex] Task {} processing...", id);
    task::sleep(Duration::from_secs(2)).await;
    format!("Result-{}", id * 10)
}

#[async_std::main]
async fn main() {
    // 创建任务管理器
    let manager = TaskManager::new();
    
    // 第一部分:简单任务
    println!("Starting simple tasks...");
    for i in 0..3 {
        manager.spawn(AsyncTask::new(simple_task(i)));
    }
    
    // 第二部分:带返回值的任务
    println!("Starting complex tasks...");
    let mut handles = Vec::new();
    for i in 0..2 {
        let handle = manager.spawn_with_handle(AsyncTask::new(complex_task(i)));
        handles.push(handle);
    }
    
    // 等待所有任务完成
    manager.join_all().await;
    
    // 获取复杂任务的结果
    for handle in handles {
        let result = handle.await;
        println!("Received result: {}", result);
    }
    
    // 关闭任务管理器
    manager.shutdown().await;
    println!("All tasks finished and manager shutdown");
}

注意事项

  1. 这个crate主要用于Zenoh内部使用,不建议在生产环境中直接使用
  2. API可能会在不通知的情况下发生变化
  3. 建议通过zenoh或zenoh-ext crates提供的公共API来使用这些功能

许可证

EPL-2.0 或 Apache-2.0


1 回复

Rust异步任务管理库zenoh-task的使用:高效处理分布式系统中的并发与任务调度

介绍

zenoh-task是一个基于zenoh协议的Rust异步任务管理库,专门设计用于简化分布式系统中的并发编程和任务调度。它构建在tokio运行时之上,提供了高级抽象来处理分布式环境中的任务执行、协调和监控。

zenoh-task的主要特点包括:

  • 分布式任务调度和执行
  • 自动负载均衡
  • 任务依赖管理
  • 容错和重试机制
  • 与zenoh网络协议无缝集成

安装

在Cargo.toml中添加依赖:

[dependencies]
zenoh-task = "0.7"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 创建简单任务

use zenoh_task::{Task, TaskContext};

#[tokio::main]
async fn main() {
    // 定义一个简单任务
    let task = Task::builder()
        .name("example_task")
        .spawn(|_ctx: TaskContext| async {
            println!("Hello from zenoh-task!");
            Ok(())
        })
        .unwrap();

    // 等待任务完成
    task.await.unwrap();
}

2. 分布式任务执行

use zenoh_task::{DistributedTask, TaskContext};
use zenoh::config::Config;

#[tokio::main]
async fn main() {
    // 创建zenoh配置
    let config = Config::default();
    
    // 创建分布式任务
    let task = DistributedTask::builder()
        .name("distributed_calculation")
        .with_zenoh_config(config)
        .spawn(|ctx: TaskContext| async move {
            let worker_id = ctx.worker_id();
            println!("Worker {} is processing data", worker_id);
            
            // 模拟工作负载
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
            
            Ok(worker_id)
        })
        .unwrap();

    // 获取任务结果
    let result = task.await.unwrap();
    println!("Task completed with result: {:?}", result);
}

3. 任务依赖和调度

use zenoh_task::{DependencyGraph, Task, TaskContext};

#[tokio::main]
async fn main() {
    // 创建任务图
    let mut graph = DependencyGraph::new();
    
    // 添加任务1
    let task1 = Task::builder()
        .name("task1")
        .spawn(|_ctx| async {
            println!("Task 1 executing");
            Ok(42)
        })
        .unwrap();
    
    // 添加任务2,依赖于任务1
    let task2 = Task::builder()
        .name("task2")
        .depends_on(&task1)
        .spawn(|ctx| async move {
            let result = ctx.dependency_result::<i32>("task1").unwrap();
            println!("Task 2 received {} from Task 1", result);
            Ok(result * 2)
        })
        .unwrap();
    
    // 将任务添加到图中
    graph.add_task(task1);
    graph.add_task(task2);
    
    // 执行任务图
    let results = graph.run().await.unwrap();
    println!("Final results: {:?}", results);
}

高级功能

1. 自定义任务调度策略

use zenoh_task::{DistributedTask, SchedulerPolicy, TaskContext};

#[tokio::main]
async fn main() {
    let task = DistributedTask::builder()
        .name("custom_scheduled_task")
        .scheduler_policy(SchedulerPolicy::RoundRobin)
        .spawn(|ctx| async move {
            println!("Processing on worker {}", ctx.worker_id());
            Ok(())
        })
        .unwrap();
    
    task.await.unwrap();
}

2. 任务监控

use zenoh_task::{DistributedTask, TaskMonitor, TaskContext};

#[tokio::main]
async fn main() {
    // 创建监控器
    let monitor = TaskMonitor::new();
    
    let task = DistributedTask::builder()
        .name="monitored_task"
        .with_monitor(&monitor)
        .spawn(|ctx| async move {
            for i in 0..10 {
                ctx.report_progress(i, 10).await;
                tokio::time::sleep(std::time::Duration::from_millis(500)).await;
            }
            Ok(())
        })
        .unwrap();
    
    // 在另一个任务中监控进度
    tokio::spawn(async move {
        while let Some(update) = monitor.recv().await {
            println!("Task progress: {}/{}", update.current, update.total);
        }
    });
    
    task.await.unwrap();
}

3. 错误处理和重试

use zenoh_task::{DistributedTask, RetryPolicy, TaskContext};

#[tokio::main]
async fn main() {
    let mut attempts = 0;
    
    let task = DistributedTask::builder()
        .name="retryable_task"
        .retry_policy(RetryPolicy::fixed_delay(3, std::time::Duration::from_secs(1)))
        .spawn(move |_ctx| async move {
            attempts += 1;
            if attempts < 3 {
                println!("Simulating failure (attempt {})", attempts);
                Err("Temporary failure".into())
            } else {
                println!("Success on attempt {}", attempts);
                Ok(())
            }
        })
        .unwrap();
    
    task.await.unwrap();
}

完整示例

以下是一个结合了任务依赖、监控和错误处理的完整示例:

use zenoh_task::{DependencyGraph, Task, TaskContext, TaskMonitor, RetryPolicy};
use zenoh::config::Config;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 创建任务图和监控器
    let mut graph = DependencyGraph::new();
    let monitor = TaskMonitor::new();
    
    // 任务1 - 数据准备任务
    let task1 = Task::builder()
        .name="data_preparation"
        .spawn(|_ctx| async {
            println!("Preparing initial data...");
            sleep(Duration::from_secs(1)).await;
            Ok(vec![1, 2, 3, 4, 5]) // 返回一些初始数据
        })
        .unwrap();
    
    // 任务2 - 数据处理任务(依赖于任务1)
    let task2 = Task::builder()
        .name="data_processing"
        .depends_on(&task1)
        .spawn(|ctx| async move {
            // 获取任务1的结果
            let data = ctx.dependency_result::<Vec<i32>>("data_preparation").unwrap();
            println!("Processing data: {:?}", data);
            
            // 处理数据
            let processed: Vec<i32> = data.into_iter().map(|x| x * 2).collect();
            Ok(processed)
        })
        .unwrap();
    
    // 任务3 - 最终任务(带重试和监控)
    let task3 = Task::builder()
        .name="final_task"
        .depends_on(&task2)
        .with_monitor(&monitor)
        .retry_policy(RetryPolicy::fixed_delay(2, Duration::from_secs(1)))
        .spawn(move |ctx| async move {
            // 获取任务2的结果
            let processed = ctx.dependency_result::<Vec<i32>>("data_processing").unwrap();
            
            // 模拟工作进度
            for i in 1..=5 {
                ctx.report_progress(i, 5).await;
                sleep(Duration::from_millis(300)).await;
            }
            
            // 计算结果
            let sum: i32 = processed.iter().sum();
            println!("Final sum: {}", sum);
            Ok(sum)
        })
        .unwrap();
    
    // 添加所有任务到图中
    graph.add_task(task1);
    graph.add_task(task2);
    graph.add_task(task3);
    
    // 启动监控任务
    tokio::spawn(async move {
        while let Some(update) = monitor.recv().await {
            println!("[Monitor] {} progress: {}/{}", 
                   update.task_name, update.current, update.total);
        }
    });
    
    // 执行任务图
    match graph.run().await {
        Ok(results) => {
            println!("All tasks completed successfully!");
            println!("Final results: {:?}", results);
        }
        Err(e) => {
            eprintln!("Task execution failed: {}", e);
        }
    }
}

最佳实践

  1. 任务粒度:保持任务适度大小,不要太小导致调度开销,也不要太大导致负载不均衡

  2. 错误处理:充分利用zenoh-task提供的重试和错误处理机制

  3. 资源管理:对于资源密集型任务,使用适当的调度策略

  4. 监控:为关键任务实现监控以跟踪进度和性能

  5. 依赖设计:合理规划任务依赖关系,避免循环依赖

zenoh-task为Rust分布式系统提供了强大的任务管理能力,结合zenoh协议的通信功能,可以构建高效可靠的分布式应用。

回到顶部