Nodejs 使用cluster后,redis的sub服务在pub时会根据cpu数量多次执行,如何只执行一次
Nodejs 使用cluster后,redis的sub服务在pub时会根据cpu数量多次执行,如何只执行一次
有办法解决吗,还是redis的pub/sub这种方式本来就不能这样使用
2 回复
Node.js 使用 cluster 后,Redis 的 sub 服务在 pub 时会根据 CPU 数量多次执行,如何只执行一次
在 Node.js 中使用 cluster
模块来利用多核 CPU 是一个常见的做法。然而,这也会带来一些挑战,比如在处理 Redis 的 Pub/Sub 时可能会遇到一个问题:每个 worker 都会接收到发布到频道的消息,导致消息被多次处理。
问题描述
假设你有一个主进程和多个工作进程(worker),每个工作进程都订阅了同一个 Redis 频道。当你在一个进程中发布消息时,所有的工作进程都会收到这条消息并进行处理。如果 CPU 核心数较多,那么每条消息就会被处理多次,这显然不是我们想要的结果。
解决方案
一种有效的解决方案是让所有 worker 都监听同一个事件,但只有主进程或特定的一个 worker 来实际处理这些事件。以下是具体步骤:
- 让所有 worker 订阅频道:确保所有 worker 都订阅相同的频道。
- 指定一个 worker 处理消息:通过某种机制(例如,发送一个特殊的消息给主进程)来决定哪个 worker 应该处理消息。
示例代码
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
const redis = require('ioredis');
if (cluster.isMaster) {
// 主进程创建子进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// 接收所有 worker 的消息
cluster.on('message', (worker, message) => {
if (message.type === 'process-message') {
// 如果是处理消息的请求,则让 worker 处理
worker.send({ type: 'process-message' });
}
});
} else {
// 子进程逻辑
const redisClient = new redis();
// 订阅频道
redisClient.subscribe('my-channel');
// 监听频道消息
redisClient.on('message', (channel, message) => {
console.log(`Worker ${process.pid} received message on channel ${channel}: ${message}`);
// 发送消息给主进程,询问是否需要处理消息
process.send({ type: 'request-process' });
});
// 接收主进程的消息
process.on('message', (msg) => {
if (msg.type === 'process-message') {
// 实际处理消息
console.log(`Worker ${process.pid} is processing the message`);
}
});
}
解释
- 在主进程中,我们创建了与 CPU 核心数相同数量的 worker 进程。
- 每个 worker 进程都订阅了一个 Redis 频道,并监听频道上的消息。
- 当 worker 收到消息时,它会向主进程发送一个消息,询问是否应该处理这条消息。
- 主进程可以决定让哪个 worker 处理消息,或者自己处理消息。
通过这种方式,我们可以确保消息只会被处理一次,即使系统中有多个 CPU 核心。
当使用 Node.js 的 cluster
模块来利用多核 CPU 时,每个工作进程都会启动自己的 Redis 客户端。这可能导致订阅相同的频道并接收重复的消息。为了解决这个问题,你可以让主进程中负责所有的 Redis 操作,并将操作委托给工作进程。
下面是一个简单的实现方法:
- 在主进程中创建一个 Redis 客户端用于发布消息。
- 在工作进程中仅订阅消息。
- 使用
process.send()
和process.on('message')
在主进程和工作进程之间进行通信。
以下是示例代码:
// 主进程
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const redis = require('redis').createClient();
// 创建多个工作进程
for (let i = 0; i < os.cpus().length; i++) {
cluster.fork();
}
// 监听子进程消息
cluster.on('message', (worker, message) => {
if (message.type === 'subscribe') {
console.log(`Worker ${worker.id} subscribed to channel ${message.channel}`);
}
});
// 发布消息
setTimeout(() => {
redis.publish('test-channel', 'Hello from master');
}, 5000);
} else {
// 子进程
const redis = require('redis').createClient();
const workerId = cluster.worker.id;
// 订阅消息
redis.subscribe('test-channel', (err) => {
if (err) throw err;
process.send({ type: 'subscribe', channel: 'test-channel' });
});
// 接收消息
redis.on('message', (channel, message) => {
console.log(`Worker ${workerId} received message "${message}" on channel "${channel}"`);
});
}
这段代码将所有 Redis 发布/订阅操作都集中在主进程中,避免了由于每个工作进程都尝试订阅同一频道而造成的重复问题。