Rust线程池管理库deadpool-sync的使用,deadpool-sync提供高效同步连接池解决方案

Rust线程池管理库deadpool-sync的使用,deadpool-sync提供高效同步连接池解决方案

Deadpool是一个用于任何类型连接和对象的极其简单的异步池。

这个crate提供了用于为不支持异步且需要在线程内运行的对象编写池的辅助工具。

注意: 这个crate旨在使deadpool-* crate的开发更容易。其他库和二进制项目通常不应直接使用它,而应使用使用它的crate提供的重新导出。

特性

特性 描述 额外依赖 默认
tracing 通过在interact()调用中传播Span来支持tracing。如果您使用tracing crate并且希望从interact()调用中获得有用的跟踪信息,请启用此功能。 tracing

许可证

根据以下任一许可证授权

  • Apache License, Version 2.0
  • MIT license

由您选择。

完整示例代码

use deadpool_sync::SyncWrapper;
use std::thread;
use std::time::Duration;

// 创建一个同步对象包装器
let wrapper = SyncWrapper::new(|| {
    // 这里创建或初始化您的同步对象
    // 例如:数据库连接、HTTP客户端等
    let connection = SomeExpensiveSyncResource::new();
    connection
});

// 在线程池中使用同步对象
let handle = thread::spawn(move || {
    // 获取同步对象
    let mut guard = wrapper.get().unwrap();
    
    // 使用同步对象执行操作
    guard.do_something_sync();
    
    // 执行一些耗时操作
    thread::sleep(Duration::from_secs(1));
});

// 等待线程完成
handle.join().unwrap();
use deadpool_sync::Pool;
use std::sync::Arc;
use std::thread;

// 创建连接池配置
let pool = Pool::builder()
    .max_size(10) // 最大连接数
    .build(|| {
        // 连接工厂函数
        Arc::new(ExpensiveSyncConnection::new())
    })
    .unwrap();

// 在多线程环境中使用连接池
let handles: Vec<_> = (0..5)
    .map(|i| {
        let pool = pool.clone();
        thread::spawn(move || {
            // 从池中获取连接
            let connection = pool.get().unwrap();
            
            // 使用连接执行操作
            println!("Thread {} using connection: {:?}", i, connection);
            
            // 连接会在离开作用域时自动返回池中
        })
    })
    .collect();

// 等待所有线程完成
for handle in handles {
    handle.join().unwrap();
}
use deadpool_sync::SyncWrapper;
use std::sync::{Arc, Mutex};
use std::thread;

// 创建线程安全的计数器示例
let counter = Arc::new(SyncWrapper::new(|| Mutex::new(0)));

let mut handles = vec![];

for _ in 0..10 {
    let counter = Arc::clone(&counter);
    let handle = thread::spawn(move || {
        let mut guard = counter.get().unwrap();
        let mut num = guard.lock().unwrap();
        *num += 1;
    });
    handles.push(handle);
}

for handle in handles {
    handle.join().unwrap();
}

// 验证结果
let result = counter.get().unwrap().lock().unwrap();
println!("Final counter value: {}", *result);

完整示例demo

use deadpool_sync::Pool;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

// 定义一个模拟的昂贵同步连接
struct ExpensiveConnection {
    id: u32,
}

impl ExpensiveConnection {
    fn new(id: u32) -> Self {
        println!("Creating new connection {}", id);
        // 模拟创建连接的耗时操作
        thread::sleep(Duration::from_millis(100));
        ExpensiveConnection { id }
    }

    fn execute_query(&self, query: &str) {
        println!("Connection {} executing query: {}", self.id, query);
        // 模拟查询执行时间
        thread::sleep(Duration::from_millis(50));
    }
}

