Rust消息队列库TMQ的使用,TMQ提供高性能异步消息处理与发布订阅功能
Rust消息队列库TMQ的使用,TMQ提供高性能异步消息处理与发布订阅功能
TMQ是一个将Tokio和ZeroMQ桥接起来的Rust库,允许在异步环境中使用ZeroMQ。
当前支持的Socket类型
- Request/Reply
- Publish/Subscribe
- Dealer/Router
- Push/Pull
使用示例
发布(Publish)
发布消息给所有连接的订阅者:
use tmq::{publish, Context, Result};
use futures::SinkExt;
use log::info;
use std::env;
use std::time::Duration;
use tokio::time::delay_for;
#[tokio::main]
async fn main() -> Result<()> {
// 创建发布socket并绑定到本地端口
let mut socket = publish(&Context::new()).bind("tcp://127.0.0.1:7899")?;
let mut i = 0;
loop {
i += 1;
// 发送消息(包含主题和内容)
socket
.send(vec!["topic", &format!("Broadcast #{}", i)])
.await?;
// 每秒发送一次
delay_for(Duration::from_secs(1)).await;
}
}
订阅(Subscribe)
订阅socket是一个Stream
,从发布socket读取值。使用subscribe
方法指定过滤前缀,""
表示接收所有消息:
use futures::StreamExt;
use tmq::{subscribe, Context, Result};
use std::env;
#[tokio::main]
async fn main() -> Result<()> {
// 创建订阅socket并连接到发布者
let mut socket = subscribe(&Context::new())
.connect("tcp://127.0.0.1:7899")?
.subscribe(b"topic")?; // 只订阅"topic"前缀的消息
// 循环接收消息
while let Some(msg) = socket.next().await {
println!(
"Subscribe: {:?}",
msg?.iter()
.map(|item| item.as_str().unwrap_or("invalid text"))
.collect::<Vec<&str>>()
);
}
Ok(())
}
完整示例
下面是一个完整的发布-订阅模式示例,包含发布者和订阅者:
// 发布者
use tmq::{publish, Context, Result};
use futures::SinkExt;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn publisher() -> Result<()> {
let mut socket = publish(&Context::new()).bind("tcp://127.0.0.1:7899")?;
for i in 1..=5 {
socket
.send(vec!["news", &format!("Breaking news #{}", i)])
.await?;
println!("Published: Breaking news #{}", i);
sleep(Duration::from_secs(2)).await;
}
Ok(())
}
// 订阅者
use tmq::{subscribe, Context, Result};
use futures::StreamExt;
#[tokio::main]
async fn subscriber() -> Result<()> {
let mut socket = subscribe(&Context::new())
.connect("tcp://127.0.0.1:7899")?
.subscribe(b"news")?;
while let Some(msg) = socket.next().await {
let msg = msg?;
let content = msg.get(1).and_then(|m| m.as_str()).unwrap_or("");
println!("Received: {}", content);
}
Ok(())
}
// 主函数中同时运行发布者和订阅者
#[tokio::main]
async fn main() {
tokio::spawn(async {
publisher().await.unwrap();
});
subscriber().await.unwrap();
}
这个示例展示了TMQ的基本发布订阅功能,发布者每2秒发送一条新闻消息,订阅者接收并打印这些消息。
1 回复
Rust消息队列库TMQ的使用指南
TMQ是一个高性能的Rust异步消息队列库,提供了消息处理和发布/订阅功能。它专为需要高效消息传递的应用程序设计,具有低延迟和高吞吐量的特点。
主要特性
- 异步消息处理
- 发布/订阅模式支持
- 高性能设计
- 线程安全
- 易于使用的API
安装
在Cargo.toml中添加依赖:
[dependencies]
tmq = "0.5" # 请使用最新版本
基本使用方法
创建消息队列
use tmq::{Context, Result};
fn main() -> Result<()> {
// 创建新的上下文
let ctx = Context::new();
// 创建消息队列
let queue = tmq::queue(&ctx)?;
Ok(())
}
发布消息
use tmq::{publish, Context};
let ctx = Context::new();
// 创建发布者并绑定到指定地址
let publisher = publish(&ctx).bind("tcp://*:5556")?;
// 发送消息
publisher.send("Hello, subscribers!".as_bytes())?;
订阅消息
use tmq::{subscribe, Context};
let ctx = Context::new();
// 创建订阅者并连接到发布者地址
let subscriber = subscribe(&ctx).connect("tcp://localhost:5556")?;
// 循环接收消息
loop {
let message = subscriber.recv()?;
println!("Received: {:?}", message);
}
高级功能
多主题订阅
let subscriber = subscribe(&ctx)
.connect("tcp://localhost:5556")?
// 订阅主题1
.subscribe("topic1")?
// 订阅主题2
.subscribe("topic2")?;
异步处理
use tmq::{subscribe, Context};
use tokio::runtime::Runtime;
// 创建Tokio运行时
let rt = Runtime::new()?;
let ctx = Context::new();
let subscriber = subscribe(&ctx).connect("tcp://localhost:5556")?;
// 异步接收消息
rt.block_on(async {
loop {
let message = subscriber.recv_async().await?;
println!("Async received: {:?}", message);
}
});
消息批处理
let publisher = publish(&ctx).bind("tcp://*:5556")?;
// 批量消息
let messages = vec![
"message1".as_bytes(),
"message2".as_bytes(),
"message3".as_bytes(),
];
// 批量发送
for msg in messages {
publisher.send(msg)?;
}
性能优化建议
- 重用Context对象而不是频繁创建新的
- 对于高吞吐量场景,考虑使用批处理发送
- 在可能的情况下使用异步API
- 合理设置缓冲区大小
错误处理
TMQ操作返回Result
类型,建议妥善处理错误:
match subscriber.recv() {
Ok(msg) => println!("Got message: {:?}", msg),
Err(e) => eprintln!("Error receiving message: {}", e),
}
完整示例代码
下面是一个完整的发布者/订阅者示例:
use tmq::{publish, subscribe, Context, Result};
use std::thread;
use std::time::Duration;
fn publisher() -> Result<()> {
let ctx = Context::new();
let publisher = publish(&ctx).bind("tcp://*:5556")?;
for i in 0..10 {
let msg = format!("Message {}", i);
publisher.send(msg.as_bytes())?;
thread::sleep(Duration::from_secs(1));
}
Ok(())
}
fn subscriber() -> Result<()> {
let ctx = Context::new();
let subscriber = subscribe(&ctx)
.connect("tcp://localhost:5556")?
.subscribe("")?; // 订阅所有消息
loop {
let msg = subscriber.recv()?;
println!("Received: {:?}", String::from_utf8_lossy(&msg[0]));
}
}
fn main() -> Result<()> {
// 启动订阅者线程
let sub_thread = thread::spawn(|| {
subscriber().unwrap();
});
// 启动发布者
publisher()?;
sub_thread.join().unwrap();
Ok(())
}
TMQ为Rust开发者提供了一个强大而高效的消息队列解决方案,特别适合需要高性能消息传递的分布式系统和微服务架构。