HarmonyOS鸿蒙Next应用中如何实现多线程处理?

HarmonyOS鸿蒙Next应用中如何实现多线程处理? 在HarmonyOS应用中如何实现多线程处理?如何使用Worker、TaskPool和异步编程避免UI卡顿?

3 回复

解决方案

1. Worker线程基础

import worker from '@ohos.worker'

// 主线程代码
@Entry
@Component
struct WorkerDemo {
  @State result: string = ''
  @State isProcessing: boolean = false
  private workerInstance?: worker.ThreadWorker

  aboutToAppear() {
    this.createWorker()
  }

  aboutToDisappear() {
    this.destroyWorker()
  }

  build() {
    Column({ space: 16 }) {
      Text('Worker多线程')
        .fontSize(20)
        .fontWeight(FontWeight.Bold)

      Button('开始计算')
        .width('100%')
        .enabled(!this.isProcessing)
        .onClick(() => {
          this.startHeavyTask()
        })

      if (this.isProcessing) {
        Row({ space: 8 }) {
          LoadingProgress()
            .width(20)
            .height(20)
          Text('计算中...')
        }
      }

      if (this.result) {
        Text('结果:')
          .fontSize(16)
          .fontWeight(FontWeight.Bold)
        Text(this.result)
          .fontSize(14)
          .fontColor('#1890ff')
      }
    }
    .padding(16)
  }

  private createWorker() {
    try {
      // 创建Worker实例
      this.workerInstance = new worker.ThreadWorker('entry/ets/workers/TaskWorker.ts')

      // 监听Worker消息
      this.workerInstance.onmessage = (event) => {
        console.log('收到Worker消息:', event.data)
        this.result = event.data
        this.isProcessing = false
      }

      // 监听Worker错误
      this.workerInstance.onerror = (error) => {
        console.error('Worker错误:', error.message)
        this.isProcessing = false
      }
    } catch (error) {
      console.error('创建Worker失败:', error)
    }
  }

  private startHeavyTask() {
    if (!this.workerInstance) return

    this.isProcessing = true
    this.result = ''

    // 向Worker发送消息
    this.workerInstance.postMessage({
      type: 'calculate',
      data: {
        start: 1,
        end: 1000000
      }
    })
  }

  private destroyWorker() {
    if (this.workerInstance) {
      this.workerInstance.terminate()
      this.workerInstance = undefined
    }
  }
}

// Worker线程代码
// entry/ets/workers/TaskWorker.ts
import worker from '@ohos.worker'

const workerPort = worker.workerPort

// 监听主线程消息
workerPort.onmessage = (event) => {
  console.log('Worker收到消息:', event.data)

  const { type, data } = event.data

  if (type === 'calculate') {
    // 执行耗时计算
    const result = calculateSum(data.start, data.end)

    // 发送结果回主线程
    workerPort.postMessage(`计算结果: ${result}`)
  }
}

function calculateSum(start: number, end: number): number {
  let sum = 0
  for (let i = start; i <= end; i++) {
    sum += i
  }
  return sum
}

// 监听错误
workerPort.onerror = (error) => {
  console.error('Worker内部错误:', error.message)
}

2. TaskPool任务池

import taskpool from '@ohos.taskpool'

// 定义并发函数
[@Concurrent](/user/Concurrent)
function processLargeData(data: number[]): number[] {
  // 耗时的数据处理
  return data.map(item => item * 2)
}

[@Concurrent](/user/Concurrent)
function calculateFibonacci(n: number): number {
  if (n <= 1) return n
  return calculateFibonacci(n - 1) + calculateFibonacci(n - 2)
}

@Entry
@Component
struct TaskPoolDemo {
  @State result: string = ''
  @State isProcessing: boolean = false

  build() {
    Column({ space: 16 }) {
      Text('TaskPool任务池')
        .fontSize(20)
        .fontWeight(FontWeight.Bold)

      Button('执行单个任务')
        .width('100%')
        .onClick(() => {
          this.executeSingleTask()
        })

      Button('执行并发任务')
        .width('100%')
        .onClick(() => {
          this.executeConcurrentTasks()
        })

      Button('执行任务组')
        .width('100%')
        .onClick(() => {
          this.executeTaskGroup()
        })

      if (this.isProcessing) {
        LoadingProgress()
          .width(40)
          .height(40)
      }

      if (this.result) {
        Text(this.result)
          .fontSize(14)
          .fontColor('#666666')
          .padding(12)
          .backgroundColor('#f5f5f5')
          .borderRadius(8)
          .width('100%')
      }
    }
    .padding(16)
  }

