Rust系统编程利器linux-futex的使用:Linux快速用户空间互斥锁的高效实现

Rust系统编程利器linux-futex的使用:Linux快速用户空间互斥锁的高效实现

linux-futex是一个提供Linux快速用户空间互斥锁(Futex)易用包装的Rust crate,它封装了不太容易使用的SYS_futex Linux系统调用。

主要特性

该crate提供了两个主要类型:

  • Futex: 基本futex操作
  • PiFutex: 优先级继承futex操作

这两个类型都是包含AtomicU32的简单包装器,暴露了Linux可以应用的所有futex操作。

现有的AtomicU32可以通过AsFutex trait作为futex使用,而无需改变其类型。

安装

在Cargo.toml中添加:

linux-futex = "1.0.0"

或者运行:

cargo add linux-futex

示例代码

下面是一个使用linux-futex实现简单互斥锁的完整示例:

use linux_futex::Futex;
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread;

// 使用Futex实现一个简单的互斥锁
struct Mutex {
    futex: AtomicU32,
}

impl Mutex {
    const UNLOCKED: u32 = 0;
    const LOCKED: u32 = 1;
    const CONTENDED: u32 = 2;

    fn new() -> Self {
        Mutex {
            futex: AtomicU32::new(Self::UNLOCKED),
        }
    }

    fn lock(&self) {
        // 尝试原子性地将状态从UNLOCKED改为LOCKED
        if self.futex.compare_exchange(
            Self::UNLOCKED,
            Self::LOCKED,
            Ordering::Acquire,
            Ordering::Relaxed,
        ).is_err() {
            // 如果失败,说明锁已被持有,进入等待状态
            self.lock_contended();
        }
    }

    fn lock_contended(&self) {
        // 循环尝试获取锁
        loop {
            // 设置状态为CONTENDED表示有等待者
            let state = self.futex.swap(Self::CONTENDED, Ordering::Acquire);
            
            if state == Self::UNLOCKED {
                // 成功获取锁
                return;
            }
            
            // 等待直到状态改变
            Futex::wait(&self.futex, Self::CONTENDED, None);
        }
    }

    fn unlock(&self) {
        // 释放锁并唤醒一个等待者
        if self.futex.swap(Self::UNLOCKED, Ordering::Release) == Self::CONTENDED {
            Futex::wake(&self.futex, 1); // 唤醒一个等待线程
        }
    }
}

fn main() {
    let mutex = Mutex::new();
    let mutex_ref = &mutex;
    
    thread::scope(|s| {
        for i in 0..4 {
            s.spawn(move || {
                mutex_ref.lock();
                println!("Thread {} got the lock", i);
                thread::sleep(std::time::Duration::from_millis(100));
                mutex_ref.unlock();
            });
        }
    });
}

工作原理

这个示例展示了如何使用linux-futex实现一个简单的互斥锁:

  1. UNLOCKED状态表示锁可用
  2. LOCKED状态表示锁被占用但没有等待者
  3. CONTENDED状态表示锁被占用且有等待者

通过Futex::waitFutex::wake系统调用,避免了忙等待(busy waiting),提高了效率。

完整示例代码

以下是一个更完整的示例,展示了如何使用linux-futex实现一个线程安全的计数器:

use linux_futex::Futex;
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread;
use std::time::Duration;

// 线程安全的计数器,使用Futex实现同步
struct ThreadSafeCounter {
    value: AtomicU32,
    lock: AtomicU32,  // 使用AtomicU32作为futex
}

impl ThreadSafeCounter {
    const UNLOCKED: u32 = 0;
    const LOCKED: u32 = 1;
    const CONTENDED: u32 = 2;

    fn new(initial_value: u32) -> Self {
        ThreadSafeCounter {
            value: AtomicU32::new(initial_value),
            lock: AtomicU32::new(Self::UNLOCKED),
        }
    }

    // 获取锁
    fn lock(&self) {
        if self.lock.compare_exchange(
            Self::UNLOCKED,
            Self::LOCKED,
            Ordering::Acquire,
            Ordering::Relaxed,
        ).is_err() {
            self.lock_contended();
        }
    }

    // 竞争锁时的处理
    fn lock_contended(&self) {
        loop {
            let state = self.lock.swap(Self::CONTENDED, Ordering::Acquire);
            if state == Self::UNLOCKED {
                return;
            }
            Futex::wait(&self.lock, Self::CONTENDED, None);
        }
    }

    // 释放锁
    fn unlock(&self) {
        if self.lock.swap(Self::UNLOCKED, Ordering::Release) == Self::CONTENDED {
            Futex::wake(&self.lock, 1);
        }
    }

