HarmonyOS鸿蒙Next中ArkTS多线程通讯

HarmonyOS鸿蒙Next中ArkTS多线程通讯 并发的多线程间如何通讯、传递值?

6 回复

在ArkTS中,多线程间的通信主要通过TaskPool和Worker两种并发模型实现,它们基于Actor内存隔离机制,支持不同数据对象的跨线程传递,包括序列化拷贝、内存共享和引用传递等方式。以下是具体实现方法及适用场景:

一、TaskPool:任务池模式

原理:TaskPool在Worker基础上封装了任务调度器,支持任务优先级设置、自动扩容/缩容、任务组管理等功能。任务通过序列化传递参数,执行结果通过Promise返回。

通信方式

  • 序列化传递
    • 普通对象、ArrayBuffer等通过Structured Clone算法序列化为引擎无关的数据(如字符串或内存块),在子线程反序列化还原。
    • 适用场景:跨线程传递小数据量或需要深拷贝的对象(如JSON数据)。
    • 示例
      @Concurrent
      function processData(data: string) {
        console.log("子线程处理数据:", data);
        return data.toUpperCase();
      }
      const task = new taskpool.Task(processData, "hello");
      taskpool.execute(task).then((result) => {
        console.log("主线程接收结果:", result); // 输出: "HELLO"
      });
      
  • Transferable对象传递
    • ArrayBuffer支持零拷贝转移,传递后原线程不再拥有访问权限,避免内存重复占用。
    • 适用场景:大内存数据(如图片、音频)的高效传递。
    • 示例
      @Concurrent
      function processArrayBuffer(buffer: ArrayBuffer) {
        // 直接操作buffer,无需拷贝
        const view = new Uint8Array(buffer);
        view[0] = 255; // 修改数据
        return buffer;
      }
      const buffer = new ArrayBuffer(1024);
      const task = new taskpool.Task(processArrayBuffer, buffer);
      task.setTransferList([buffer]); // 标记为Transferable对象
      taskpool.execute(task).then((result) => {
        console.log("主线程接收修改后的buffer:", result);
      });
      

二、Worker:独立线程模式

原理:Worker拥有独立的运行环境(内存、消息队列等),通过postMessage和onmessage实现异步通信。

通信方式

  • 异步消息传递
    • 主线程与Worker通过消息队列交互,支持所有可序列化对象(如字符串、对象、ArrayBuffer)。
    • 适用场景:需要长期运行的后台任务(如文件下载、复杂计算)。
    • 示例
      // 主线程
      const worker = new worker.ThreadWorker("worker.js");
      worker.postMessage({ type: "START", data: "hello" });
      worker.onmessage = (event) => {
        console.log("主线程接收消息:", event.data); // 输出: "HELLO"
      };
      // worker.js
      self.onmessage = (event) => {
        if (event.data.type === "START") {
          const result = event.data.data.toUpperCase();
          self.postMessage(result); // 发送回主线程
        }
      };
      
  • SharedArrayBuffer共享内存
    • 通过AtomicsAPI实现多线程对同一内存区域的原子操作,避免数据竞争。
    • 适用场景:高频数据共享(如游戏帧同步、实时数据处理)。
    • 示例
      // 主线程
      const sharedBuffer = new SharedArrayBuffer(4);
      const worker = new worker.ThreadWorker("worker.js");
      worker.postMessage({ buffer: sharedBuffer });
      // worker.js
      self.onmessage = (event) => {
        const sharedArray = new Int32Array(event.data.buffer);
        Atomics.store(sharedArray, 0, 123); // 写入数据
        Atomics.notify(sharedArray, 0); // 通知主线程
      };
      

三、Sendable对象:引用传递

原理:通过@Sendable装饰器标记对象,允许跨线程引用传递(而非拷贝),减少序列化开销。

