Rust网络连接管理库stubborn-io的使用:实现可靠、可配置的TCP/UDP连接重连与错误处理

Rust网络连接管理库stubborn-io的使用:实现可靠、可配置的TCP/UDP连接重连与错误处理

stubborn-io介绍

stubborn-io是一个Rust库,提供了自动从潜在断开/中断中恢复的IO特性/结构体。

要在项目中使用它,请在Cargo.toml中添加以下内容:

stubborn-io = "0.3"

使用示例

以下示例展示了如何替代tokio的TcpStream,区别在于当连接失败时它会自动尝试重新连接。

use stubborn_io::StubbornTcpStream;
use tokio::io::AsyncWriteExt;

let addr = "localhost:8080";

// 使用默认内置选项连接TcpStream
// 这些选项也可以自定义(例如重连尝试次数、等待时间等)
// 使用connect_with_options方法
let mut tcp_stream = StubbornTcpStream::connect(addr).await?;
// 一旦获取到包装的IO(本例中是TcpStream)
// 我们可以调用所有常规方法,如下所示
tcp_stream.write_all(b"hello world!").await?;

完整示例代码

use std::time::Duration;
use stubborn_io::{ReconnectOptions, StubbornTcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "localhost:8080";
    
    // 自定义重连选项
    let options = ReconnectOptions::new()
        .with_retries_generator(|| 10) // 最大重试次数
        .with_exponential_backoff(Duration::from_secs(1)) // 初始重连间隔
        .with_max_delay(Duration::from_secs(30)) // 最大重连间隔
        .with_message_callback(|attempt| {
            println!("尝试重新连接,第{}次", attempt);
        });
    
    // 使用自定义选项建立连接
    let mut stream = StubbornTcpStream::connect_with_options(addr, options).await?;
    
    // 持续发送和接收数据
    loop {
        // 发送数据
        if let Err(e) = stream.write_all(b"ping").await {
            eprintln!("发送失败: {}", e);
            continue;
        }
        
        // 接收数据
        let mut buf = [0; 1024];
        match stream.read(&mut buf).await {
            Ok(n) if n > 0 => {
                println!("收到回复: {}", String::from_utf8_lossy(&buf[..n]));
            }
            Ok(_) => {
                println!("连接关闭");
                break;
            }
            Err(e) => {
                eprintln!("接收失败: {}", e);
            }
        }
        
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
    
    Ok(())
}

这个完整示例展示了:

  1. 自定义重连选项配置
  2. 自动重连机制
  3. 错误处理和回调
  4. 持续的数据收发功能

stubborn-io特别适合需要稳定网络连接的应用场景,如IoT设备通信、实时数据流等。


1 回复

Rust网络连接管理库stubborn-io的使用指南

简介

stubborn-io是一个Rust库,用于管理网络连接并提供可靠的重连机制。它特别适合需要持久网络连接的应用场景,能够处理连接断开后的自动重连,并提供丰富的配置选项来控制重连行为。

主要特性

  • 自动重连机制
  • 可配置的重连策略
  • 支持TCP和UDP协议
  • 自定义错误处理
  • 连接状态监控

使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
stubborn-io = "0.3"
tokio = { version = "1.0", features = ["full"] }

基本TCP连接示例

use std::time::Duration;
use stubborn_io::{ReconnectOptions, StubbornTcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() {
    // 配置重连选项
    let reconnect_options = ReconnectOptions::new()
        .with_retries_growth(1.5) // 每次重连间隔增长因子
        .with_initial_retry_delay(Duration::from_secs(1)) // 初始重连延迟
        .with_max_retry_delay(Duration::from_secs(30)) // 最大重连延迟
        .with_max_retries(10); // 最大重试次数

    // 创建TCP连接
    let addr = "127.0.0.1:8080";
    let mut stream = StubbornTcpStream::connect_with_options(addr, reconnect_options)
        .await
        .expect("Failed to connect");

    // 使用连接
    let message = b"Hello, server!";
    stream.write_all(message).await.expect("Write failed");

    let mut buf = [0; 1024];
    let n = stream.read(&mut buf).await.expect("Read failed");
    println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
}

UDP连接示例

use stubborn_io::{ReconnectOptions, StubbornUdpSocket};
use tokio::net::ToSocketAddrs;

#[tokio::main]
async fn main() {
    let reconnect_options = ReconnectOptions::new()
        .with_exit_if_first_connect_fails(false); // 允许首次连接失败后重试

    let addr = "127.0.0.1:8080";
    let socket = StubbornUdpSocket::connect_with_options(addr, reconnect_options)
        .await
        .expect("Failed to connect");

    // 使用UDP socket发送数据
    let message = b"Hello, UDP server!";
    socket.send(message).await.expect("Send failed");
}

自定义错误处理

use stubborn_io::{ReconnectOptions, StubbornTcpStream};
use tokio::time::timeout;

#[tokio::main]
async fn main() {
    let reconnect_options = ReconnectOptions::new()
        .with_on_connect_callback(|| println!("连接成功!"))
        .with_on_disconnect_callback(|error| {
            println!("连接断开,错误: {:?}", error);
        });

    let addr = "127.0.0.1:8080";
    let mut stream = match timeout(
        Duration::from_secs(5),
        StubbornTcpStream::connect_with_options(addr, reconnect_options)
    ).await {
        Ok(Ok(stream)) => stream,
        Ok(Err(e)) => panic!("连接失败: {:?}", e),
        Err(_) => panic!("连接超时"),
    };

    // 使用连接...
}

高级配置

自定义重连策略

use stubborn_io::{ReconnectOptions, StubbornTcpStream};

#[tokio::main]
async fn main() {
    let reconnect_options = ReconnectOptions::new()
        .with_custom_reconnect_policy(|attempt, previous_delay| {
            // 自定义重连策略:指数退避,但不超过10秒
            Some(previous_delay.unwrap_or(Duration::from_secs(1)).min(Duration::from_secs(10)))
        });

    let stream = StubbornTcpStream::connect_with_options("127.0.0.1:8080", reconnect_options)
        .await
        .expect("Failed to connect");
}

获取连接状态

use stubborn_io::{StubbornTcpStream, ConnectionState};

#[tokio::main]
async fn main() {
    let stream = StubbornTcpStream::connect("127.0.0.1:8080")
        .await
        .expect("Failed to connect");
    
    match stream.get_connection_state() {
        ConnectionState::Connected => println!("已连接"),
        ConnectionState::Disconnected => println!("已断开"),
        ConnectionState::Connecting => println!("正在连接"),
    }
}

完整示例

下面是一个完整的TCP客户端示例,包含连接、读写、重连和状态监控:

use std::time::Duration;
use stubborn_io::{ReconnectOptions, StubbornTcpStream, ConnectionState};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    time::sleep,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 配置重连选项
    let reconnect_options = ReconnectOptions::new()
        .with_retries_growth(1.5) // 重连间隔增长因子
        .with_initial_retry_delay(Duration::from_secs(1)) // 初始重连延迟
        .with_max_retry_delay(Duration::from_secs(30)) // 最大重连延迟
        .with_max_retries(10) // 最大重试次数
        .with_on_connect_callback(|| println!("成功连接到服务器!"))
        .with_on_disconnect_callback(|err| println!("连接断开: {:?}", err));

    // 创建TCP连接
    let addr = "127.0.0.1:8080";
    let mut stream = StubbornTcpStream::connect_with_options(addr, reconnect_options)
        .await
        .expect("Failed to connect");

    // 主循环 - 持续发送和接收数据
    let mut counter = 0;
    loop {
        // 检查连接状态
        match stream.get_connection_state() {
            ConnectionState::Connected => {
                // 发送数据
                let message = format!("Message {}", counter);
                stream.write_all(message.as_bytes()).await?;
                println!("已发送: {}", message);

                // 接收数据
                let mut buf = [0; 1024];
                match stream.read(&mut buf).await {
                    Ok(n) if n > 0 => {
                        println!("收到响应: {}", String::from_utf8_lossy(&buf[..n]));
                    }
                    _ => println!("连接可能已断开"),
                }

                counter += 1;
            }
            ConnectionState::Connecting => println!("正在尝试连接..."),
            ConnectionState::Disconnected => println!("连接已断开"),
        }

        // 等待一段时间
        sleep(Duration::from_secs(2)).await;
    }
}

注意事项

  1. stubborn-io基于tokio运行时,确保你的项目使用了tokio运行时
  2. 对于生产环境,建议仔细配置重连策略以避免过度重试
  3. 连接状态回调函数中避免执行耗时操作,以免影响重连逻辑

stubborn-io为Rust网络应用提供了强大的连接管理能力,特别适合需要高可靠性的网络通信场景。通过合理配置,可以构建出既健壮又灵活的网络连接处理逻辑。

回到顶部