Rust消息队列库Pulsar的使用:高性能分布式消息传递与事件流处理

Rust消息队列库Pulsar的使用:高性能分布式消息传递与事件流处理

pulsar-rs是一个基于Future的Rust客户端,用于Apache Pulsar。它不依赖C++ Pulsar库,提供了基于async/await的API,兼容Tokio和async-std。

特性

  • 基于URL(pulsar://pulsar+ssl://)的连接,支持DNS查找
  • 多主题消费者(基于正则表达式或列表)
  • TLS连接
  • 可配置的执行器(Tokio或async-std)
  • 具有指数退避的自动重连
  • 消息批处理
  • 使用LZ4、zlib、zstd或Snappy压缩(可通过Cargo特性禁用)
  • 使用tracing crate进行遥测(可通过Cargo特性激活)

快速开始

在Cargo.toml中添加以下依赖:

futures = "0.3"
pulsar = "5.1"
tokio = "1.0"

示例代码

生产者示例

use futures::StreamExt;
use pulsar::{
    producer, 
    Pulsar, 
    TokioExecutor
};

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    // 创建Pulsar客户端
    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
    
    // 创建生产者
    let mut producer = pulsar
        .producer()
        .with_topic("non-persistent://public/default/my-topic")
        .with_name("my-producer")
        .with_options(producer::ProducerOptions {
            batch_size: Some(100),
            ..Default::default()
        })
        .build()
        .await?;
    
    // 发送消息
    for i in 0..10 {
        let msg = format!("message-{}", i);
        producer
            .send(msg)
            .await?
            .await
            .unwrap();
    }
    
    Ok(())
}

消费者示例

use futures::StreamExt;
use pulsar::{
    consumer::InitialPosition,
    Consumer, 
    Pulsar, 
    SubType, 
    TokioExecutor
};

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    // 创建Pulsar客户端
    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
    
    // 创建消费者
    let mut consumer: Consumer<_, _> = pulsar
        .consumer()
        .with_topic("non-persistent://public/default/my-topic")
        .with_consumer_name("my-consumer")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("my-subscription")
        .with_options(consumer::ConsumerOptions {
            initial_position: InitialPosition::Latest,
            ..Default::default()
        })
        .build()
        .await?;
    
    // 接收消息
    while let Some(msg) = consumer.next().await {
        let msg = msg?;
        println!("Received message: {:?}", msg.deserialize());
        consumer.ack(&msg).await?;
    }
    
    Ok(())
}

完整示例代码

完整生产者实现

use futures::StreamExt;
use pulsar::{
    producer, 
    Pulsar, 
    TokioExecutor,
    SerializeMessage
};

// 自定义消息结构
#[derive(Serialize)]
struct MyMessage {
    id: u64,
    content: String,
    timestamp: i64,
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    // 1. 创建Pulsar客户端连接
    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor)
        .build()
        .await?;
    
    // 2. 配置生产者选项
    let producer_options = producer::ProducerOptions {
        batch_size: Some(100),    // 批处理大小
        chunking: true,           // 启用消息分块
        ..Default::default()
    };
    
    // 3. 创建生产者实例
    let mut producer = pulsar
        .producer()
        .with_topic("persistent://public/default/rust-topic")
        .with_name("rust-producer")
        .with_options(producer_options)
        .build()
        .await?;
    
    // 4. 发送消息
    for i in 1..=50 {
        let message = MyMessage {
            id: i,
            content: format!("这是第{}条消息", i),
            timestamp: chrono::Utc::now().timestamp(),
        };
        
        // 序列化并发送消息
        let send_result = producer
            .send_serde_json(&message)
            .await?;
            
        // 等待发送确认
        send_result.await?;
        println!("已发送消息ID: {}", i);
    }
    
    // 5. 关闭生产者
    producer.close().await?;
    println!("生产者已关闭");
    
    Ok(())
}

完整消费者实现

use futures::StreamExt;
use pulsar::{
    consumer::InitialPosition,
    Consumer, 
    Pulsar, 
    SubType, 
    TokioExecutor,
    DeserializeMessage
};
use serde::Deserialize;