  private async executeSingleTask() {
    try {
      this.isProcessing = true
      this.result = ''

      // 创建任务
      const task = new taskpool.Task(calculateFibonacci, 30)

      // 执行任务
      const result = await taskpool.execute(task) as number

      this.result = `斐波那契数列第30项: ${result}`
      console.log('任务执行完成:', result)
    } catch (error) {
      console.error('任务执行失败:', error)
      this.result = '任务执行失败'
    } finally {
      this.isProcessing = false
    }
  }

  private async executeConcurrentTasks() {
    try {
      this.isProcessing = true
      this.result = ''

      // 创建多个任务
      const tasks = [
        new taskpool.Task(calculateFibonacci, 25),
        new taskpool.Task(calculateFibonacci, 26),
        new taskpool.Task(calculateFibonacci, 27)
      ]

      // 并发执行所有任务
      const results = await Promise.all(
        tasks.map(task => taskpool.execute(task))
      )

      this.result = `并发结果: ${results.join(', ')}`
      console.log('并发任务完成:', results)
    } catch (error) {
      console.error('并发任务失败:', error)
      this.result = '并发任务失败'
    } finally {
      this.isProcessing = false
    }
  }

  private async executeTaskGroup() {
    try {
      this.isProcessing = true
      this.result = ''

      // 创建任务组
      const taskGroup = new taskpool.TaskGroup()
      
      // 添加任务到组
      taskGroup.addTask(new taskpool.Task(calculateFibonacci, 20))
      taskGroup.addTask(new taskpool.Task(calculateFibonacci, 21))
      taskGroup.addTask(new taskpool.Task(calculateFibonacci, 22))

      // 执行任务组
      const results = await taskpool.execute(taskGroup)

      this.result = `任务组结果: ${results.join(', ')}`
      console.log('任务组完成:', results)
    } catch (error) {
      console.error('任务组失败:', error)
      this.result = '任务组失败'
    } finally {
      this.isProcessing = false
    }
  }
}

3. Promise异步编程

// 异步工具类
export class AsyncUtil {
  /**
   * 延时执行
   */
  static delay(ms: number): Promise<void> {
    return new Promise(resolve => {
      setTimeout(resolve, ms)
    })
  }

  /**
   * 带超时的Promise
   */
  static timeout<T>(
    promise: Promise<T>,
    timeoutMs: number
  ): Promise<T> {
    return Promise.race([
      promise,
      new Promise<T>((_, reject) => {
        setTimeout(() => {
          reject(new Error('操作超时'))
        }, timeoutMs)
      })
    ])
  }

  /**
   * 重试执行
   */
  static async retry<T>(
    fn: () => Promise<T>,
    maxRetries: number = 3,
    delayMs: number = 1000
  ): Promise<T> {
    let lastError: Error | undefined

    for (let i = 0; i < maxRetries; i++) {
      try {
        return await fn()
      } catch (error) {
        lastError = error as Error
        console.log(`第${i + 1}次尝试失败:`, lastError.message)

        if (i < maxRetries - 1) {
          await this.delay(delayMs)
        }
      }
    }

    throw lastError
  }

  /**
   * 批量并发执行(限制并发数)
   */
  static async parallelLimit<T, R>(
    items: T[],
    fn: (item: T) => Promise<R>,
    limit: number = 5
  ): Promise<R[]> {
    const results: R[] = []
    const executing: Promise<void>[] = []

    for (const item of items) {
      const promise = fn(item).then(result => {
        results.push(result)
      })

      executing.push(promise)

      if (executing.length >= limit) {
        await Promise.race(executing)
        executing.splice(executing.findIndex(p => p === promise), 1)
      }
    }

    await Promise.all(executing)
    return results
  }

  /**
   * 串行执行
   */
  static async serial<T, R>(
    items: T[],
    fn: (item: T) => Promise<R>
  ): Promise<R[]> {
    const results: R[] = []

    for (const item of items) {
      const result = await fn(item)
      results.push(result)
    }

    return results
  }
}

