Rust异步任务优雅关闭库tokio-graceful-shutdown的使用,实现Tokio运行时服务的平滑终止与资源清理

Rust异步任务优雅关闭库tokio-graceful-shutdown的使用,实现Tokio运行时服务的平滑终止与资源清理

简介

tokio-graceful-shutdown 是一个为基于 tokio-rs 的服务提供优雅关闭功能的库。它提供了以下特性:

  • 从子系统内部监听关闭请求
  • 从子系统内部手动启动关闭
  • 自动关闭功能,包括:
    • SIGINT/SIGTERM/Ctrl+C信号
    • 子系统故障
    • 子系统panic
  • 带有超时和错误传播的干净关闭流程
  • 子系统嵌套
  • 部分关闭选定的子系统树

使用示例

下面是一个基本示例,展示如何创建一个简单的异步子系统:

async fn subsys1(subsys: SubsystemHandle) -> Result<()> {
    log::info!("Subsystem1 started.");
    subsys.on_shutdown_requested().await;
    log::info!("Subsystem1 stopped.");
    Ok(())
}

这个子系统启动后,会等待程序关闭触发,然后停止自己。

执行这个子系统的完整示例:

#[tokio::main]
async fn main() -> Result<()> {
    Toplevel::new(|s| async move {
        s.start(SubsystemBuilder::new("Subsys1", subsys1))
    })
    .catch_signals()
    .handle_shutdown_requests(Duration::from_millis(1000))
    .await
    .map_err(Into::into)
}

完整示例代码

use tokio_graceful_shutdown::{Toplevel, SubsystemHandle, SubsystemBuilder};
use std::time::Duration;
use log;

// 定义一个简单的子系统
async fn subsys1(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
    log::info!("Subsystem1 started.");
    
    // 等待关闭信号
    subsys.on_shutdown_requested().await;
    
    log::info!("Subsystem1 stopped.");
    Ok(())
}

// 定义另一个子系统
async fn subsys2(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
    log::info!("Subsystem2 started.");
    
    tokio::select! {
        _ = subsys.on_shutdown_requested() => {
            log::info!("Subsystem2 received shutdown signal.");
        }
        _ = tokio::time::sleep(Duration::from_secs(5)) => {
            log::info!("Subsystem2 completed its work.");
        }
    }
    
    log::info!("Subsystem2 stopped.");
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    // 初始化日志
    env_logger::init();
    
    // 创建Toplevel对象并启动子系统
    Toplevel::new(|s| async move {
        // 启动第一个子系统
        s.start(SubsystemBuilder::new("Subsys1", subsys1));
        
        // 启动第二个子系统
        s.start(SubsystemBuilder::new("Subsys2", subsys2));
    })
    // 捕获系统信号(如Ctrl+C)
    .catch_signals()
    // 处理关闭请求,设置超时为1秒
    .handle_shutdown_requests(Duration::from_millis(1000))
    .await
    .map_err(Into::into)
}

构建和使用

要将这个库添加到你的项目中,在你的项目目录中运行:

cargo add tokio-graceful-shutdown

工作原理

Toplevel 对象是子系统树的根对象。可以使用其 SubsystemHandle 对象的 start() 方法在其中启动子系统。

catch_signals() 方法让 Toplevel 对象监听 SIGINT/SIGTERM/Ctrl+C 信号并在收到信号后启动关闭流程。

handle_shutdown_requests()Toplevel 最重要和最后的方法。它会空闲等待直到程序进入关闭模式。然后,它会收集所有子系统的返回值,确定全局错误状态,并确保关闭在给定的超时时间内完成。最后,它会返回一个可以直接用作 main() 返回码的错误值。

动机

在异步程序中执行优雅关闭是一个非平凡的问题。tokio-graceful-shutdown 旨在提供干净的抽象来封装所有这些样板代码,使开发者能够专注于业务逻辑而不是关闭机制。


1 回复

Rust异步任务优雅关闭库tokio-graceful-shutdown的使用

介绍

tokio-graceful-shutdown是一个用于实现Tokio运行时服务平滑终止与资源清理的Rust库。它可以帮助开发者优雅地关闭异步任务,确保所有正在处理的工作能够完成,同时清理资源,避免数据损坏或丢失。

这个库特别适合需要处理长期运行任务的服务,如Web服务器、消息队列消费者或任何需要确保安全关闭的异步应用。