// 定义与生产者相同的消息结构
#[derive(Debug, Deserialize)]
struct MyMessage {
    id: u64,
    content: String,
    timestamp: i64,
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    // 1. 创建Pulsar客户端连接
    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor)
        .build()
        .await?;
    
    // 2. 配置消费者选项
    let consumer_options = pulsar::consumer::ConsumerOptions {
        initial_position: InitialPosition::Earliest,  // 从最早的消息开始消费
        read_compacted: false,                       // 不读取压缩数据
        ..Default::default()
    };
    
    // 3. 创建消费者实例
    let mut consumer: Consumer<_, _> = pulsar
        .consumer()
        .with_topic("persistent://public/default/rust-topic")
        .with_consumer_name("rust-consumer")
        .with_subscription_type(SubType::Shared)     // 共享订阅模式
        .with_subscription("rust-subscription")
        .with_options(consumer_options)
        .build()
        .await?;
    
    println!("消费者已启动,等待消息...");
    
    // 4. 消费消息
    while let Some(msg) = consumer.next().await {
        match msg {
            Ok(msg) => {
                // 反序列化消息
                let payload: MyMessage = msg.deserialize()?;
                println!(
                    "收到消息 [ID: {}]: {}, 时间: {}",
                    payload.id,
                    payload.content,
                    chrono::DateTime::from_timestamp(payload.timestamp, 0)
                        .unwrap()
                        .to_rfc2822()
                );
                
                // 确认消息
                consumer.ack(&msg).await?;
            }
            Err(e) => {
                eprintln!("处理消息时出错: {:?}", e);
                consumer.nack(&msg).await?;
            }
        }
    }
    
    Ok(())
}

项目维护者

许可证

这个库在MIT许可和Apache许可(2.0版)的条款下获得许可,并且可能包含由第三方编写的包,这些包带有它们自己的版权声明和许可条款。

历史

这个项目最初是由@stearnsc和其他人在2018年在Wyyerd创建的。后来在2022年,原始创建者决定将存储库转移到StreamNative。目前,这个项目在StreamNative组织下积极维护。


1 回复

Rust消息队列库Pulsar的使用:高性能分布式消息传递与事件流处理

基本介绍

Pulsar是一个结合了传统消息队列和流处理系统优点的高性能分布式消息平台,主要特性包括:

  • 多租户支持
  • 持久化消息存储
  • 低延迟
  • 水平扩展能力
  • 多种订阅模式(独占、共享、故障转移)

安装

在Cargo.toml中添加依赖:

[dependencies]
pulsar = "4.0"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 生产者发送消息示例

use pulsar::{
    producer, 
    Pulsar, 
    TokioExecutor,
    message::Payload
};
use serde::{Serialize, Deserialize};

// 定义消息数据结构
#[derive(Serialize, Deserialize, Debug)]
struct TestData {
    id: u32,
    name: String,
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    // 连接Pulsar服务
    let addr = "pulsar://localhost:6650";
    let pulsar = Pulsar::builder(addr, TokioExecutor).build().await?;
    
    // 创建生产者
    let mut producer = pulsar
        .producer()
        .with_topic("test-topic")
        .with_name("test-producer")
        .with_options(producer::ProducerOptions {
            ..Default::default()
        })
        .build()
        .await?;
    
    // 准备数据
    let data = TestData {
        id: 1,
        name: "test".to_string(),
    };
    let payload = serde_json::to_vec(&data).unwrap();
    
    // 发送消息
    let message_id = producer
        .send(payload)
        .await?
        .await?;
    
    println!("消息已发布,ID: {:?}", message_id);
    
    Ok(())
}

2. 消费者接收消息示例

use pulsar::{
    consumer, 
    Consumer, 
    Pulsar, 
    SubType, 
    TokioExecutor,
    message::Payload
};
use serde::{Serialize, Deserialize};

// 定义与生产者相同的消息结构
#[derive(Serialize, Deserialize, Debug)]
struct TestData {
    id: u32,
    name: String,
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    // 连接Pulsar服务
    let addr = "pulsar://localhost:6650";
    let pulsar = Pulsar::builder(addr, TokioExecutor).build().await?;
    
    // 创建消费者
    let mut consumer: Consumer<TestData, _> = pulsar
        .consumer()
        .with_topic("test-topic")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("test-subscription")
        .with_options(consumer::ConsumerOptions::default())
        .build()
        .await?;
    
    // 持续接收消息
    while let Some(msg) = consumer.try_next().await? {
        println!("收到消息: {:?}", msg.deserialize().unwrap());
        consumer.ack(&msg).await?;  // 确认消息已处理
    }
    
    Ok(())
}

高级功能示例

1. 批量消息发送

use pulsar::producer::Message;

// ...省略前面的Pulsar连接和生产者创建代码...

// 准备批量消息
let messages = vec![
    Message {
        payload: serde_json::to_vec(&TestData { id: 1, name: "msg1".to_string() }).unwrap(),
        ..Default::default()
    },
    Message {
        payload: serde_json::to_vec(&TestData { id: 2, name: "msg2".to_string() }).unwrap(),
        ..Default::default()
    },
];

