HarmonyOS鸿蒙Next中TCP Socket Server怎么使用worker多线程
HarmonyOS鸿蒙Next中TCP Socket Server怎么使用worker多线程 手机开发的软件作为服务端,现在需要多个客户端进行连接服务端,并且当每个客户端连接的时候,服务端会创建一个线程和客户端进行通信。
我是把TCP Socket的代码全部放到 workPort.onmessage 中,但还是没有实现多线程。
Index.ets 和 MyWork.ets的代码如下:
MyWork.ets
import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { socket } from '@kit.NetworkKit'
import { BusinessError } from '@kit.BasicServicesKit'
const TAG = "Worker"
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
workerPort.onmessage = (event: MessageEvents) => {
// 创建一个TCPSocketServer连接,返回一个TCPSocketServer对象。
let tcpServer: socket.TCPSocketServer = socket.constructTCPSocketServerInstance()
// 绑定本地IP地址和端口,进行监听。
let ipAddress: socket.NetAddress = {} as socket.NetAddress
ipAddress.address = "127.0.0.1"
ipAddress.port = 8888
tcpServer.listen(ipAddress).then(() => {
console.log(`listen success`)
}).catch((err: BusinessError) => {
console.log(`listen fail`)
})
class SocketInfo {
message: ArrayBuffer = new ArrayBuffer(1)
remoteInfo: socket.SocketRemoteInfo = {} as socket.SocketRemoteInfo
}
// 订阅TCPSocketServer的connect事件,客户端与服务端建立连接后,返回一个TCPSocketConnection对象,用于与客户端通信。
tcpServer.on("connect", (client: socket.TCPSocketConnection) => {
console.log(`connect success`)
// client即为建立连接后获取到的连接对象,可以通过该对象订阅TCPSocketConnection相关的事件。
client.on("close", () => {
console.log(`on close success`)
})
client.on("message", (value: SocketInfo) => {
let buffer = value.message
let dataView = new DataView(buffer)
let line = ""
for (let i = 0; i < dataView.byteLength - 1; i++) {
line += String.fromCharCode(dataView.getUint8(i))
}
console.log(`received message--: ${line}`)
console.log(`received address--: ${value.remoteInfo.address}`)
console.log(`received family--: ${value.remoteInfo.family}`)
console.log(`received port--: ${value.remoteInfo.port}`)
console.log(`received size--: ${value.remoteInfo.size}`)
})
})
};
Index.ets
import { ErrorEvent, MessageEvents, worker } from '@kit.ArkTS'
@Entry
@Component
struct Index {
aboutToAppear(): void {
console.log("aboutToAppear enter")
let workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');
workerInstance.onmessage = (e: MessageEvents) => {
let data: string = e.data
console.log(`workerInstance onmessage is: ${data}`)
}
workerInstance.postMessage('start worker')
}
build() {
}
}
更多关于HarmonyOS鸿蒙Next中TCP Socket Server怎么使用worker多线程的实战教程也可以访问 https://www.itying.com/category-93-b0.html
问题分析
您希望作为服务端的手机软件能够处理多个客户端连接,并为每个客户端连接创建一个线程进行通信。您当前将TCPSocketServer代码放在worker线程的onmessage
回调中,但认为没有实现多线程。实际上,从鸿蒙文档可知,TCPSocketServer的listen
方法内部已经使用了多线程并发处理客户端连接(参考《js-apis-socket.md》中listen
方法的说明:“监听并接受与此套接字建立的TCPSocket连接。该接口使用多线程并发处理客户端的数据”)。因此,您不需要手动为每个客户端创建单独的worker线程。
正确使用方式
- TCPSocketServer内置多线程:当您调用
listen
方法后,TCPSocketServer会自动使用多线程接受和处理客户端连接。每个客户端连接都会由服务器内部的线程池并发处理,无需您额外创建worker线程。 - 将TCPSocketServer放在worker线程中:为了避免网络操作阻塞主线程(UI线程),您可以将TCPSocketServer放置在worker线程中运行(如您当前所做)。这是推荐的做法,但请注意:
- 在worker线程中,TCPSocketServer的
connect
事件回调(用于处理每个客户端连接)会在TCPSocketServer的内部线程中执行,而不是在worker线程的主线程中执行。因此,您不需要为每个客户端连接创建新的worker线程。 - 如果客户端连接后的数据处理逻辑非常耗时(例如大量计算或IO操作),为了避免阻塞TCPSocketServer的内部线程,您可以在
connect
事件回调中使用taskpool
或嵌套worker来异步处理(参考《faqs-arkts-utils.md》中的taskpool示例)。
- 在worker线程中,TCPSocketServer的
代码改进建议
您的当前代码在worker的onmessage
中创建TCPSocketServer,这可能会导致每次收到消息时都创建新的服务器实例。建议在worker线程的全局作用域中创建TCPSocketServer(只创建一次),而不是在onmessage
回调中。以下是修改后的示例:
MyWorker.ets(优化后)
import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { socket } from '@kit.NetworkKit';
import { BusinessError } from '@kit.BasicServicesKit';
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
// 在全局作用域中创建TCPSocketServer实例,确保只创建一次
let tcpServer: socket.TCPSocketServer = socket.constructTCPSocketServerInstance();
// 绑定和监听操作只需执行一次,因此放在onmessage外部或使用标志位控制
let isListening = false;
workerPort.onmessage = (event: MessageEvents) => {
if (event.data === 'start' && !isListening) {
let ipAddress: socket.NetAddress = {
address: "127.0.0.1",
port: 8888
};
tcpServer.listen(ipAddress).then(() => {
console.log('listen success');
isListening = true;
}).catch((err: BusinessError) => {
console.error('listen fail: ' + JSON.stringify(err));
});
class SocketInfo {
message: ArrayBuffer = new ArrayBuffer(1);
remoteInfo: socket.SocketRemoteInfo = {} as socket.SocketRemoteInfo;
}
tcpServer.on("connect", (client: socket.TCPSocketConnection) => {
console.log('client connected');
client.on("close", () => {
console.log('client closed');
});
client.on("message", (value: SocketInfo) => {
// 处理客户端消息(这里会在TCPSocketServer的内部线程中执行)
let buffer = value.message;
let dataView = new DataView(buffer);
let line = "";
for (let i = 0; i < dataView.byteLength; ++i) {
line += String.fromCharCode(dataView.getUint8(i));
}
console.log(`received message: ${line}`);
// 如果处理逻辑耗时,建议使用taskpool异步处理(参考文档中的taskpool示例)
});
});
}
};
Index.ets(保持不变)
重要说明
- 多线程实现:TCPSocketServer的
listen
方法内部已使用多线程,客户端连接会自动由系统线程池处理。您无需手动创建多个worker线程。 - 耗时操作处理:如果在
connect
事件回调中的数据处理逻辑耗时(如大量计算),建议使用taskpool
异步执行(参考《faqs-arkts-utils.md》),以避免阻塞TCPSocketServer的内部线程。 - 错误处理:确保添加适当的错误处理,如监听
error
事件(参考文档中的错误码部分)。
更多关于HarmonyOS鸿蒙Next中TCP Socket Server怎么使用worker多线程的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html
我把client通过worker传递过来之后,再调用 client.on 有报错TypeError: is not callable
worker发送的代码如下:
tcpServer.on("connect", (client: socket.TCPSocketConnection) => {
console.log(`TestTest connect success`)
let workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWork.ets');
workerInstance.onmessage = (e: MessageEvents) => {
let data: string = e.data
console.log(`TestTest workerInstance onmessage is: ${data}`)
}
workerInstance.postMessage(client)
})
1.关于worker的使用可以参考下这个仓库 [ConcurrentModule: 本示例通过@ohos.taskpool和@ohos.worker接口,展示了如何启动worker线程和taskpool线程。](https://gitee.com/harmonyos_samples/concurrent-module)
2.部分参考代码:
// Index.ets
import { worker, MessageEvents, ErrorEvent } from '@kit.ArkTS';
@Entry
@Component
struct Index {
@State message: string = 'Hello World';
build() {
Row() {
Column() {
Text(this.message)
.fontSize(50)
.fontWeight(FontWeight.Bold)
.onClick(() => {
// 宿主线程中创建Worker对象
const workerInstance = new worker.ThreadWorker("entry/ets/workers/Worker.ets");
// 宿主线程向worker线程传递信息
const buffer = new ArrayBuffer(8);
workerInstance.postMessage(buffer);
// 宿主线程接收worker线程信息
workerInstance.onmessage = (e: MessageEvents): void => {
// data:worker线程发送的信息
let data: Int8Array = e.data;
console.info("main thread data is " + data);
// 销毁Worker对象
workerInstance.terminate();
}
// 在调用terminate后,执行onexit
workerInstance.onexit = (code) => {
console.info("main thread terminate");
}
workerInstance.onAllErrors = (err: ErrorEvent) => {
console.error("main error message " + err.message);
}
})
}
.width('100%')
.height('100%')
}
}
}
参考文档:[@ohos.worker (启动一个Worker)-ArkTS API-ArkTS(方舟编程语言)-应用框架 - 华为HarmonyOS开发者](https://developer.huawei.com/consumer/cn/doc/harmonyos-references/js-apis-worker#stage模型)
Worker线程核心特性:每个Worker实例对应一个独立线程,但默认情况下单个Worker内部逻辑是单线程执行的(所有onmessage回调都在同一个线程)
TCP连接处理误区:当前代码将整个TCP服务端监听逻辑都放在Worker的onmessage回调中,会导致每次主线程postMessage时重复创建服务端实例
重构服务端逻辑(MyWork.ets)
import { ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { socket } from '@kit.NetworkKit'
import { BusinessError } from '@kit.BasicServicesKit'
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
// 创建全局TCP服务端实例
let tcpServer: socket.TCPSocketServer = socket.constructTCPSocketServerInstance();
// 服务端初始化(只执行一次)
const initServer = () => {
const ipAddress: socket.NetAddress = {
address: "127.0.0.1",
port: 8888
} as socket.NetAddress;
tcpServer.listen(ipAddress).then(() => {
workerPort.postMessage("SERVER_READY");
}).catch((err: BusinessError) => {
workerPort.postMessage(`SERVER_ERROR:${err.message}`);
});
// 监听客户端连接
tcpServer.on("connect", (client: socket.TCPSocketConnection) => {
// 为每个客户端创建新的Worker线程
const clientWorker = new worker.ThreadWorker('entry/ets/workers/ClientHandler.ets');
// 将客户端连接对象传递给新Worker
clientWorker.postMessage({
type: "NEW_CLIENT",
connection: client
});
});
}
// 主线程消息处理
workerPort.onmessage = (event: MessageEvents) => {
if (event.data === 'START_SERVER') {
initServer();
}
};
创建客户端处理线程(ClientHandler.ets)
import { ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { socket } from '@kit.NetworkKit'
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
workerPort.onmessage = (event: MessageEvents) => {
if (event.data.type === "NEW_CLIENT") {
const client: socket.TCPSocketConnection = event.data.connection;
// 设置客户端消息回调
client.on("message", (value: SocketInfo) => {
const buffer = value.message;
// 业务处理逻辑...
// 向主线程发送处理结果
workerPort.postMessage({
type: "CLIENT_MESSAGE",
data: processMessage(buffer)
});
});
client.on("close", () => {
workerPort.postMessage({
type: "CLIENT_DISCONNECTED",
address: client.remoteAddress
});
});
}
};
修改主线程调用(Index.ets)
@Entry
@Component
struct Index {
private workerInstance: worker.ThreadWorker;
aboutToAppear(): void {
this.workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');
this.workerInstance.onmessage = (e: MessageEvents) => {
if (e.data === 'SERVER_READY') {
console.log("TCP Server started successfully");
}
};
this.workerInstance.postMessage('START_SERVER');
}
build() {
// UI构建...
}
}
Worker生命周期错误:将TCPSocketServer初始化逻辑放在onmessage回调中,导致每次主线程发送消息都会重复创建服务器实例,另外所有客户端连接处理集中在单个Worker线程,未实现真正的多线程并发。Worker线程本身是独立线程,但每个Worker只能运行单实例任务
解决方案:
1/ 重构Worker架构
// MyWork.ets
import { ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { socket } from '@kit.NetworkKit';
import { BusinessError } from '@kit.BasicServicesKit';
import { taskpool } from '@kit.ArkTS';
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
// 初始化TCP服务器
let tcpServer: socket.TCPSocketServer = socket.constructTCPSocketServerInstance();
let ipAddress: socket.NetAddress = { address: "127.0.0.1", port: 8888 };
// 启动监听
tcpServer.listen(ipAddress).then(() => {
console.log(`Server listening on ${ipAddress.address}:${ipAddress.port}`);
tcpServer.on("connect", (client: socket.TCPSocketConnection) => {
// 使用TaskPool处理客户端连接
const task = new taskpool.Task(clientHandler, client);
taskpool.execute(task);
});
}).catch((err: BusinessError) => {
console.error(`Listen failed: ${err.code}`);
});
// 客户端处理函数
function clientHandler(client: socket.TCPSocketConnection) {
client.on("message", (value: SocketInfo) => {
// 处理消息逻辑
const buffer = value.message;
// ... 数据解析逻辑
});
client.on("close", () => {
console.log(`Client ${client.remoteInfo?.address} disconnected`);
});
}
workerPort.onmessage = (e: MessageEvents) => {
// 接收主线程控制命令
};
2/ 主线程优化
// Index.ets
@Entry
@Component
struct Index {
private workerInstance?: worker.ThreadWorker;
aboutToAppear() {
this.workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');
this.workerInstance.onerror = (e: ErrorEvent) => {
console.error(`Worker error: ${e.message}`);
};
}
build() {
Button('启动服务端').onClick(() => {
this.workerInstance?.postMessage('START');
});
}
}
找HarmonyOS工作还需要会Flutter技术的哦,有需要Flutter教程的可以学学大地老师的教程,很不错,B站免费学的哦:https://www.bilibili.com/video/BV1S4411E7LY/?p=17
我用2个客户端去连接,日志显示,进程号和线程号都是同一个
在HarmonyOS Next中,使用Worker创建多线程TCP Socket Server需通过Worker线程处理连接。在主线程中初始化TCPSocket,绑定端口并监听。收到连接请求后,将连接的socket句柄传递给Worker线程,由Worker线程处理具体的数据收发及业务逻辑。通过postMessage与主线程通信,Worker内使用TCPSocket相关API操作连接。
在HarmonyOS Next中,你的代码没有实现多线程的原因是将TCP Socket Server的监听和客户端处理逻辑都放在了同一个worker的onmessage回调中。这样每次收到消息都会重新创建TCPSocketServer实例,而不是为每个客户端连接创建独立的线程。
正确的做法是:在主线程中创建TCP服务器,当accept到新连接时,通过workerPool为每个客户端连接创建独立的worker线程来处理通信。以下是修改建议:
- 在主线程(Index.ets)中创建TCPSocketServer并监听
- 在connect事件回调中,使用worker.ThreadWorker为每个客户端连接创建新的worker实例
- 将client对象传递给worker线程进行处理
这样每个客户端连接都会在独立的worker线程中处理消息,实现真正的多线程通信。
注意:需要合理管理worker线程的生命周期,在连接关闭时及时终止对应的worker线程。