HarmonyOS鸿蒙Next中TCP Socket Server怎么使用worker多线程

HarmonyOS鸿蒙Next中TCP Socket Server怎么使用worker多线程 手机开发的软件作为服务端,现在需要多个客户端进行连接服务端,并且当每个客户端连接的时候,服务端会创建一个线程和客户端进行通信。

我是把TCP Socket的代码全部放到 workPort.onmessage 中,但还是没有实现多线程。

Index.ets 和 MyWork.ets的代码如下:

MyWork.ets
import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { socket } from '@kit.NetworkKit'
import { BusinessError } from '@kit.BasicServicesKit'

const TAG = "Worker"
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

workerPort.onmessage = (event: MessageEvents) => {
  // 创建一个TCPSocketServer连接,返回一个TCPSocketServer对象。
  let tcpServer: socket.TCPSocketServer = socket.constructTCPSocketServerInstance()

  // 绑定本地IP地址和端口,进行监听。
  let ipAddress: socket.NetAddress = {} as socket.NetAddress
  ipAddress.address = "127.0.0.1"
  ipAddress.port = 8888
  tcpServer.listen(ipAddress).then(() => {
    console.log(`listen success`)
  }).catch((err: BusinessError) => {
    console.log(`listen fail`)
  })

  class SocketInfo {
    message: ArrayBuffer = new ArrayBuffer(1)
    remoteInfo: socket.SocketRemoteInfo = {} as socket.SocketRemoteInfo
  }

  // 订阅TCPSocketServer的connect事件,客户端与服务端建立连接后,返回一个TCPSocketConnection对象,用于与客户端通信。
  tcpServer.on("connect", (client: socket.TCPSocketConnection) => {
    console.log(`connect success`)

    // client即为建立连接后获取到的连接对象,可以通过该对象订阅TCPSocketConnection相关的事件。
    client.on("close", () => {
      console.log(`on close success`)
    })
    client.on("message", (value: SocketInfo) => {
      let buffer = value.message
      let dataView = new DataView(buffer)
      let line = ""
      for (let i = 0; i < dataView.byteLength - 1; i++) {
        line += String.fromCharCode(dataView.getUint8(i))
      }
      console.log(`received message--: ${line}`)
      console.log(`received address--: ${value.remoteInfo.address}`)
      console.log(`received family--: ${value.remoteInfo.family}`)
      console.log(`received port--: ${value.remoteInfo.port}`)
      console.log(`received size--: ${value.remoteInfo.size}`)
    })
  })

};
Index.ets
import { ErrorEvent, MessageEvents, worker } from '@kit.ArkTS'

@Entry
@Component
struct Index {
  aboutToAppear(): void {
    console.log("aboutToAppear enter")
    let workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');

    workerInstance.onmessage = (e: MessageEvents) => {
      let data: string = e.data
      console.log(`workerInstance onmessage is: ${data}`)
    }

    workerInstance.postMessage('start worker')
  }

  build() {

  }
}

更多关于HarmonyOS鸿蒙Next中TCP Socket Server怎么使用worker多线程的实战教程也可以访问 https://www.itying.com/category-93-b0.html

9 回复

问题分析

您希望作为服务端的手机软件能够处理多个客户端连接,并为每个客户端连接创建一个线程进行通信。您当前将TCPSocketServer代码放在worker线程的onmessage回调中,但认为没有实现多线程。实际上,从鸿蒙文档可知,TCPSocketServer的listen方法内部已经使用了多线程并发处理客户端连接(参考《js-apis-socket.md》中listen方法的说明:“监听并接受与此套接字建立的TCPSocket连接。该接口使用多线程并发处理客户端的数据”)。因此,您不需要手动为每个客户端创建单独的worker线程。

