Rust异步ZeroMQ库async_zmq的使用:高性能消息传递与分布式系统开发
Rust异步ZeroMQ库async_zmq的使用:高性能消息传递与分布式系统开发
async-zmq是ZeroMQ的高层异步绑定库,兼容所有异步运行时(如tokio、async-stio等)。无需配置或调整功能,只需直接使用即可。
基本用法
用户可以通过async_zmq::*
初始化任何类型的socket,然后根据场景调用bind()
或connect()
。例如,创建一个发布(publish) socket:
let zmq = async_zmq::publish("tcp://127.0.0.1:5555")?.bind();
如果需要在不同socket间共享上下文,可以在构建socket时设置:
let context = Context::new();
let xpub = async_zmq::xpublish("inproc://example")?.with_context(&context).bind();
let sub = subscribe("inproc://example")?.with_context(&context).connect()?;
完整示例
下面是一个完整的发布者/订阅者模式的示例:
发布者 (Publisher)
use async_zmq::{publish, Result};
use futures::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// 创建发布者socket并绑定到5555端口
let mut zmq = publish("tcp://127.0.0.1:5555")?.bind()?;
// 发送消息
for i in 0..10 {
zmq.send(vec![format!("Message {}", i).into()]).await?;
println!("Sent message {}", i);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
}
订阅者 (Subscriber)
use async_zmq::{subscribe, Result};
use futures::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// 创建订阅者socket并连接到发布者
let mut zmq = subscribe("tcp://127.0.0.1:5555")?.connect()?;
// 订阅所有消息
zmq.set_subscribe(b"").unwrap();
// 接收消息
while let Some(msg) = zmq.next().await {
let msg = msg?;
println!("Received: {:?}", msg);
}
Ok(())
}
特性
- 完全异步:兼容所有主流异步运行时
- 高性能:基于ZeroMQ的高性能消息传递
- 多部分消息:支持发送/接收多部分消息(Multipart messages)
- 上下文共享:多个socket可以共享同一个上下文
安装
在项目中添加以下依赖:
[dependencies]
async_zmq = "0.4.0"
或者运行以下命令:
cargo add async_zmq
async-zmq提供了简单易用的API来进行高性能的异步消息传递,非常适合构建分布式系统和微服务架构。
完整请求-响应模式示例
请求端 (Requester)
use async_zmq::{request, Result};
use futures::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// 创建请求socket并连接到响应端
let mut zmq = request("tcp://127.0.0.1:5556")?.connect()?;
// 发送请求
for i in 0..5 {
zmq.send(vec![format!("Request {}", i).into()]).await?;
println!("Sent request {}", i);
// 等待响应
if let Some(reply) = zmq.next().await {
let reply = reply?;
println!("Received reply: {:?}", reply);
}
}
Ok(())
}
响应端 (Responder)
use async_zmq::{reply, Result};
use futures::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// 创建响应socket并绑定到5556端口
let mut zmq = reply("tcp://127.0.0.1:5556")?.bind()?;
// 处理请求
while let Some(request) = zmq.next().await {
let request = request?;
println!("Received request: {:?}", request);
// 发送响应
zmq.send(vec!["Response".into()]).await?;
println!("Sent response");
}
Ok(())
}
完整推送-拉取模式示例
推送端 (Pusher)
use async_zmq::{push, Result};
use futures::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// 创建推送socket并绑定到5557端口
let mut zmq = push("tcp://127.0.0.1:5557")?.bind()?;
// 推送消息
for i in 0..10 {
zmq.send(vec![format!("Work item {}", i).into()]).await?;
println!("Pushed work item {}", i);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
Ok(())
}
拉取端 (Puller)
use async_zmq::{pull, Result};
use futures::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// 创建拉取socket并连接到推送端
let mut zmq = pull("tcp://127.0.0.1:5557")?.connect()?;
// 处理工作项
while let Some(work) = zmq.next().await {
let work = work?;
println!("Processing work item: {:?}", work);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
}
1 回复
以下是根据提供的内容整理的async_zmq
完整示例,包含请求/回复、发布/订阅和推送/拉取三种模式的完整实现:
完整示例1:请求/回复模式(增强版)
use async_zmq::{Result, Multipart};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<()> {
// 启动服务端
tokio::spawn(async move {
let server = async_zmq::rep("tcp://127.0.0.1:5555")?.bind()?;
println!("Server started");
loop {
// 接收请求
if let Ok(request) = server.recv().await {
println!("Server received: {:?}", request);
// 处理请求并回复
sleep(Duration::from_millis(100)).await; // 模拟处理耗时
server.send(vec!["Response"].into()).await?;
}
}
});
// 启动多个客户端
for i in 0..3 {
tokio::spawn(async move {
let client = async_zmq::req("tcp://127.0.0.1:5555")?.connect()?;
println!("Client {} connected", i);
// 发送请求
client.send(vec![format!("Request {}", i)].into()).await?;
// 接收回复
if let Ok(reply) = client.recv().await {
println!("Client {} received: {:?}", i, reply);
}
});
}
sleep(Duration::from_secs(2)).await; // 等待所有操作完成
Ok(())
}
完整示例2:发布/订阅模式(增强版)
use async_zmq::{Result, Multipart};
use futures::StreamExt;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<()> {
// 发布者
let publisher = async_zmq::pub("tcp://127.0.0.1:5556")?.bind()?;
// 启动3个不同主题的订阅者
for topic in ["news", "weather", "sports"] {
tokio::spawn({
let topic = topic.to_string();
async move {
let mut subscriber = async_zmq::sub("tcp://127.0.0.1:5556")?
.connect()?
.subscribe(&topic)?;
println!("Subscriber for {} started", topic);
while let Some(msg) = subscriber.next().await {
println!("[{}] received: {:?}", topic, msg);
}
Ok::<(), async_zmq::Error>(())
}
});
}
// 发布不同主题的消息
let topics = ["news", "weather", "sports"];
for i in 0..10 {
let topic = topics[i % 3];
publisher
.send(vec![topic, &format!("Update {}", i)].into())
.await?;
sleep(Duration::from_millis(200)).await;
}
sleep(Duration::from_secs(1)).await; // 确保消息被接收
Ok(())
}
完整示例3:推送/拉取模式(工作队列)
use async_zmq::{Result, Multipart};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<()> {
// 任务生产者
let pusher = async_zmq::push("tcp://127.0.0.1:5557")?.bind()?;
// 启动3个工作线程
for worker_id in 0..3 {
tokio::spawn({
async move {
let puller = async_zmq::pull("tcp://127.0.0.1:5557")?.connect()?;
println!("Worker {} started", worker_id);
while let Ok(task) = puller.recv().await {
println!("Worker {} processing: {:?}", worker_id, task);
sleep(Duration::from_millis(500)).await; // 模拟处理耗时
println!("Worker {} finished processing", worker_id);
}
Ok::<(), async_zmq::Error>(())
}
});
}
// 生产10个任务
for i in 0..10 {
pusher.send(vec![format!("Task {}", i)].into()).await?;
sleep(Duration::from_millis(100)).await;
}
sleep(Duration::from_secs(3)).await; // 等待所有任务完成
Ok(())
}
完整示例4:多部分消息(文件传输模拟)
use async_zmq::{Result, Multipart};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<()> {
// 发送方
let sender = async_zmq::push("tcp://127.0.0.1:5558")?.bind()?;
// 接收方
let receiver = async_zmq::pull("tcp://127.0.0.1:5558")?.connect()?;
tokio::spawn(async move {
// 模拟发送文件分片
let file_name = "example.txt";
let chunks = vec![
b"First part of the file".to_vec(),
b"Second part of the file".to_vec(),
b"Final part of the file".to_vec(),
];
for (i, chunk) in chunks.into_iter().enumerate() {
let message = Multipart::from(vec![
file_name.as_bytes().to_vec(),
format!("{}/3", i + 1).as_bytes().to_vec(),
chunk,
]);
sender.send(message).await?;
sleep(Duration::from_millis(100)).await;
}
Ok::<(), async_zmq::Error>(())
});
// 接收并组装文件分片
while let Ok(Multart(parts)) = receiver.recv().await {
let file_name = String::from_utf8_lossy(&parts[0]);
let progress = String::from_utf8_lossy(&parts[1]);
let data = String::from_utf8_lossy(&parts[2]);
println!(
"Received {} ({}): {} bytes",
file_name,
progress,
data.len()
);
}
Ok(())
}
这些示例展示了async_zmq
在不同场景下的实际应用,包括:
- 客户端-服务器通信模式
- 发布/订阅模式的消息广播
- 工作队列的任务分发
- 多部分消息的分块传输
每个示例都包含了错误处理、异步任务管理和实际应用场景的模拟,可以直接运行测试。