HarmonyOS鸿蒙Next中如何使用MQTT,提供源码和方案说明
HarmonyOS鸿蒙Next中如何使用MQTT,提供源码和方案说明
鸿蒙中如何使用MQTT,提供源码和方案说明
ohos_mqtt
https://gitcode.com/openharmony-tpc/ohos_mqtt
使应用程序能够连接到MQTT代理以发布消息、订阅主题和接收发布的消息。
下载安装
ohpm install [@ohos](/user/ohos)/mqtt
- @ohos/mqtt包已开启ssl功能、已支持sslVersion参数。
使用说明
import { MqttAsync } from '[@ohos](/user/ohos)/mqtt';
Demo
MqttAsync.createMqttAsync({
url: "ip:port",
clientId: "e5fatos4jh3l79lndb0bs",
persistenceType: 1,
MQTTVersion: 5,
}).then((mqttAsyncClient) => {
this.mqttAsyncClient = mqttAsyncClient;
console.log("createMqttAsync success, mqttAsyncClient: " + mqttAsyncClient);
}).catch((error) => {
console.error("createMqttAsync failed, error: " + error);
})
let options: MqttConnectOptions = {
//set userName and password
userName: "",
password: "",
connectTimeout: 30,
MQTTVersion: 5,
// If connecting to an SSL port, the following parameters need to be configured
sslOptions: {
// true: enable server certificate authentication, false: disable,default is true.
enableServerCertAuth: true,
// Sandbox path for CA certificate
// If enableServerCertAuth is true, a CA certificate needs to be passed
// If enableServerCertAuth is false, a CA certificate does not need to be passed
// trustStore default value is "/etc/ssl/certs/cacert.pem"
trustStore: fileDir + "/ca.crt"
}
};
this.mqttAsyncClient.connect(options, (err: Error, data: MqttResponse) => {
// to do Something
});
let options: MqttConnectOptions = {
//set userName and password
userName: "",
password: "",
connectTimeout: 30,
MQTTVersion: 5,
};
this.mqttAsyncClient.connect(options).then((data: MqttResponse) => {
console.log("mqtt connect success "+ JSON.stringify(data));
}).catch((err: MqttResponse) => {
console.log("mqtt connect fail"+JSON.stringify(err))
})
try{
let result: MqttResponse = await this.mqttAsyncClient.connect(options)
console.log("mqtt connect success "+ JSON.stringify(result));
}catch(err){
console.log("mqtt connect fail "+ JSON.stringify(err));
}
let publishOption: MqttPublishOptions = {
topic: "domotopic",
qos: 1,
payload: "haishangdebing",
}
this.mqttAsyncClient.publish(publishOption, (err: Error, data: MqttResponse) => {
// to do Something
});
let subscribeOption: MqttSubscribeOptions = {
topic: "domotopic",
qos: 2
}
this.mqttAsyncClient.subscribe(subscribeOption).then((data: MqttResponse) => {
console.log("mqtt subscribe success "+ JSON.stringify(result));
}).catch((err: MqttResponse) => {
console.log("mqtt subscribe fail "+ JSON.stringify(err));
})
try{
let result: MqttResponse = await this.mqttAsyncClient.subscribe(subscribeOption)
console.log("mqtt subscribe success "+ JSON.stringify(result));
}catch(err){
console.log("mqtt subscribe fail "+ JSON.stringify(err));
}
更多关于HarmonyOS鸿蒙Next中如何使用MQTT,提供源码和方案说明的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html
一、结论
目前鸿蒙(HarmonyOS)中使用MQTT协议的核心方式为: 通过官方@ohos/mqtt库实现,优先采用异步客户端(MqttAsync)适配鸿蒙异步编程模型,核心流程为「配置依赖与网络权限→创建MQTT客户端→配置连接参数→建立连接→发布/订阅消息→断开连接」。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级、基于发布 / 订阅(Publish/Subscribe)模式的即时通讯协议,专为资源受限的物联网(IoT)设备和低带宽、高延迟或不可靠网络环境设计。
目前在物联网,车载,即时通讯,JG领域用的很多。MQTT模式是有个服务器,若干个客户端,订阅Topic作为事件ID,用来订阅广播,发送广播。类似于EventHub和Emitter的发布订阅机制。使用起来很简单。
二、代码实现和详细解释