正确使用方式

  1. TCPSocketServer内置多线程:当您调用listen方法后,TCPSocketServer会自动使用多线程接受和处理客户端连接。每个客户端连接都会由服务器内部的线程池并发处理,无需您额外创建worker线程。
  2. 将TCPSocketServer放在worker线程中:为了避免网络操作阻塞主线程(UI线程),您可以将TCPSocketServer放置在worker线程中运行(如您当前所做)。这是推荐的做法,但请注意:
    • 在worker线程中,TCPSocketServer的connect事件回调(用于处理每个客户端连接)会在TCPSocketServer的内部线程中执行,而不是在worker线程的主线程中执行。因此,您不需要为每个客户端连接创建新的worker线程。
    • 如果客户端连接后的数据处理逻辑非常耗时(例如大量计算或IO操作),为了避免阻塞TCPSocketServer的内部线程,您可以在connect事件回调中使用taskpool或嵌套worker来异步处理(参考《faqs-arkts-utils.md》中的taskpool示例)。

代码改进建议

您的当前代码在worker的onmessage中创建TCPSocketServer,这可能会导致每次收到消息时都创建新的服务器实例。建议在worker线程的全局作用域中创建TCPSocketServer(只创建一次),而不是在onmessage回调中。以下是修改后的示例:

MyWorker.ets(优化后)

import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { socket } from '@kit.NetworkKit';
import { BusinessError } from '@kit.BasicServicesKit';

const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

// 在全局作用域中创建TCPSocketServer实例,确保只创建一次
let tcpServer: socket.TCPSocketServer = socket.constructTCPSocketServerInstance();

// 绑定和监听操作只需执行一次,因此放在onmessage外部或使用标志位控制
let isListening = false;

workerPort.onmessage = (event: MessageEvents) => {
  if (event.data === 'start' && !isListening) {
    let ipAddress: socket.NetAddress = {
      address: "127.0.0.1",
      port: 8888
    };

    tcpServer.listen(ipAddress).then(() => {
      console.log('listen success');
      isListening = true;
    }).catch((err: BusinessError) => {
      console.error('listen fail: ' + JSON.stringify(err));
    });

    class SocketInfo {
      message: ArrayBuffer = new ArrayBuffer(1);
      remoteInfo: socket.SocketRemoteInfo = {} as socket.SocketRemoteInfo;
    }

    tcpServer.on("connect", (client: socket.TCPSocketConnection) => {
      console.log('client connected');

      client.on("close", () => {
        console.log('client closed');
      });

      client.on("message", (value: SocketInfo) => {
        // 处理客户端消息(这里会在TCPSocketServer的内部线程中执行)
        let buffer = value.message;
        let dataView = new DataView(buffer);
        let line = "";
        for (let i = 0; i < dataView.byteLength; ++i) {
          line += String.fromCharCode(dataView.getUint8(i));
        }
        console.log(`received message: ${line}`);
        // 如果处理逻辑耗时,建议使用taskpool异步处理(参考文档中的taskpool示例)
      });
    });
  }
};

Index.ets(保持不变)

重要说明

  • 多线程实现:TCPSocketServer的listen方法内部已使用多线程,客户端连接会自动由系统线程池处理。您无需手动创建多个worker线程。
  • 耗时操作处理:如果在connect事件回调中的数据处理逻辑耗时(如大量计算),建议使用taskpool异步执行(参考《faqs-arkts-utils.md》),以避免阻塞TCPSocketServer的内部线程。
  • 错误处理:确保添加适当的错误处理,如监听error事件(参考文档中的错误码部分)。

更多关于HarmonyOS鸿蒙Next中TCP Socket Server怎么使用worker多线程的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html


cke_133.png

我把client通过worker传递过来之后,再调用 client.on 有报错TypeError: is not callable

worker发送的代码如下:

tcpServer.on("connect", (client: socket.TCPSocketConnection) => {
  console.log(`TestTest connect success`)

  let workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWork.ets');

  workerInstance.onmessage = (e: MessageEvents) => {
    let data: string = e.data
    console.log(`TestTest workerInstance onmessage is: ${data}`)
  }

  workerInstance.postMessage(client)
})