// 批量发送
let message_ids = producer.send_all(messages).await?;
println!("批量发送完成,消息IDs: {:?}", message_ids);

2. 使用Avro Schema

use pulsar::producer::Message;
use pulsar::schema::AvroSchema;
use avro_rs::types::Value;

// 定义Avro schema
let schema = r#"
    {
        "type": "record",
        "name": "TestData",
        "fields": [
            {"name": "id", "type": "int"},
            {"name": "name", "type": "string"}
        ]
    }
"#;

// 创建带Schema的生产者
let mut producer = pulsar
    .producer()
    .with_topic("schema-topic")
    .with_schema(AvroSchema::new(schema))
    .build()
    .await?;

// 创建Avro格式的消息
let value = Value::Record(vec![
    ("id".to_string(), Value::Int(1)),
    ("name".to_string(), Value::String("test".to_string())),
]);

// 发送Avro格式消息
producer.send(value).await?;

3. 多主题订阅示例

// 创建可以消费多个主题的消费者
let mut consumer = pulsar
    .consumer()
    .with_topics(vec!["orders", "payments"])  // 订阅两个主题
    .with_subscription_type(SubType::Shared)  // 使用共享订阅模式
    .with_subscription("multi-topic-sub")
    .build()
    .await?;

// 处理来自不同主题的消息
while let Some(msg) = consumer.try_next().await? {
    match msg.topic() {
        "orders" => println!("收到订单消息: {:?}", msg.deserialize::<Order>()?),
        "payments" => println!("收到支付消息: {:?}", msg.deserialize::<Payment>()?),
        _ => println!("收到未知主题消息")
    }
    consumer.ack(&msg).await?;
}

完整生产-消费示例

use pulsar::{
    producer, consumer,
    Pulsar, Consumer, SubType, TokioExecutor,
    message::Payload
};
use serde::{Serialize, Deserialize};
use tokio::time::{sleep, Duration};

#[derive(Serialize, Deserialize, Debug)]
struct OrderEvent {
    order_id: String,
    amount: f64,
    items: Vec<String>,
}

// 生产者任务
async fn produce_orders(pulsar: Pulsar<TokioExecutor>) -> Result<(), pulsar::Error> {
    let mut producer = pulsar
        .producer()
        .with_topic("orders")
        .build()
        .await?;

    for i in 1..=5 {
        let order = OrderEvent {
            order_id: format!("ORD-{}", i),
            amount: 100.0 * i as f64,
            items: vec![format!("item{}", i), format!("item{}", i+1)]
        };
        
        producer.send(serde_json::to_vec(&order)?).await??;
        println!("已发送订单: {:?}", order);
        sleep(Duration::from_secs(1)).await;
    }
    
    Ok(())
}

// 消费者任务
async fn consume_orders(pulsar: Pulsar<TokioExecutor>) -> Result<(), pulsar::Error> {
    let mut consumer: Consumer<OrderEvent, _> = pulsar
        .consumer()
        .with_topic("orders")
        .with_subscription("order-processor")
        .build()
        .await?;

    while let Some(msg) = consumer.try_next().await? {
        let order = msg.deserialize()?;
        println!("处理订单: {:?}", order);
        // 模拟订单处理
        sleep(Duration::from_millis(500)).await;
        consumer.ack(&msg).await?;
    }
    
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    let pulsar = Pulsar::builder("pulsar://localhost:6650", TokioExecutor).build().await?;
    
    // 启动生产者和消费者任务
    let producer_task = tokio::spawn(produce_orders(pulsar.clone()));
    let consumer_task = tokio::spawn(consume_orders(pulsar));
    
    let _ = tokio::try_join!(producer_task, consumer_task)?;
    
    Ok(())
}

性能优化建议

  1. 批量发送:使用send_all批量发送消息减少网络开销
  2. 连接复用:重用Pulsar客户端实例避免重复连接
  3. 异步处理:对消息处理逻辑使用异步任务提高吞吐量
  4. 合理配置:根据消息大小调整生产者和消费者的缓冲区大小

错误处理最佳实践

async fn safe_send(producer: &mut pulsar::Producer, data: &TestData) {
    let payload = match serde_json::to_vec(data) {
        Ok(p) => p,
        Err(e) => {
            eprintln!("序列化失败: {}", e);
            return;
        }
    };
    
    match producer.send(payload).await {
        Ok(msg_future) => {
            match msg_future.await {
                Ok(id) => println!("消息发送成功: {:?}", id),
                Err(e) => eprintln!("消息确认失败: {}", e),
            }
        },
        Err(e) => {
            eprintln!("消息发送失败: {}", e);
            // 这里可以添加重试逻辑
        }
    }
}
回到顶部