Rust AMQP客户端库fe2o3-amqp的使用,实现高效消息队列通信与异步任务处理

fe2o3-amqp

一个基于serde和tokio的AMQP 1.0协议的Rust实现。

crate_version docs_version discord

  • 快速开始
  • 文档
  • 变更日志
  • 示例

特性标志

default = []
特性 描述
"rustls" 启用与tokio-rustlsrustls的TLS集成
"native-tls" 启用与tokio-native-tlsnative-tls的TLS集成
"acceptor" 启用ConnectionAcceptorSessionAcceptorLinkAcceptor
"transaction" 启用ControllerTransactionOwnedTransactioncontrol_link_acceptor
"scram" 启用SCRAM认证
"tracing" 启用使用tracing的日志记录
"log" 启用使用log的日志记录

快速开始

  1. 客户端
  2. 监听器
  3. WebSocket绑定

更多示例包括与Azure Service Bus一起使用的示例可以在GitHub仓库中找到。

客户端

以下是一个使用本地代理的示例,该代理在本地主机上监听。代理使用以下命令执行:

./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1

以下代码需要在依赖项中添加[tokio]异步运行时。

use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;

#[tokio::main]
async fn main() {
    // 打开连接
    let mut connection = Connection::open(
        "connection-1",                     // 容器ID
        "amqp://guest:guest@localhost:5672" // URL
    ).await.unwrap();

    // 开始会话
    let mut session = Session::begin(&mut connection).await.unwrap();

    // 创建发送者
    let mut sender = Sender::attach(
        &mut session,           // 会话
        "rust-sender-link-1",   // 链接名称
        "q1"                    // 目标地址
    ).await.unwrap();

    // 创建接收者
    let mut receiver = Receiver::attach(
        &mut session,
        "rust-receiver-link-1", // 链接名称
        "q1"                    // 源地址
    ).await.unwrap();

    // 发送消息到代理并等待结果(处置)
    let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
    outcome.accepted_or_else(|state| state).unwrap(); // 处理传递结果

    // 发送一个可批处理字段设置为true的消息
    let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
    let outcome: Outcome = fut.await.unwrap(); // 等待结果(处置)
    outcome.accepted_or_else(|state| state).unwrap(); // 处理传递结果

    // 从代理接收消息
    let delivery = receiver.recv::<String>().await.unwrap();
    receiver.accept(&delivery).await.unwrap();

    sender.close().await.unwrap(); // 使用关闭分离执行分离发送者
    receiver.close().await.unwrap(); // 使用关闭分离执行分离接收者
    session.end().await.unwrap(); // 结束会话
    connection.close().await.unwrap(); // 关闭连接
}

监听器

use tokio::net::TcpListener;
use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};

#[tokio::main]
async fn main() {
    let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
    let connection_acceptor = ConnectionAcceptor::new("example-listener");

    while let Ok((stream, addr)) = tcp_listener.accept().await {
        let mut connection = connection_acceptor.accept(stream).await.unwrap();
        let handle = tokio::spawn(async move {
            let session_acceptor = SessionAcceptor::new();
            while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
                let handle = tokio::spawn(async move {
                    let link_acceptor = LinkAcceptor::new();
                    match link_acceptor.accept(&mut session).await.unwrap() {
                        LinkEndpoint::Sender(sender) => { },
                        LinkEndpoint::Receiver(recver) => { },
                    }
                });
            }
        });
    }
}

WebSocket

WebSocket绑定需要[fe2o3-amqp-ws]

use fe2o3_amqp::{
    types::{messaging::Outcome, primitives::Value},
    Connection, Delivery, Receiver, Sender, Session,
};
use fe2o3_amqp_ws::WebSocketStream;

#[tokio::main]
async fn main() {
    let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673")
        .await
        .unwrap();
    let mut connection = Connection::builder()
        .container_id("connection-1")
        .open_with_stream(ws_stream)
        .await
        .unwrap();

    connection.close().await.unwrap();
}

更多示例

更多发送和接收的示例可以在GitHub仓库中找到。请注意,大多数示例需要运行本地代理。Windows上可以使用的一个代理是TestAmqpBroker。

WebAssembly支持