fn main() {
    // 创建连接池,最大容量为5
    let pool = Pool::builder()
        .max_size(5)
        .build(|| {
            static COUNTER: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(1);
            let id = COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
            Arc::new(ExpensiveConnection::new(id))
        })
        .unwrap();

    println!("连接池创建完成,开始模拟多线程查询...");

    let mut handles = vec![];

    // 创建10个线程并发执行查询
    for i in 0..10 {
        let pool = pool.clone();
        let handle = thread::spawn(move || {
            // 从连接池获取连接
            let connection = pool.get().unwrap();
            
            // 执行查询操作
            connection.execute_query(&format!("SELECT * FROM table WHERE id = {}", i));
            
            // 连接会在作用域结束时自动返回池中
            println!("Thread {} finished using connection {}", i, connection.id);
        });
        handles.push(handle);
    }

    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    println!("所有查询执行完成!");
    println!("当前池中连接数: {}", pool.status().size);
    println!("可用连接数: {}", pool.status().available);
}
use deadpool_sync::SyncWrapper;
use std::sync::{Arc, Mutex};
use std::thread;

// 使用SyncWrapper包装线程安全的数据结构示例
struct SharedData {
    counter: i32,
    data: Vec<String>,
}

impl SharedData {
    fn new() -> Self {
        SharedData {
            counter: 0,
            data: Vec::new(),
        }
    }

    fn increment(&mut self) {
        self.counter += 1;
    }

    fn add_data(&mut self, item: String) {
        self.data.push(item);
    }

    fn get_stats(&self) -> (i32, usize) {
        (self.counter, self.data.len())
    }
}

fn main() {
    // 使用SyncWrapper包装共享数据
    let shared_data = Arc::new(SyncWrapper::new(|| Mutex::new(SharedData::new())));

    let mut handles = vec![];

    // 创建多个线程并发修改共享数据
    for i in 0..8 {
        let data_ref = Arc::clone(&shared_data);
        let handle = thread::spawn(move || {
            // 获取数据的可变访问权
            let mut guard = data_ref.get().unwrap();
            let mut data = guard.lock().unwrap();
            
            // 修改数据
            data.increment();
            data.add_data(format!("Item from thread {}", i));
            
            println!("Thread {} modified shared data", i);
        });
        handles.push(handle);
    }

    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 读取最终结果
    let guard = shared_data.get().unwrap();
    let data = guard.lock().unwrap();
    let (counter, items_count) = data.get_stats();
    
    println!("最终计数器值: {}", counter);
    println!("数据项数量: {}", items_count);
    println!("所有数据: {:?}", data.data);
}

1 回复

deadpool-sync:Rust高效同步连接池管理库

概述

deadpool-sync是一个专为Rust设计的轻量级同步连接池库,提供简单高效的连接池管理解决方案。该库特别适合需要管理数据库连接、网络连接或其他昂贵资源的情况,通过重用连接来减少创建和销毁的开销。

核心特性

  • 线程安全的同步连接池
  • 自动连接回收和重用
  • 可配置的连接超时和最大连接数
  • 简单的API设计
  • 零成本抽象

安装方法

在Cargo.toml中添加依赖:

[dependencies]
deadpool-sync = "0.10"

基本使用方法

1. 创建连接池

use deadpool_sync::SyncPool;
use std::sync::{Arc, Mutex};

// 定义连接类型
type Connection = Arc<Mutex<()>>;

// 创建连接池
let pool = SyncPool::new(
    || Connection::new(Mutex::new(())), // 连接创建函数
    10,                                  // 最大连接数
);

2. 获取和释放连接

// 从池中获取连接
let connection = pool.get().unwrap();

// 使用连接
{
    let guard = connection.lock().unwrap();
    // 执行操作...
} // 连接自动释放回池中

3. 带超时的连接获取

use std::time::Duration;

// 设置超时时间
let connection = pool.get_timeout(Duration::from_secs(5)).unwrap();

完整示例

use deadpool_sync::SyncPool;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

// 模拟数据库连接
struct DatabaseConnection {
    id: u32,
}

