HarmonyOS鸿蒙Next中如何使用MQTT,提供源码和方案说明

HarmonyOS鸿蒙Next中如何使用MQTT,提供源码和方案说明

鸿蒙中如何使用MQTT,提供源码和方案说明

4 回复

ohos_mqtt

https://gitcode.com/openharmony-tpc/ohos_mqtt

使应用程序能够连接到MQTT代理以发布消息、订阅主题和接收发布的消息。

下载安装

ohpm install [@ohos](/user/ohos)/mqtt
  • @ohos/mqtt包已开启ssl功能、已支持sslVersion参数。

使用说明

    import { MqttAsync } from '[@ohos](/user/ohos)/mqtt';

Demo

  MqttAsync.createMqttAsync({
        url: "ip:port",
        clientId: "e5fatos4jh3l79lndb0bs",
        persistenceType: 1,
        MQTTVersion: 5,
    }).then((mqttAsyncClient) => {
        this.mqttAsyncClient = mqttAsyncClient;
        console.log("createMqttAsync success, mqttAsyncClient: " + mqttAsyncClient);
    }).catch((error) => {
        console.error("createMqttAsync failed, error: " + error);
    })
    let options: MqttConnectOptions = {
        //set userName and password
        userName: "",
        password: "",
        connectTimeout: 30,
        MQTTVersion: 5,
        // If connecting to an SSL port, the following parameters need to be configured
        sslOptions: {
          // true: enable server certificate authentication, false: disable,default is true.
          enableServerCertAuth: true,
          // Sandbox path for CA certificate
          // If enableServerCertAuth is true, a CA certificate needs to be passed
          // If enableServerCertAuth is false, a CA certificate does not need to be passed
          // trustStore default value is "/etc/ssl/certs/cacert.pem"
          trustStore: fileDir + "/ca.crt"
        }
    };
    this.mqttAsyncClient.connect(options, (err: Error, data: MqttResponse) => {
        // to do Something
    });
    let options: MqttConnectOptions = {
        //set userName and password
        userName: "",
        password: "",
        connectTimeout: 30,
        MQTTVersion: 5,
    };
    this.mqttAsyncClient.connect(options).then((data: MqttResponse) => {
        console.log("mqtt connect success "+ JSON.stringify(data));
    }).catch((err: MqttResponse) => {
        console.log("mqtt connect fail"+JSON.stringify(err))
    })

    try{
        let result: MqttResponse = await this.mqttAsyncClient.connect(options)
        console.log("mqtt connect success "+ JSON.stringify(result));
    }catch(err){
        console.log("mqtt connect fail "+ JSON.stringify(err));
    }
    let publishOption: MqttPublishOptions = {
        topic: "domotopic",
        qos: 1,
        payload: "haishangdebing",
    }
    this.mqttAsyncClient.publish(publishOption, (err: Error, data: MqttResponse) => {
        // to do Something
    });
    let subscribeOption: MqttSubscribeOptions = {
        topic: "domotopic",
        qos: 2
    }
    this.mqttAsyncClient.subscribe(subscribeOption).then((data: MqttResponse) => {
        console.log("mqtt subscribe success "+ JSON.stringify(result));
    }).catch((err: MqttResponse) => {
        console.log("mqtt subscribe fail "+ JSON.stringify(err));
    })

    try{
        let result: MqttResponse = await this.mqttAsyncClient.subscribe(subscribeOption)
        console.log("mqtt subscribe success "+ JSON.stringify(result));
    }catch(err){
        console.log("mqtt subscribe fail "+ JSON.stringify(err));
    }

更多关于HarmonyOS鸿蒙Next中如何使用MQTT,提供源码和方案说明的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html


一、结论

目前鸿蒙(HarmonyOS)中使用MQTT协议的核心方式为: 通过官方@ohos/mqtt库实现,优先采用异步客户端(MqttAsync)适配鸿蒙异步编程模型,核心流程为「配置依赖与网络权限→创建MQTT客户端→配置连接参数→建立连接→发布/订阅消息→断开连接」。

图片

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级、基于发布 / 订阅(Publish/Subscribe)模式的即时通讯协议,专为资源受限的物联网(IoT)设备和低带宽、高延迟或不可靠网络环境设计。

目前在物联网,车载,即时通讯,JG领域用的很多。MQTT模式是有个服务器,若干个客户端,订阅Topic作为事件ID,用来订阅广播,发送广播。类似于EventHub和Emitter的发布订阅机制。使用起来很简单。

