Flutter MQTT通信插件dart_mqtt的使用

发布于 1周前 作者 eggper 来自 Flutter

Flutter MQTT通信插件dart_mqtt的使用

快速入门指南

TCP连接

以下是一个使用TCP协议与MQTT服务器通信的完整示例代码。该示例展示了如何连接到MQTT代理、订阅主题并处理接收到的消息。

import 'package:dart_mqtt/dart_mqtt.dart';

void main() async {
  // 创建一个TCP传输客户端,连接到指定的MQTT代理
  var transport = XTransportTcpClient.from(
    "broker.emqx.io",  // MQTT代理地址
    1883,              // 端口号
    log: true,         // 是否开启日志
  );

  // 创建MQTT客户端,并设置相关参数
  var cli = MqttClient(
    transport,
    log: true,         // 是否开启日志
  )
    ..withKeepalive(10)        // 设置心跳间隔时间为10秒
    ..withClientID("mqttx_test");  // 设置客户端ID

  // 处理连接确认消息
  cli.onMqttConack((msg) {
    print("onMqttConack: $msg");  // 打印连接确认消息
    if (msg.returnCode != MqttConnectReturnCode.connectionAccepted) {
      // 如果连接被拒绝,关闭客户端
      cli.close();
      return;
    }
    // 重新订阅之前订阅的主题
    cli.reSub();
  });

  // 处理重新连接前的回调
  cli.onBeforeReconnect(() async {
    print("reconnecting...");  // 打印重新连接信息
  });

  // 启动MQTT客户端
  cli.start();

  // 订阅主题,并设置消息处理回调
  await cli.subscribe(
    "test/topic",  // 订阅的主题
    onMessage: (msg) {
      print(msg);  // 打印接收到的消息
    },
    futureWaitData: true,  // 等待数据返回
  );
}
WebSocket连接

除了TCP连接,dart_mqtt还支持通过WebSocket进行通信。以下是一个使用WebSocket连接的示例代码:

import 'package:dart_mqtt/dart_mqtt.dart';

void main() async {
  // 创建一个WebSocket传输客户端,连接到指定的MQTT代理
  var transport = XTransportWsClient.from(
    "broker.emqx.io",  // MQTT代理地址
    "/mqtt",           // WebSocket路径
    8083,              // 端口号
    log: true,         // 是否开启日志
    protocols: ["mqtt"],  // 协议类型,必须包含"mqtt"
  );

  // 创建MQTT客户端,并设置相关参数
  var cli = MqttClient(
    transport,
    log: true,         // 是否开启日志
    allowReconnect: true,  // 允许自动重连
  )
    ..withKeepalive(10)        // 设置心跳间隔时间为10秒
    ..withClientID("mqttx_test");  // 设置客户端ID

  // 处理连接确认消息
  cli.onMqttConack((msg) {
    // print("onMqttConack: $msg");  // 打印连接确认消息
    if (msg.returnCode != MqttConnectReturnCode.connectionAccepted) {
      // 如果连接被拒绝,关闭客户端
      cli.close();
      return;
    }
    // 重新订阅之前订阅的主题
    cli.reSub();
  });

  // 处理重新连接前的回调
  cli.onBeforeReconnect(() async {
    print("reconnecting...");  // 打印重新连接信息
  });

  // 启动MQTT客户端
  cli.start();

  // 订阅主题,并设置消息处理回调
  cli.subscribe("test/topic", onMessage: (msg) {
    // print(msg);  // 打印接收到的消息
  });
}

重连属性

除了错误和通知回调外,您还可以在连接选项中设置一些重连属性:

  • allowReconnect (bool):启用或禁用重连逻辑。默认值为false

    var cli = MqttClient(
      transport,
      log: true,
      allowReconnect: true,  // 启用重连
    );
    
  • reconnectWait (Duration):设置在尝试重连失败后等待的时间。默认值为const Duration(seconds: 2)

    var cli = MqttClient(
      transport,
      log: true,
      reconnectWait: const Duration(seconds: 5),  // 设置重连等待时间为5秒
    );
    
  • customReconnectDelayCB (Duration Function()?):这是一个自定义的重连延迟回调函数。当库尝试每个URL都失败后,会调用此函数。它返回一个Duration,表示下一次重连前的等待时间。建议在此函数中添加一些随机延迟(jitter),以避免所有连接同时尝试重连。

    var cli = MqttClient(
      transport,
      log: true,
      customReconnectDelayCB: () {
        // 返回一个随机的重连延迟时间
        return Duration(seconds: (Random().nextInt(5) + 1));
      },
    );
    

