Rust信号量同步库std-semaphore的使用,std-semaphore提供线程间同步与资源管理的并发控制功能

Rust信号量同步库std-semaphore的使用

std-semaphore是一个提供线程间同步与资源管理的并发控制功能的Rust库。它实现了信号量(Semaphore)机制,可用于控制对共享资源的访问。

安装

在项目目录中运行以下Cargo命令:

cargo add std-semaphore

或者在Cargo.toml中添加:

std-semaphore = "0.1.0"

使用示例

下面是一个使用std-semaphore进行线程同步的完整示例:

use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std_semaphore::Semaphore;

fn main() {
    // 创建一个初始值为3的信号量
    let semaphore = Arc::new(Semaphore::new(3));
    
    // 创建5个工作线程
    for i in 0..5 {
        let semaphore = semaphore.clone();
        thread::spawn(move || {
            // 获取信号量许可
            println!("线程 {} 等待获取许可...", i);
            let _guard = semaphore.access();
            
            println!("线程 {} 获得了许可", i);
            // 模拟工作
            thread::sleep(Duration::from_secs(1));
            
            // _guard离开作用域时会自动释放许可
            println!("线程 {} 释放了许可", i);
        });
    }
    
    // 等待所有线程完成
    thread::sleep(Duration::from_secs(5));
}

示例说明

  1. 我们创建了一个初始值为3的信号量,表示最多允许3个线程同时访问共享资源
  2. 创建了5个工作线程,每个线程尝试获取信号量许可
  3. 只有3个线程能同时获得许可,其他线程需要等待
  4. 当线程完成工作后,许可会自动释放(通过_guard的Drop实现)

输出示例

运行程序可能会看到类似这样的输出(顺序可能不同):

线程 0 等待获取许可...
线程 1 等待获取许可...
线程 2 等待获取许可...
线程 3 等待获取许可...
线程 4 等待获取许可...
线程 0 获得了许可
线程 1 获得了许可 
线程 2 获得了许可
线程 0 释放了许可
线程 3 获得了许可
线程 1 释放了许可
线程 4 获得了许可
线程 2 释放了许可
线程 3 释放了许可
线程 4 释放了许可

这个示例展示了如何使用std-semaphore来限制对共享资源的并发访问。

完整示例代码

use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std_semaphore::Semaphore;

fn main() {
    // 创建信号量,初始值为3
    let sem = Arc::new(Semaphore::new(3));
    
    // 创建10个线程模拟并发访问
    let mut handles = vec![];
    
    for i in 0..10 {
        let sem_clone = sem.clone();
        let handle = thread::spawn(move || {
            // 尝试获取信号量
            println!("[线程 {}] 尝试获取资源...", i);
            let permit = sem_clone.access();
            
            // 获取到资源
            println!("[线程 {}] 获取资源成功", i);
            
            // 模拟资源占用1秒
            thread::sleep(Duration::from_secs(1));
            
            // permit离开作用域自动释放
            println!("[线程 {}] 释放资源", i);
        });
        handles.push(handle);
    }
    
    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("所有线程执行完毕");
}

完整示例说明

  1. 创建了一个初始值为3的信号量,表示最多允许3个线程同时访问资源
  2. 创建了10个工作线程模拟高并发场景
  3. 每个线程会先尝试获取信号量,最多3个线程能同时获取成功
  4. 获取到资源的线程会占用资源1秒钟
  5. 资源使用完毕后会自动释放(通过RAII机制)
  6. 主线程等待所有工作线程完成

完整示例输出

可能的输出结果(顺序可能不同):

[线程 0] 尝试获取资源...
[线程 1] 尝试获取资源...
[线程 2] 尝试获取资源...
[线程 3] 尝试获取资源...
[线程 4] 尝试获取资源...
[线程 5] 尝试获取资源...
[线程 6] 尝试获取资源...
[线程 7] 尝试获取资源...
[线程 8] 尝试获取资源...
[线程 9] 尝试获取资源...
[线程 0] 获取资源成功
[线程 1] 获取资源成功
[线程 2] 获取资源成功
[线程 0] 释放资源
[线程 3] 获取资源成功
[线程 1] 释放资源
[线程 4] 获取资源成功
[线程 2] 释放资源
[线程 5] 获取资源成功
[线程 3] 释放资源
[线程 6] 获取资源成功
[线程 4] 释放资源
[线程 7] 获取资源成功
[线程 5] 释放资源
[线程 8] 获取资源成功
[线程 6] 释放资源
[线程 9] 获取资源成功
[线程 7] 释放资源
[线程 8] 释放资源
[线程 9] 释放资源
所有线程执行完毕

1 回复

Rust信号量同步库std-semaphore的使用指南

std-semaphore是Rust中一个提供信号量(Semaphore)同步原语的库,用于线程间的同步与资源管理。信号量是一种经典的并发控制机制,可以限制同时访问某个资源的线程数量。

基本概念

信号量维护了一个计数器,表示可用资源的数量。主要操作包括:

  • acquire(或wait): 获取资源,计数器减1
  • release(或signal): 释放资源,计数器加1

使用方法

首先在Cargo.toml中添加依赖:

[dependencies]
std-semaphore = "0.1"

基本示例

use std_semaphore::Semaphore;
use std::sync::Arc;
use std::thread;