二、代码实现和详细解释

图片

去鸿蒙三方库中心,搜索mqtt的最新版本进行集成,配置依赖:

ohpm install [@ohos](/user/ohos)/mqtt

或者在oh-package.json5中配置:

  "dependencies": {
        "[@ohos](/user/ohos)/mqtt":"2.0.18",
  }

记得配置网络权限:

在module.json5中配置:
    "requestPermissions":[
      {
        "name" : "ohos.permission.INTERNET",
        "reason": "$string:reason_net",
        "usedScene": {
          "abilities": [
            "FormAbility"
          ],
          "when":"inuse"
        }
      }
    ]

详细DEMO步骤:

import { MqttAsync } from '[@ohos](/user/ohos)/mqtt';

// ---------------------- 基础配置 ----------------------
const BROKER_URL = 'mqtt://test.mosquitto.org:1883'; // 公共测试Broker(支持MQTT 3.1.1)
const CLIENT_ID = 'HarmonyOS-MQTT3-Demo';           // 客户端ID(需唯一)
const TOPIC = 'harmonyos/classic/test';             // 订阅/发布主题
const QOS = 1;                                      // 服务质量等级(0/1/2)

// ---------------------- 创建异步客户端 ----------------------
    let client = MqttAsync.createMqtt({
      url: BROKER_URL,
      clientId: CLIENT_ID,
      // 客户端持久化类型(0=文件系统,1=内存,2=自定义)
      persistenceType: 1 // 使用内存持久化(轻量设备推荐)
    })

// ---------------------- 核心功能实现 ----------------------
async function mqttCommunication() {
  try {
    // 1. 连接到Broker(异步方法,返回Promise)
    await client.connect({
      // MQTT 3.1.1 连接参数
      cleanSession: true,       // 清除会话(断开后不保留订阅和消息)
      connectTimeout: 30,       // 连接超时时间(秒)
      keepAliveInterval: 60,    // 心跳间隔(秒),维持长连接
      // 认证信息(若Broker需要)
      userName: 'user',         // 用户名(可选)
      password: 'password',     // 密码(可选)
    });
    console.log('[MQTT 3.1.1] 连接成功');

    // 2. 订阅主题(支持通配符,如 "home/+/temp")
      await client.subscribe({
        topic: TOPIC,
        qos: QOS
      });
    console.log(`[MQTT 3.1.1] 已订阅主题:${TOPIC}(QoS ${QOS})`);

    // 3. 发布消息(字符串或二进制 payload)
    const message = 'Hello from HarmonyOS with MQTT 3.1.1!';
    await client.publish({
      topic: TOPIC,
      payload: message,
      qos: QOS,
      retained: false, // 是否保留消息(Broker存储最后一条消息)
    });
    console.log(`[MQTT 3.1.1] 消息已发布:${message}`);

    // 4. 监听消息接收事件
      client.subscribe({
        topic: TOPIC,
        qos: QOS,
      },(MqttResponse)=>{
        console.log(`[接收消息] 主题:${MqttResponse}`);
      })


  } catch (err) {
    console.error('[MQTT 3.1.1] 操作失败:', err.message);
  }
}

// ---------------------- 启动连接 ----------------------
mqttCommunication();

// ---------------------- 断开连接(如页面销毁时调用) ----------------------
// client.disconnect();

三、引用资料地址

1、鸿蒙@ohos/mqtt官方文档:https://ohpm.openharmony.cn/#/cn/detail/@ohos%2Fmqtt

在HarmonyOS Next中,使用MQTT需通过@ohos/net.mqtt模块。首先在module.json5中添加网络权限和ohos.permission.INTERNET权限。使用mqtt.createMqtt()创建客户端,通过connect()连接服务器,参数包括URI、clientId等。订阅主题用subscribe(),发布消息用publish()。核心代码示例如下:

import mqtt from '@ohos/net.mqtt';

let client = mqtt.createMqtt();
client.connect({ uri: 'tcp://your-server:1883', clientId: 'harmony-client' });
client.subscribe({ topic: 'test/topic' });
client.publish({ topic: 'test/topic', payload: 'Hello MQTT' });

需注意Next版本API可能调整,具体参考官方文档。

在HarmonyOS Next中使用MQTT进行设备间通信,可以参考以下方案和核心代码实现:

一、方案概述