主要特性

  • 提供优雅的关闭机制,允许任务完成当前工作
  • 支持嵌套的子系统和层次化关闭
  • 可以设置超时,防止关闭过程无限期挂起
  • 与Tokio运行时无缝集成
  • 支持自定义关闭信号

完整示例代码

下面是一个结合了多个功能的完整示例,展示如何使用tokio-graceful-shutdown来管理一个包含HTTP服务器和后台任务的应用程序:

use tokio_graceful_shutdown::{Toplevel, SubsystemHandle, NestedSubsystem};
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};
use std::convert::Infallible;
use std::time::Duration;
use tokio::sync::oneshot;

// HTTP服务器子系统
async fn http_server(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
    // 创建HTTP服务
    let make_svc = make_service_fn(|_conn| {
        async {
            Ok::<_, Infallible>(service_fn(|req: Request<Body>| async {
                Ok::<Response<Body>, Infallible>(Response::new(Body::from("Hello World")))
            }))
        }
    });

    // 绑定端口并启动服务器
    let server = Server::bind(&"0.0.0.0:8080".parse().unwrap())
        .serve(make_svc)
        .with_graceful_shutdown(subsys.on_shutdown_requested());

    println!("HTTP server running on port 8080");
    
    // 等待服务器关闭
    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
    
    println!("HTTP server shutdown complete");
    Ok(())
}

// 后台工作子系统
async fn background_worker(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
    // 模拟后台工作
    let mut count = 0;
    loop {
        tokio::select! {
            // 监听关闭信号
            _ = subsys.on_shutdown_requested() => {
                println!("Background worker cleaning up...");
                return Ok(());
            }
            // 模拟工作循环
            _ = async {
                tokio::time::sleep(Duration::from_secs(1)).await;
                count += 1;
                println!("Background worker processed {} tasks", count);
            } => {}
        }
    }
}

// 数据库连接子系统
async fn database_manager(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
    // 模拟数据库连接
    println!("Database manager connecting...");
    
    tokio::select! {
        _ = subsys.on_shutdown_requested() => {
            println!("Database manager closing connections...");
            // 这里可以添加实际的数据库连接关闭逻辑
            Ok(())
        }
        _ = async {
            // 模拟长期运行的数据库连接
            tokio::time::sleep(Duration::from_secs(9999)).await;
            Ok(())
        } => unreachable!(),
    }
}

// 父级子系统,包含多个子子系统
async fn parent_subsystem(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
    // 创建嵌套子系统
    let nested = NestedSubsystem::new()
        .start("Database", database_manager)
        .start("Worker", background_worker);
    
    // 运行嵌套子系统
    nested.run(subsys).await
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    // 创建自定义关闭信号通道
    let (shutdown_sender, shutdown_receiver) = oneshot::channel();
    
    // 模拟在10秒后发送关闭信号
    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(10)).await;
        println!("Sending custom shutdown signal after 10 seconds...");
        let _ = shutdown_sender.send(());
    });

    // 创建并配置顶层系统
    Toplevel::new()
        // 添加HTTP服务器
        .start("HttpServer", http_server)
        // 添加父级子系统
        .start("ParentSystem", parent_subsystem)
        // 捕获系统信号和自定义信号
        .catch_signals()
        .catch_custom_signals(shutdown_receiver)
        // 设置15秒的超时时间
        .handle_shutdown_requests(Duration::from_secs(15))
        .await
}

代码说明

  1. HTTP服务器:创建一个简单的HTTP服务器,在8080端口响应请求
  2. 后台工作子系统:模拟一个持续运行的后台任务,每秒打印一次状态
  3. 数据库管理子系统:模拟数据库连接管理
  4. 父级子系统:演示如何嵌套多个子系统
  5. 自定义关闭信号:设置10秒后自动触发关闭
  6. 超时设置:整个关闭过程最多允许15秒完成

运行流程

  1. 程序启动后,HTTP服务器和所有子系统开始运行
  2. 10秒后,自定义关闭信号被触发
  3. 所有子系统收到关闭信号,开始清理工作
  4. 如果在15秒内所有子系统都完成清理,程序正常退出
  5. 如果有子系统未能在15秒内完成,将被强制终止

这个示例展示了tokio-graceful-shutdown库的主要功能,包括子系统管理、嵌套子系统、自定义关闭信号和超时控制等。

回到顶部