1.关于worker的使用可以参考下这个仓库 [ConcurrentModule: 本示例通过@ohos.taskpool和@ohos.worker接口,展示了如何启动worker线程和taskpool线程。](https://gitee.com/harmonyos_samples/concurrent-module)

2.部分参考代码:

// Index.ets
import { worker, MessageEvents, ErrorEvent } from '@kit.ArkTS';

@Entry
@Component
struct Index {
  @State message: string = 'Hello World';
  build() {
    Row() {
      Column() {
        Text(this.message)
          .fontSize(50)
          .fontWeight(FontWeight.Bold)
          .onClick(() => {
            // 宿主线程中创建Worker对象
            const workerInstance = new worker.ThreadWorker("entry/ets/workers/Worker.ets");
            // 宿主线程向worker线程传递信息
            const buffer = new ArrayBuffer(8);
            workerInstance.postMessage(buffer);
            // 宿主线程接收worker线程信息
            workerInstance.onmessage = (e: MessageEvents): void => {
              // data:worker线程发送的信息
              let data: Int8Array = e.data;
              console.info("main thread data is  " + data);
              // 销毁Worker对象
              workerInstance.terminate();
            }
            // 在调用terminate后,执行onexit
            workerInstance.onexit = (code) => {
              console.info("main thread terminate");
            }

            workerInstance.onAllErrors = (err: ErrorEvent) => {
              console.error("main error message " + err.message);
            }
          })
      }
      .width('100%')
      .height('100%')
    }
  }
}

参考文档:[@ohos.worker (启动一个Worker)-ArkTS API-ArkTS(方舟编程语言)-应用框架 - 华为HarmonyOS开发者](https://developer.huawei.com/consumer/cn/doc/harmonyos-references/js-apis-worker#stage模型)

Worker线程核心特性:每个Worker实例对应一个独立线程,但默认情况下单个Worker内部逻辑是单线程执行的(所有onmessage回调都在同一个线程)

TCP连接处理误区:当前代码将整个TCP服务端监听逻辑都放在Worker的onmessage回调中,会导致每次主线程postMessage时重复创建服务端实例

重构服务端逻辑(MyWork.ets)

import { ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { socket } from '@kit.NetworkKit'
import { BusinessError } from '@kit.BasicServicesKit'

const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

// 创建全局TCP服务端实例
let tcpServer: socket.TCPSocketServer = socket.constructTCPSocketServerInstance();

// 服务端初始化(只执行一次)
const initServer = () => {
    const ipAddress: socket.NetAddress = {
        address: "127.0.0.1",
        port: 8888
    } as socket.NetAddress;

    tcpServer.listen(ipAddress).then(() => {
        workerPort.postMessage("SERVER_READY");
    }).catch((err: BusinessError) => {
        workerPort.postMessage(`SERVER_ERROR:${err.message}`);
    });

    // 监听客户端连接
    tcpServer.on("connect", (client: socket.TCPSocketConnection) => {
        // 为每个客户端创建新的Worker线程
        const clientWorker = new worker.ThreadWorker('entry/ets/workers/ClientHandler.ets');
        
        // 将客户端连接对象传递给新Worker
        clientWorker.postMessage({ 
            type: "NEW_CLIENT", 
            connection: client 
        });
    });
}

// 主线程消息处理
workerPort.onmessage = (event: MessageEvents) => {
    if (event.data === 'START_SERVER') {
        initServer();
    }
};

创建客户端处理线程(ClientHandler.ets)

import { ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { socket } from '@kit.NetworkKit'

const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

workerPort.onmessage = (event: MessageEvents) => {
    if (event.data.type === "NEW_CLIENT") {
        const client: socket.TCPSocketConnection = event.data.connection;

        // 设置客户端消息回调
        client.on("message", (value: SocketInfo) => {
            const buffer = value.message;
            // 业务处理逻辑...
            
            // 向主线程发送处理结果
            workerPort.postMessage({
                type: "CLIENT_MESSAGE",
                data: processMessage(buffer)
            });
        });

        client.on("close", () => {
            workerPort.postMessage({ 
                type: "CLIENT_DISCONNECTED",
                address: client.remoteAddress 
            });
        });
    }
};

修改主线程调用(Index.ets)

@Entry
@Component
struct Index {
    private workerInstance: worker.ThreadWorker;

    aboutToAppear(): void {
        this.workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');
        
        this.workerInstance.onmessage = (e: MessageEvents) => {
            if (e.data === 'SERVER_READY') {
                console.log("TCP Server started successfully");
            }
        };

        this.workerInstance.postMessage('START_SERVER');
    }

    build() {
        // UI构建...
    }
}

Worker生命周期错误:将TCPSocketServer初始化逻辑放在onmessage回调中,导致每次主线程发送消息都会重复创建服务器实例,另外所有客户端连接处理集中在单个Worker线程,未实现真正的多线程并发。Worker线程本身是独立线程,但每个Worker只能运行单实例任务

解决方案:

1/ 重构Worker架构

// MyWork.ets

import { ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';

import { socket } from '@kit.NetworkKit';

import { BusinessError } from '@kit.BasicServicesKit';

import { taskpool } from '@kit.ArkTS';

const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

// 初始化TCP服务器

let tcpServer: socket.TCPSocketServer = socket.constructTCPSocketServerInstance();

let ipAddress: socket.NetAddress = { address: "127.0.0.1", port: 8888 };

// 启动监听

tcpServer.listen(ipAddress).then(() => {

  console.log(`Server listening on ${ipAddress.address}:${ipAddress.port}`);

  tcpServer.on("connect", (client: socket.TCPSocketConnection) => {

    // 使用TaskPool处理客户端连接

    const task = new taskpool.Task(clientHandler, client);

    taskpool.execute(task);

  });

}).catch((err: BusinessError) => {

  console.error(`Listen failed: ${err.code}`);

});

// 客户端处理函数

function clientHandler(client: socket.TCPSocketConnection) {

  client.on("message", (value: SocketInfo) => {

    // 处理消息逻辑

    const buffer = value.message;

    // ... 数据解析逻辑

  });

  client.on("close", () => {

    console.log(`Client ${client.remoteInfo?.address} disconnected`);

  });

}

workerPort.onmessage = (e: MessageEvents) => {

  // 接收主线程控制命令

};

2/ 主线程优化

// Index.ets

@Entry

@Component

struct Index {

  private workerInstance?: worker.ThreadWorker;

  aboutToAppear() {

    this.workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');

    this.workerInstance.onerror = (e: ErrorEvent) => {

      console.error(`Worker error: ${e.message}`);

    };

  }

  build() {

    Button('启动服务端').onClick(() => {

      this.workerInstance?.postMessage('START');

    });

  }

}

找HarmonyOS工作还需要会Flutter技术的哦,有需要Flutter教程的可以学学大地老师的教程,很不错,B站免费学的哦:https://www.bilibili.com/video/BV1S4411E7LY/?p=17

我用2个客户端去连接,日志显示,进程号和线程号都是同一个

cke_713.png

在HarmonyOS Next中,使用Worker创建多线程TCP Socket Server需通过Worker线程处理连接。在主线程中初始化TCPSocket,绑定端口并监听。收到连接请求后,将连接的socket句柄传递给Worker线程,由Worker线程处理具体的数据收发及业务逻辑。通过postMessage与主线程通信,Worker内使用TCPSocket相关API操作连接。

在HarmonyOS Next中,你的代码没有实现多线程的原因是将TCP Socket Server的监听和客户端处理逻辑都放在了同一个worker的onmessage回调中。这样每次收到消息都会重新创建TCPSocketServer实例,而不是为每个客户端连接创建独立的线程。

正确的做法是:在主线程中创建TCP服务器,当accept到新连接时,通过workerPool为每个客户端连接创建独立的worker线程来处理通信。以下是修改建议:

  1. 在主线程(Index.ets)中创建TCPSocketServer并监听
  2. 在connect事件回调中,使用worker.ThreadWorker为每个客户端连接创建新的worker实例
  3. 将client对象传递给worker线程进行处理

这样每个客户端连接都会在独立的worker线程中处理消息,实现真正的多线程通信。

注意:需要合理管理worker线程的生命周期,在连接关闭时及时终止对应的worker线程。

回到顶部