Rust异步任务优雅关闭库shutdown-async的使用,实现安全可靠的异步服务终止功能

Rust异步任务优雅关闭库shutdown-async的使用,实现安全可靠的异步服务终止功能

Shutdown Async是一个用于优雅关闭异步应用程序的库。当您希望在所有正在进行的处理完成后再关闭应用程序以保持状态一致性时,这个库会非常有用。

使用方式

在Cargo.toml中添加依赖:

[dependencies]
shutdown-async = "0.1.1"

使用示例如下:

use shutdown_async::ShutdownController;

#[tokio::main]
async fn main() {
  let shutdown = ShutdownController::new();
   
  tokio::task::spawn({
    let mut monitor = shutdown.subscribe();
    async move {
      // 等待某些事情发生
      tokio::select! {
       _ = monitor.recv() => { println!("shutdown initiated"); }
       _ = tokio::time::sleep(ONE_YEAR) => { println!("one year has passed!"); }
      }
    }
  });

  shutdown.shutdown().await;
}

static ONE_YEAR: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24 * 365);

完整示例代码

下面是一个更完整的示例,展示如何在真实场景中使用shutdown-async:

use shutdown_async::ShutdownController;
use std::time::Duration;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 创建关闭控制器
    let shutdown = ShutdownController::new();
    
    // 创建一个通道用于任务间通信
    let (tx, mut rx) = mpsc::channel(32);
    
    // 启动工作线程1
    tokio::spawn({
        let mut monitor = shutdown.subscribe();
        let tx = tx.clone();
        async move {
            loop {
                tokio::select! {
                    // 收到关闭信号时退出循环
                    _ = monitor.recv() => {
                        println!("Worker 1 received shutdown signal, exiting...");
                        break;
                    }
                    // 模拟工作
                    _ = tokio::time::sleep(Duration::from_millis(500)) => {
                        tx.send("Worker 1 is working").await.unwrap();
                    }
                }
            }
        }
    });
    
    // 启动工作线程2
    tokio::spawn({
        let mut monitor = shutdown.subscribe();
        let tx = tx.clone();
        async move {
            loop {
                tokio::select! {
                    // 收到关闭信号时退出循环
                    _ = monitor.recv() => {
                        println!("Worker 2 received shutdown signal, exiting...");
                        break;
                    }
                    // 模拟工作
                    _ = tokio::time::sleep(Duration::from_millis(800)) => {
                        tx.send("Worker 2 is working").await.unwrap();
                    }
                }
            }
        }
    });
    
    // 启动接收线程
    tokio::spawn({
        let mut monitor = shutdown.subscribe();
        async move {
            while let Some(msg) = rx.recv().await {
                println!("Received: {}", msg);
                
                // 检查是否收到关闭信号
                if monitor.try_recv().is_ok() {
                    println!("Receiver got shutdown signal, exiting...");
                    break;
                }
            }
        }
    });
    
    // 模拟运行一段时间后触发关闭
    tokio::time::sleep(Duration::from_secs(5).await;
    println!("Initiating graceful shutdown...");
    
    // 触发关闭
    shutdown.shutdown().await;
    
    println!("All tasks have shut down gracefully");
}

这个示例展示了:

  1. 创建多个工作线程和一个接收线程
  2. 每个线程都订阅了关闭信号
  3. 主线程在5秒后触发优雅关闭
  4. 所有工作线程在收到关闭信号后完成当前工作并退出
  5. 整个过程确保所有任务都能安全结束,不会丢失数据

特性

  • 轻量级,无额外依赖
  • 线程安全,可在多线程环境中使用
  • 简单易用的API
  • 支持Tokio运行时

通过使用shutdown-async库,您可以确保您的异步应用程序能够在需要关闭时优雅地完成所有正在进行的任务,而不会丢失数据或处于不一致状态。


1 回复

Rust异步任务优雅关闭库shutdown-async使用指南

概述

shutdown-async是一个用于优雅关闭异步任务的Rust库,它提供了一种机制来通知异步任务应该开始关闭过程,并等待它们完成清理工作。这对于需要安全终止的服务器、服务或长时间运行的异步应用特别有用。

主要特性

  • 提供优雅关闭通知机制
  • 支持超时设置
  • 与Tokio、async-std等异步运行时兼容
  • 简单易用的API

安装

在Cargo.toml中添加依赖:

[dependencies]
shutdown-async = "0.3"
tokio = { version = "1.0", features = ["full"] } # 如果你使用tokio运行时

基本使用方法

1. 创建关闭控制器

use shutdown_async::ShutdownController;

#[tokio::main]
async fn main() {
    let shutdown = ShutdownController::new();
    let shutdown_signal = shutdown.subscribe();
    
    // 启动你的异步任务
    tokio::spawn(my_async_task(shutdown_signal));
    
    // 当需要关闭时
    shutdown.shutdown().await;
}

2. 在任务中处理关闭信号

async fn my_async_task(mut shutdown_signal: shutdown_async::ShutdownSignal) {
    loop {
        tokio::select! {
            _ = shutdown_signal.recv() => {
                println!("收到关闭信号,开始清理...");
                // 执行清理工作
                println!("清理完成,退出任务");
                break;
            }
            _ = async {
                // 正常的任务逻辑
                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                println!("执行任务中...");
            } => {}
        }
    }
}

高级用法

1. 设置超时

#[tokio::main]
async fn main() {
    let shutdown = ShutdownController::new();
    let shutdown_signal = shutdown.subscribe();
    
    tokio::spawn(my_async_task(shutdown_signal));
    
    // 设置5秒超时
    if let Err(_) = shutdown.shutdown_timeout(std::time::Duration::from_secs(5)).await {
        eprintln!("部分任务未在超时时间内完成");
    }
}