自"0.8.11"版本起,实验性支持wasm32-unknown-unknown目标,需要使用fe2o3-amqp-ws与代理建立WebSocket连接。在浏览器标签页中发送和接收消息的示例可以在examples/wasm32-in-browser中找到。

组件

名称 描述
serde_amqp_derive 自定义派生宏,用于AMQP1.0协议中定义的描述类型
serde_amqp AMQP1.0序列化器和反序列化器以及原始类型
fe2o3-amqp-types AMQP1.0数据类型
fe2o3-amqp AMQP1.0 ConnectionSessionLink的实现
fe2o3-amqp-ext 扩展类型和实现
fe2o3-amqp-ws fe2o3-amqp传输的WebSocket绑定
fe2o3-amqp-management AMQP1.0管理的实验性实现
fe2o3-amqp-cbs AMQP1.0 CBS的实验性实现

支持的最低Rust版本

1.75.0

许可证:MIT/Apache-2.0

完整示例代码

use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;

#[tokio::main]
async fn main() {
    // 建立AMQP连接
    let mut connection = Connection::open(
        "connection-1",
        "amqp://guest:guest@localhost:5672"
    ).await.unwrap();

    // 创建会话
    let mut session = Session::begin(&mut connection).await.unwrap();

    // 附加发送者链接
    let mut sender = Sender::attach(
        &mut session,
        "rust-sender-link-1",
        "q1"
    ).await.unwrap();

    // 附加接收者链接
    let mut receiver = Receiver::attach(
        &mut session,
        "rust-receiver-link-1",
        "q1"
    ).await.unwrap();

    // 发送消息并等待确认
    let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
    outcome.accepted_or_else(|state| state).unwrap();

    // 发送可批处理消息
    let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
    let outcome: Outcome = fut.await.unwrap();
    outcome.accepted_or_else(|state| state).unwrap();

    // 接收消息
    let delivery = receiver.recv::<String>().await.unwrap();
    println!("Received: {}", delivery.body());
    receiver.accept(&delivery).await.unwrap();

    // 清理资源
    sender.close().await.unwrap();
    receiver.close().await.unwrap();
    session.end().await.unwrap();
    connection.close().await.unwrap();
}

1 回复

Rust AMQP客户端库fe2o3-amqp的使用指南

概述

fe2o3-amqp是一个基于Rust语言的AMQP 1.0协议客户端库,专门为高性能消息队列通信和异步任务处理而设计。该库提供了完整的AMQP协议实现,支持消息的发布/订阅、请求/响应等模式,并充分利用Rust的异步特性实现高效并发处理。

主要特性

  • 完整的AMQP 1.0协议支持
  • 基于async/await的异步API
  • 连接池和会话管理
  • 消息确认机制
  • TLS/SSL安全连接
  • 高性能消息序列化

安装方法

在Cargo.toml中添加依赖:

[dependencies]
fe2o3-amqp = "0.8"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 建立连接和会话

use fe2o3_amqp::{
    Connection, Session, Sender, Receiver,
    types::primitives::Value,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建连接
    let mut connection = Connection::open(
        "my-connection",
        "amqp://localhost:5672"
    ).await?;

    // 创建会话
    let mut session = Session::begin(&mut connection).await?;
    
    Ok(())
}

2. 消息发送示例

// 创建发送者
let mut sender = Sender::attach(
    &mut session,
    "my-sender",
    "my-queue"
).await?;

// 发送消息
let message = fe2o3_amqp::Message::builder()
    .body("Hello AMQP!")
    .build();
    
sender.send(message).await?;

// 关闭发送者
sender.close().await?;

3. 消息接收示例

// 创建接收者
let mut receiver = Receiver::attach(
    &mut session,
    "my-receiver",
    "my-queue"
).await?;

// 接收消息
let message = receiver.recv::<String>().await?;
println!("Received: {}", message.body());

// 确认消息
receiver.accept(&message).await?;

// 关闭接收者
receiver.close().await?;

4. 异步任务处理

use fe2o3_amqp::types::primitives::Value;

async fn process_messages(mut receiver: Receiver) {
    while let Ok(message) = receiver.recv::<Value>().await {
        println!("Processing message: {:?}", message.body());
        
        // 模拟异步处理
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        
        receiver.accept(&message).await.unwrap();
    }
}

