Rust分布式系统模拟库madsim-tokio的使用:基于Tokio的确定性仿真与测试框架

Rust分布式系统模拟库madsim-tokio的使用:基于Tokio的确定性仿真与测试框架

madsim-tokio是madsim项目中的Tokio模拟器,它提供了一个与Tokio API兼容的确定性仿真环境。

使用方式

在Cargo.toml中将所有tokio依赖替换为madsim-tokio:

[dependencies]
tokio = { version = "0.2", package = "madsim-tokio" }

完整示例

下面是一个使用madsim-tokio构建简单分布式系统的完整示例:

use madsim_tokio::net::{TcpListener, TcpStream};
use std::io::{self, Read, Write};

// 服务器端代码
async fn server() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    
    loop {
        let (mut socket, _) = listener.accept().await?;
        
        madsim_tokio::spawn(async move {
            let mut buf = [0; 1024];
            loop {
                let n = match socket.read(&mut buf).await {
                    Ok(n) if n == 0 => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("failed to read from socket; err = {:?}", e);
                        return;
                    }
                };
                
                if let Err(e) = socket.write_all(&buf[0..n]).await {
                    eprintln!("failed to write to socket; err = {:?}", e);
                    return;
                }
            }
        });
    }
}

// 客户端代码
async fn client() -> io::Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    
    // 发送数据
    stream.write_all(b"hello world").await?;
    
    // 接收回显
    let mut buf = [0; 11];
    stream.read_exact(&mut buf).await?;
    assert_eq!(&buf, b"hello world");
    
    Ok(())
}

#[madsim_tokio::main]
async fn main() -> io::Result<()> {
    // 启动服务器
    let server_handle = madsim_tokio::spawn(server());
    
    // 运行客户端
    client().await?;
    
    // 关闭服务器
    server_handle.abort();
    
    Ok(())
}

测试示例

madsim-tokio特别适合用于分布式系统的确定性测试:

#[madsim_tokio::test]
async fn test_distributed_system() {
    // 模拟网络分区
    madsim_tokio::net::partition("node1", "node2");
    
    // 节点1尝试与节点2通信
    let result = node1_call_node2().await;
    assert!(result.is_err());
    
    // 恢复网络
    madsim_tokio::net::recover_all();
    
    // 再次尝试通信
    let result = node1_call_node2().await;
    assert!(result.is_ok());
}

特性

  1. 与Tokio API完全兼容
  2. 确定性仿真
  3. 网络故障注入
  4. 时间控制
  5. 适合分布式系统测试

许可证

Apache-2.0

完整示例Demo

基于上述内容,这里提供一个更完整的分布式键值存储模拟示例:

use madsim_tokio::{
    net::{TcpListener, TcpStream},
    time::{sleep, Duration},
};
use std::{
    collections::HashMap,
    io::{self, Read, Write},
    sync::Arc,
};

// 键值存储服务器
struct KvServer {
    store: Arc<parking_lot::RwLock<HashMap<String, String>>>,
}

impl KvServer {
    async fn run(self) -> io::Result<()> {
        let listener = TcpListener::bind("127.0.0.1:6379").await?;
        
        loop {
            let (mut socket, _) = listener.accept().await?;
            let store = self.store.clone();
            
            madsim_tokio::spawn(async move {
                let mut buf = [0; 1024];
                
                loop {
                    // 读取客户端请求
                    let n = match socket.read(&mut buf).await {
                        Ok(0) => break,  // 连接关闭
                        Ok(n) => n,
                        Err(e) => {
                            eprintln!("读取错误: {:?}", e);
                            break;
                        }
                    };
                    
                    // 解析和处理请求
                    let request = String::from_utf8_lossy(&buf[..n]);
                    let response = match Self::process_request(&store, &request) {
                        Ok(res) => res,
                        Err(e) => format!("ERROR: {}", e),
                    };
                    
                    // 发送响应
                    if let Err(e) = socket.write_all(response.as_bytes()).await {
                        eprintln!("写入错误: {:?}", e);
                        break;
                    }
                }
            });
        }
    }
    
    fn process_request(
        store: &Arc<parking_lot::RwLock<HashMap<String, String>>>,
        request: &str,
    ) -> Result<String, String> {
        let parts: Vec<&str> = request.trim().split(' ').collect();
        match parts.as_slice() {
            ["GET", key] => {
                let store = store.read();
                store.get(*key)
                    .map(|v| v.to_string())
                    .ok_or_else(|| "Key not found".to_string())
            }
            ["SET", key, value] => {
                let mut store = store.write();
                store.insert(key.to_string(), value.to_string());
                Ok("OK".to_string())
            }
            _ => Err("Invalid command".to_string()),
        }
    }
}

// 客户端代码
async fn kv_client() -> io::Result<()> {
    // 模拟3个客户端并发操作
    let client1 = madsim_tokio::spawn(async {
        let mut stream = TcpStream::connect("127.0.0.1:6379").await.unwrap();
        stream.write_all(b"SET key1 value1").await.unwrap();
        println!("Client1: SET key1 value1");
    });
    
    let client2 = madsim_tokio::spawn(async {
        sleep(Duration::from_secs(1)).await;
        let mut stream = TcpStream::connect("127.0.0.1:6379").await.unwrap();
        stream.write_all(b"GET key1").await.unwrap();
        let mut buf = [0; 128];
        let n = stream.read(&mut buf).await.unwrap();
        println!("Client2: GET key1 -> {}", String::from_utf8_lossy(&buf[..n]));
    });
    
    let client3 = madsim_tokio::spawn(async {
        sleep(Duration::from_secs(2)).await;
        let mut stream = TcpStream::connect("127.0.0.1:6379").await.unwrap();
        stream.write_all(b"SET key1 new_value").await.unwrap();
        println!("Client3: SET key1 new_value");
    });
    
    client1.await?;
    client2.await?;
    client3.await?;
    
    Ok(())
}

