Rust AMQP扩展库fe2o3-amqp-ext的使用:高性能异步消息队列协议实现与扩展功能

fe2o3-amqp-ext

fe2o3-amqp-extfe2o3-amqp的扩展库,提供了高性能的异步AMQP(高级消息队列协议)实现和扩展功能。

许可证: MIT/Apache-2.0

安装

在项目目录中运行以下Cargo命令:

cargo add fe2o3-amqp-ext

或者在Cargo.toml中添加以下行:

fe2o3-amqp-ext = "0.14.0"

示例代码

以下是一个使用fe2o3-amqp-ext的完整示例,展示了如何创建AMQP连接、发送和接收消息:

use fe2o3_amqp::{
    connection::Connection,
    session::Session,
    sender::Sender,
    receiver::Receiver,
    types::primitives::Value,
};
use fe2o3_amqp_ext::SharedConnection; // 扩展功能

#[tokio::main]
async fn main() {
    // 创建AMQP连接
    let mut connection = Connection::builder()
        .container_id("example-container")
        .open("amqp://localhost:5672")
        .await
        .unwrap();

    // 创建会话
    let mut session = Session::begin(&mut connection).await.unwrap();

    // 创建发送者
    let mut sender = Sender::attach(&mut session, "example-sender", "example-queue")
        .await
        .unwrap();

    // 发送消息
    let message = Value::String("Hello AMQP!".to_string());
    sender.send(message).await.unwrap();
    println!("Message sent");

    // 创建接收者
    let mut receiver = Receiver::attach(&mut session, "example-receiver", "example-queue")
        .await
        .unwrap();

    // 接收消息
    let received = receiver.recv::<Value>().await.unwrap();
    println!("Received: {:?}", received);

    // 使用扩展的共享连接功能
    let shared_conn = SharedConnection::new(connection);
    let mut conn1 = shared_conn.clone();
    let mut conn2 = shared_conn.clone();

    // 可以在多个任务中使用共享连接
    tokio::spawn(async move {
        let mut session = Session::begin(&mut conn1).await.unwrap();
        // 使用连接1...
    });

    tokio::spawn(async move {
        let mut session = Session::begin(&mut conn2).await.unwrap();
        // 使用连接2...
    });

    // 关闭会话和连接
    session.end().await.unwrap();
    connection.close().await.unwrap();
}

主要功能

  1. 高性能异步AMQP协议实现
  2. 扩展功能如共享连接(SharedConnection)
  3. 支持AMQP 1.0协议
  4. 提供发送者和接收者API
  5. 支持多种消息类型

完整示例代码

以下是一个更完整的示例,展示了如何使用fe2o3-amqp-ext进行错误处理和消息确认:

use fe2o3_amqp::{
    connection::Connection,
    session::Session,
    sender::Sender,
    receiver::Receiver,
    types::{
        primitives::Value,
        messaging::{Outcome, DeliveryState},
    },
    DeliveryTag,
};
use fe2o3_amqp_ext::SharedConnection;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建AMQP连接
    let mut connection = Connection::builder()
        .container_id("example-container")
        .open("amqp://localhost:5672")
        .await?;

    // 创建会话
    let mut session = Session::begin(&mut connection).await?;

    // 创建发送者
    let mut sender = Sender::attach(&mut session, "example-sender", "example-queue")
        .await?;

    // 发送多条消息
    for i in 0..5 {
        let message = Value::String(format!("Message {}", i));
        sender.send(message).await?;
        println!("Sent message {}", i);
        sleep(Duration::from_millis(100)).await;
    }

    // 创建接收者
    let mut receiver = Receiver::attach(&mut session, "example-receiver", "example-queue")
        .await?;

    // 接收并处理消息
    while let Ok(delivery) = receiver.recv::<Value>().await {
        println!("Received: {:?}", delivery);
        
        // 确认消息
        let outcome = Outcome::Accepted;
        let state = DeliveryState::from(outcome);
        receiver.disposition(DeliveryTag(delivery.tag), state).await?;
    }

    // 使用共享连接
    let shared_conn = SharedConnection::new(connection);
    let mut conn1 = shared_conn.clone();
    let mut conn2 = shared_conn.clone();

    let task1 = tokio::spawn(async move {
        let mut session = Session::begin(&mut conn1).await.unwrap();
        // 使用连接1发送消息...
    });

    let task2 = tokio::spawn(async move {
        let mut session = Session::begin(&mut conn2).await.unwrap();
        // 使用连接2接收消息...
    });

    let _ = tokio::join!(task1, task2);

    // 关闭会话和连接
    session.end().await?;
    connection.close().await?;

    Ok(())
}

