Rust消息队列库TMQ的使用,TMQ提供高性能异步消息处理与发布订阅功能

Rust消息队列库TMQ的使用,TMQ提供高性能异步消息处理与发布订阅功能

TMQ是一个将Tokio和ZeroMQ桥接起来的Rust库,允许在异步环境中使用ZeroMQ。

当前支持的Socket类型

  • Request/Reply
  • Publish/Subscribe
  • Dealer/Router
  • Push/Pull

使用示例

发布(Publish)

发布消息给所有连接的订阅者:

use tmq::{publish, Context, Result};
use futures::SinkExt;
use log::info;
use std::env;
use std::time::Duration;
use tokio::time::delay_for;

#[tokio::main]
async fn main() -> Result<()> {
    // 创建发布socket并绑定到本地端口
    let mut socket = publish(&Context::new()).bind("tcp://127.0.0.1:7899")?;

    let mut i = 0;

    loop {
        i += 1;

        // 发送消息(包含主题和内容)
        socket
            .send(vec!["topic", &format!("Broadcast #{}", i)])
            .await?;

        // 每秒发送一次
        delay_for(Duration::from_secs(1)).await;
    }
}

订阅(Subscribe)

订阅socket是一个Stream,从发布socket读取值。使用subscribe方法指定过滤前缀,""表示接收所有消息:

use futures::StreamExt;
use tmq::{subscribe, Context, Result};
use std::env;

#[tokio::main]
async fn main() -> Result<()> {
    // 创建订阅socket并连接到发布者
    let mut socket = subscribe(&Context::new())
        .connect("tcp://127.0.0.1:7899")?
        .subscribe(b"topic")?;  // 只订阅"topic"前缀的消息

    // 循环接收消息
    while let Some(msg) = socket.next().await {
        println!(
            "Subscribe: {:?}",
            msg?.iter()
                .map(|item| item.as_str().unwrap_or("invalid text"))
                .collect::<Vec<&str>>()
        );
    }
    Ok(())
}

完整示例

下面是一个完整的发布-订阅模式示例,包含发布者和订阅者:

// 发布者
use tmq::{publish, Context, Result};
use futures::SinkExt;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn publisher() -> Result<()> {
    let mut socket = publish(&Context::new()).bind("tcp://127.0.0.1:7899")?;
    
    for i in 1..=5 {
        socket
            .send(vec!["news", &format!("Breaking news #{}", i)])
            .await?;
        println!("Published: Breaking news #{}", i);
        sleep(Duration::from_secs(2)).await;
    }
    
    Ok(())
}

// 订阅者
use tmq::{subscribe, Context, Result};
use futures::StreamExt;

#[tokio::main] 
async fn subscriber() -> Result<()> {
    let mut socket = subscribe(&Context::new())
        .connect("tcp://127.0.0.1:7899")?
        .subscribe(b"news")?;
        
    while let Some(msg) = socket.next().await {
        let msg = msg?;
        let content = msg.get(1).and_then(|m| m.as_str()).unwrap_or("");
        println!("Received: {}", content);
    }
    
    Ok(())
}

// 主函数中同时运行发布者和订阅者
#[tokio::main]
async fn main() {
    tokio::spawn(async {
        publisher().await.unwrap();
    });
    
    subscriber().await.unwrap();
}

这个示例展示了TMQ的基本发布订阅功能,发布者每2秒发送一条新闻消息,订阅者接收并打印这些消息。


1 回复

Rust消息队列库TMQ的使用指南

TMQ是一个高性能的Rust异步消息队列库,提供了消息处理和发布/订阅功能。它专为需要高效消息传递的应用程序设计,具有低延迟和高吞吐量的特点。

主要特性

  • 异步消息处理
  • 发布/订阅模式支持
  • 高性能设计
  • 线程安全
  • 易于使用的API

安装

在Cargo.toml中添加依赖:

[dependencies]
tmq = "0.5"  # 请使用最新版本

基本使用方法

创建消息队列

use tmq::{Context, Result};

fn main() -> Result<()> {
    // 创建新的上下文
    let ctx = Context::new();
    // 创建消息队列
    let queue = tmq::queue(&ctx)?;
    Ok(())
}

发布消息

use tmq::{publish, Context};

let ctx = Context::new();
// 创建发布者并绑定到指定地址
let publisher = publish(&ctx).bind("tcp://*:5556")?;

// 发送消息
publisher.send("Hello, subscribers!".as_bytes())?;

订阅消息

use tmq::{subscribe, Context};

let ctx = Context::new();
// 创建订阅者并连接到发布者地址
let subscriber = subscribe(&ctx).connect("tcp://localhost:5556")?;

// 循环接收消息
loop {
    let message = subscriber.recv()?;
    println!("Received: {:?}", message);
}

高级功能

多主题订阅

let subscriber = subscribe(&ctx)
    .connect("tcp://localhost:5556")?
    // 订阅主题1
    .subscribe("topic1")?
    // 订阅主题2
    .subscribe("topic2")?;

异步处理

use tmq::{subscribe, Context};
use tokio::runtime::Runtime;

// 创建Tokio运行时
let rt = Runtime::new()?;
let ctx = Context::new();
let subscriber = subscribe(&ctx).connect("tcp://localhost:5556")?;

// 异步接收消息
rt.block_on(async {
    loop {
        let message = subscriber.recv_async().await?;
        println!("Async received: {:?}", message);
    }
});

消息批处理

let publisher = publish(&ctx).bind("tcp://*:5556")?;

// 批量消息
let messages = vec![
    "message1".as_bytes(),
    "message2".as_bytes(),
    "message3".as_bytes(),
];

// 批量发送
for msg in messages {
    publisher.send(msg)?;
}

性能优化建议

  1. 重用Context对象而不是频繁创建新的
  2. 对于高吞吐量场景,考虑使用批处理发送
  3. 在可能的情况下使用异步API
  4. 合理设置缓冲区大小

错误处理

TMQ操作返回Result类型,建议妥善处理错误:

match subscriber.recv() {
    Ok(msg) => println!("Got message: {:?}", msg),
    Err(e) => eprintln!("Error receiving message: {}", e),
}

完整示例代码

下面是一个完整的发布者/订阅者示例:

use tmq::{publish, subscribe, Context, Result};
use std::thread;
use std::time::Duration;

fn publisher() -> Result<()> {
    let ctx = Context::new();
    let publisher = publish(&ctx).bind("tcp://*:5556")?;
    
    for i in 0..10 {
        let msg = format!("Message {}", i);
        publisher.send(msg.as_bytes())?;
        thread::sleep(Duration::from_secs(1));
    }
    
    Ok(())
}

fn subscriber() -> Result<()> {
    let ctx = Context::new();
    let subscriber = subscribe(&ctx)
        .connect("tcp://localhost:5556")?
        .subscribe("")?;  // 订阅所有消息
    
    loop {
        let msg = subscriber.recv()?;
        println!("Received: {:?}", String::from_utf8_lossy(&msg[0]));
    }
}

fn main() -> Result<()> {
    // 启动订阅者线程
    let sub_thread = thread::spawn(|| {
        subscriber().unwrap();
    });
    
    // 启动发布者
    publisher()?;
    
    sub_thread.join().unwrap();
    Ok(())
}

TMQ为Rust开发者提供了一个强大而高效的消息队列解决方案,特别适合需要高性能消息传递的分布式系统和微服务架构。

回到顶部