更多关于Flutter MQTT通信插件dart_mqtt的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html

1 回复

更多关于Flutter MQTT通信插件dart_mqtt的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html


当然,以下是如何在Flutter项目中使用dart_mqtt插件来实现MQTT通信的示例代码。

首先,确保你已经在pubspec.yaml文件中添加了dart_mqtt依赖:

dependencies:
  flutter:
    sdk: flutter
  dart_mqtt: ^x.y.z  # 请替换为最新版本号

然后运行flutter pub get来安装依赖。

接下来是一个简单的Flutter应用示例,它展示了如何使用dart_mqtt进行MQTT通信:

import 'package:flutter/material.dart';
import 'package:dart_mqtt/dart_mqtt.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatefulWidget {
  @override
  _MyAppState createState() => _MyAppState();
}

class _MyAppState extends State<MyApp> {
  MqttClient? _client;
  String _connectionStatus = 'Disconnected';
  String _receivedMessage = '';

  @override
  void initState() {
    super.initState();
    connectToMqtt();
  }

  void connectToMqtt() async {
    final String broker = 'mqtt.eclipseprojects.io'; // MQTT broker地址
    final int port = 1883; // MQTT broker端口
    final String clientId = 'flutter_client_${DateTime.now().millisecondsSinceEpoch}';
    final String topic = 'test/topic';

    _client = MqttClient(broker, port, clientId);
    _client!.logging(on: true);

    _client!.keepAliveInterval = 20;
    _client!.keepAlivePeriod = 60;

    _client!.onConnected = () {
      print('Connected to broker');
      if (_client!.subscriptionTopics!.isNotEmpty) {
        _client!.subscribe(topic, MqttQos.atLeastOnce);
      }
      setState(() {
        _connectionStatus = 'Connected';
      });
    };

    _client!.onDisconnected = () {
      print('Disconnected from broker');
      setState(() {
        _connectionStatus = 'Disconnected';
      });
    };

    _client!.onSubscribed = (String topic) {
      print('Subscribed to topic: $topic');
    };

    _client!.onMessageReceived = (MqttMessage message) async {
      final payload =
          MqttPublishPayload.bytesToStringAsString(message.payload.message!);
      print('Message received: $payload from topic: ${message.variableHeader.topicName}');
      setState(() {
        _receivedMessage = payload;
      });
    };

    final MqttConnectOptions connOpts = MqttConnectOptions.builder()
        .setClientId(clientId)
        .setCleanSession(true)
        .setAutomaticReconnect(true)
        .setName('Flutter MQTT Client')
        .build();

    try {
      await _client!.connect(connOpts).timeout(Duration(seconds: 30), onTimeout: () {
        throw TimeoutException('Connection timed out');
      });
    } catch (e) {
      print('Exception while connecting: $e');
      setState(() {
        _connectionStatus = 'Connection error: $e';
      });
    }
  }

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('Flutter MQTT Example'),
        ),
        body: Padding(
          padding: const EdgeInsets.all(8.0),
          child: Column(
            crossAxisAlignment: CrossAxisAlignment.start,
            children: [
              Text('Connection Status: $_connectionStatus'),
              SizedBox(height: 16),
              Text('Received Message: $_receivedMessage'),
            ],
          ),
        ),
      ),
    );
  }

  @override
  void dispose() {
    _client?.disconnect();
    super.dispose();
  }
}

在这个示例中,我们做了以下几件事:

  1. 初始化MQTT客户端:创建一个MqttClient实例并设置连接选项。
  2. 定义回调:处理连接、断开连接、订阅和消息接收事件。
  3. 连接到MQTT broker:使用connect方法连接到指定的MQTT broker。
  4. 订阅主题:在连接成功后订阅指定的主题。
  5. 显示状态:在UI中显示连接状态和接收到的消息。

确保你已经正确配置了MQTT broker地址和端口,并且网络允许Flutter应用访问该broker。

这个示例展示了基本的MQTT通信流程,你可以根据需要扩展它,比如添加更多的主题订阅、处理QoS级别、处理保留消息等。

回到顶部