// 使用示例
@Entry
@Component
struct AsyncDemo {
  @State status: string = ''

  build() {
    Column({ space: 16 }) {
      Text('异步编程')
        .fontSize(20)
        .fontWeight(FontWeight.Bold)

      Button('带超时的请求')
        .width('100%')
        .onClick(() => {
          this.fetchWithTimeout()
        })

      Button('重试请求')
        .width('100%')
        .onClick(() => {
          this.retryRequest()
        })

      Button('限制并发数')
        .width('100%')
        .onClick(() => {
          this.limitedParallel()
        })

      if (this.status) {
        Text(this.status)
          .fontSize(14)
          .padding(12)
          .backgroundColor('#f5f5f5')
          .borderRadius(8)
          .width('100%')
      }
    }
    .padding(16)
  }

  private async fetchWithTimeout() {
    try {
      this.status = '请求中...'

      const result = await AsyncUtil.timeout(
        this.mockApiCall(),
        3000 // 3秒超时
      )

      this.status = `请求成功: ${result}`
    } catch (error) {
      this.status = `请求失败: ${error.message}`
    }
  }

  private async retryRequest() {
    try {
      this.status = '尝试请求...'

      const result = await AsyncUtil.retry(
        () => this.mockFailableApiCall(),
        3, // 最多重试3次
        1000 // 间隔1秒
      )

      this.status = `请求成功: ${result}`
    } catch (error) {
      this.status = `请求失败: ${error.message}`
    }
  }

  private async limitedParallel() {
    try {
      this.status = '批量处理中...'

      const items = Array.from({ length: 20 }, (_, i) => i)

      const results = await AsyncUtil.parallelLimit(
        items,
        async (item) => {
          await AsyncUtil.delay(1000)
          return item * 2
        },
        5 // 最多同时执行5个
      )

      this.status = `处理完成,结果数量: ${results.length}`
    } catch (error) {
      this.status = `处理失败: ${error.message}`
    }
  }

  private async mockApiCall(): Promise<string> {
    await AsyncUtil.delay(1000)
    return '数据加载成功'
  }

  private async mockFailableApiCall(): Promise<string> {
    await AsyncUtil.delay(500)
    
    // 70%概率失败
    if (Math.random() > 0.3) {
      throw new Error('网络错误')
    }

    return '请求成功'
  }
}

4. 多线程工具类

// utils/ThreadUtil.ets
import worker from '@ohos.worker'
import taskpool from '@ohos.taskpool'

export class ThreadUtil {
  private static workerPool: Map<string, worker.ThreadWorker> = new Map()

  /**
   * 创建Worker
   */
  static createWorker(name: string, scriptPath: string): worker.ThreadWorker {
    if (this.workerPool.has(name)) {
      return this.workerPool.get(name)!
    }

    const workerInstance = new worker.ThreadWorker(scriptPath)
    this.workerPool.set(name, workerInstance)
    return workerInstance
  }

  /**
   * 销毁Worker
   */
  static destroyWorker(name: string): void {
    const workerInstance = this.workerPool.get(name)
    if (workerInstance) {
      workerInstance.terminate()
      this.workerPool.delete(name)
    }
  }

  /**
   * 销毁所有Worker
   */
  static destroyAllWorkers(): void {
    this.workerPool.forEach(worker => {
      worker.terminate()
    })
    this.workerPool.clear()
  }

  /**
   * Worker通信包装
   */
  static postToWorker<T>(
    workerName: string,
    message: any
  ): Promise<T> {
    return new Promise((resolve, reject) => {
      const workerInstance = this.workerPool.get(workerName)
      if (!workerInstance) {
        reject(new Error('Worker不存在'))
        return
      }

      // 设置单次消息监听
      const messageHandler = (event: MessageEvent<T>) => {
        workerInstance.off('message', messageHandler)
        resolve(event.data)
      }

      const errorHandler = (error: ErrorEvent) => {
        workerInstance.off('error', errorHandler)
        reject(error)
      }

      workerInstance.on('message', messageHandler)
      workerInstance.on('error', errorHandler)
      workerInstance.postMessage(message)
    })
  }

