Nodejs 使用cluster后,redis的sub服务在pub时会根据cpu数量多次执行,如何只执行一次

Nodejs 使用cluster后,redis的sub服务在pub时会根据cpu数量多次执行,如何只执行一次

有办法解决吗,还是redis的pub/sub这种方式本来就不能这样使用

2 回复

Node.js 使用 cluster 后,Redis 的 sub 服务在 pub 时会根据 CPU 数量多次执行,如何只执行一次

在 Node.js 中使用 cluster 模块来利用多核 CPU 是一个常见的做法。然而,这也会带来一些挑战,比如在处理 Redis 的 Pub/Sub 时可能会遇到一个问题:每个 worker 都会接收到发布到频道的消息,导致消息被多次处理。

问题描述

假设你有一个主进程和多个工作进程(worker),每个工作进程都订阅了同一个 Redis 频道。当你在一个进程中发布消息时,所有的工作进程都会收到这条消息并进行处理。如果 CPU 核心数较多,那么每条消息就会被处理多次,这显然不是我们想要的结果。

解决方案

一种有效的解决方案是让所有 worker 都监听同一个事件,但只有主进程或特定的一个 worker 来实际处理这些事件。以下是具体步骤:

  1. 让所有 worker 订阅频道:确保所有 worker 都订阅相同的频道。
  2. 指定一个 worker 处理消息:通过某种机制(例如,发送一个特殊的消息给主进程)来决定哪个 worker 应该处理消息。

示例代码

const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
const redis = require('ioredis');

if (cluster.isMaster) {
    // 主进程创建子进程
    for (let i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    // 接收所有 worker 的消息
    cluster.on('message', (worker, message) => {
        if (message.type === 'process-message') {
            // 如果是处理消息的请求,则让 worker 处理
            worker.send({ type: 'process-message' });
        }
    });
} else {
    // 子进程逻辑
    const redisClient = new redis();

    // 订阅频道
    redisClient.subscribe('my-channel');

    // 监听频道消息
    redisClient.on('message', (channel, message) => {
        console.log(`Worker ${process.pid} received message on channel ${channel}: ${message}`);

        // 发送消息给主进程,询问是否需要处理消息
        process.send({ type: 'request-process' });
    });

    // 接收主进程的消息
    process.on('message', (msg) => {
        if (msg.type === 'process-message') {
            // 实际处理消息
            console.log(`Worker ${process.pid} is processing the message`);
        }
    });
}

解释

  • 在主进程中,我们创建了与 CPU 核心数相同数量的 worker 进程。
  • 每个 worker 进程都订阅了一个 Redis 频道,并监听频道上的消息。
  • 当 worker 收到消息时,它会向主进程发送一个消息,询问是否应该处理这条消息。
  • 主进程可以决定让哪个 worker 处理消息,或者自己处理消息。

通过这种方式,我们可以确保消息只会被处理一次,即使系统中有多个 CPU 核心。


当使用 Node.js 的 cluster 模块来利用多核 CPU 时,每个工作进程都会启动自己的 Redis 客户端。这可能导致订阅相同的频道并接收重复的消息。为了解决这个问题,你可以让主进程中负责所有的 Redis 操作,并将操作委托给工作进程。

下面是一个简单的实现方法:

  1. 在主进程中创建一个 Redis 客户端用于发布消息。
  2. 在工作进程中仅订阅消息。
  3. 使用 process.send()process.on('message') 在主进程和工作进程之间进行通信。

以下是示例代码:

// 主进程
const cluster = require('cluster');
const os = require('os');

if (cluster.isMaster) {
  const redis = require('redis').createClient();

  // 创建多个工作进程
  for (let i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  // 监听子进程消息
  cluster.on('message', (worker, message) => {
    if (message.type === 'subscribe') {
      console.log(`Worker ${worker.id} subscribed to channel ${message.channel}`);
    }
  });

  // 发布消息
  setTimeout(() => {
    redis.publish('test-channel', 'Hello from master');
  }, 5000);
} else {
  // 子进程
  const redis = require('redis').createClient();
  const workerId = cluster.worker.id;

  // 订阅消息
  redis.subscribe('test-channel', (err) => {
    if (err) throw err;
    process.send({ type: 'subscribe', channel: 'test-channel' });
  });

  // 接收消息
  redis.on('message', (channel, message) => {
    console.log(`Worker ${workerId} received message "${message}" on channel "${channel}"`);
  });
}

这段代码将所有 Redis 发布/订阅操作都集中在主进程中,避免了由于每个工作进程都尝试订阅同一频道而造成的重复问题。

回到顶部