通信方式

  • 引用共享
    • Sendable对象在共享堆中分配内存,所有线程可直接访问同一实例。
    • 适用场景:自定义大对象或需要频繁更新的状态(如UI状态、数据库连接)。
    • 示例
      [@Sendable](/user/Sendable)
      class SharedState {
        count: number = 0;
        increment() { this.count++; }
      }
      const sharedState = new SharedState();
      const task = new taskpool.Task((state: SharedState) => {
        state.increment();
        console.log("子线程计数:", state.count); // 输出: 1
      }, sharedState);
      taskpool.execute(task).then(() => {
        console.log("主线程计数:", sharedState.count); // 输出: 1
      });
      
  • 线程安全控制
    • 使用异步锁或对象冻结避免数据竞争:
      [@Sendable](/user/Sendable)
      class SafeState {
        #count: number = 0;
        async increment() {
          await lock.acquire(); // 异步锁
          this.#count++;
          lock.release();
        }
      }
      

四、方案对比与推荐

方案 通信方式 适用场景 性能 复杂度
TaskPool 序列化/Transferable 短期任务、小数据量 中等(序列化)
Worker 消息/SharedArrayBuffer 长期任务、大数据量或共享内存 高(零拷贝)
Sendable 引用传递 自定义大对象、频繁更新状态 最高

推荐选择

  • 简单任务:优先使用TaskPool,代码简洁且自动管理线程生命周期。
  • 大数据处理:使用Worker + SharedArrayBuffer,避免内存拷贝。
  • 高性能共享:Sendable对象,但需注意线程安全设计。

更多关于HarmonyOS鸿蒙Next中ArkTS多线程通讯的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html


在ArkTS中,多线程间通信主要基于Actor内存隔离模型实现,通过序列化/反序列化、数据转移或共享机制传递值。以下是核心实现方式和场景解析:

一、通信机制与数据传递方式

根据线程间数据对象的类型差异,通信行为分为三类:

普通JS对象

  • 使用Structured Clone算法(深拷贝)
  • 通过序列化转字符串/内存块 → 反序列化为新对象
  • 特点:安全隔离但效率较低(需完整拷贝)
    // 示例:Worker线程传递JS对象
    workerPort.postMessage({ key: "value" }); // 自动序列化
    

ArrayBuffer/SharedArrayBuffer

  • ArrayBuffer:转移所有权(原线程失去访问权)
  • SharedArrayBuffer:多线程共享内存(需原子操作避免冲突)
    // SharedArrayBuffer共享示例
    const sharedBuffer = new SharedArrayBuffer(1024);
    workerPort.postMessage(sharedBuffer);
    

Native绑定对象(如Context)

  • 通过SendableContext协议转换实现共享:
    import { SendableContext } from '@kit.ArkTS';
    // 主线程转换并传递
    const sendableCtx = SendableContext.from(context);
    taskpool.execute(() => {
      const ctx = sendableCtx.toContext(); // 子线程转回Context
    });
    

二、并发模型与通信场景

ArkTS提供两种并发能力处理不同场景:

并发模型 适用场景 通信方式
Worker 长时CPU密集型后台任务 基于消息队列的异步通信(postMessage/onmessage)
TaskPool 短时单点任务 直接传递参数或使用Emitter事件通知

典型场景示例:

子线程访问主线程资源 通过SendableContext传递Context对象操作数据库/文件:

// 子线程读取主线程数据库
taskpool.execute(async (sendableCtx: SendableContext) => {
  const db = relationalStore.getRdbStore(sendableCtx.toContext(), { name: "test.db" });
  await db.querySql("SELECT * FROM user");
}, [SendableContext.from(context)]);

多线程协同执行 使用Emitter实现线程间事件同步:

// 线程A完成后通知线程B
emitter.emit("taskA_done", result);
emitter.on("taskA_done", (data) => { /* 线程B处理逻辑 */ });

HarmonyOS的分布式文件系统让我在多设备间共享文件变得更加方便。

【背景知识】

