Flutter AMQP协议通信插件dart_amqp的使用

发布于 1周前 作者 caililin 来自 Flutter

Flutter AMQP协议通信插件dart_amqp的使用

dart_amqp简介

dart_amqp 是一个用于Dart语言的AMQP 0.9.1协议客户端库,支持异步API、多种认证方式、TLS连接、消息确认和心跳检测等功能。它适用于与RabbitMQ等AMQP兼容的消息队列服务器进行交互。

主要特性

  • 异步API:基于Futures和Streams。
  • 认证方式:支持PLAIN和AMQPLAIN,其他认证方案可通过实现接口添加。
  • 协议支持:实现了整个AMQP 0.9.1协议(除basic get和recover-async)。
  • 连接安全:支持明文和TLS连接。
  • 消息确认:支持发布确认。
  • 心跳机制:支持心跳检测以维持连接活跃。

注意事项

目前该驱动程序在重新建立连接时还不支持恢复客户端拓扑结构,这一功能可能会在未来版本中实现。

快速开始

监听队列

以下代码展示了如何监听名为“hello”的队列,并处理接收到的消息:

import "package:dart_amqp/dart_amqp.dart";

void main() async {
  Client client = Client();

  Channel channel = await client.channel(); // 自动连接到本地服务器,默认凭据为guest
  Queue queue = await channel.queue("hello");
  Consumer consumer = await queue.consume();
  consumer.listen((AmqpMessage message) {
    // 获取负载作为字符串
    print(" [x] Received string: ${message.payloadAsString}");

    // 或反序列化为json
    print(" [x] Received json: ${message.payloadAsJson}");

    // 或直接获取原始数据为Uint8List
    print(" [x] Received raw: ${message.payload}");

    // 消息对象包含回复、确认和拒绝的帮助方法
    message.reply("world");
  });
}

发送消息通过交换机

下面的示例演示了如何通过FANOUT类型的交换机发送一条测试消息:

import "package:dart_amqp/dart_amqp.dart";

void main() async {

  // 可以提供设置对象来覆盖默认连接设置
  ConnectionSettings settings = ConnectionSettings(
    host: "remote.amqp.server.com",
    authProvider: PlainAuthenticator("user", "pass")
  );
  Client client = Client(settings: settings);

  Channel channel = await client.channel();
  Exchange exchange = await channel.exchange("logs", ExchangeType.FANOUT);
  // 对于FANOUT类型,我们不关心路由键
  exchange.publish("Testing 1-2-3", null);
  client.close();
}

API文档

完整的API文档请参阅官方GitHub仓库

RPC调用

对于需要RPC功能的应用,可以参考此例子,它展示了如何仅使用提供的API创建基本的RPC服务器/客户端。如果希望简化RPC操作,还可以考虑使用dart_amqp_rpc包。

更多示例

除了上述内容外,项目中的example目录还包含了RabbitMQ入门教程中的六个实例以及一个关于发布确认的额外例子。

贡献与许可

有关贡献指南,请访问Contributing Guidedart_amqp遵循MIT许可证分发,详情见LICENSE文件


以上是dart_amqp插件的基本介绍及其在Flutter项目中的使用方法。如果您有更多问题或需要进一步的帮助,请随时提问!


更多关于Flutter AMQP协议通信插件dart_amqp的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html

1 回复

更多关于Flutter AMQP协议通信插件dart_amqp的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html


当然,以下是一个关于如何在Flutter项目中使用dart_amqp插件进行AMQP协议通信的代码示例。dart_amqp是一个Dart包,用于与AMQP(高级消息队列协议)服务器进行通信。在Flutter中,你可以通过添加依赖来使用它。

步骤 1: 添加依赖

首先,在你的pubspec.yaml文件中添加dart_amqp依赖:

dependencies:
  flutter:
    sdk: flutter
  dart_amqp: ^最新版本号  # 请替换为实际的最新版本号

然后运行flutter pub get来安装依赖。

步骤 2: 导入包并配置连接

在你的Dart文件(例如main.dart)中,导入dart_amqp包并配置AMQP连接:

import 'package:dart_amqp/dart_amqp.dart';
import 'package:flutter/material.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('AMQP Demo'),
        ),
        body: Center(
          child: FutureBuilder<Void>(
            future: connectToAMQP(),
            builder: (context, snapshot) {
              if (snapshot.connectionState == ConnectionState.waiting) {
                return CircularProgressIndicator();
              } else if (snapshot.hasError) {
                return Text('Error: ${snapshot.error}');
              } else {
                return Text('Connected to AMQP Server');
              }
            },
          ),
        ),
      ),
    );
  }

  Future<Void> connectToAMQP() async {
    // AMQP服务器配置
    final amqpUri = Uri.parse('amqp://username:password@hostname:port/vhost');
    final connection = await AmqpConnection.connect(amqpUri);
    
    // 打开通道
    final channel = await connection.createChannel();
    
    // 声明队列(可选)
    await channel.queueDeclare(
      queue: 'my_queue',
      durable: true,
      exclusive: false,
      autoDelete: false,
      arguments: null,
    );
    
    // 消费消息(可选)
    channel.consume(
      queue: 'my_queue',
      autoAck: true,
      consumerTag: 'my_consumer',
      noLocal: false,
      exclusive: false,
      nowait: false,
      arguments: null,
    ).listen((delivery) {
      print('Received message: ${String.fromCharCodes(delivery.body)}');
    });
    
    // 发送消息(可选)
    final properties = MessageProperties(deliveryMode: 2, contentType: 'text/plain');
    final body = 'Hello AMQP!'.codeUnits;
    await channel.basicPublish(exchange: '', routingKey: 'my_queue', body: body, properties: properties);
    
    // 关闭连接(这里为了示例没有关闭,实际应用中应该在适当的时候关闭)
    // await connection.close();
    
    return null;
  }
}

注意事项

  1. 替换配置:确保将amqp://username:password@hostname:port/vhost替换为你的AMQP服务器的实际连接信息。
  2. 错误处理:实际应用中,应添加更多的错误处理和日志记录。
  3. 资源管理:在示例中,连接和通道没有关闭。在实际应用中,你应该在适当的时候关闭它们以释放资源。
  4. 并发处理:如果你的应用需要处理并发消息,考虑使用更复杂的逻辑来管理连接和通道。

这个示例演示了如何使用dart_amqp包连接到AMQP服务器、声明队列、发送和接收消息。根据你的具体需求,你可以进一步扩展和修改这个示例。

回到顶部