    // 线程安全地增加计数器
    fn increment(&self) -> u32 {
        self.lock();
        let val = self.value.fetch_add(1, Ordering::Relaxed) + 1;
        self.unlock();
        val
    }

    // 线程安全地获取计数器值
    fn get(&self) -> u32 {
        self.lock();
        let val = self.value.load(Ordering::Relaxed);
        self.unlock();
        val
    }
}

fn main() {
    let counter = ThreadSafeCounter::new(0);
    let counter_ref = &counter;

    thread::scope(|s| {
        // 创建10个线程并发增加计数器
        for i in 0..10 {
            s.spawn(move || {
                for _ in 0..100 {
                    let new_val = counter_ref.increment();
                    if i == 0 && new_val % 50 == 0 {
                        println!("Counter reached: {}", new_val);
                    }
                    thread::sleep(Duration::from_micros(100));
                }
            });
        }
    });

    println!("Final counter value: {}", counter.get());
}

工作原理说明

这个完整示例展示了linux-futex的更多实际用法:

  1. 实现了一个线程安全的计数器
  2. 使用Futex同步机制保护共享数据
  3. 展示了更复杂的锁获取和释放逻辑
  4. 演示了多线程环境下的数据竞争保护

通过Futex::waitFutex::wake系统调用,线程在无法获取锁时会进入睡眠状态,而不是忙等待,从而减少了CPU资源的浪费。

许可证

BSD-2-Clause许可证


1 回复

Rust系统编程利器:linux-futex的使用指南

什么是linux-futex?

linux-futex(Fast Userspace Mutex)是Linux提供的一种高效同步原语,它结合了用户空间的快速路径和内核空间的等待队列管理,是构建高性能锁的基础。Rust的linux-futex crate提供了对Linux futex系统调用的安全包装。

主要特性

  • 用户空间快速路径:无竞争情况下完全在用户空间操作
  • 内核辅助的阻塞:有竞争时利用内核管理等待队列
  • 低延迟和高吞吐量
  • 支持多种操作:等待、唤醒、比较等待等

使用方法

首先在Cargo.toml中添加依赖:

[dependencies]
linux-futex = "0.1"

基本示例:简单的互斥锁

use linux_futex::{Futex, WakeWaiters};
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread;

struct SimpleMutex {
    futex: Futex,
    state: AtomicU32, // 0=unlocked, 1=locked
}

impl SimpleMutex {
    pub fn new() -> Self {
        Self {
            futex: Futex::new(0),
            state: AtomicU32::new(0),
        }
    }

    pub fn lock(&self) {
        // 尝试原子交换,如果原值是0(未锁定),则设置为1(锁定)
        while self.state.compare_exchange_weak(
            0, 
            1, 
            Ordering::Acquire, 
            Ordering::Relaxed
        ).is_err() {
            // 如果锁定失败,进入等待
            self.futex.wait(
                1,    // 期望值是1(表示锁被占用)
                None  // 无限等待
            ).unwrap();
        }
    }

    pub fn unlock(&self) {
        self.state.store(0, Ordering::Release);
        // 唤醒一个等待线程
        self.futex.wake(WakeWaiters::One);
    }
}

// 使用示例
fn main() {
    let mutex = std::sync::Arc::new(SimpleMutex::new());
    let mut handles = vec![];

    for i in 0..5 {
        let mutex_clone = mutex.clone();
        handles.push(thread::spawn(move || {
            mutex_clone.lock();
            println!("Thread {} acquired the lock", i);
            thread::sleep(std::time::Duration::from_millis(100));
            println!("Thread {} releasing the lock", i);
            mutex_clone.unlock();
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

高级用法:条件变量实现

use linux_futex::{Futex, WakeWaiters};
use std::sync::atomic::{AtomicU32, Ordering};

struct Condvar {
    futex: Futex,
    counter: AtomicU32,
}

impl Condvar {
    pub fn new() -> Self {
        Self {
            futex: Futex::new(0),
            counter: AtomicU32::new(0),
        }
    }

    pub fn wait(&self, mutex: &SimpleMutex) {
        let current_counter = self.counter.load(Ordering::Relaxed);
        mutex.unlock();
        
        self.futex.wait(current_counter, None).unwrap();
        
        mutex.lock();
    }

    pub fn notify_one(&self) {
        self.counter.fetch_add(1, Ordering::Release);
        self.futex.wake(WakeWaiters::One);
    }

    pub fn notify_all(&self) {
        self.counter.fetch_add
回到顶部