去鸿蒙三方库中心,搜索mqtt的最新版本进行集成,配置依赖:
ohpm install [@ohos](/user/ohos)/mqtt
或者在oh-package.json5中配置:
"dependencies": {
"[@ohos](/user/ohos)/mqtt":"2.0.18",
}
记得配置网络权限:
在module.json5中配置:
"requestPermissions":[
{
"name" : "ohos.permission.INTERNET",
"reason": "$string:reason_net",
"usedScene": {
"abilities": [
"FormAbility"
],
"when":"inuse"
}
}
]
详细DEMO步骤:
import { MqttAsync } from '[@ohos](/user/ohos)/mqtt';
// ---------------------- 基础配置 ----------------------
const BROKER_URL = 'mqtt://test.mosquitto.org:1883'; // 公共测试Broker(支持MQTT 3.1.1)
const CLIENT_ID = 'HarmonyOS-MQTT3-Demo'; // 客户端ID(需唯一)
const TOPIC = 'harmonyos/classic/test'; // 订阅/发布主题
const QOS = 1; // 服务质量等级(0/1/2)
// ---------------------- 创建异步客户端 ----------------------
let client = MqttAsync.createMqtt({
url: BROKER_URL,
clientId: CLIENT_ID,
// 客户端持久化类型(0=文件系统,1=内存,2=自定义)
persistenceType: 1 // 使用内存持久化(轻量设备推荐)
})
// ---------------------- 核心功能实现 ----------------------
async function mqttCommunication() {
try {
// 1. 连接到Broker(异步方法,返回Promise)
await client.connect({
// MQTT 3.1.1 连接参数
cleanSession: true, // 清除会话(断开后不保留订阅和消息)
connectTimeout: 30, // 连接超时时间(秒)
keepAliveInterval: 60, // 心跳间隔(秒),维持长连接
// 认证信息(若Broker需要)
userName: 'user', // 用户名(可选)
password: 'password', // 密码(可选)
});
console.log('[MQTT 3.1.1] 连接成功');
// 2. 订阅主题(支持通配符,如 "home/+/temp")
await client.subscribe({
topic: TOPIC,
qos: QOS
});
console.log(`[MQTT 3.1.1] 已订阅主题:${TOPIC}(QoS ${QOS})`);
// 3. 发布消息(字符串或二进制 payload)
const message = 'Hello from HarmonyOS with MQTT 3.1.1!';
await client.publish({
topic: TOPIC,
payload: message,
qos: QOS,
retained: false, // 是否保留消息(Broker存储最后一条消息)
});
console.log(`[MQTT 3.1.1] 消息已发布:${message}`);
// 4. 监听消息接收事件
client.subscribe({
topic: TOPIC,
qos: QOS,
},(MqttResponse)=>{
console.log(`[接收消息] 主题:${MqttResponse}`);
})
} catch (err) {
console.error('[MQTT 3.1.1] 操作失败:', err.message);
}
}
// ---------------------- 启动连接 ----------------------
mqttCommunication();
// ---------------------- 断开连接(如页面销毁时调用) ----------------------
// client.disconnect();
三、引用资料地址
1、鸿蒙@ohos/mqtt官方文档:https://ohpm.openharmony.cn/#/cn/detail/@ohos%2Fmqtt
在HarmonyOS Next中使用MQTT进行设备间通信,可以参考以下方案和核心代码实现:
一、方案概述
HarmonyOS Next推荐使用系统内置的@ohos/net.mqtt模块实现MQTT客户端功能,支持TLS加密连接和消息发布/订阅。
二、核心实现步骤
1. 权限配置
在module.json5中声明网络权限:
{
"module": {
"requestPermissions": [{
"name": "ohos.permission.INTERNET"
}]
}
}
2. MQTT客户端实现
import { mqtt } from '@ohos/net.mqtt';
import { BusinessError } from '@ohos.base';
class MQTTClient {
private client: mqtt.MQTTClient | null = null;
// 创建客户端
async createClient(options: mqtt.MQTTOptions): Promise<void> {
try {
this.client = mqtt.createMqttClient(options);
console.info('MQTT client created successfully');
} catch (error) {
console.error(`Failed to create client: ${JSON.stringify(error)}`);
}
}
// 连接服务器
async connect(config: mqtt.ConnectOptions): Promise<void> {
if (!this.client) {
console.error('Client not initialized');
return;
}
this.client.connect(config, (err: BusinessError) => {
if (err) {
console.error(`Connect failed: ${JSON.stringify(err)}`);
} else {
console.info('Connected to MQTT broker');
}
});
}
// 订阅主题
async subscribe(topic: string, qos: mqtt.QoS): Promise<void> {
if (!this.client) return;
this.client.subscribe(topic, qos, (err: BusinessError) => {
if (err) {
console.error(`Subscribe failed: ${JSON.stringify(err)}`);
} else {
console.info(`Subscribed to topic: ${topic}`);
}
});
}
// 发布消息
async publish(topic: string, message: string, qos: mqtt.QoS): Promise<void> {
if (!this.client) return;
this.client.publish(topic, message, qos, (err: BusinessError) => {
if (err) {
console.error(`Publish failed: ${JSON.stringify(err)}`);
} else {
console.info(`Message published to ${topic}`);
}
});
}
// 设置消息回调
setMessageCallback(callback: (topic: string, message: Uint8Array) => void): void {
if (!this.client) return;
this.client.on('message', (data: mqtt.MessageData) => {
callback(data.topic, data.message);
});
}
// 断开连接
async disconnect(): Promise<void> {
if (!this.client) return;
this.client.disconnect((err: BusinessError) => {
if (err) {
console.error(`Disconnect failed: ${JSON.stringify(err)}`);
}
});
}
}
3. 使用示例
// 初始化配置
const mqttClient = new MQTTClient();
// 创建客户端
await mqttClient.createClient({
clientId: 'harmonyos_device_001',
cleanSession: true
});
// 连接配置
const connectConfig: mqtt.ConnectOptions = {
host: 'broker.emqx.io',
port: 1883,
userName: 'optional_username',
password: 'optional_password',
keepAlive: 60,
timeout: 30
};
// 建立连接
await mqttClient.connect(connectConfig);
// 设置消息接收回调
mqttClient.setMessageCallback((topic: string, message: Uint8Array) => {
const decodedMsg = String.fromCharCode.apply(null, Array.from(message));
console.info(`Received message from ${topic}: ${decodedMsg}`);
});
// 订阅主题
await mqttClient.subscribe('test/topic', mqtt.QoS.AT_LEAST_ONCE);
// 发布消息
await mqttClient.publish('test/topic', 'Hello HarmonyOS', mqtt.QoS.AT_LEAST_ONCE);
// 断开连接(使用后调用)
// await mqttClient.disconnect();
三、关键配置说明
-
QoS级别:
AT_MOST_ONCE(0):最多一次AT_LEAST_ONCE(1):至少一次EXACTLY_ONCE(2):仅一次
-
TLS连接: 在
ConnectOptions中配置ssl: true并设置CA证书路径。 -
连接保持:
keepAlive参数控制心跳间隔(秒),建议设置为30-60秒。
四、注意事项
- 确保设备网络连接正常
- 生产环境建议使用TLS加密
- 合理设置客户端ID避免冲突
- 及时处理连接断开和重连逻辑
此方案基于HarmonyOS Next的官方MQTT API实现,可根据实际业务需求扩展重连机制、消息持久化等功能。


