使用Nodejs(net+cluster模块)创建的tcp服务端如何使用redis共享Socket?

使用Nodejs(net+cluster模块)创建的tcp服务端如何使用redis共享Socket?

现在有一个问题,我要做A到B和B到A一对一的长连接通讯,使用net模块创建了服务器,又使用cluster做负载,现在不知道如何在进程间共享socket,想到了redis存储连接,但是在序列化socket的时候报错,请问有谁做过类似的功能啊??

3 回复

使用Node.js(net + cluster 模块)创建的TCP服务端如何使用Redis共享Socket?

在使用Node.js的net模块创建TCP服务器并结合cluster模块实现负载均衡时,如果需要在多个工作进程中共享Socket连接,可以借助Redis来实现。然而,直接将Socket对象存入Redis是不可行的,因为Socket对象是不能被序列化的。但我们可以使用一些策略来间接地实现Socket的共享。

示例代码

  1. 安装必要的依赖

    npm install redis net cluster
    
  2. 创建主进程

    在主进程中启动多个工作进程,并设置一个机制来共享Socket连接信息。

    const cluster = require('cluster');
    const net = require('net');
    const redis = require('redis');
    
    if (cluster.isMaster) {
      const numWorkers = require('os').cpus().length;
      console.log(`Master ${process.pid} is running`);
    
      // Fork workers.
      for (let i = 0; i < numWorkers; i++) {
        cluster.fork();
      }
    
      cluster.on('exit', (worker, code, signal) => {
        console.log(`Worker ${worker.process.pid} died`);
      });
    
    } else {
      // Worker processes
      console.log(`Worker ${process.pid} started`);
    
      const server = net.createServer((socket) => {
        const socketId = `${socket.remoteAddress}:${socket.remotePort}`;
        const client = redis.createClient();
    
        client.set(socketId, JSON.stringify({
          address: socket.remoteAddress,
          port: socket.remotePort,
          data: ''
        }));
    
        socket.on('data', (data) => {
          client.get(socketId, (err, reply) => {
            if (err) throw err;
            const socketInfo = JSON.parse(reply);
            socketInfo.data += data.toString();
            client.set(socketId, JSON.stringify(socketInfo));
          });
        });
    
        socket.on('end', () => {
          client.del(socketId);
        });
      });
    
      server.listen(8080, () => {
        console.log(`Server listening on port 8080`);
      });
    }
    
  3. 在工作进程中处理数据

    工作进程负责处理来自客户端的数据,并将这些数据存储在Redis中。

    const redis = require('redis');
    
    const client = redis.createClient();
    
    client.on('error', (err) => {
      console.log(`Error ${err}`);
    });
    
    client.keys('*', (err, replies) => {
      if (err) throw err;
    
      replies.forEach((reply, index) => {
        client.get(reply, (err, reply) => {
          if (err) throw err;
          const socketInfo = JSON.parse(reply);
          console.log(`Socket info: ${JSON.stringify(socketInfo)}`);
        });
      });
    });
    

解释

  • 主进程:使用cluster模块创建多个工作进程,并监听客户端的连接。
  • 工作进程:每个工作进程都会创建一个net.Server实例,并处理客户端的连接。它使用Redis来存储和检索Socket的信息。
  • Redis:用于存储Socket连接信息,包括客户端的地址和端口以及接收到的数据。通过这种方式,不同的工作进程可以通过查询Redis来获取其他进程的Socket状态。

这种方法虽然不是直接共享Socket,但可以通过共享连接信息来间接实现多进程间的通信。


JSON.parse 中 function是不能被共享的。 单进程对应单SOCKET即可。跨进程分享SOCKET是画蛇添足吧?

如果一定要实现进程间通信 process模块和child_process模块你去看一看

process.on('message', function(m) {
  console.log('CHILD got message:', m);
});

process.send({ foo: ‘bar’ });

要解决这个问题,你需要实现一个方案来在多进程之间共享TCP连接。由于直接共享socket对象是不可能的,你可以考虑将socket的ID或相关的上下文信息存储在一个共享存储中,例如Redis。然后,在不同的worker进程中,你可以根据这些上下文信息重新建立连接或获取连接状态。

示例方案

  1. 存储Socket ID: 当一个新的客户端连接时,为该连接分配一个唯一的ID,并将其与对应的socket对象存储在Redis中。
  2. 获取Socket ID: 在另一个worker进程中需要与特定客户端通信时,从Redis获取相应的socket ID,进而可以找到并操作该socket。

示例代码

const net = require('net');
const cluster = require('cluster');
const Redis = require("ioredis");
const redisClient = new Redis();

if (cluster.isMaster) {
    // 创建一个子进程
    const numWorkers = require('os').cpus().length;
    console.log(`Master cluster setting up ${numWorkers} workers...`);

    for (let i = 0; i < numWorkers; i++) {
        cluster.fork();
    }

    cluster.on('exit', worker => {
        console.log(`Worker ${worker.process.pid} died`);
        cluster.fork();
    });
} else {
    // Worker进程,处理客户端连接
    const server = net.createServer((socket) => {
        const clientId = socket.remoteAddress + ':' + socket.remotePort;

        // 存储socket id到redis
        redisClient.set(clientId, socket.id);

        socket.on('data', data => {
            // 发送数据给客户端
            socket.write(data);
        });

        socket.on('end', () => {
            // 客户端断开连接
            redisClient.del(clientId);
        });
    });

    server.listen(3000, () => {
        console.log(`Server listening on port 3000`);
    });
}

请注意,上述代码简化了实际应用中的许多细节,如错误处理、重连机制等。实际项目中需要考虑更多的边界情况和异常处理。

回到顶部