Flutter MQTT通信插件dart_mqtt的使用
Flutter MQTT通信插件dart_mqtt的使用
快速入门指南
TCP连接
以下是一个使用TCP协议与MQTT服务器通信的完整示例代码。该示例展示了如何连接到MQTT代理、订阅主题并处理接收到的消息。
import 'package:dart_mqtt/dart_mqtt.dart';
void main() async {
// 创建一个TCP传输客户端,连接到指定的MQTT代理
var transport = XTransportTcpClient.from(
"broker.emqx.io", // MQTT代理地址
1883, // 端口号
log: true, // 是否开启日志
);
// 创建MQTT客户端,并设置相关参数
var cli = MqttClient(
transport,
log: true, // 是否开启日志
)
..withKeepalive(10) // 设置心跳间隔时间为10秒
..withClientID("mqttx_test"); // 设置客户端ID
// 处理连接确认消息
cli.onMqttConack((msg) {
print("onMqttConack: $msg"); // 打印连接确认消息
if (msg.returnCode != MqttConnectReturnCode.connectionAccepted) {
// 如果连接被拒绝,关闭客户端
cli.close();
return;
}
// 重新订阅之前订阅的主题
cli.reSub();
});
// 处理重新连接前的回调
cli.onBeforeReconnect(() async {
print("reconnecting..."); // 打印重新连接信息
});
// 启动MQTT客户端
cli.start();
// 订阅主题,并设置消息处理回调
await cli.subscribe(
"test/topic", // 订阅的主题
onMessage: (msg) {
print(msg); // 打印接收到的消息
},
futureWaitData: true, // 等待数据返回
);
}
WebSocket连接
除了TCP连接,dart_mqtt
还支持通过WebSocket进行通信。以下是一个使用WebSocket连接的示例代码:
import 'package:dart_mqtt/dart_mqtt.dart';
void main() async {
// 创建一个WebSocket传输客户端,连接到指定的MQTT代理
var transport = XTransportWsClient.from(
"broker.emqx.io", // MQTT代理地址
"/mqtt", // WebSocket路径
8083, // 端口号
log: true, // 是否开启日志
protocols: ["mqtt"], // 协议类型,必须包含"mqtt"
);
// 创建MQTT客户端,并设置相关参数
var cli = MqttClient(
transport,
log: true, // 是否开启日志
allowReconnect: true, // 允许自动重连
)
..withKeepalive(10) // 设置心跳间隔时间为10秒
..withClientID("mqttx_test"); // 设置客户端ID
// 处理连接确认消息
cli.onMqttConack((msg) {
// print("onMqttConack: $msg"); // 打印连接确认消息
if (msg.returnCode != MqttConnectReturnCode.connectionAccepted) {
// 如果连接被拒绝,关闭客户端
cli.close();
return;
}
// 重新订阅之前订阅的主题
cli.reSub();
});
// 处理重新连接前的回调
cli.onBeforeReconnect(() async {
print("reconnecting..."); // 打印重新连接信息
});
// 启动MQTT客户端
cli.start();
// 订阅主题,并设置消息处理回调
cli.subscribe("test/topic", onMessage: (msg) {
// print(msg); // 打印接收到的消息
});
}
重连属性
除了错误和通知回调外,您还可以在连接选项中设置一些重连属性:
-
allowReconnect (
bool
):启用或禁用重连逻辑。默认值为false
。var cli = MqttClient( transport, log: true, allowReconnect: true, // 启用重连 );
-
reconnectWait (
Duration
):设置在尝试重连失败后等待的时间。默认值为const Duration(seconds: 2)
。var cli = MqttClient( transport, log: true, reconnectWait: const Duration(seconds: 5), // 设置重连等待时间为5秒 );
-
customReconnectDelayCB (
Duration Function()?
):这是一个自定义的重连延迟回调函数。当库尝试每个URL都失败后,会调用此函数。它返回一个Duration
,表示下一次重连前的等待时间。建议在此函数中添加一些随机延迟(jitter),以避免所有连接同时尝试重连。var cli = MqttClient( transport, log: true, customReconnectDelayCB: () { // 返回一个随机的重连延迟时间 return Duration(seconds: (Random().nextInt(5) + 1)); }, );
更多关于Flutter MQTT通信插件dart_mqtt的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html
更多关于Flutter MQTT通信插件dart_mqtt的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html
当然,以下是如何在Flutter项目中使用dart_mqtt
插件来实现MQTT通信的示例代码。
首先,确保你已经在pubspec.yaml
文件中添加了dart_mqtt
依赖:
dependencies:
flutter:
sdk: flutter
dart_mqtt: ^x.y.z # 请替换为最新版本号
然后运行flutter pub get
来安装依赖。
接下来是一个简单的Flutter应用示例,它展示了如何使用dart_mqtt
进行MQTT通信:
import 'package:flutter/material.dart';
import 'package:dart_mqtt/dart_mqtt.dart';
void main() {
runApp(MyApp());
}
class MyApp extends StatefulWidget {
@override
_MyAppState createState() => _MyAppState();
}
class _MyAppState extends State<MyApp> {
MqttClient? _client;
String _connectionStatus = 'Disconnected';
String _receivedMessage = '';
@override
void initState() {
super.initState();
connectToMqtt();
}
void connectToMqtt() async {
final String broker = 'mqtt.eclipseprojects.io'; // MQTT broker地址
final int port = 1883; // MQTT broker端口
final String clientId = 'flutter_client_${DateTime.now().millisecondsSinceEpoch}';
final String topic = 'test/topic';
_client = MqttClient(broker, port, clientId);
_client!.logging(on: true);
_client!.keepAliveInterval = 20;
_client!.keepAlivePeriod = 60;
_client!.onConnected = () {
print('Connected to broker');
if (_client!.subscriptionTopics!.isNotEmpty) {
_client!.subscribe(topic, MqttQos.atLeastOnce);
}
setState(() {
_connectionStatus = 'Connected';
});
};
_client!.onDisconnected = () {
print('Disconnected from broker');
setState(() {
_connectionStatus = 'Disconnected';
});
};
_client!.onSubscribed = (String topic) {
print('Subscribed to topic: $topic');
};
_client!.onMessageReceived = (MqttMessage message) async {
final payload =
MqttPublishPayload.bytesToStringAsString(message.payload.message!);
print('Message received: $payload from topic: ${message.variableHeader.topicName}');
setState(() {
_receivedMessage = payload;
});
};
final MqttConnectOptions connOpts = MqttConnectOptions.builder()
.setClientId(clientId)
.setCleanSession(true)
.setAutomaticReconnect(true)
.setName('Flutter MQTT Client')
.build();
try {
await _client!.connect(connOpts).timeout(Duration(seconds: 30), onTimeout: () {
throw TimeoutException('Connection timed out');
});
} catch (e) {
print('Exception while connecting: $e');
setState(() {
_connectionStatus = 'Connection error: $e';
});
}
}
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
appBar: AppBar(
title: Text('Flutter MQTT Example'),
),
body: Padding(
padding: const EdgeInsets.all(8.0),
child: Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: [
Text('Connection Status: $_connectionStatus'),
SizedBox(height: 16),
Text('Received Message: $_receivedMessage'),
],
),
),
),
);
}
@override
void dispose() {
_client?.disconnect();
super.dispose();
}
}
在这个示例中,我们做了以下几件事:
- 初始化MQTT客户端:创建一个
MqttClient
实例并设置连接选项。 - 定义回调:处理连接、断开连接、订阅和消息接收事件。
- 连接到MQTT broker:使用
connect
方法连接到指定的MQTT broker。 - 订阅主题:在连接成功后订阅指定的主题。
- 显示状态:在UI中显示连接状态和接收到的消息。
确保你已经正确配置了MQTT broker地址和端口,并且网络允许Flutter应用访问该broker。
这个示例展示了基本的MQTT通信流程,你可以根据需要扩展它,比如添加更多的主题订阅、处理QoS级别、处理保留消息等。