关于Nodejs cluster fork多线程任务的疑问

关于Nodejs cluster fork多线程任务的疑问

我的代码原型是这样的: if(cluster.isMaster){ for(var i=0;i<numCPUs;i++){ cluster.fork(); }

}else{ // fs.appendFile(‘node_build.log’,cluster.worker.id+’ ',function(err){}); logic();

}

在 logic当中创建了一个socket.io服务器。 我现在使用http以及websocket 循环大量连接。 log日志得到的结果基本上都是id为4的worker。在系统监视器中,该进程cpu消耗达到30%。 其他进程无动静。求解?


5 回复

关于Node.js Cluster Fork 多线程任务的疑问

在Node.js中,cluster模块允许你在单个Node.js进程中利用多核CPU的能力。通过cluster.fork()方法,你可以创建多个工作进程(workers),每个工作进程都会运行相同的主进程代码。

示例代码

假设你有一个简单的Node.js应用,它使用socket.io来处理WebSocket连接。你希望使用cluster模块来分发这些连接到不同的工作进程。以下是一个基本的示例:

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
const io = require('socket.io');

if (cluster.isMaster) {
    // 主进程
    for (let i = 0; i < numCPUs; i++) {
        cluster.fork();
    }
} else {
    // 工作进程
    const server = http.createServer((req, res) => {
        res.writeHead(200);
        res.end('Hello World\n');
    });
    server.listen(8000);

    const socketServer = io(server);
    
    socketServer.on('connection', (socket) => {
        console.log(`Worker ${cluster.worker.id} got a connection`);
        socket.emit('message', `Hello from worker ${cluster.worker.id}`);
        
        // 业务逻辑
        logic(socket);
    });
}

function logic(socket) {
    // 在这里实现你的业务逻辑
}

解释

  1. 主进程

    • 使用cluster.fork()方法创建与CPU核心数量相等的工作进程。
    • 每个工作进程都会运行相同的代码,但它们会在不同的进程中运行。
  2. 工作进程

    • 创建一个HTTP服务器并监听端口8000。
    • 使用socket.io创建WebSocket服务器。
    • 监听连接事件,并打印日志信息。
    • 调用logic(socket)函数来处理具体的业务逻辑。

问题分析

根据你的描述,只有ID为4的工作进程在处理所有的连接。这可能是因为socket.io默认情况下将所有连接都路由到了同一个工作进程。为了公平地分配连接,你可以使用sticky-session库来确保连接被均匀地分发到各个工作进程。

解决方案

可以考虑使用sticky-session库来解决这个问题。sticky-session会确保WebSocket连接被正确地路由到不同的工作进程。以下是如何使用sticky-session的示例:

const sticky = require('sticky-session');
const http = require('http');
const io = require('socket.io');

if (!sticky.listen(http.createServer())) {
    const socketServer = io();

    socketServer.on('connection', (socket) => {
        console.log(`Worker ${cluster.worker.id} got a connection`);
        socket.emit('message', `Hello from worker ${cluster.worker.id}`);
        
        // 业务逻辑
        logic(socket);
    });

    function logic(socket) {
        // 在这里实现你的业务逻辑
    }

    return;
}

console.log('Sticky session running on master process.');

通过这种方式,sticky-session会确保WebSocket连接被均匀地分配到各个工作进程,从而避免某个进程负载过重的问题。


在 cluster 并非真正的”负载均衡“ … 测试 socket.io 时~ 同时打开多个浏览器(例如 3) .ctrl+c 同时干掉, 再进去可能会达到你想测试的效果… 也不是100%

我现在基本放弃了多进程的打算。而是开多端口的方式。这样就是传统的负载均衡了

果断使用多端口+nginx

从你的描述来看,问题可能出在负载均衡上。Node.js 的 cluster 模块默认使用 round-robin 策略分配连接,但实际效果可能并不理想,尤其是在处理大量的 WebSocket 连接时。

你可以通过自定义负载均衡策略来改进这个问题。以下是一个示例代码,展示如何根据每个 worker 的负载情况动态分配连接:

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    for (let i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    let workerCount = numCPUs;
    let workers = {};

    // 监听所有 worker 的事件
    cluster.on('exit', (worker, code, signal) => {
        console.log(`Worker ${worker.process.pid} died`);
        workerCount--;
    });

    // 动态分配连接
    const io = require('socket.io')(http.createServer().listen(3000));
    io.on('connection', (socket) => {
        let minWorker = null;
        let minLoad = Infinity;

        // 找到当前负载最小的 worker
        Object.keys(workers).forEach(id => {
            if (workers[id].load < minLoad) {
                minLoad = workers[id].load;
                minWorker = id;
            }
        });

        // 将连接分配给负载最小的 worker
        socket.join(minWorker);
        workers[minWorker].sockets.push(socket);

        // 更新 worker 负载信息
        workers[minWorker].load++;
    });
} else {
    const workerId = cluster.worker.id;
    const workers = require('express')();

    workers.get('/', (req, res) => {
        res.send(`Hello from Worker ${workerId}`);
    });

    workers.listen(3000);
}

在这个示例中,我们创建了一个自定义的负载均衡逻辑。每当一个新的 WebSocket 连接建立时,我们会找到当前负载最小的 worker,并将连接分配给它。这样可以确保各个 worker 的负载更加均匀,避免某个 worker 过度繁忙。

如果你希望更详细地监控每个 worker 的状态,可以进一步扩展代码以记录更多的指标(如 CPU 使用率、内存使用情况等),并根据这些指标动态调整负载均衡策略。

回到顶部