Nodejs Socket.io使用Redis Pub/Sub 疑问
Nodejs Socket.io使用Redis Pub/Sub 疑问
Nodejs使用cluster,fork了4个instance出来
原先的代码
socket.emit("chat", {message:'aa'},callback)
现在使用了redis pub/sub
this.pub.publish('chat', JSON.stringify(message))
this.sub.on(‘message’, function(channel, message) {
//这里我怎么获取到之前的callback呢?
socket.emit(channel, message);
});
问题如上,我怎么在publish是把原本客户端传过来的callback一并带过去呢?
在使用 Node.js 和 Socket.io 结合 Redis 的 Pub/Sub 模式时,你可能会遇到一些挑战,尤其是在处理回调函数(callback)传递的问题。由于 Pub/Sub 是一种发布/订阅模式,它并不直接支持回调机制。因此,你需要找到一种方法来模拟这种行为。
示例代码
首先,让我们看看如何设置 Redis Pub/Sub 以及如何处理回调。
安装必要的依赖
确保你已经安装了 socket.io
、ioredis
(或 redis
)等依赖包:
npm install socket.io ioredis
主服务器代码
const http = require('http');
const io = require('socket.io');
const Redis = require('ioredis');
const server = http.createServer();
const socketIo = io(server);
// 创建 Redis 客户端
const redis = new Redis();
// 处理连接
socketIo.on('connection', (socket) => {
console.log('a user connected');
// 订阅频道
redis.subscribe('chat');
// 监听消息
redis.on('message', (channel, message) => {
const data = JSON.parse(message);
if (data.callbackId) {
// 假设我们有一个全局的回调存储对象
const callback = callbacks[data.callbackId];
if (callback) {
callback(data);
}
} else {
socket.emit(channel, data.message);
}
});
// 发送消息
socket.on('chat', (data, callback) => {
const callbackId = Math.random().toString(36).substring(2, 15);
// 存储回调
callbacks[callbackId] = callback;
// 发布消息
redis.publish('chat', JSON.stringify({ message: data.message, callbackId }));
});
});
server.listen(3000, () => {
console.log('listening on *:3000');
});
解释
- Redis 客户端:创建一个 Redis 客户端实例。
- 订阅频道:当有新的客户端连接时,订阅
chat
频道。 - 监听消息:当收到消息时,检查是否有
callbackId
。如果有,则从全局的回调存储对象中取出对应的回调函数并调用。 - 发送消息:当客户端发送消息时,生成一个唯一的
callbackId
,将其与消息一起发布到 Redis,并将回调函数存储起来。
注意事项
- 全局的回调存储对象
callbacks
可以是一个简单的 JavaScript 对象。 - 这种方法依赖于生成的
callbackId
来唯一标识每个回调函数,确保它们不会冲突。 - 为了简化,上述代码假设所有客户端共享同一个回调存储对象。在生产环境中,可能需要更复杂的机制来管理不同客户端的回调。
通过这种方式,你可以模拟出类似于回调的行为,即使在使用 Pub/Sub 模式时也能处理客户端的响应。
在使用 Redis Pub/Sub 模式时,你无法直接将客户端的回调函数传递给发布者(publisher)。这是因为 Pub/Sub 模式是一种消息广播机制,发布者并不关心具体的订阅者,而订阅者也并不知道具体哪个客户端发布了消息。因此,无法直接将客户端的回调函数传递过去。
但是,你可以通过其他方式实现类似的功能。例如,客户端可以将需要执行的回调逻辑发送到服务器端,并存储在一个全局变量或数据库中。当订阅者收到消息后,可以根据消息中的某些标识符找到对应的回调函数并执行。
示例代码如下:
// 服务器端
const redis = require('redis');
const io = require('socket.io')(server);
const pub = redis.createClient();
const sub = redis.createClient();
sub.on('message', (channel, message) => {
const msg = JSON.parse(message);
const callbackId = msg.callbackId;
const callbackFn = callbacks[callbackId];
if (callbackFn) {
callbackFn(msg.data);
}
});
io.on('connection', (socket) => {
socket.on('chat', (data, callback) => {
const callbackId = Math.random().toString(36).substr(2, 9);
callbacks[callbackId] = callback;
pub.publish('chat', JSON.stringify({ message: data.message, callbackId }));
// 删除已执行的回调函数
setTimeout(() => {
delete callbacks[callbackId];
}, 5000); // 假设最多等待5秒执行回调
});
});
const callbacks = {};
客户端代码:
socket.emit('chat', { message: 'aa' }, (response) => {
console.log('Callback received:', response);
});
在这个例子中,服务器端会为每个回调生成一个唯一的ID,并将其与回调函数一起存储在一个对象中。当订阅者收到消息后,它可以根据消息中的回调ID找到对应的回调函数并执行。同时,在5秒内未执行的回调将被自动删除。请注意,这只是一个简单的示例,实际应用中可能需要考虑更多的细节,例如错误处理、超时等。