#[madsim_tokio::test]
async fn test_kv_store() {
    // 启动KV服务器
    let server = KvServer {
        store: Arc::new(parking_lot::RwLock::new(HashMap::new())),
    };
    let server_handle = madsim_tokio::spawn(server.run());
    
    // 模拟网络延迟
    madsim_tokio::net::set_delay(Duration::from_millis(100));
    
    // 运行客户端测试
    kv_client().await.unwrap();
    
    // 关闭服务器
    server_handle.abort();
}

这个示例展示了:

  1. 一个简单的分布式键值存储服务器实现
  2. 多个客户端并发操作
  3. 模拟网络延迟
  4. 完整的测试用例
  5. 使用madsim-tokio提供的各种特性

1 回复

以下是基于madsim-tokio的完整示例demo,展示了一个分布式键值存储的模拟测试场景:

use madsim_tokio::{
    net::{TcpListener, TcpStream},
    runtime::Runtime,
    time,
};
use std::{
    collections::HashMap,
    io::{Read, Write},
    net::SocketAddr,
    sync::{Arc, Mutex},
    time::Duration,
};

// 简单的内存键值存储服务
#[derive(Clone, Default)]
struct KvStore {
    data: Arc<Mutex<HashMap<String, String>>>,
}

impl KvStore {
    fn new() -> Self {
        Self::default()
    }

    async fn handle_connection(&self, mut stream: TcpStream) {
        let mut buf = [0; 1024];
        loop {
            let n = stream.read(&mut buf).await.unwrap();
            if n == 0 {
                break; // 连接关闭
            }

            let request = String::from_utf8_lossy(&buf[..n]);
            let response = match request.trim() {
                cmd if cmd.starts_with("GET ") => {
                    let key = cmd.strip_prefix("GET ").unwrap();
                    let data = self.data.lock().unwrap();
                    data.get(key).cloned().unwrap_or_else(|| "Not Found".into())
                }
                cmd if cmd.starts_with("SET ") => {
                    let parts: Vec<&str> = cmd.splitn(3, ' ').collect();
                    if parts.len() != 3 {
                        "Invalid SET command".into()
                    } else {
                        let mut data = self.data.lock().unwrap();
                        data.insert(parts[1].to_string(), parts[2].to_string());
                        "OK".into()
                    }
                }
                _ => "Unknown command".into(),
            };

            stream.write_all(response.as_bytes()).await.unwrap();
        }
    }
}

#[test]
fn distributed_kv_store_test() {
    let runtime = Runtime::new();

    runtime.block_on(async {
        // 创建3个节点模拟分布式环境
        let node1 = runtime.create_node();
        let node2 = runtime.create_node();
        let node3 = runtime.create_node();

        let addr: SocketAddr = "127.0.0.1:6379".parse().unwrap();

        // 在节点1上启动KV服务器
        let kv_store = KvStore::new();
        let server_handle = node1.spawn({
            let kv_store = kv_store.clone();
            async move {
                let listener = TcpListener::bind(addr).await.unwrap();
                println!("Server started on {}", addr);

                loop {
                    let (stream, _) = listener.accept().await.unwrap();
                    let kv_store = kv_store.clone();
                    node1.spawn(async move {
                        kv_store.handle_connection(stream).await;
                    });
                }
            }
        });

        // 在节点2和节点3上运行客户端
        let client1_handle = node2.spawn(async move {
            // 模拟网络延迟
            time::sleep(Duration::from_millis(100)).await;

            // 连接到服务器
            let mut stream = TcpStream::connect(addr).await.unwrap();

            // 设置键值
            stream.write_all(b"SET foo bar").await.unwrap();
            let mut buf = [0; 2];
            stream.read_exact(&mut buf).await.unwrap();
            println!("Client1 SET response: {}", String::from_utf8_lossy(&buf));

            // 获取值
            stream.write_all(b"GET foo").await.unwrap();
            let mut buf = [0; 3];
            stream.read_exact(&mut buf).await.unwrap();
            println!("Client1 GET response: {}", String::from_utf8_lossy(&buf));
        });

        let client2_handle = node3.spawn(async move {
            // 更长的延迟
            time::sleep(Duration::from_millis(200)).await;

            // 注入网络故障 - 50%丢包率
            madsim_tokio::net::set_loss_rate(0.5);

            let mut stream = TcpStream::connect(addr).await.unwrap();
            stream.write_all(b"GET foo").await.unwrap();
            let mut buf = [0; 3];
            stream.read_exact(&mut buf).await.unwrap();
            println!("Client2 GET response: {}", String::from_utf8_lossy(&buf));
        });

        // 等待所有任务完成
        tokio::join!(server_handle, client1_handle, client2_handle);
    });
}

这个示例演示了:

  1. 创建一个简单的分布式键值存储服务
  2. 在模拟环境中运行多个节点(1个服务器,2个客户端)
  3. 模拟网络延迟和丢包等真实场景
  4. 测试键值存储的基本操作

测试输出会是确定性的,即使有随机网络故障注入。每次运行测试都会得到相同的结果,方便调试分布式系统中的问题。

回到顶部