HarmonyOS Next推荐使用系统内置的@ohos/net.mqtt模块实现MQTT客户端功能,支持TLS加密连接和消息发布/订阅。

二、核心实现步骤

1. 权限配置

module.json5中声明网络权限:

{
  "module": {
    "requestPermissions": [{
      "name": "ohos.permission.INTERNET"
    }]
  }
}

2. MQTT客户端实现

import { mqtt } from '@ohos/net.mqtt';
import { BusinessError } from '@ohos.base';

class MQTTClient {
  private client: mqtt.MQTTClient | null = null;
  
  // 创建客户端
  async createClient(options: mqtt.MQTTOptions): Promise<void> {
    try {
      this.client = mqtt.createMqttClient(options);
      console.info('MQTT client created successfully');
    } catch (error) {
      console.error(`Failed to create client: ${JSON.stringify(error)}`);
    }
  }

  // 连接服务器
  async connect(config: mqtt.ConnectOptions): Promise<void> {
    if (!this.client) {
      console.error('Client not initialized');
      return;
    }
    
    this.client.connect(config, (err: BusinessError) => {
      if (err) {
        console.error(`Connect failed: ${JSON.stringify(err)}`);
      } else {
        console.info('Connected to MQTT broker');
      }
    });
  }

  // 订阅主题
  async subscribe(topic: string, qos: mqtt.QoS): Promise<void> {
    if (!this.client) return;
    
    this.client.subscribe(topic, qos, (err: BusinessError) => {
      if (err) {
        console.error(`Subscribe failed: ${JSON.stringify(err)}`);
      } else {
        console.info(`Subscribed to topic: ${topic}`);
      }
    });
  }

  // 发布消息
  async publish(topic: string, message: string, qos: mqtt.QoS): Promise<void> {
    if (!this.client) return;
    
    this.client.publish(topic, message, qos, (err: BusinessError) => {
      if (err) {
        console.error(`Publish failed: ${JSON.stringify(err)}`);
      } else {
        console.info(`Message published to ${topic}`);
      }
    });
  }

  // 设置消息回调
  setMessageCallback(callback: (topic: string, message: Uint8Array) => void): void {
    if (!this.client) return;
    
    this.client.on('message', (data: mqtt.MessageData) => {
      callback(data.topic, data.message);
    });
  }

  // 断开连接
  async disconnect(): Promise<void> {
    if (!this.client) return;
    
    this.client.disconnect((err: BusinessError) => {
      if (err) {
        console.error(`Disconnect failed: ${JSON.stringify(err)}`);
      }
    });
  }
}

3. 使用示例

// 初始化配置
const mqttClient = new MQTTClient();

// 创建客户端
await mqttClient.createClient({
  clientId: 'harmonyos_device_001',
  cleanSession: true
});

// 连接配置
const connectConfig: mqtt.ConnectOptions = {
  host: 'broker.emqx.io',
  port: 1883,
  userName: 'optional_username',
  password: 'optional_password',
  keepAlive: 60,
  timeout: 30
};

// 建立连接
await mqttClient.connect(connectConfig);

// 设置消息接收回调
mqttClient.setMessageCallback((topic: string, message: Uint8Array) => {
  const decodedMsg = String.fromCharCode.apply(null, Array.from(message));
  console.info(`Received message from ${topic}: ${decodedMsg}`);
});

// 订阅主题
await mqttClient.subscribe('test/topic', mqtt.QoS.AT_LEAST_ONCE);

// 发布消息
await mqttClient.publish('test/topic', 'Hello HarmonyOS', mqtt.QoS.AT_LEAST_ONCE);

// 断开连接(使用后调用)
// await mqttClient.disconnect();

三、关键配置说明

  1. QoS级别

    • AT_MOST_ONCE(0):最多一次
    • AT_LEAST_ONCE(1):至少一次
    • EXACTLY_ONCE(2):仅一次
  2. TLS连接: 在ConnectOptions中配置ssl: true并设置CA证书路径。

  3. 连接保持keepAlive参数控制心跳间隔(秒),建议设置为30-60秒。

四、注意事项

  1. 确保设备网络连接正常
  2. 生产环境建议使用TLS加密
  3. 合理设置客户端ID避免冲突
  4. 及时处理连接断开和重连逻辑

此方案基于HarmonyOS Next的官方MQTT API实现,可根据实际业务需求扩展重连机制、消息持久化等功能。

回到顶部