关于Nodejs cluster fork多线程任务的疑问
关于Nodejs cluster fork多线程任务的疑问
我的代码原型是这样的: if(cluster.isMaster){ for(var i=0;i<numCPUs;i++){ cluster.fork(); }
}else{ // fs.appendFile(‘node_build.log’,cluster.worker.id+’ ',function(err){}); logic();
}
在 logic当中创建了一个socket.io服务器。 我现在使用http以及websocket 循环大量连接。 log日志得到的结果基本上都是id为4的worker。在系统监视器中,该进程cpu消耗达到30%。 其他进程无动静。求解?
关于Node.js Cluster Fork 多线程任务的疑问
在Node.js中,cluster
模块允许你在单个Node.js进程中利用多核CPU的能力。通过cluster.fork()
方法,你可以创建多个工作进程(workers),每个工作进程都会运行相同的主进程代码。
示例代码
假设你有一个简单的Node.js应用,它使用socket.io
来处理WebSocket连接。你希望使用cluster
模块来分发这些连接到不同的工作进程。以下是一个基本的示例:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
const io = require('socket.io');
if (cluster.isMaster) {
// 主进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
} else {
// 工作进程
const server = http.createServer((req, res) => {
res.writeHead(200);
res.end('Hello World\n');
});
server.listen(8000);
const socketServer = io(server);
socketServer.on('connection', (socket) => {
console.log(`Worker ${cluster.worker.id} got a connection`);
socket.emit('message', `Hello from worker ${cluster.worker.id}`);
// 业务逻辑
logic(socket);
});
}
function logic(socket) {
// 在这里实现你的业务逻辑
}
解释
-
主进程:
- 使用
cluster.fork()
方法创建与CPU核心数量相等的工作进程。 - 每个工作进程都会运行相同的代码,但它们会在不同的进程中运行。
- 使用
-
工作进程:
- 创建一个HTTP服务器并监听端口8000。
- 使用
socket.io
创建WebSocket服务器。 - 监听连接事件,并打印日志信息。
- 调用
logic(socket)
函数来处理具体的业务逻辑。
问题分析
根据你的描述,只有ID为4的工作进程在处理所有的连接。这可能是因为socket.io
默认情况下将所有连接都路由到了同一个工作进程。为了公平地分配连接,你可以使用sticky-session
库来确保连接被均匀地分发到各个工作进程。
解决方案
可以考虑使用sticky-session
库来解决这个问题。sticky-session
会确保WebSocket连接被正确地路由到不同的工作进程。以下是如何使用sticky-session
的示例:
const sticky = require('sticky-session');
const http = require('http');
const io = require('socket.io');
if (!sticky.listen(http.createServer())) {
const socketServer = io();
socketServer.on('connection', (socket) => {
console.log(`Worker ${cluster.worker.id} got a connection`);
socket.emit('message', `Hello from worker ${cluster.worker.id}`);
// 业务逻辑
logic(socket);
});
function logic(socket) {
// 在这里实现你的业务逻辑
}
return;
}
console.log('Sticky session running on master process.');
通过这种方式,sticky-session
会确保WebSocket连接被均匀地分配到各个工作进程,从而避免某个进程负载过重的问题。
在 cluster 并非真正的”负载均衡“ … 测试 socket.io 时~ 同时打开多个浏览器(例如 3) .ctrl+c 同时干掉, 再进去可能会达到你想测试的效果… 也不是100%
我现在基本放弃了多进程的打算。而是开多端口的方式。这样就是传统的负载均衡了
果断使用多端口+nginx
从你的描述来看,问题可能出在负载均衡上。Node.js 的 cluster
模块默认使用 round-robin 策略分配连接,但实际效果可能并不理想,尤其是在处理大量的 WebSocket 连接时。
你可以通过自定义负载均衡策略来改进这个问题。以下是一个示例代码,展示如何根据每个 worker 的负载情况动态分配连接:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
let workerCount = numCPUs;
let workers = {};
// 监听所有 worker 的事件
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died`);
workerCount--;
});
// 动态分配连接
const io = require('socket.io')(http.createServer().listen(3000));
io.on('connection', (socket) => {
let minWorker = null;
let minLoad = Infinity;
// 找到当前负载最小的 worker
Object.keys(workers).forEach(id => {
if (workers[id].load < minLoad) {
minLoad = workers[id].load;
minWorker = id;
}
});
// 将连接分配给负载最小的 worker
socket.join(minWorker);
workers[minWorker].sockets.push(socket);
// 更新 worker 负载信息
workers[minWorker].load++;
});
} else {
const workerId = cluster.worker.id;
const workers = require('express')();
workers.get('/', (req, res) => {
res.send(`Hello from Worker ${workerId}`);
});
workers.listen(3000);
}
在这个示例中,我们创建了一个自定义的负载均衡逻辑。每当一个新的 WebSocket 连接建立时,我们会找到当前负载最小的 worker,并将连接分配给它。这样可以确保各个 worker 的负载更加均匀,避免某个 worker 过度繁忙。
如果你希望更详细地监控每个 worker 的状态,可以进一步扩展代码以记录更多的指标(如 CPU 使用率、内存使用情况等),并根据这些指标动态调整负载均衡策略。