Rust系统编程利器linux-futex的使用:Linux快速用户空间互斥锁的高效实现
Rust系统编程利器linux-futex的使用:Linux快速用户空间互斥锁的高效实现
linux-futex是一个提供Linux快速用户空间互斥锁(Futex)易用包装的Rust crate,它封装了不太容易使用的SYS_futex
Linux系统调用。
主要特性
该crate提供了两个主要类型:
Futex
: 基本futex操作PiFutex
: 优先级继承futex操作
这两个类型都是包含AtomicU32
的简单包装器,暴露了Linux可以应用的所有futex操作。
现有的AtomicU32
可以通过AsFutex
trait作为futex使用,而无需改变其类型。
安装
在Cargo.toml中添加:
linux-futex = "1.0.0"
或者运行:
cargo add linux-futex
示例代码
下面是一个使用linux-futex实现简单互斥锁的完整示例:
use linux_futex::Futex;
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread;
// 使用Futex实现一个简单的互斥锁
struct Mutex {
futex: AtomicU32,
}
impl Mutex {
const UNLOCKED: u32 = 0;
const LOCKED: u32 = 1;
const CONTENDED: u32 = 2;
fn new() -> Self {
Mutex {
futex: AtomicU32::new(Self::UNLOCKED),
}
}
fn lock(&self) {
// 尝试原子性地将状态从UNLOCKED改为LOCKED
if self.futex.compare_exchange(
Self::UNLOCKED,
Self::LOCKED,
Ordering::Acquire,
Ordering::Relaxed,
).is_err() {
// 如果失败,说明锁已被持有,进入等待状态
self.lock_contended();
}
}
fn lock_contended(&self) {
// 循环尝试获取锁
loop {
// 设置状态为CONTENDED表示有等待者
let state = self.futex.swap(Self::CONTENDED, Ordering::Acquire);
if state == Self::UNLOCKED {
// 成功获取锁
return;
}
// 等待直到状态改变
Futex::wait(&self.futex, Self::CONTENDED, None);
}
}
fn unlock(&self) {
// 释放锁并唤醒一个等待者
if self.futex.swap(Self::UNLOCKED, Ordering::Release) == Self::CONTENDED {
Futex::wake(&self.futex, 1); // 唤醒一个等待线程
}
}
}
fn main() {
let mutex = Mutex::new();
let mutex_ref = &mutex;
thread::scope(|s| {
for i in 0..4 {
s.spawn(move || {
mutex_ref.lock();
println!("Thread {} got the lock", i);
thread::sleep(std::time::Duration::from_millis(100));
mutex_ref.unlock();
});
}
});
}
工作原理
这个示例展示了如何使用linux-futex实现一个简单的互斥锁:
UNLOCKED
状态表示锁可用LOCKED
状态表示锁被占用但没有等待者CONTENDED
状态表示锁被占用且有等待者
通过Futex::wait
和Futex::wake
系统调用,避免了忙等待(busy waiting),提高了效率。
完整示例代码
以下是一个更完整的示例,展示了如何使用linux-futex实现一个线程安全的计数器:
use linux_futex::Futex;
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread;
use std::time::Duration;
// 线程安全的计数器,使用Futex实现同步
struct ThreadSafeCounter {
value: AtomicU32,
lock: AtomicU32, // 使用AtomicU32作为futex
}
impl ThreadSafeCounter {
const UNLOCKED: u32 = 0;
const LOCKED: u32 = 1;
const CONTENDED: u32 = 2;
fn new(initial_value: u32) -> Self {
ThreadSafeCounter {
value: AtomicU32::new(initial_value),
lock: AtomicU32::new(Self::UNLOCKED),
}
}
// 获取锁
fn lock(&self) {
if self.lock.compare_exchange(
Self::UNLOCKED,
Self::LOCKED,
Ordering::Acquire,
Ordering::Relaxed,
).is_err() {
self.lock_contended();
}
}
// 竞争锁时的处理
fn lock_contended(&self) {
loop {
let state = self.lock.swap(Self::CONTENDED, Ordering::Acquire);
if state == Self::UNLOCKED {
return;
}
Futex::wait(&self.lock, Self::CONTENDED, None);
}
}
// 释放锁
fn unlock(&self) {
if self.lock.swap(Self::UNLOCKED, Ordering::Release) == Self::CONTENDED {
Futex::wake(&self.lock, 1);
}
}
// 线程安全地增加计数器
fn increment(&self) -> u32 {
self.lock();
let val = self.value.fetch_add(1, Ordering::Relaxed) + 1;
self.unlock();
val
}
// 线程安全地获取计数器值
fn get(&self) -> u32 {
self.lock();
let val = self.value.load(Ordering::Relaxed);
self.unlock();
val
}
}
fn main() {
let counter = ThreadSafeCounter::new(0);
let counter_ref = &counter;
thread::scope(|s| {
// 创建10个线程并发增加计数器
for i in 0..10 {
s.spawn(move || {
for _ in 0..100 {
let new_val = counter_ref.increment();
if i == 0 && new_val % 50 == 0 {
println!("Counter reached: {}", new_val);
}
thread::sleep(Duration::from_micros(100));
}
});
}
});
println!("Final counter value: {}", counter.get());
}
工作原理说明
这个完整示例展示了linux-futex的更多实际用法:
- 实现了一个线程安全的计数器
- 使用Futex同步机制保护共享数据
- 展示了更复杂的锁获取和释放逻辑
- 演示了多线程环境下的数据竞争保护
通过Futex::wait
和Futex::wake
系统调用,线程在无法获取锁时会进入睡眠状态,而不是忙等待,从而减少了CPU资源的浪费。
许可证
BSD-2-Clause许可证
1 回复
Rust系统编程利器:linux-futex的使用指南
什么是linux-futex?
linux-futex
(Fast Userspace Mutex)是Linux提供的一种高效同步原语,它结合了用户空间的快速路径和内核空间的等待队列管理,是构建高性能锁的基础。Rust的linux-futex
crate提供了对Linux futex系统调用的安全包装。
主要特性
- 用户空间快速路径:无竞争情况下完全在用户空间操作
- 内核辅助的阻塞:有竞争时利用内核管理等待队列
- 低延迟和高吞吐量
- 支持多种操作:等待、唤醒、比较等待等
使用方法
首先在Cargo.toml中添加依赖:
[dependencies]
linux-futex = "0.1"
基本示例:简单的互斥锁
use linux_futex::{Futex, WakeWaiters};
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread;
struct SimpleMutex {
futex: Futex,
state: AtomicU32, // 0=unlocked, 1=locked
}
impl SimpleMutex {
pub fn new() -> Self {
Self {
futex: Futex::new(0),
state: AtomicU32::new(0),
}
}
pub fn lock(&self) {
// 尝试原子交换,如果原值是0(未锁定),则设置为1(锁定)
while self.state.compare_exchange_weak(
0,
1,
Ordering::Acquire,
Ordering::Relaxed
).is_err() {
// 如果锁定失败,进入等待
self.futex.wait(
1, // 期望值是1(表示锁被占用)
None // 无限等待
).unwrap();
}
}
pub fn unlock(&self) {
self.state.store(0, Ordering::Release);
// 唤醒一个等待线程
self.futex.wake(WakeWaiters::One);
}
}
// 使用示例
fn main() {
let mutex = std::sync::Arc::new(SimpleMutex::new());
let mut handles = vec![];
for i in 0..5 {
let mutex_clone = mutex.clone();
handles.push(thread::spawn(move || {
mutex_clone.lock();
println!("Thread {} acquired the lock", i);
thread::sleep(std::time::Duration::from_millis(100));
println!("Thread {} releasing the lock", i);
mutex_clone.unlock();
}));
}
for handle in handles {
handle.join().unwrap();
}
}
高级用法:条件变量实现
use linux_futex::{Futex, WakeWaiters};
use std::sync::atomic::{AtomicU32, Ordering};
struct Condvar {
futex: Futex,
counter: AtomicU32,
}
impl Condvar {
pub fn new() -> Self {
Self {
futex: Futex::new(0),
counter: AtomicU32::new(0),
}
}
pub fn wait(&self, mutex: &SimpleMutex) {
let current_counter = self.counter.load(Ordering::Relaxed);
mutex.unlock();
self.futex.wait(current_counter, None).unwrap();
mutex.lock();
}
pub fn notify_one(&self) {
self.counter.fetch_add(1, Ordering::Release);
self.futex.wake(WakeWaiters::One);
}
pub fn notify_all(&self) {
self.counter.fetch_add