Rust异步任务优雅关闭库tokio-graceful-shutdown的使用,实现Tokio运行时服务的平滑终止与资源清理
Rust异步任务优雅关闭库tokio-graceful-shutdown的使用,实现Tokio运行时服务的平滑终止与资源清理
简介
tokio-graceful-shutdown 是一个为基于 tokio-rs 的服务提供优雅关闭功能的库。它提供了以下特性:
- 从子系统内部监听关闭请求
- 从子系统内部手动启动关闭
- 自动关闭功能,包括:
- SIGINT/SIGTERM/Ctrl+C信号
- 子系统故障
- 子系统panic
- 带有超时和错误传播的干净关闭流程
- 子系统嵌套
- 部分关闭选定的子系统树
使用示例
下面是一个基本示例,展示如何创建一个简单的异步子系统:
async fn subsys1(subsys: SubsystemHandle) -> Result<()> {
log::info!("Subsystem1 started.");
subsys.on_shutdown_requested().await;
log::info!("Subsystem1 stopped.");
Ok(())
}
这个子系统启动后,会等待程序关闭触发,然后停止自己。
执行这个子系统的完整示例:
#[tokio::main]
async fn main() -> Result<()> {
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("Subsys1", subsys1))
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
完整示例代码
use tokio_graceful_shutdown::{Toplevel, SubsystemHandle, SubsystemBuilder};
use std::time::Duration;
use log;
// 定义一个简单的子系统
async fn subsys1(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
log::info!("Subsystem1 started.");
// 等待关闭信号
subsys.on_shutdown_requested().await;
log::info!("Subsystem1 stopped.");
Ok(())
}
// 定义另一个子系统
async fn subsys2(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
log::info!("Subsystem2 started.");
tokio::select! {
_ = subsys.on_shutdown_requested() => {
log::info!("Subsystem2 received shutdown signal.");
}
_ = tokio::time::sleep(Duration::from_secs(5)) => {
log::info!("Subsystem2 completed its work.");
}
}
log::info!("Subsystem2 stopped.");
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// 初始化日志
env_logger::init();
// 创建Toplevel对象并启动子系统
Toplevel::new(|s| async move {
// 启动第一个子系统
s.start(SubsystemBuilder::new("Subsys1", subsys1));
// 启动第二个子系统
s.start(SubsystemBuilder::new("Subsys2", subsys2));
})
// 捕获系统信号(如Ctrl+C)
.catch_signals()
// 处理关闭请求,设置超时为1秒
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
构建和使用
要将这个库添加到你的项目中,在你的项目目录中运行:
cargo add tokio-graceful-shutdown
工作原理
Toplevel
对象是子系统树的根对象。可以使用其 SubsystemHandle
对象的 start()
方法在其中启动子系统。
catch_signals()
方法让 Toplevel
对象监听 SIGINT/SIGTERM/Ctrl+C 信号并在收到信号后启动关闭流程。
handle_shutdown_requests()
是 Toplevel
最重要和最后的方法。它会空闲等待直到程序进入关闭模式。然后,它会收集所有子系统的返回值,确定全局错误状态,并确保关闭在给定的超时时间内完成。最后,它会返回一个可以直接用作 main()
返回码的错误值。
动机
在异步程序中执行优雅关闭是一个非平凡的问题。tokio-graceful-shutdown 旨在提供干净的抽象来封装所有这些样板代码,使开发者能够专注于业务逻辑而不是关闭机制。
Rust异步任务优雅关闭库tokio-graceful-shutdown的使用
介绍
tokio-graceful-shutdown
是一个用于实现Tokio运行时服务平滑终止与资源清理的Rust库。它可以帮助开发者优雅地关闭异步任务,确保所有正在处理的工作能够完成,同时清理资源,避免数据损坏或丢失。
这个库特别适合需要处理长期运行任务的服务,如Web服务器、消息队列消费者或任何需要确保安全关闭的异步应用。
主要特性
- 提供优雅的关闭机制,允许任务完成当前工作
- 支持嵌套的子系统和层次化关闭
- 可以设置超时,防止关闭过程无限期挂起
- 与Tokio运行时无缝集成
- 支持自定义关闭信号
完整示例代码
下面是一个结合了多个功能的完整示例,展示如何使用tokio-graceful-shutdown
来管理一个包含HTTP服务器和后台任务的应用程序:
use tokio_graceful_shutdown::{Toplevel, SubsystemHandle, NestedSubsystem};
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};
use std::convert::Infallible;
use std::time::Duration;
use tokio::sync::oneshot;
// HTTP服务器子系统
async fn http_server(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
// 创建HTTP服务
let make_svc = make_service_fn(|_conn| {
async {
Ok::<_, Infallible>(service_fn(|req: Request<Body>| async {
Ok::<Response<Body>, Infallible>(Response::new(Body::from("Hello World")))
}))
}
});
// 绑定端口并启动服务器
let server = Server::bind(&"0.0.0.0:8080".parse().unwrap())
.serve(make_svc)
.with_graceful_shutdown(subsys.on_shutdown_requested());
println!("HTTP server running on port 8080");
// 等待服务器关闭
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
println!("HTTP server shutdown complete");
Ok(())
}
// 后台工作子系统
async fn background_worker(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
// 模拟后台工作
let mut count = 0;
loop {
tokio::select! {
// 监听关闭信号
_ = subsys.on_shutdown_requested() => {
println!("Background worker cleaning up...");
return Ok(());
}
// 模拟工作循环
_ = async {
tokio::time::sleep(Duration::from_secs(1)).await;
count += 1;
println!("Background worker processed {} tasks", count);
} => {}
}
}
}
// 数据库连接子系统
async fn database_manager(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
// 模拟数据库连接
println!("Database manager connecting...");
tokio::select! {
_ = subsys.on_shutdown_requested() => {
println!("Database manager closing connections...");
// 这里可以添加实际的数据库连接关闭逻辑
Ok(())
}
_ = async {
// 模拟长期运行的数据库连接
tokio::time::sleep(Duration::from_secs(9999)).await;
Ok(())
} => unreachable!(),
}
}
// 父级子系统,包含多个子子系统
async fn parent_subsystem(subsys: SubsystemHandle) -> Result<(), anyhow::Error> {
// 创建嵌套子系统
let nested = NestedSubsystem::new()
.start("Database", database_manager)
.start("Worker", background_worker);
// 运行嵌套子系统
nested.run(subsys).await
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// 创建自定义关闭信号通道
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
// 模拟在10秒后发送关闭信号
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
println!("Sending custom shutdown signal after 10 seconds...");
let _ = shutdown_sender.send(());
});
// 创建并配置顶层系统
Toplevel::new()
// 添加HTTP服务器
.start("HttpServer", http_server)
// 添加父级子系统
.start("ParentSystem", parent_subsystem)
// 捕获系统信号和自定义信号
.catch_signals()
.catch_custom_signals(shutdown_receiver)
// 设置15秒的超时时间
.handle_shutdown_requests(Duration::from_secs(15))
.await
}
代码说明
- HTTP服务器:创建一个简单的HTTP服务器,在8080端口响应请求
- 后台工作子系统:模拟一个持续运行的后台任务,每秒打印一次状态
- 数据库管理子系统:模拟数据库连接管理
- 父级子系统:演示如何嵌套多个子系统
- 自定义关闭信号:设置10秒后自动触发关闭
- 超时设置:整个关闭过程最多允许15秒完成
运行流程
- 程序启动后,HTTP服务器和所有子系统开始运行
- 10秒后,自定义关闭信号被触发
- 所有子系统收到关闭信号,开始清理工作
- 如果在15秒内所有子系统都完成清理,程序正常退出
- 如果有子系统未能在15秒内完成,将被强制终止
这个示例展示了tokio-graceful-shutdown
库的主要功能,包括子系统管理、嵌套子系统、自定义关闭信号和超时控制等。