  /**
   * TaskPool执行
   */
  static async executeTask<T>(
    fn: Function,
    ...args: any[]
  ): Promise<T> {
    const task = new taskpool.Task(fn, ...args)
    return await taskpool.execute(task) as T
  }

  /**
   * TaskPool批量执行
   */
  static async executeTasks<T>(
    tasks: Array<{ fn: Function, args: any[] }>
  ): Promise<T[]> {
    const taskList = tasks.map(({ fn, args }) => 
      new taskpool.Task(fn, ...args)
    )

    const results = await Promise.all(
      taskList.map(task => taskpool.execute(task))
    )

    return results as T[]
  }
}

关键要点

  1. Worker线程: 适合长时间运行的耗时任务
  2. TaskPool: 适合短时间的并发任务
  3. @Concurrent: 标记可并发执行的函数
  4. Promise: 异步编程的基础
  5. 避免阻塞: 耗时操作必须在子线程执行

最佳实践

  1. 合理选择: Worker用于持久任务,TaskPool用于短任务
  2. 资源管理: 及时销毁Worker,释放资源
  3. 错误处理: 捕获Worker和Task的错误
  4. 数据传递: 避免传递大对象,使用序列化数据
  5. 线程安全: 避免共享可变状态

更多关于HarmonyOS鸿蒙Next应用中如何实现多线程处理?的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html


在HarmonyOS鸿蒙Next应用中,多线程处理主要通过TaskPool和Worker模块实现。

TaskPool适用于轻量级并发任务,支持任务优先级管理和取消操作。Worker用于长时间运行的后台任务,通过消息传递与主线程通信。

使用TaskPool时,通过execute()方法提交任务。Worker需在配置文件中声明,通过postMessage()和onmessage()进行线程间通信。

这两种方式都遵循Actor模型,避免共享内存,确保线程安全。

在HarmonyOS Next中,实现多线程处理以提升性能并避免UI卡顿,主要有以下几种核心方式:

1. Worker

用于执行长时间运行的独立任务,拥有独立的线程和上下文。

  • 适用场景:计算密集型、长时间运行且与UI无关的任务。
  • 基本步骤
    • 主线程通过new worker.ThreadWorker()创建Worker实例。
    • 通过postMessage()发送任务数据。
    • 在Worker脚本(例如entry/src/main/ets/workers/MyWorker.ts)中通过onmessage接收并处理。
    • 处理完成后,通过postMessage()将结果返回主线程。
    • 主线程通过onmessage接收结果,任务完成后调用terminate()销毁Worker。
  • 特点:线程独立,通信通过序列化消息进行,开销相对较大。

2. TaskPool

轻量级的任务池,适用于短时、可并发的任务。

  • 适用场景:大量、短生命周期、无依赖的异步任务。
  • 基本使用
    • 将任务函数封装为task: taskpool.Task
    • 使用taskpool.execute(task)执行任务,返回一个Promise
    • 通过thenawait获取结果。
  • 特点:由系统管理线程池,自动调度和复用线程,开销小,效率高。任务函数及其参数需可序列化。

3. 异步编程(Async/Await)

用于处理单线程内的异步操作,避免阻塞UI线程。

  • 适用场景:I/O操作、网络请求等非CPU密集型异步任务。
  • 使用方式
    • 使用async关键字声明异步函数。
    • 在函数内使用await等待Promise完成。
    • 结合系统API(如网络、文件操作)的异步接口使用。
  • 作用:让UI线程在等待异步结果时不被阻塞,保持响应。

选择建议与避免UI卡顿

  • 避免在UI线程进行耗时操作:将计算、I/O等耗时操作移至Worker或TaskPool。
  • 任务类型选择
    • 长时间、复杂计算 → Worker
    • 大量短任务、并行处理 → TaskPool
    • 异步I/O、网络请求 → 异步编程
  • 线程间通信:Worker通信有序列化开销,应减少频繁的大数据传递。TaskPool任务参数也需可序列化。
  • 资源管理:及时销毁不再使用的Worker实例。TaskPool由系统管理,通常无需手动释放。

通过合理组合使用这些机制,可以有效将耗时任务分流,确保UI线程的流畅响应。

回到顶部