5. 错误处理和重连机制

use fe2o3_amqp::error::Error;

async fn create_connection_with_retry() -> Result<Connection, Error> {
    let mut attempts = 0;
    loop {
        match Connection::open("retry-connection", "amqp://localhost:5672").await {
            Ok(conn) => return Ok(conn),
            Err(e) => {
                attempts += 1;
                if attempts > 5 {
                    return Err(e);
                }
                tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
            }
        }
    }
}

高级特性

消息属性设置

let message = fe2o3_amqp::Message::builder()
    .body("Important message")
    .message_id("12345")
    .content_type("text/plain")
    .subject("notification")
    .reply_to("response-queue")
    .build();

批量消息处理

// 批量发送
let messages = vec![
    Message::builder().body("msg1").build(),
    Message::builder().body("msg2").build(),
];

for message in messages {
    sender.send(message).await?;
}

// 批量接收
let batch = receiver.recv_batch::<String>(10).await?;
for message in batch {
    println!("Batch message: {}", message.body());
    receiver.accept(&message).await?;
}

完整示例demo

use fe2o3_amqp::{
    Connection, Session, Sender, Receiver,
    types::primitives::Value,
    error::Error,
};
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建带重试机制的连接
    let mut connection = create_connection_with_retry().await?;
    
    // 创建会话
    let mut session = Session::begin(&mut connection).await?;
    
    // 创建发送者
    let mut sender = Sender::attach(
        &mut session,
        "demo-sender",
        "demo-queue"
    ).await?;

    // 发送带属性的消息
    let message = fe2o3_amqp::Message::builder()
        .body("Hello AMQP from demo!")
        .message_id("demo-001")
        .content_type("text/plain")
        .subject("demo-message")
        .build();
    
    sender.send(message).await?;
    println!("Message sent successfully");

    // 创建接收者
    let mut receiver = Receiver::attach(
        &mut session,
        "demo-receiver",
        "demo-queue"
    ).await?;

    // 异步处理消息
    let handle = tokio::spawn(process_messages(receiver));

    // 发送批量消息
    let messages = vec![
        fe2o3_amqp::Message::builder().body("batch-msg-1").build(),
        fe2o3_amqp::Message::builder().body("batch-msg-2").build(),
        fe2o3_amqp::Message::builder().body("batch-msg-3").build(),
    ];

    for message in messages {
        sender.send(message).await?;
    }
    println!("Batch messages sent");

    // 等待消息处理完成
    handle.await?;

    // 清理资源
    sender.close().await?;
    session.end().await?;
    connection.close().await?;

    Ok(())
}

async fn process_messages(mut receiver: Receiver) {
    while let Ok(message) = receiver.recv::<String>().await {
        println!("Processing message: {}", message.body());
        
        // 模拟异步处理任务
        sleep(Duration::from_secs(1)).await;
        
        // 确认消息处理完成
        receiver.accept(&message).await.unwrap();
        println!("Message processed and accepted");
    }
}

async fn create_connection_with_retry() -> Result<Connection, Error> {
    let mut attempts = 0;
    loop {
        match Connection::open("demo-connection", "amqp://localhost:5672").await {
            Ok(conn) => {
                println!("Connection established successfully");
                return Ok(conn);
            },
            Err(e) => {
                attempts += 1;
                println!("Connection attempt {} failed: {}", attempts, e);
                if attempts > 3 {
                    return Err(e);
                }
                sleep(Duration::from_secs(2)).await;
            }
        }
    }
}

性能优化建议

  1. 连接池管理:重用连接和会话以减少开销
  2. 批量操作:使用批量发送和接收提高吞吐量
  3. 适当的预取设置:调整预取计数以平衡延迟和吞吐量
  4. 异步处理:使用tokio的spawn来并行处理消息

注意事项

  • 确保AMQP broker(如RabbitMQ或ActiveMQ)已正确配置
  • 处理网络中断和重连逻辑
  • 适当处理消息确认和拒绝
  • 监控内存使用,特别是在处理大量消息时

这个库为Rust开发者提供了强大的AMQP消息队列功能,特别适合需要高性能异步消息处理的应用程序。

回到顶部