Rust信号量同步库std-semaphore的使用,std-semaphore提供线程间同步与资源管理的并发控制功能
Rust信号量同步库std-semaphore的使用
std-semaphore是一个提供线程间同步与资源管理的并发控制功能的Rust库。它实现了信号量(Semaphore)机制,可用于控制对共享资源的访问。
安装
在项目目录中运行以下Cargo命令:
cargo add std-semaphore
或者在Cargo.toml中添加:
std-semaphore = "0.1.0"
使用示例
下面是一个使用std-semaphore进行线程同步的完整示例:
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std_semaphore::Semaphore;
fn main() {
// 创建一个初始值为3的信号量
let semaphore = Arc::new(Semaphore::new(3));
// 创建5个工作线程
for i in 0..5 {
let semaphore = semaphore.clone();
thread::spawn(move || {
// 获取信号量许可
println!("线程 {} 等待获取许可...", i);
let _guard = semaphore.access();
println!("线程 {} 获得了许可", i);
// 模拟工作
thread::sleep(Duration::from_secs(1));
// _guard离开作用域时会自动释放许可
println!("线程 {} 释放了许可", i);
});
}
// 等待所有线程完成
thread::sleep(Duration::from_secs(5));
}
示例说明
- 我们创建了一个初始值为3的信号量,表示最多允许3个线程同时访问共享资源
- 创建了5个工作线程,每个线程尝试获取信号量许可
- 只有3个线程能同时获得许可,其他线程需要等待
- 当线程完成工作后,许可会自动释放(通过
_guard
的Drop实现)
输出示例
运行程序可能会看到类似这样的输出(顺序可能不同):
线程 0 等待获取许可...
线程 1 等待获取许可...
线程 2 等待获取许可...
线程 3 等待获取许可...
线程 4 等待获取许可...
线程 0 获得了许可
线程 1 获得了许可
线程 2 获得了许可
线程 0 释放了许可
线程 3 获得了许可
线程 1 释放了许可
线程 4 获得了许可
线程 2 释放了许可
线程 3 释放了许可
线程 4 释放了许可
这个示例展示了如何使用std-semaphore来限制对共享资源的并发访问。
完整示例代码
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std_semaphore::Semaphore;
fn main() {
// 创建信号量,初始值为3
let sem = Arc::new(Semaphore::new(3));
// 创建10个线程模拟并发访问
let mut handles = vec![];
for i in 0..10 {
let sem_clone = sem.clone();
let handle = thread::spawn(move || {
// 尝试获取信号量
println!("[线程 {}] 尝试获取资源...", i);
let permit = sem_clone.access();
// 获取到资源
println!("[线程 {}] 获取资源成功", i);
// 模拟资源占用1秒
thread::sleep(Duration::from_secs(1));
// permit离开作用域自动释放
println!("[线程 {}] 释放资源", i);
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
println!("所有线程执行完毕");
}
完整示例说明
- 创建了一个初始值为3的信号量,表示最多允许3个线程同时访问资源
- 创建了10个工作线程模拟高并发场景
- 每个线程会先尝试获取信号量,最多3个线程能同时获取成功
- 获取到资源的线程会占用资源1秒钟
- 资源使用完毕后会自动释放(通过RAII机制)
- 主线程等待所有工作线程完成
完整示例输出
可能的输出结果(顺序可能不同):
[线程 0] 尝试获取资源...
[线程 1] 尝试获取资源...
[线程 2] 尝试获取资源...
[线程 3] 尝试获取资源...
[线程 4] 尝试获取资源...
[线程 5] 尝试获取资源...
[线程 6] 尝试获取资源...
[线程 7] 尝试获取资源...
[线程 8] 尝试获取资源...
[线程 9] 尝试获取资源...
[线程 0] 获取资源成功
[线程 1] 获取资源成功
[线程 2] 获取资源成功
[线程 0] 释放资源
[线程 3] 获取资源成功
[线程 1] 释放资源
[线程 4] 获取资源成功
[线程 2] 释放资源
[线程 5] 获取资源成功
[线程 3] 释放资源
[线程 6] 获取资源成功
[线程 4] 释放资源
[线程 7] 获取资源成功
[线程 5] 释放资源
[线程 8] 获取资源成功
[线程 6] 释放资源
[线程 9] 获取资源成功
[线程 7] 释放资源
[线程 8] 释放资源
[线程 9] 释放资源
所有线程执行完毕
1 回复
Rust信号量同步库std-semaphore的使用指南
std-semaphore
是Rust中一个提供信号量(Semaphore)同步原语的库,用于线程间的同步与资源管理。信号量是一种经典的并发控制机制,可以限制同时访问某个资源的线程数量。
基本概念
信号量维护了一个计数器,表示可用资源的数量。主要操作包括:
acquire
(或wait
): 获取资源,计数器减1release
(或signal
): 释放资源,计数器加1
使用方法
首先在Cargo.toml
中添加依赖:
[dependencies]
std-semaphore = "0.1"
基本示例
use std_semaphore::Semaphore;
use std::sync::Arc;
use std::thread;
fn main() {
// 创建一个初始值为3的信号量
let semaphore = Arc::new(Semaphore::new(3));
let mut handles = vec![];
for i in 0..10 {
let sem = semaphore.clone();
handles.push(thread::spawn(move || {
// 获取信号量许可
sem.acquire();
println!("线程 {} 获取了资源", i);
// 模拟工作
thread::sleep(std::time::Duration::from_secs(1));
println!("线程 {} 释放了资源", i);
// 释放信号量许可
sem.release();
}));
}
for handle in handles {
handle.join().unwrap();
}
}
带超时的信号量获取
use std_semaphore::Semaphore;
use std::time::Duration;
fn main() {
let sem = Semaphore::new(1);
// 第一个线程获取信号量
sem.acquire();
// 第二个线程尝试获取,但设置超时
let result = sem.try_acquire(Duration::from_secs(1));
match result {
Ok(_) => println!("成功获取信号量"),
Err(_) => println!("获取信号量超时"),
}
}
使用信号量实现生产者-消费者模式
use std_semaphore::Semaphore;
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let buffer = Arc::new(Mutex::new(Vec::<i32>::new()));
let items = Arc::new(Semaphore::new(0)); // 初始没有可消费的项目
let spaces = Arc::new(Semaphore::new(10)); // 缓冲区大小为10
let producer = {
let buffer = buffer.clone();
let items = items.clone();
let spaces = spaces.clone();
thread::spawn(move || {
for i in 0..20 {
spaces.acquire(); // 等待有空位
{
let mut buf = buffer.lock().unwrap();
buf.push(i);
println!("生产: {}", i);
}
items.release(); // 增加可消费项目
}
})
};
let consumer = {
let buffer = buffer.clone();
let items = items.clone();
let spaces = spaces.clone();
thread::spawn(move || {
for _ in 0..20 {
items.acquire(); // 等待有项目可消费
let item = {
let mut buf = buffer.lock().unwrap();
buf.pop().unwrap()
};
println!("消费: {}", item);
spaces.release(); // 释放空位
}
})
};
producer.join().unwrap();
consumer.join().unwrap();
}
完整示例代码
下面是一个结合了上述所有特性的完整示例:
use std_semaphore::Semaphore;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
// 1. 基本信号量使用示例
basic_semaphore_example();
// 2. 带超时的信号量获取示例
timeout_semaphore_example();
// 3. 生产者-消费者模式示例
producer_consumer_example();
}
fn basic_semaphore_example() {
println!("\n=== 基本信号量示例 ===");
let semaphore = Arc::new(Semaphore::new(3));
let mut handles = vec![];
for i in 0..5 {
let sem = semaphore.clone();
handles.push(thread::spawn(move || {
sem.acquire();
println!("线程 {} 获取了资源", i);
thread::sleep(Duration::from_millis(500));
println!("线程 {} 释放了资源", i);
sem.release();
}));
}
for handle in handles {
handle.join().unwrap();
}
}
fn timeout_semaphore_example() {
println!("\n=== 带超时的信号量示例 ===");
let sem = Arc::new(Semaphore::new(1));
// 第一个线程获取并持有信号量
let sem1 = sem.clone();
let handle1 = thread::spawn(move || {
sem1.acquire();
println!("线程1获取了信号量并持有2秒");
thread::sleep(Duration::from_secs(2));
sem1.release();
});
// 第二个线程尝试获取但会超时
thread::sleep(Duration::from_millis(500)); // 确保线程1先获取
let sem2 = sem.clone();
let handle2 = thread::spawn(move || {
match sem2.try_acquire(Duration::from_secs(1)) {
Ok(_) => println!("线程2成功获取信号量"),
Err(_) => println!("线程2获取信号量超时"),
}
});
handle1.join().unwrap();
handle2.join().unwrap();
}
fn producer_consumer_example() {
println!("\n=== 生产者-消费者示例 ===");
let buffer = Arc::new(Mutex::new(Vec::new()));
let items = Arc::new(Semaphore::new(0));
let spaces = Arc::new(Semaphore::new(5)); // 限制缓冲区大小为5
let producer = {
let buffer = buffer.clone();
let items = items.clone();
let spaces = spaces.clone();
thread::spawn(move || {
for i in 1..=10 {
spaces.acquire();
{
let mut buf = buffer.lock().unwrap();
buf.push(i);
println!("生产者添加: {}", i);
}
items.release();
thread::sleep(Duration::from_millis(200));
}
})
};
let consumer = {
let buffer = buffer.clone();
let items = items.clone();
let spaces = spaces.clone();
thread::spawn(move || {
for _ in 1..=10 {
items.acquire();
let item = {
let mut buf = buffer.lock().unwrap();
buf.pop().unwrap()
};
println!("消费者取出: {}", item);
spaces.release();
thread::sleep(Duration::from_millis(300));
}
})
};
producer.join().unwrap();
consumer.join().unwrap();
}
注意事项
std-semaphore
不是标准库的一部分,但API设计类似于标准库风格- 信号量通常与
Arc
一起使用,以便在线程间共享 - 注意避免死锁情况,确保所有获取的信号量最终都会被释放
- 对于复杂场景,可以考虑结合
Mutex
或RwLock
一起使用
信号量是强大的并发控制工具,特别适合需要限制资源访问数量的场景,如连接池、限流等。