Nodejs Socket.io使用Redis Pub/Sub 疑问

Nodejs Socket.io使用Redis Pub/Sub 疑问

Nodejs使用cluster,fork了4个instance出来

原先的代码

  socket.emit("chat", {message:'aa'},callback)

现在使用了redis pub/sub

this.pub.publish('chat', JSON.stringify(message))

this.sub.on(‘message’, function(channel, message) { //这里我怎么获取到之前的callback呢? socket.emit(channel, message); });

问题如上,我怎么在publish是把原本客户端传过来的callback一并带过去呢?


2 回复

在使用 Node.js 和 Socket.io 结合 Redis 的 Pub/Sub 模式时,你可能会遇到一些挑战,尤其是在处理回调函数(callback)传递的问题。由于 Pub/Sub 是一种发布/订阅模式,它并不直接支持回调机制。因此,你需要找到一种方法来模拟这种行为。

示例代码

首先,让我们看看如何设置 Redis Pub/Sub 以及如何处理回调。

安装必要的依赖

确保你已经安装了 socket.ioioredis(或 redis)等依赖包:

npm install socket.io ioredis

主服务器代码

const http = require('http');
const io = require('socket.io');
const Redis = require('ioredis');
const server = http.createServer();
const socketIo = io(server);

// 创建 Redis 客户端
const redis = new Redis();

// 处理连接
socketIo.on('connection', (socket) => {
    console.log('a user connected');

    // 订阅频道
    redis.subscribe('chat');

    // 监听消息
    redis.on('message', (channel, message) => {
        const data = JSON.parse(message);
        if (data.callbackId) {
            // 假设我们有一个全局的回调存储对象
            const callback = callbacks[data.callbackId];
            if (callback) {
                callback(data);
            }
        } else {
            socket.emit(channel, data.message);
        }
    });

    // 发送消息
    socket.on('chat', (data, callback) => {
        const callbackId = Math.random().toString(36).substring(2, 15);
        // 存储回调
        callbacks[callbackId] = callback;
        // 发布消息
        redis.publish('chat', JSON.stringify({ message: data.message, callbackId }));
    });
});

server.listen(3000, () => {
    console.log('listening on *:3000');
});

解释

  1. Redis 客户端:创建一个 Redis 客户端实例。
  2. 订阅频道:当有新的客户端连接时,订阅 chat 频道。
  3. 监听消息:当收到消息时,检查是否有 callbackId。如果有,则从全局的回调存储对象中取出对应的回调函数并调用。
  4. 发送消息:当客户端发送消息时,生成一个唯一的 callbackId,将其与消息一起发布到 Redis,并将回调函数存储起来。

注意事项

  • 全局的回调存储对象 callbacks 可以是一个简单的 JavaScript 对象。
  • 这种方法依赖于生成的 callbackId 来唯一标识每个回调函数,确保它们不会冲突。
  • 为了简化,上述代码假设所有客户端共享同一个回调存储对象。在生产环境中,可能需要更复杂的机制来管理不同客户端的回调。

通过这种方式,你可以模拟出类似于回调的行为,即使在使用 Pub/Sub 模式时也能处理客户端的响应。


在使用 Redis Pub/Sub 模式时,你无法直接将客户端的回调函数传递给发布者(publisher)。这是因为 Pub/Sub 模式是一种消息广播机制,发布者并不关心具体的订阅者,而订阅者也并不知道具体哪个客户端发布了消息。因此,无法直接将客户端的回调函数传递过去。

但是,你可以通过其他方式实现类似的功能。例如,客户端可以将需要执行的回调逻辑发送到服务器端,并存储在一个全局变量或数据库中。当订阅者收到消息后,可以根据消息中的某些标识符找到对应的回调函数并执行。

示例代码如下:

// 服务器端
const redis = require('redis');
const io = require('socket.io')(server);

const pub = redis.createClient();
const sub = redis.createClient();

sub.on('message', (channel, message) => {
    const msg = JSON.parse(message);
    const callbackId = msg.callbackId;
    const callbackFn = callbacks[callbackId];

    if (callbackFn) {
        callbackFn(msg.data);
    }
});

io.on('connection', (socket) => {
    socket.on('chat', (data, callback) => {
        const callbackId = Math.random().toString(36).substr(2, 9);
        callbacks[callbackId] = callback;

        pub.publish('chat', JSON.stringify({ message: data.message, callbackId }));

        // 删除已执行的回调函数
        setTimeout(() => {
            delete callbacks[callbackId];
        }, 5000); // 假设最多等待5秒执行回调
    });
});

const callbacks = {};

客户端代码:

socket.emit('chat', { message: 'aa' }, (response) => {
    console.log('Callback received:', response);
});

在这个例子中,服务器端会为每个回调生成一个唯一的ID,并将其与回调函数一起存储在一个对象中。当订阅者收到消息后,它可以根据消息中的回调ID找到对应的回调函数并执行。同时,在5秒内未执行的回调将被自动删除。请注意,这只是一个简单的示例,实际应用中可能需要考虑更多的细节,例如错误处理、超时等。

回到顶部