首先ArkTS和其他所有基于JS引擎的语言一样,都是基于Actor内存隔离的并发模型。

Actor模型图:

Actor模型图

线程间通讯,狭义的理解,其实就是各线程间相互数据交换,目前JS引擎下的数据交换,有以下几种主流方式:

  • 基于标准的Structure Clone算法(即交换数据序列化反序列化)实现字符串/内存块等和对象互转,然后走深拷贝传递,实现数据交换。
  • 绑定Native的JS对象进行传输,实现数据交换。
  • 借助Sendable对象的共享能力,实现数据交换。

无论哪种方式,都不需要自己手动实现,ArkTS已经帮我们封装了各类API,装饰器,集成能力等等,重点在于理解这些方式,知道哪些对象适用哪些方式等。

对于上面的各种传输方式,需要注意不同类型的数据,在序列化反序列化拷贝,数据转移和数据共享等方面,都具有不同的行为,所以 不同类的通信数据对象,在传输处理过程中注意点各异:

1. 对于普通对象 :(ArkTS基本数据类型boolean string number等以及数组,object等普通js对象) 默认就是序列化反序列化,走深度拷贝传输交换。 但如果使用ArkTS提供的@Sendable装饰器,就可以转成上述的第3种方式走传输。

2. 对于ArrayBuffer对象: :(eg:ArrayBuffer类) 传输的其实是其内部包含的那块Native内存(当然外面还包了一层JS壳),有2种方式:拷贝和转移; 用拷贝传输后,两个线程都可以独立访问ArrayBuffer;有点像把那块Native内存Ctrl+C、Ctrl+V,2个线程间都有了。 用转移传输后原线程无法再使用,跨线程时只需重建JS壳,新线程Native内存无需拷贝,效率更高;类似于Ctrl+X、Ctrl+V。

3. 对于SharedArrayBuffer对象: :(eg:Int32Array类) 和ArrayBuffer对象一样,都有native内存块,多了个shared也就是支持共享,所以访问及修改需要采用Atomics类,以保证内存原子性。

4. 对于Transferable/NativeBinding对象: :(eg: context/PixelMap)

Transferable/NativeBinding对象

有共享和转移2种模式:

  • 常见的共享模式NativeBinding对象包括Context,Context对象包含应用程序组件的上下文信息,它提供了一种访问系统服务和资源的方式,使得应用程序组件可以与系统进行交互。
  • 常见的转移模式NativeBinding对象包括PixelMap,PixelMap对象可以读取或写入图像数据以及获取图像信息,常用于在应用或系统中显示图片。

5. 对于Sendable对象: 相对于上面的普通对象(深拷贝数据对象),ArrayBuffer对象(拷贝native内存块)Sendable则提供了数据对象引用拷贝传递的新思路。

Sendable对象

可以看到,上面介绍的各种通信对象,主流的3种传输方式都有涉及。 ArkTS目前主要提供两种并发能力支持线程间通信:TaskPool和Worker。也就是说TaskPool和Worker里面,可以通过已有API,配合装饰器等,实现了各种主流的数据传输方式,来配合线程间数据交换。

【解决方案】

通过上面背景知识介绍,相信大家对各种类型数据线程间传输方式及原理有了个大体了解;现在我们根据各种具体跨线程交互场景,看下怎么实现数据传递,数据通信,以及各场景需要注意的点:

【宿主js进程】即启动子线程的线程,通常可以理解成UI主线程,【TaskPool/Worker线程】可以理解成子线程。

线程模型

情形1:UI主线程->TaskPool子线程:

情形1

tips: 被@Concurrent修饰的就是TaskPool子线程要执行的方法。

new taskpool.Task(func, data) 直接就把数据data从UI主线程传给了TaskPool子线程,截图中示例,传了个string对象,这里的data支持 ArkTS基本数据类型boolean,number,null等以及数组,类对象 等,如果没有其他特殊处理,data就直接通过默认的序列化反序列化传输了。

