Rust分布式系统模拟库madsim-tokio的使用:基于Tokio的确定性仿真与测试框架
Rust分布式系统模拟库madsim-tokio的使用:基于Tokio的确定性仿真与测试框架
madsim-tokio是madsim项目中的Tokio模拟器,它提供了一个与Tokio API兼容的确定性仿真环境。
使用方式
在Cargo.toml中将所有tokio依赖替换为madsim-tokio:
[dependencies]
tokio = { version = "0.2", package = "madsim-tokio" }
完整示例
下面是一个使用madsim-tokio构建简单分布式系统的完整示例:
use madsim_tokio::net::{TcpListener, TcpStream};
use std::io::{self, Read, Write};
// 服务器端代码
async fn server() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
madsim_tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(n) if n == 0 => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return;
}
};
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("failed to write to socket; err = {:?}", e);
return;
}
}
});
}
}
// 客户端代码
async fn client() -> io::Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
// 发送数据
stream.write_all(b"hello world").await?;
// 接收回显
let mut buf = [0; 11];
stream.read_exact(&mut buf).await?;
assert_eq!(&buf, b"hello world");
Ok(())
}
#[madsim_tokio::main]
async fn main() -> io::Result<()> {
// 启动服务器
let server_handle = madsim_tokio::spawn(server());
// 运行客户端
client().await?;
// 关闭服务器
server_handle.abort();
Ok(())
}
测试示例
madsim-tokio特别适合用于分布式系统的确定性测试:
#[madsim_tokio::test]
async fn test_distributed_system() {
// 模拟网络分区
madsim_tokio::net::partition("node1", "node2");
// 节点1尝试与节点2通信
let result = node1_call_node2().await;
assert!(result.is_err());
// 恢复网络
madsim_tokio::net::recover_all();
// 再次尝试通信
let result = node1_call_node2().await;
assert!(result.is_ok());
}
特性
- 与Tokio API完全兼容
- 确定性仿真
- 网络故障注入
- 时间控制
- 适合分布式系统测试
许可证
Apache-2.0
完整示例Demo
基于上述内容,这里提供一个更完整的分布式键值存储模拟示例:
use madsim_tokio::{
net::{TcpListener, TcpStream},
time::{sleep, Duration},
};
use std::{
collections::HashMap,
io::{self, Read, Write},
sync::Arc,
};
// 键值存储服务器
struct KvServer {
store: Arc<parking_lot::RwLock<HashMap<String, String>>>,
}
impl KvServer {
async fn run(self) -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6379").await?;
loop {
let (mut socket, _) = listener.accept().await?;
let store = self.store.clone();
madsim_tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
// 读取客户端请求
let n = match socket.read(&mut buf).await {
Ok(0) => break, // 连接关闭
Ok(n) => n,
Err(e) => {
eprintln!("读取错误: {:?}", e);
break;
}
};
// 解析和处理请求
let request = String::from_utf8_lossy(&buf[..n]);
let response = match Self::process_request(&store, &request) {
Ok(res) => res,
Err(e) => format!("ERROR: {}", e),
};
// 发送响应
if let Err(e) = socket.write_all(response.as_bytes()).await {
eprintln!("写入错误: {:?}", e);
break;
}
}
});
}
}
fn process_request(
store: &Arc<parking_lot::RwLock<HashMap<String, String>>>,
request: &str,
) -> Result<String, String> {
let parts: Vec<&str> = request.trim().split(' ').collect();
match parts.as_slice() {
["GET", key] => {
let store = store.read();
store.get(*key)
.map(|v| v.to_string())
.ok_or_else(|| "Key not found".to_string())
}
["SET", key, value] => {
let mut store = store.write();
store.insert(key.to_string(), value.to_string());
Ok("OK".to_string())
}
_ => Err("Invalid command".to_string()),
}
}
}
// 客户端代码
async fn kv_client() -> io::Result<()> {
// 模拟3个客户端并发操作
let client1 = madsim_tokio::spawn(async {
let mut stream = TcpStream::connect("127.0.0.1:6379").await.unwrap();
stream.write_all(b"SET key1 value1").await.unwrap();
println!("Client1: SET key1 value1");
});
let client2 = madsim_tokio::spawn(async {
sleep(Duration::from_secs(1)).await;
let mut stream = TcpStream::connect("127.0.0.1:6379").await.unwrap();
stream.write_all(b"GET key1").await.unwrap();
let mut buf = [0; 128];
let n = stream.read(&mut buf).await.unwrap();
println!("Client2: GET key1 -> {}", String::from_utf8_lossy(&buf[..n]));
});
let client3 = madsim_tokio::spawn(async {
sleep(Duration::from_secs(2)).await;
let mut stream = TcpStream::connect("127.0.0.1:6379").await.unwrap();
stream.write_all(b"SET key1 new_value").await.unwrap();
println!("Client3: SET key1 new_value");
});
client1.await?;
client2.await?;
client3.await?;
Ok(())
}
#[madsim_tokio::test]
async fn test_kv_store() {
// 启动KV服务器
let server = KvServer {
store: Arc::new(parking_lot::RwLock::new(HashMap::new())),
};
let server_handle = madsim_tokio::spawn(server.run());
// 模拟网络延迟
madsim_tokio::net::set_delay(Duration::from_millis(100));
// 运行客户端测试
kv_client().await.unwrap();
// 关闭服务器
server_handle.abort();
}
这个示例展示了:
- 一个简单的分布式键值存储服务器实现
- 多个客户端并发操作
- 模拟网络延迟
- 完整的测试用例
- 使用madsim-tokio提供的各种特性
1 回复
以下是基于madsim-tokio的完整示例demo,展示了一个分布式键值存储的模拟测试场景:
use madsim_tokio::{
net::{TcpListener, TcpStream},
runtime::Runtime,
time,
};
use std::{
collections::HashMap,
io::{Read, Write},
net::SocketAddr,
sync::{Arc, Mutex},
time::Duration,
};
// 简单的内存键值存储服务
#[derive(Clone, Default)]
struct KvStore {
data: Arc<Mutex<HashMap<String, String>>>,
}
impl KvStore {
fn new() -> Self {
Self::default()
}
async fn handle_connection(&self, mut stream: TcpStream) {
let mut buf = [0; 1024];
loop {
let n = stream.read(&mut buf).await.unwrap();
if n == 0 {
break; // 连接关闭
}
let request = String::from_utf8_lossy(&buf[..n]);
let response = match request.trim() {
cmd if cmd.starts_with("GET ") => {
let key = cmd.strip_prefix("GET ").unwrap();
let data = self.data.lock().unwrap();
data.get(key).cloned().unwrap_or_else(|| "Not Found".into())
}
cmd if cmd.starts_with("SET ") => {
let parts: Vec<&str> = cmd.splitn(3, ' ').collect();
if parts.len() != 3 {
"Invalid SET command".into()
} else {
let mut data = self.data.lock().unwrap();
data.insert(parts[1].to_string(), parts[2].to_string());
"OK".into()
}
}
_ => "Unknown command".into(),
};
stream.write_all(response.as_bytes()).await.unwrap();
}
}
}
#[test]
fn distributed_kv_store_test() {
let runtime = Runtime::new();
runtime.block_on(async {
// 创建3个节点模拟分布式环境
let node1 = runtime.create_node();
let node2 = runtime.create_node();
let node3 = runtime.create_node();
let addr: SocketAddr = "127.0.0.1:6379".parse().unwrap();
// 在节点1上启动KV服务器
let kv_store = KvStore::new();
let server_handle = node1.spawn({
let kv_store = kv_store.clone();
async move {
let listener = TcpListener::bind(addr).await.unwrap();
println!("Server started on {}", addr);
loop {
let (stream, _) = listener.accept().await.unwrap();
let kv_store = kv_store.clone();
node1.spawn(async move {
kv_store.handle_connection(stream).await;
});
}
}
});
// 在节点2和节点3上运行客户端
let client1_handle = node2.spawn(async move {
// 模拟网络延迟
time::sleep(Duration::from_millis(100)).await;
// 连接到服务器
let mut stream = TcpStream::connect(addr).await.unwrap();
// 设置键值
stream.write_all(b"SET foo bar").await.unwrap();
let mut buf = [0; 2];
stream.read_exact(&mut buf).await.unwrap();
println!("Client1 SET response: {}", String::from_utf8_lossy(&buf));
// 获取值
stream.write_all(b"GET foo").await.unwrap();
let mut buf = [0; 3];
stream.read_exact(&mut buf).await.unwrap();
println!("Client1 GET response: {}", String::from_utf8_lossy(&buf));
});
let client2_handle = node3.spawn(async move {
// 更长的延迟
time::sleep(Duration::from_millis(200)).await;
// 注入网络故障 - 50%丢包率
madsim_tokio::net::set_loss_rate(0.5);
let mut stream = TcpStream::connect(addr).await.unwrap();
stream.write_all(b"GET foo").await.unwrap();
let mut buf = [0; 3];
stream.read_exact(&mut buf).await.unwrap();
println!("Client2 GET response: {}", String::from_utf8_lossy(&buf));
});
// 等待所有任务完成
tokio::join!(server_handle, client1_handle, client2_handle);
});
}
这个示例演示了:
- 创建一个简单的分布式键值存储服务
- 在模拟环境中运行多个节点(1个服务器,2个客户端)
- 模拟网络延迟和丢包等真实场景
- 测试键值存储的基本操作
测试输出会是确定性的,即使有随机网络故障注入。每次运行测试都会得到相同的结果,方便调试分布式系统中的问题。