impl DatabaseConnection {
    fn new(id: u32) -> Self {
        println!("创建新连接: {}", id);
        DatabaseConnection { id }
    }
    
    fn query(&self, sql: &str) {
        println!("连接 {} 执行查询: {}", self.id, sql);
    }
}

fn main() {
    // 创建连接池
    let mut next_id = 1;
    let pool = SyncPool::new(
        || {
            let id = next_id;
            next_id += 1;
            Arc::new(Mutex::new(DatabaseConnection::new(id)))
        },
        3, // 最大3个连接
    );

    // 在多线程中使用连接池
    let handles: Vec<_> = (0..5)
        .map(|i| {
            let pool = pool.clone();
            thread::spawn(move || {
                let connection = pool.get().unwrap();
                let conn = connection.lock().unwrap();
                conn.query(&format!("SELECT * FROM table WHERE id = {}", i));
                // 连接自动释放
            })
        })
        .collect();

    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }
}

高级配置

自定义连接管理器

use deadpool_sync::{SyncPool, Manager};
use std::sync::{Arc, Mutex};

struct CustomManager;

impl Manager for CustomManager {
    type Connection = Arc<Mutex<DatabaseConnection>>;
    type Error = std::io::Error;

    fn create(&self) -> Result<Self::Connection, Self::Error> {
        Ok(Arc::new(Mutex::new(DatabaseConnection::new(1))))
    }

    fn validate(&self, _: &Self::Connection) -> bool {
        true
    }
}

let pool = SyncPool::custom(CustomManager, 10);

最佳实践

  1. 根据实际需求调整最大连接数
  2. 合理设置超时时间以避免死锁
  3. 定期检查连接有效性
  4. 使用RAII模式确保连接正确释放

注意事项

  • 确保连接类型实现Send + Sync
  • 避免在连接使用过程中持有锁过长时间
  • 适当处理连接创建失败的情况

deadpool-sync提供了简单而强大的同步连接池管理功能,能够显著提升应用程序的性能和资源利用率。

完整示例代码

use deadpool_sync::SyncPool;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

// 模拟数据库连接结构
struct DatabaseConnection {
    id: u32,
}

impl DatabaseConnection {
    // 创建新的数据库连接
    fn new(id: u32) -> Self {
        println!("创建新连接: {}", id);
        DatabaseConnection { id }
    }
    
    // 执行查询操作
    fn query(&self, sql: &str) {
        println!("连接 {} 执行查询: {}", self.id, sql);
        // 模拟查询耗时
        thread::sleep(Duration::from_millis(100));
    }
}

fn main() {
    // 创建连接池
    let mut next_id = 1;
    let pool = SyncPool::new(
        || {
            let id = next_id;
            next_id += 1;
            Arc::new(Mutex::new(DatabaseConnection::new(id)))
        },
        3, // 最大连接数限制为3个
    );

    println!("开始多线程数据库操作演示...");

    // 创建5个线程模拟并发请求
    let handles: Vec<_> = (0..5)
        .map(|i| {
            let pool = pool.clone();
            thread::spawn(move || {
                // 从连接池获取连接
                let connection = pool.get().unwrap();
                let conn = connection.lock().unwrap();
                
                // 执行查询操作
                conn.query(&format!("SELECT * FROM users WHERE id = {}", i));
                
                // 连接在使用完毕后自动释放回连接池
                println!("线程 {} 完成操作,连接已释放", i);
            })
        })
        .collect();

    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    println!("所有操作完成!");
    
    // 显示连接池统计信息
    println!("连接池状态: {:?}", pool.state());
}

这个完整示例展示了:

  1. 创建了一个最大容量为3的连接池
  2. 模拟了5个并发线程使用数据库连接
  3. 演示了连接的自动获取和释放
  4. 展示了连接的重用机制(只会创建3个连接)
  5. 提供了连接池状态监控

运行此示例可以看到连接池如何有效地重用连接,避免频繁创建和销毁连接的开销。

回到顶部