fn main() {
    // 创建一个初始值为3的信号量
    let semaphore = Arc::new(Semaphore::new(3));
    
    let mut handles = vec![];

    for i in 0..10 {
        let sem = semaphore.clone();
        handles.push(thread::spawn(move || {
            // 获取信号量许可
            sem.acquire();
            
            println!("线程 {} 获取了资源", i);
            // 模拟工作
            thread::sleep(std::time::Duration::from_secs(1));
            println!("线程 {} 释放了资源", i);
            
            // 释放信号量许可
            sem.release();
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

带超时的信号量获取

use std_semaphore::Semaphore;
use std::time::Duration;

fn main() {
    let sem = Semaphore::new(1);
    
    // 第一个线程获取信号量
    sem.acquire();
    
    // 第二个线程尝试获取,但设置超时
    let result = sem.try_acquire(Duration::from_secs(1));
    
    match result {
        Ok(_) => println!("成功获取信号量"),
        Err(_) => println!("获取信号量超时"),
    }
}

使用信号量实现生产者-消费者模式

use std_semaphore::Semaphore;
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let buffer = Arc::new(Mutex::new(Vec::<i32>::new()));
    let items = Arc::new(Semaphore::new(0)); // 初始没有可消费的项目
    let spaces = Arc::new(Semaphore::new(10)); // 缓冲区大小为10

    let producer = {
        let buffer = buffer.clone();
        let items = items.clone();
        let spaces = spaces.clone();
        thread::spawn(move || {
            for i in 0..20 {
                spaces.acquire(); // 等待有空位
                {
                    let mut buf = buffer.lock().unwrap();
                    buf.push(i);
                    println!("生产: {}", i);
                }
                items.release(); // 增加可消费项目
            }
        })
    };

    let consumer = {
        let buffer = buffer.clone();
        let items = items.clone();
        let spaces = spaces.clone();
        thread::spawn(move || {
            for _ in 0..20 {
                items.acquire(); // 等待有项目可消费
                let item = {
                    let mut buf = buffer.lock().unwrap();
                    buf.pop().unwrap()
                };
                println!("消费: {}", item);
                spaces.release(); // 释放空位
            }
        })
    };

    producer.join().unwrap();
    consumer.join().unwrap();
}

完整示例代码

下面是一个结合了上述所有特性的完整示例:

use std_semaphore::Semaphore;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    // 1. 基本信号量使用示例
    basic_semaphore_example();
    
    // 2. 带超时的信号量获取示例
    timeout_semaphore_example();
    
    // 3. 生产者-消费者模式示例
    producer_consumer_example();
}

fn basic_semaphore_example() {
    println!("\n=== 基本信号量示例 ===");
    let semaphore = Arc::new(Semaphore::new(3));
    let mut handles = vec![];

    for i in 0..5 {
        let sem = semaphore.clone();
        handles.push(thread::spawn(move || {
            sem.acquire();
            println!("线程 {} 获取了资源", i);
            thread::sleep(Duration::from_millis(500));
            println!("线程 {} 释放了资源", i);
            sem.release();
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

fn timeout_semaphore_example() {
    println!("\n=== 带超时的信号量示例 ===");
    let sem = Arc::new(Semaphore::new(1));
    
    // 第一个线程获取并持有信号量
    let sem1 = sem.clone();
    let handle1 = thread::spawn(move || {
        sem1.acquire();
        println!("线程1获取了信号量并持有2秒");
        thread::sleep(Duration::from_secs(2));
        sem1.release();
    });
    
    // 第二个线程尝试获取但会超时
    thread::sleep(Duration::from_millis(500)); // 确保线程1先获取
    let sem2 = sem.clone();
    let handle2 = thread::spawn(move || {
        match sem2.try_acquire(Duration::from_secs(1)) {
            Ok(_) => println!("线程2成功获取信号量"),
            Err(_) => println!("线程2获取信号量超时"),
        }
    });
    
    handle1.join().unwrap();
    handle2.join().unwrap();
}

fn producer_consumer_example() {
    println!("\n=== 生产者-消费者示例 ===");
    let buffer = Arc::new(Mutex::new(Vec::new()));
    let items = Arc::new(Semaphore::new(0));
    let spaces = Arc::new(Semaphore::new(5)); // 限制缓冲区大小为5
    
    let producer = {
        let buffer = buffer.clone();
        let items = items.clone();
        let spaces = spaces.clone();
        thread::spawn(move || {
            for i in 1..=10 {
                spaces.acquire();
                {
                    let mut buf = buffer.lock().unwrap();
                    buf.push(i);
                    println!("生产者添加: {}", i);
                }
                items.release();
                thread::sleep(Duration::from_millis(200));
            }
        })
    };
    
    let consumer = {
        let buffer = buffer.clone();
        let items = items.clone();
        let spaces = spaces.clone();
        thread::spawn(move || {
            for _ in 1..=10 {
                items.acquire();
                let item = {
                    let mut buf = buffer.lock().unwrap();
                    buf.pop().unwrap()
                };
                println!("消费者取出: {}", item);
                spaces.release();
                thread::sleep(Duration::from_millis(300));
            }
        })
    };
    
    producer.join().unwrap();
    consumer.join().unwrap();
}

注意事项

  1. std-semaphore不是标准库的一部分,但API设计类似于标准库风格
  2. 信号量通常与Arc一起使用,以便在线程间共享
  3. 注意避免死锁情况,确保所有获取的信号量最终都会被释放
  4. 对于复杂场景,可以考虑结合MutexRwLock一起使用

信号量是强大的并发控制工具,特别适合需要限制资源访问数量的场景,如连接池、限流等。

回到顶部