HarmonyOS 鸿蒙Next 多线程并发编程实践-IM场景

发布于 1周前 作者 gougou168 最后一次编辑是 5天前 来自 鸿蒙OS

HarmonyOS 鸿蒙Next 多线程并发编程实践-IM场景

介绍

本示例分别利用TaskPool和Worker开发多线程并发场景,以此来展示两者的不同之处以及相似之处。

多线程并发案例源码链接

效果图

使用说明

打开首页消息列表实时刷新消息列表,点击任一消息进入对话框,文件下载保存本地。

实现思路

  1. aboutToAppear,在组件即将出现时被调用,用于执行初始化、数据准备操作。
  • workerInstance.postMessage,向workerInstance发送一条消息,消息是一个对象,包含type属性设置为true和context属性设置为this.context。使用postMessage方法将消息发送给Web Worker,用于开启子线程任务,并且可以根据业务需求传递自定义的上下文信息。
  • workerInstance.onmessage ,workerInstance注册一个消息处理函数,当从子线程接收到消息时会调用该函数。
  • let newMessage: string = e.data.toString();:将接收到的数据(e.data)转换为字符串类型,并存储在 newMessage 变量中。
  aboutToAppear(): void { 
    workerInstance.postMessage({ type: true, context: this.context }) 
    workerInstance.onmessage = (e) => {
      let newMessage: string = e.data.toString();
      console.info("page received message:" + newMessage)
      if (!this.messageArr.includes(newMessage)) {
        this.messageArr.unshift(newMessage);
      }
      console.info("page received message:" + JSON.stringify(this.messageArr))
    }
  }
  1. 在每个页面点击下载后,执行以下操作:
  • 创建一个taskpool.Task对象task,将downloadFile函数作为任务,使用this.params.name 作为任务名称,指定一个文件的下载链接,并传入this.context作为任务的上下文。

  • 为emitter对象注册event事件的监听器,在事件触发时,从eventData中提取progress信息。

  • 打印下载进度信息到控制台。

  • 当进度达到 100 时,使用 promptAction.showToast 显示一个下载完成的提示,位置在底部 100 像素处。

    这里使用了TaskPool方式,这种方式偏向独立任务维度,该任务在线程中执行,无需关注线程的生命周期,超长任务(大于3分钟)会被系统自动回收。

    let task = new taskpool.Task(downloadFile, this.params.name,
    "", this.context);
    emitter.on(event, (eventData) => {
    let progress: number = eventData.data?.progress;
    console.info('receive download progress:' + progress);
    if (progress == 100) {
        promptAction.showToast({ message: `下载完成`, bottom: 100 });
     }
    }
  1. 构造messageReceive()函数,调用systemDateTime.getCurrentTime()方法获取当前时间,使用await等待结果并存储在time变量中。将获取到的当前时间打印到控制台。检查time是否是5000的倍数,如果是,则返回一个字符串name-${time},否则返回一个空字符串。
export async function messageReceive(): Promise<string> {
  let time = await systemDateTime.getCurrentTime()
  console.info(`Succeeded in getting currentTime : ${time}`);
  if (time % 5000 === 0) {
    return `name-${time}`
  } else {
    return '';
  }
}
  1. 为workerPort的onmessage事件添加一个异步处理函数,当接收到消息时会执行以下操作:
  • 打印execute worker task到控制台。

  • 从e.data.type中获取flag并将其转换为布尔类型,用于控制while循环。

  • 当flag为true时,进入while循环。

  • 在循环中调用messageReceive函数获取消息,并使用await等待结果。

  • 如果receiveMessage不为空,打印消息并使用workerPort.postMessage将消息发送回宿主线程。

    这里使用了Worker方式,这种方式偏向线程的维度,支持长时间占据线程执行,需要主动管理线程生命周期。

workerPort.onmessage = async (e: MessageEvents) => {
  console.log("execute worker task")
  let flag = e.data.type as Boolean;
  while (flag) {
    let receiveMessage = await messageReceive();
<span class="hljs-keyword">if</span> (receiveMessage) {
  <span class="hljs-built_in">console</span>.info(<span class="hljs-string">"success get message: "</span> + receiveMessage)
  workerPort.postMessage(receiveMessage);
}

} }

工程目录

│──entry/src/main/ets             // hap包
│  ├──entryability
│  │  └──EntryAbility.ts          // 程序入口类
│  ├──pages
│  │  └──Index.ets                // 首页消息列表页
        └── MessageDetail.ets     // 对话框页面下载文件
        └── BaseUsage.ets         // 基本用法
        └── SendableTest.ets      // 基本用法
└──entry/src/main/resource        // 应用静态资源目录
│──thread/src/main/ets            // har包
│  ├──components
│  │  └──task
            └──ThreadTasks        // 业务代码
        └──util
            └──WorkerUtil         // worker实例
    ├──workers
        └── NewMessageObserveWorker.ets  // worker线程

更多关于HarmonyOS 鸿蒙Next 多线程并发编程实践-IM场景的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html

1 回复

更多关于HarmonyOS 鸿蒙Next 多线程并发编程实践-IM场景的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html


在HarmonyOS 鸿蒙Next多线程并发编程实践中,针对IM(即时通讯)场景,可以利用鸿蒙提供的多线程机制及同步原语来实现高效的消息处理。以下是一个简化的场景化代码示例,展示如何在IM应用中管理多线程并发:

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>

std::queue<std::string> messageQueue;
std::mutex mtx;
std::condition_variable cv;
bool stopFlag = false;

void messageProducer() {
    while (!stopFlag) {
        std::unique_lock<std::mutex> lock(mtx);
        messageQueue.push("New Message");
        cv.notify_one();
        // Simulate message generation delay
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void messageConsumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return !messageQueue.empty() || stopFlag; });
        
        while (!messageQueue.empty()) {
            std::string msg = messageQueue.front();
            messageQueue.pop();
            lock.unlock();
            // Process message
            std::cout << "Received: " << msg << std::endl;
            lock.lock();
        }
        
        if (stopFlag && messageQueue.empty()) break;
    }
}

int main() {
    std::thread producer(messageProducer);
    std::thread consumer(messageConsumer);
    
    // Run for some time then stop
    std::this_thread::sleep_for(std::chrono::seconds(5));
    stopFlag = true;
    cv.notify_all();
    
    producer.join();
    consumer.join();
    
    return 0;
}
回到顶部