2. 多个任务管理

#[tokio::main]
async fn main() {
    let shutdown = ShutdownController::new();
    
    // 创建多个任务
    for i in 0..5 {
        let signal = shutdown.subscribe();
        tokio::spawn(async move {
            worker_task(i, signal).await;
        });
    }
    
    // 等待Ctrl+C信号
    tokio::signal::ctrl_c().await.unwrap();
    println!("收到中断信号,开始优雅关闭...");
    
    shutdown.shutdown().await;
    println!("所有任务已关闭");
}

async fn worker_task(id: usize, mut signal: shutdown_async::ShutdownSignal) {
    loop {
        tokio::select! {
            _ = signal.recv() => {
                println!("Worker {}: 收到关闭信号", id);
                // 模拟清理工作
                tokio::time::sleep(std::time::Duration::from_millis(100 * id as u64)).await;
                println!("Worker {}: 清理完成", id);
                break;
            }
            _ = async {
                // 模拟工作
                tokio::time::sleep(std::time::Duration::from_secs(1).await;
                println!("Worker {}: 工作中...", id);
            } => {}
        }
    }
}

实际应用示例:HTTP服务器优雅关闭

use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};
use shutdown_async::ShutdownController;
use std::convert::Infallible;

async fn handle_request(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
    Ok(Response::new(Body::from("Hello World")))
}

#[tokio::main]
async fn main() {
    let shutdown = ShutdownController::new();
    let shutdown_signal = shutdown.subscribe();
    
    let make_svc = make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service_fn(handle_request))
    });
    
    let server = Server::bind(&"127.0.0.1:3000".parse().unwrap())
        .serve(make_svc)
        .with_graceful_shutdown(async {
            shutdown_signal.recv().await;
        }));
    
    let server_handle = tokio::spawn(server);
    
    // 模拟收到关闭信号
    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
    shutdown.shutdown().await;
    
    server_handle.await.unwrap().unwrap();
    println!("HTTP服务器已优雅关闭");
}

完整示例代码:带数据库连接的优雅关闭

use shutdown_async::ShutdownController;
use sqlx::postgres::PgPoolOptions;
use std::time::Duration;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建关闭控制器
    let shutdown = ShutdownController::new();
    let shutdown_signal = shutdown.subscribe();
    
    // 初始化数据库连接池
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://user:pass@localhost/db")
        .await?;
    
    // 创建任务通信通道
    let (tx, mut rx) = mpsc::channel(32);
    
    // 启动数据处理任务
    let db_task = tokio::spawn({
        let pool = pool.clone();
        async move {
            database_worker(pool, shutdown_signal, rx).await
        }
    });
    
    // 启动数据生成任务
    let producer_task = tokio::spawn({
        let tx = tx.clone();
        async move {
            data_producer(tx).await
        }
    });
    
    // 模拟运行一段时间后关闭
    tokio::time::sleep(Duration::from_secs(10)).await;
    
    // 开始优雅关闭
    println!("启动优雅关闭...");
    shutdown.shutdown().await;
    
    // 等待任务完成
    let _ = tokio::join!(db_task, producer_task);
    println!("所有任务已关闭");
    
    Ok(())
}

async fn data_producer(tx: mpsc::Sender<i32>) {
    let mut counter = 0;
    loop {
        // 模拟数据生成
        if let Err(_) = tx.send(counter).await {
            // 通道已关闭,退出任务
            break;
        }
        counter += 1;
        tokio::time::sleep(Duration::from_millis(500)).await;
    }
    println!("数据生产者已关闭");
}

async fn database_worker(
    pool: sqlx::PgPool,
    mut shutdown_signal: shutdown_async::ShutdownSignal,
    mut rx: mpsc::Receiver<i32>,
) {
    loop {
        tokio::select! {
            _ = shutdown_signal.recv() => {
                println!("数据库工作者收到关闭信号");
                // 执行数据库清理工作
                cleanup_database(&pool).await;
                println!("数据库清理完成");
                break;
            }
            Some(data) = rx.recv() => {
                // 处理数据
                if let Err(e) = process_data(&pool, data).await {
                    eprintln!("处理数据出错: {}", e);
                }
            }
        }
    }
    println!("数据库工作者已关闭");
}

async fn process_data(pool: &sqlx::PgPool, data: i32) -> Result<(), sqlx::Error> {
    // 模拟数据库操作
    sqlx::query("INSERT INTO test_table (value) VALUES ($1)")
        .bind(data)
        .execute(pool)
        .await?;
    Ok(())
}

async fn cleanup_database(pool: &sqlx::PgPool) {
    // 模拟清理操作
    println!("开始清理数据库...");
    if let Err(e) = sqlx::query("DELETE FROM temp_data")
        .execute(pool)
        .await
    {
        eprintln!("清理数据库时出错: {}", e);
    }
    tokio::time::sleep(Duration::from_secs(1)).await;
}

注意事项

  1. 确保所有任务都正确处理关闭信号,否则shutdown()可能会无限期等待
  2. 对于关键任务,考虑使用shutdown_timeout()避免无限等待
  3. 在任务中,关闭信号接收后应尽快完成清理工作
  4. 一个ShutdownController可以创建多个ShutdownSignal,所有信号会在shutdown()调用时被触发

shutdown-async库提供了一种简单而强大的方式来管理异步任务的优雅关闭,是构建可靠Rust异步服务的实用工具。

回到顶部