HarmonyOS鸿蒙Next应用中如何实现多线程处理?
HarmonyOS鸿蒙Next应用中如何实现多线程处理? 在HarmonyOS应用中如何实现多线程处理?如何使用Worker、TaskPool和异步编程避免UI卡顿?
3 回复
解决方案
1. Worker线程基础
import worker from '@ohos.worker'
// 主线程代码
@Entry
@Component
struct WorkerDemo {
@State result: string = ''
@State isProcessing: boolean = false
private workerInstance?: worker.ThreadWorker
aboutToAppear() {
this.createWorker()
}
aboutToDisappear() {
this.destroyWorker()
}
build() {
Column({ space: 16 }) {
Text('Worker多线程')
.fontSize(20)
.fontWeight(FontWeight.Bold)
Button('开始计算')
.width('100%')
.enabled(!this.isProcessing)
.onClick(() => {
this.startHeavyTask()
})
if (this.isProcessing) {
Row({ space: 8 }) {
LoadingProgress()
.width(20)
.height(20)
Text('计算中...')
}
}
if (this.result) {
Text('结果:')
.fontSize(16)
.fontWeight(FontWeight.Bold)
Text(this.result)
.fontSize(14)
.fontColor('#1890ff')
}
}
.padding(16)
}
private createWorker() {
try {
// 创建Worker实例
this.workerInstance = new worker.ThreadWorker('entry/ets/workers/TaskWorker.ts')
// 监听Worker消息
this.workerInstance.onmessage = (event) => {
console.log('收到Worker消息:', event.data)
this.result = event.data
this.isProcessing = false
}
// 监听Worker错误
this.workerInstance.onerror = (error) => {
console.error('Worker错误:', error.message)
this.isProcessing = false
}
} catch (error) {
console.error('创建Worker失败:', error)
}
}
private startHeavyTask() {
if (!this.workerInstance) return
this.isProcessing = true
this.result = ''
// 向Worker发送消息
this.workerInstance.postMessage({
type: 'calculate',
data: {
start: 1,
end: 1000000
}
})
}
private destroyWorker() {
if (this.workerInstance) {
this.workerInstance.terminate()
this.workerInstance = undefined
}
}
}
// Worker线程代码
// entry/ets/workers/TaskWorker.ts
import worker from '@ohos.worker'
const workerPort = worker.workerPort
// 监听主线程消息
workerPort.onmessage = (event) => {
console.log('Worker收到消息:', event.data)
const { type, data } = event.data
if (type === 'calculate') {
// 执行耗时计算
const result = calculateSum(data.start, data.end)
// 发送结果回主线程
workerPort.postMessage(`计算结果: ${result}`)
}
}
function calculateSum(start: number, end: number): number {
let sum = 0
for (let i = start; i <= end; i++) {
sum += i
}
return sum
}
// 监听错误
workerPort.onerror = (error) => {
console.error('Worker内部错误:', error.message)
}
2. TaskPool任务池
import taskpool from '@ohos.taskpool'
// 定义并发函数
[@Concurrent](/user/Concurrent)
function processLargeData(data: number[]): number[] {
// 耗时的数据处理
return data.map(item => item * 2)
}
[@Concurrent](/user/Concurrent)
function calculateFibonacci(n: number): number {
if (n <= 1) return n
return calculateFibonacci(n - 1) + calculateFibonacci(n - 2)
}
@Entry
@Component
struct TaskPoolDemo {
@State result: string = ''
@State isProcessing: boolean = false
build() {
Column({ space: 16 }) {
Text('TaskPool任务池')
.fontSize(20)
.fontWeight(FontWeight.Bold)
Button('执行单个任务')
.width('100%')
.onClick(() => {
this.executeSingleTask()
})
Button('执行并发任务')
.width('100%')
.onClick(() => {
this.executeConcurrentTasks()
})
Button('执行任务组')
.width('100%')
.onClick(() => {
this.executeTaskGroup()
})
if (this.isProcessing) {
LoadingProgress()
.width(40)
.height(40)
}
if (this.result) {
Text(this.result)
.fontSize(14)
.fontColor('#666666')
.padding(12)
.backgroundColor('#f5f5f5')
.borderRadius(8)
.width('100%')
}
}
.padding(16)
}
private async executeSingleTask() {
try {
this.isProcessing = true
this.result = ''
// 创建任务
const task = new taskpool.Task(calculateFibonacci, 30)
// 执行任务
const result = await taskpool.execute(task) as number
this.result = `斐波那契数列第30项: ${result}`
console.log('任务执行完成:', result)
} catch (error) {
console.error('任务执行失败:', error)
this.result = '任务执行失败'
} finally {
this.isProcessing = false
}
}
private async executeConcurrentTasks() {
try {
this.isProcessing = true
this.result = ''
// 创建多个任务
const tasks = [
new taskpool.Task(calculateFibonacci, 25),
new taskpool.Task(calculateFibonacci, 26),
new taskpool.Task(calculateFibonacci, 27)
]
// 并发执行所有任务
const results = await Promise.all(
tasks.map(task => taskpool.execute(task))
)
this.result = `并发结果: ${results.join(', ')}`
console.log('并发任务完成:', results)
} catch (error) {
console.error('并发任务失败:', error)
this.result = '并发任务失败'
} finally {
this.isProcessing = false
}
}
private async executeTaskGroup() {
try {
this.isProcessing = true
this.result = ''
// 创建任务组
const taskGroup = new taskpool.TaskGroup()
// 添加任务到组
taskGroup.addTask(new taskpool.Task(calculateFibonacci, 20))
taskGroup.addTask(new taskpool.Task(calculateFibonacci, 21))
taskGroup.addTask(new taskpool.Task(calculateFibonacci, 22))
// 执行任务组
const results = await taskpool.execute(taskGroup)
this.result = `任务组结果: ${results.join(', ')}`
console.log('任务组完成:', results)
} catch (error) {
console.error('任务组失败:', error)
this.result = '任务组失败'
} finally {
this.isProcessing = false
}
}
}
3. Promise异步编程
// 异步工具类
export class AsyncUtil {
/**
* 延时执行
*/
static delay(ms: number): Promise<void> {
return new Promise(resolve => {
setTimeout(resolve, ms)
})
}
/**
* 带超时的Promise
*/
static timeout<T>(
promise: Promise<T>,
timeoutMs: number
): Promise<T> {
return Promise.race([
promise,
new Promise<T>((_, reject) => {
setTimeout(() => {
reject(new Error('操作超时'))
}, timeoutMs)
})
])
}
/**
* 重试执行
*/
static async retry<T>(
fn: () => Promise<T>,
maxRetries: number = 3,
delayMs: number = 1000
): Promise<T> {
let lastError: Error | undefined
for (let i = 0; i < maxRetries; i++) {
try {
return await fn()
} catch (error) {
lastError = error as Error
console.log(`第${i + 1}次尝试失败:`, lastError.message)
if (i < maxRetries - 1) {
await this.delay(delayMs)
}
}
}
throw lastError
}
/**
* 批量并发执行(限制并发数)
*/
static async parallelLimit<T, R>(
items: T[],
fn: (item: T) => Promise<R>,
limit: number = 5
): Promise<R[]> {
const results: R[] = []
const executing: Promise<void>[] = []
for (const item of items) {
const promise = fn(item).then(result => {
results.push(result)
})
executing.push(promise)
if (executing.length >= limit) {
await Promise.race(executing)
executing.splice(executing.findIndex(p => p === promise), 1)
}
}
await Promise.all(executing)
return results
}
/**
* 串行执行
*/
static async serial<T, R>(
items: T[],
fn: (item: T) => Promise<R>
): Promise<R[]> {
const results: R[] = []
for (const item of items) {
const result = await fn(item)
results.push(result)
}
return results
}
}
// 使用示例
@Entry
@Component
struct AsyncDemo {
@State status: string = ''
build() {
Column({ space: 16 }) {
Text('异步编程')
.fontSize(20)
.fontWeight(FontWeight.Bold)
Button('带超时的请求')
.width('100%')
.onClick(() => {
this.fetchWithTimeout()
})
Button('重试请求')
.width('100%')
.onClick(() => {
this.retryRequest()
})
Button('限制并发数')
.width('100%')
.onClick(() => {
this.limitedParallel()
})
if (this.status) {
Text(this.status)
.fontSize(14)
.padding(12)
.backgroundColor('#f5f5f5')
.borderRadius(8)
.width('100%')
}
}
.padding(16)
}
private async fetchWithTimeout() {
try {
this.status = '请求中...'
const result = await AsyncUtil.timeout(
this.mockApiCall(),
3000 // 3秒超时
)
this.status = `请求成功: ${result}`
} catch (error) {
this.status = `请求失败: ${error.message}`
}
}
private async retryRequest() {
try {
this.status = '尝试请求...'
const result = await AsyncUtil.retry(
() => this.mockFailableApiCall(),
3, // 最多重试3次
1000 // 间隔1秒
)
this.status = `请求成功: ${result}`
} catch (error) {
this.status = `请求失败: ${error.message}`
}
}
private async limitedParallel() {
try {
this.status = '批量处理中...'
const items = Array.from({ length: 20 }, (_, i) => i)
const results = await AsyncUtil.parallelLimit(
items,
async (item) => {
await AsyncUtil.delay(1000)
return item * 2
},
5 // 最多同时执行5个
)
this.status = `处理完成,结果数量: ${results.length}`
} catch (error) {
this.status = `处理失败: ${error.message}`
}
}
private async mockApiCall(): Promise<string> {
await AsyncUtil.delay(1000)
return '数据加载成功'
}
private async mockFailableApiCall(): Promise<string> {
await AsyncUtil.delay(500)
// 70%概率失败
if (Math.random() > 0.3) {
throw new Error('网络错误')
}
return '请求成功'
}
}
4. 多线程工具类
// utils/ThreadUtil.ets
import worker from '@ohos.worker'
import taskpool from '@ohos.taskpool'
export class ThreadUtil {
private static workerPool: Map<string, worker.ThreadWorker> = new Map()
/**
* 创建Worker
*/
static createWorker(name: string, scriptPath: string): worker.ThreadWorker {
if (this.workerPool.has(name)) {
return this.workerPool.get(name)!
}
const workerInstance = new worker.ThreadWorker(scriptPath)
this.workerPool.set(name, workerInstance)
return workerInstance
}
/**
* 销毁Worker
*/
static destroyWorker(name: string): void {
const workerInstance = this.workerPool.get(name)
if (workerInstance) {
workerInstance.terminate()
this.workerPool.delete(name)
}
}
/**
* 销毁所有Worker
*/
static destroyAllWorkers(): void {
this.workerPool.forEach(worker => {
worker.terminate()
})
this.workerPool.clear()
}
/**
* Worker通信包装
*/
static postToWorker<T>(
workerName: string,
message: any
): Promise<T> {
return new Promise((resolve, reject) => {
const workerInstance = this.workerPool.get(workerName)
if (!workerInstance) {
reject(new Error('Worker不存在'))
return
}
// 设置单次消息监听
const messageHandler = (event: MessageEvent<T>) => {
workerInstance.off('message', messageHandler)
resolve(event.data)
}
const errorHandler = (error: ErrorEvent) => {
workerInstance.off('error', errorHandler)
reject(error)
}
workerInstance.on('message', messageHandler)
workerInstance.on('error', errorHandler)
workerInstance.postMessage(message)
})
}
/**
* TaskPool执行
*/
static async executeTask<T>(
fn: Function,
...args: any[]
): Promise<T> {
const task = new taskpool.Task(fn, ...args)
return await taskpool.execute(task) as T
}
/**
* TaskPool批量执行
*/
static async executeTasks<T>(
tasks: Array<{ fn: Function, args: any[] }>
): Promise<T[]> {
const taskList = tasks.map(({ fn, args }) =>
new taskpool.Task(fn, ...args)
)
const results = await Promise.all(
taskList.map(task => taskpool.execute(task))
)
return results as T[]
}
}
关键要点
- Worker线程: 适合长时间运行的耗时任务
- TaskPool: 适合短时间的并发任务
- @Concurrent: 标记可并发执行的函数
- Promise: 异步编程的基础
- 避免阻塞: 耗时操作必须在子线程执行
最佳实践
- 合理选择: Worker用于持久任务,TaskPool用于短任务
- 资源管理: 及时销毁Worker,释放资源
- 错误处理: 捕获Worker和Task的错误
- 数据传递: 避免传递大对象,使用序列化数据
- 线程安全: 避免共享可变状态
更多关于HarmonyOS鸿蒙Next应用中如何实现多线程处理?的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html
在HarmonyOS鸿蒙Next应用中,多线程处理主要通过TaskPool和Worker模块实现。
TaskPool适用于轻量级并发任务,支持任务优先级管理和取消操作。Worker用于长时间运行的后台任务,通过消息传递与主线程通信。
使用TaskPool时,通过execute()方法提交任务。Worker需在配置文件中声明,通过postMessage()和onmessage()进行线程间通信。
这两种方式都遵循Actor模型,避免共享内存,确保线程安全。
在HarmonyOS Next中,实现多线程处理以提升性能并避免UI卡顿,主要有以下几种核心方式:
1. Worker
用于执行长时间运行的独立任务,拥有独立的线程和上下文。
- 适用场景:计算密集型、长时间运行且与UI无关的任务。
- 基本步骤:
- 主线程通过
new worker.ThreadWorker()创建Worker实例。 - 通过
postMessage()发送任务数据。 - 在Worker脚本(例如
entry/src/main/ets/workers/MyWorker.ts)中通过onmessage接收并处理。 - 处理完成后,通过
postMessage()将结果返回主线程。 - 主线程通过
onmessage接收结果,任务完成后调用terminate()销毁Worker。
- 主线程通过
- 特点:线程独立,通信通过序列化消息进行,开销相对较大。
2. TaskPool
轻量级的任务池,适用于短时、可并发的任务。
- 适用场景:大量、短生命周期、无依赖的异步任务。
- 基本使用:
- 将任务函数封装为
task: taskpool.Task。 - 使用
taskpool.execute(task)执行任务,返回一个Promise。 - 通过
then或await获取结果。
- 将任务函数封装为
- 特点:由系统管理线程池,自动调度和复用线程,开销小,效率高。任务函数及其参数需可序列化。
3. 异步编程(Async/Await)
用于处理单线程内的异步操作,避免阻塞UI线程。
- 适用场景:I/O操作、网络请求等非CPU密集型异步任务。
- 使用方式:
- 使用
async关键字声明异步函数。 - 在函数内使用
await等待Promise完成。 - 结合系统API(如网络、文件操作)的异步接口使用。
- 使用
- 作用:让UI线程在等待异步结果时不被阻塞,保持响应。
选择建议与避免UI卡顿
- 避免在UI线程进行耗时操作:将计算、I/O等耗时操作移至Worker或TaskPool。
- 任务类型选择:
- 长时间、复杂计算 → Worker。
- 大量短任务、并行处理 → TaskPool。
- 异步I/O、网络请求 → 异步编程。
- 线程间通信:Worker通信有序列化开销,应减少频繁的大数据传递。TaskPool任务参数也需可序列化。
- 资源管理:及时销毁不再使用的Worker实例。TaskPool由系统管理,通常无需手动释放。
通过合理组合使用这些机制,可以有效将耗时任务分流,确保UI线程的流畅响应。