1 回复

Rust AMQP扩展库fe2o3-amqp-ext的使用指南

概述

fe2o3-amqp-ext是一个基于Rust的高性能异步AMQP(高级消息队列协议)扩展库,提供了对AMQP协议的核心实现和扩展功能。该库构建在tokio异步运行时之上,专为需要高性能消息队列通信的应用程序设计。

主要特性

  • 完全异步的AMQP 1.0协议实现
  • 支持消息发布/订阅模式
  • 提供连接池管理
  • 支持自定义扩展功能
  • 高性能的消息处理
  • 完善的错误处理机制

安装

在Cargo.toml中添加依赖:

[dependencies]
fe2o3-amqp-ext = "0.1"  # 请使用最新版本
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 创建连接

use fe2o3_amqp_ext::{Connection, ConnectionOptions};

#[tokio::main]
async fn main() {
    let options = ConnectionOptions::builder()
        .host("localhost")
        .port(5672)
        .username("guest")
        .password("guest")
        .container_id("rust-client")
        .build();
    
    let connection = Connection::connect(options).await.unwrap();
    println!("Connected to AMQP broker!");
    
    // ... 使用连接
    
    connection.close().await.unwrap();
}

2. 创建会话和发送消息

use fe2o3_amqp_ext::{Session, Sender, Receiver};
use fe2o3_amqp_ext::types::messaging::{Message, Data};

#[tokio::main]
async fn main() {
    // 先建立连接...
    let connection = /* 如前面示例 */;
    
    // 创建会话
    let session = Session::begin(&connection).await.unwrap();
    
    // 创建发送者
    let sender = Sender::attach(&session, "rust-sender", "example-queue").await.unwrap();
    
    // 发送消息
    let message = Message::builder()
        .body(Data::from("Hello AMQP!"))
        .build();
    sender.send(message).await.unwrap();
    
    // 创建接收者
    let receiver = Receiver::attach(&session, "rust-receiver", "example-queue").await.unwrap();
    
    // 接收消息
    let received = receiver.recv::<String>().await.unwrap();
    println!("Received: {}", received);
    
    // 关闭资源
    sender.close().await.unwrap();
    receiver.close().await.unwrap();
    session.end().await.unwrap();
    connection.close().await.unwrap();
}

高级功能

1. 使用连接池

use fe2o3_amqp_ext::{ConnectionPool, ConnectionPoolOptions};

#[tokio::main]
async fn main() {
    let pool_options = ConnectionPoolOptions::builder()
        .host("localhost")
        .port(5672)
        .username("guest")
        .password("guest")
        .max_size(5)
        .build();
    
    let pool = ConnectionPool::new(pool_options);
    
    // 从池中获取连接
    let connection = pool.get().await.unwrap();
    
    // 使用连接...
    
    // 连接会自动返回池中
}

2. 自定义扩展功能

use fe2o3_amqp_ext::{Connection, Extension, ExtensionRegistry};

// 自定义扩展
struct MyCustomExtension;

impl Extension for MyCustomExtension {
    fn name(&self) -> &str {
        "my-custom-extension"
    }
    