注意:普通类实例对象跨线程通过拷贝形式传递,只能传递数据,类实例上的方法会丢失。可以使用@Sendable装饰器标识为Sendable类,类实例对象跨线程传递后,可携带类方法。

除了上述的普通类对象,这里data还支持 图片类数据(如ArrayBuffer等类型) ,和上面一样,直接调用new taskpool.Task(func, data),把ArrayBuffer对象作为参数直接传过去;注意ArrayBuffer提供了2种方式:拷贝和转移(见上面的背景知识)。

TaskPool中 默认传输方式为转移 (可通过接口setTransferList()设置转移列表)转移这种方式,适用于某一方线程不再需要当前数据, 这种方式性能更高。

如果线程双方都还会用到data,建议用拷贝方式 (比如子线程需要对主线程中的图片进行调整处理,后续UI主线程还要访问到这些图片)使用时可以让转移列表数组置空,task.setTransferList([])即可实现拷贝方式传输native内存块。

具体可参考下代码:

// Index.ets
import { taskpool } from '@kit.ArkTS';
import { BusinessError } from '@kit.BasicServicesKit';

[@Concurrent](/user/Concurrent)
function adjustImageValue(arrayBuffer: ArrayBuffer): ArrayBuffer {
  // 对arrayBuffer进行操作
  return arrayBuffer;  // 返回值默认转移
}

function createImageTask(arrayBuffer: ArrayBuffer, isParamsByTransfer: boolean): taskpool.Task {
  let task: taskpool.Task = new taskpool.Task(adjustImageValue, arrayBuffer);
  if (!isParamsByTransfer) { // 是否使用转移方式
    // 传递空数组[],全部arrayBuffer参数传递均采用拷贝方式
    task.setTransferList([]);
  }
  return task;
}

@Entry
@Component
struct Index {
  @State message: string = 'Hello World';

  build() {
    RelativeContainer() {
      Text(this.message)        
        .onClick(() => {
          let taskNum = 4;
          let arrayBuffer = new ArrayBuffer(1024 * 1024);
          let taskPoolGroup = new taskpool.TaskGroup();
          // 创建taskNum个Task
          for (let i: number = 0; i < taskNum; i++) {
            let arrayBufferSlice: ArrayBuffer = arrayBuffer.slice(arrayBuffer.byteLength / taskNum * i, arrayBuffer.byteLength / taskNum * (i + 1));
            // 使用拷贝方式传入ArrayBuffer,所以isParamsByTransfer为false
            taskPoolGroup.addTask(createImageTask(arrayBufferSlice, false));
          }
          // 执行Task
          taskpool.execute(taskPoolGroup).then((data) => {
            // 返回结果,对数组拼接,获得最终结果
          }).catch((e: BusinessError) => {
            console.error(e.message);
          })
        })
    }
    .height('100%')
    .width('100%')
  }
}

同理,SharedArrayBuffer类型(常见的有Int32Array对象)也可一样通过new taskpool.Task(func, data)调用传递,默认绑定Native的JS对象进行传输,实现数据交换,但是要注意由于是可共享对象,操作时得加原子性限制,防止数据竞争。 详见下代码:

import { taskpool } from '@kit.ArkTS';

[@Concurrent](/user/Concurrent)
function transferAtomics(arg1: Int32Array) {
  console.info("wait begin::");
  // 使用Atomics进行操作
  let res = Atomics.wait(arg1, 0, 0, 3000);
  return res;
}

@Entry
@Component
struct Index {
  @State message: string = 'Hello World';

  build() {
    RelativeContainer() {
      Text(this.message)        
        .onClick(() => {
    // 定义可共享对象
    let sab: SharedArrayBuffer = new SharedArrayBuffer(20);
    let int32 = new Int32Array(sab);
    let task: taskpool.Task = new taskpool.Task(transferAtomics, int32);
    taskpool.execute(task).then((res) => {
      console.info("this res is: " + res);
    });
    setTimeout(() => {
      Atomics.notify(int32, 0, 1);
    }, 1000);
    })
    }
    .height('100%')
    .width('100%')
  }
}