    // 实现其他扩展方法...
}

#[tokio::main]
async fn main() {
    let mut registry = ExtensionRegistry::new();
    registry.register(MyCustomExtension);
    
    let options = ConnectionOptions::builder()
        // ...其他配置
        .extensions(registry)
        .build();
    
    let connection = Connection::connect(options).await.unwrap();
    
    // 现在连接支持自定义扩展功能
}

3. 批量消息处理

use fe2o3_amqp_ext::{Sender, Receiver, BatchOptions};
use fe2o3_amqp_ext::types::messaging::Message;

#[tokio::main]
async fn main() {
    // 建立连接和会话...
    
    let sender = Sender::attach(&session, "batch-sender", "batch-queue").await.unwrap();
    
    // 批量发送
    let messages = vec![
        Message::builder().body("Message 1").build(),
        Message::builder().body("Message 2").build(),
        Message::builder().body("Message 3").build(),
    ];
    
    let batch_options = BatchOptions::default()
        .max_size(10)
        .timeout(std::time::Duration::from_secs(1));
    
    sender.send_batch(messages, batch_options).await.unwrap();
    
    // 批量接收
    let receiver = Receiver::attach(&session, "batch-receiver", "batch-queue").await.unwrap();
    
    let batch = receiver.recv_batch::<String>(3).await.unwrap();
    for msg in batch {
        println!("Received: {}", msg);
    }
    
    // 关闭资源...
}

错误处理

use fe2o3_amqp_ext::{Connection, ConnectionError};

#[tokio::main]
async fn main() -> Result<(), ConnectionError> {
    let options = ConnectionOptions::builder()
        .host("localhost")
        .port(5672)
        .build();
    
    let connection = match Connection::connect(options).await {
        Ok(conn) => conn,
        Err(e) => {
            eprintln!("Failed to connect: {}", e);
            return Err(e);
        }
    };
    
    // 使用连接...
    
    connection.close().await?;
    Ok(())
}

性能优化建议

  1. 重用连接和会话对象
  2. 使用连接池管理连接
  3. 对大量小消息使用批量操作
  4. 根据场景调整预取计数(prefetch)
  5. 合理设置心跳间隔

fe2o3-amqp-ext库为Rust开发者提供了强大的AMQP协议支持,特别适合构建高性能、可靠的分布式消息系统。

完整示例代码

下面是一个完整的消息生产者和消费者的示例:

use fe2o3_amqp_ext::{
    Connection, ConnectionOptions, 
    Session, Sender, Receiver,
    types::messaging::{Message, Data}
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 创建连接
    let options = ConnectionOptions::builder()
        .host("localhost")
        .port(5672)
        .username("guest")
        .password("guest")
        .container_id("full-demo-client")
        .build();
    
    let connection = Connection::connect(options).await?;
    println!("Connected to AMQP broker!");

    // 2. 创建会话
    let session = Session::begin(&connection).await?;

    // 3. 创建发送者并发送消息
    let sender = Sender::attach(&session, "full-demo-sender", "demo-queue").await?;
    
    let message = Message::builder()
        .body(Data::from("This is a full demo message"))
        .subject("demo")
        .build();
    
    sender.send(message).await?;
    println!("Message sent successfully");

    // 4. 创建接收者并接收消息
    let receiver = Receiver::attach(&session, "full-demo-receiver", "demo-queue").await?;
    
    let received = receiver.recv::<String>().await?;
    println!("Received message: {}", received);

    // 5. 关闭所有资源
    sender.close().await?;
    receiver.close().await?;
    session.end().await?;
    connection.close().await?;

    Ok(())
}

这个完整示例展示了:

  1. 如何建立AMQP连接
  2. 如何创建会话
  3. 如何发送消息
  4. 如何接收消息
  5. 如何正确关闭所有资源

您可以根据实际需求修改队列名称、消息内容和其他参数。

回到顶部