Transferable/NativeBinding类型,共享模式下常见的有Context对象(应用程序组件的上下文信息)。 转移模式,常见的有PixelMap对象(用于读写图像信息,显示图片)。

情形2:TaskPool子线程->UI主线程:

情形2

反过来,TaskPool子线程借助taskpool.Task.sendData(data)向UI主线程发消息数据;主线程通过task.onReceiveData()回调函数,处理接收到的消息数据。

情形3:UI主线程->Worker子线程: 情形4:Worker子线程->UI主线程:

情形3和情形4

如截图所示:Worker子线程通过postMessage向宿主线程发送消息,宿主线程通过onMessage接收子线程消息;反之宿主线程也可以用postMessage发消息,onMessage接收消息:

postMessage(message: Object, transfer: ArrayBuffer[]): void postMessage(message: Object, options?: PostMessageOptions): void

postMessage支持普通、ArrayBuffer等类型数据传输详见【 postMessage接口

如果想用Sendable方案,可以调用

postMessageWithSharedSendable(message: Object, transfer?: ArrayBuffer[]): void 详见【 postmessagewithsharedsendable接口

情形5:任意线程->任意线程:

情形5

如截图所示,emitter.on持续订阅指定的事件,并在接收到该事件时,执行对应的回调处理函数。

emitter.emit发送指定的带eventData的事件:其中eventdata支持数据类型包括Array、ArrayBuffer、Boolean、DataView、Date、Error、Map、Number、Object、Primitive(除了symbol)、RegExp、Set、String、TypedArray等。

详见: [@ohos.events.emitter (Emitter)](https://developer.huawei.com/consumer/cn/doc/harmonyos-references/js-apis-emitter)

鸿蒙Next中ArkTS多线程通信主要使用Worker线程模型。通过Worker模块创建后台线程,主线程与Worker线程通过postMessage()和onmessage()进行消息传递,支持对象序列化传输。线程间数据共享需通过消息传递实现,避免直接共享内存。

在HarmonyOS Next的ArkTS中,多线程间通信主要依靠TaskPoolWorker两种并发能力,它们都支持线程间的消息传递。

1. TaskPool(任务池) 适用于轻量级、独立的并行任务。通过execute()方法执行任务并传递参数,通过Promiseasync/await获取返回值。

import taskpool from '@ohos.taskpool';

@Concurrent
function add(x: number, y: number): number {
  return x + y;
}

async function taskPoolDemo() {
  let task = new taskpool.Task(add, 1, 2);
  let result = await taskpool.execute(task); // result = 3
}

2. Worker(工作线程) 适用于长时间运行、有状态或需要持续通信的后台任务。线程间通过postMessage()发送消息、onmessage接收消息进行通信。

// 主线程
let worker = new worker.ThreadWorker("entry/ets/workers/MyWorker.ets");
worker.postMessage({ data: 'Hello' });
worker.onmessage = (message: MessageEvents) => {
  console.log('Main: ' + message.data); // 接收Worker回复
};

// Worker线程 (MyWorker.ets)
import worker, { MessageEvents } from '@ohos.worker';

let parentPort = worker.parentPort;
parentPort.onmessage = (message: MessageEvents) => {
  console.log('Worker: ' + message.data); // 接收主线程消息
  parentPort.postMessage('World'); // 向主线程发送消息
};

关键区别:

  • TaskPool:任务执行后线程可能销毁,适合无状态、一次性的计算。
  • Worker:线程生命周期独立,可维持状态并进行多次消息交换。

根据任务性质(是否需持续通信、是否需保持状态)选择合适方式即可实现线程间值传递。

回到顶部