Nodejs 利用进程通信实现Cluster共享内存
Nodejs 利用进程通信实现Cluster共享内存
Node.js的标准API没有提供进程共享内存,然而通过IPC接口的send方法和对message事件的监听,就可以实现一个多进程之间的协同机制,通过通信来操作共享内存。 ##IPC的基本用法:
// worker进程 发送消息
process.send(‘读取共享内存’);
// master进程 接收消息 -> 处理 -> 发送回信
cluster.on('online', function (worker) {
// 有worker进程建立,即开始监听message事件
worker.on(‘message’, function(data) {
// 处理来自worker的请求
// 回传结果
worker.send(‘result')
});
});
在Node.js中,通过send和on(‘message’, callback)实现的IPC通信有几个特点。首先,master和worker之间可以互相通信,而各个worker之间不能直接通信,但是worker之间可以通过master转发实现间接通信。另外,通过send方法传递的数据,会先被JSON.stringify处理后再传递,接收后会再用JSON.parse解析。所以Buffer对象传递后会变成数组,而function则无法直接传递。反过来说,就是可以直接传递除了buffer和function之外的所有数据类型(已经很强大了,而且buffer和function也可以用变通的方法实现传递)。
基于以上特点,我们可以设计一个通过IPC来共享内存的方案:
1、worker进程作为共享内存的使用者,并不直接操作共享内存,而是通过send方法通知master进程进行写入(set)或者读取(get)操作。
2、master进程初始化一个Object对象作为共享内存,并根据worker发来的message,对Object的键值进行读写。
3、由于要使用跨进程通信,所以worker发起的set和get都是异步操作,master根据请求进行实际读写操作,然后将结果返回给worker(即把结果数据send给worker)。
##数据格式
为了实现进程间异步的读写功能,需要对通信数据的格式做一点规范。
首先是worker的请求数据:
requestMessage = {
isSharedMemoryMessage: true, // 表示这是一次共享内存的操作通信
method: ‘set’, // or ‘get’ 操作的方法
id: cluster.worker.id, // 发起操作的进程(在一些特殊场景下,用于保证master可以回信)
uuid: uuid, // 此次操作的(用于注册/调用回调函数)
key: key, // 要操作的键
value: value // 键对应的值(写入)
}
master在接到数据后,会根据method执行相应操作,然后根据requestMessage.id将结果数据发给对应的worker,数据格式如下:
responseMessage = {
isSharedMemoryMessage: true, // 标记这是一次共享内存通信
uuid: requestMessage.uuid, // 此次操作的唯一标示
value: value // 返回值。get操作为key对应的值,set操作为成功或失败
}
规范数据格式的意义在于,master在接收到请求后,能够将处理结果发送给对应的worker,而worker在接到回传的结果后,能够调用此次通信对应的callback,从而实现协同。
规范数据格式后,接下来要做的就是设计两套代码,分别用于master进程和worker进程,监听通信并处理通信数据,实现共享内存的功能。
##User类
User类的实例在worker进程中工作,负责发送操作共享内存的请求,并监听master的回信。
var User = function() {
var self = this;
self.__uuid__ = 0;
// 缓存回调函数
self.__getCallbacks__ = {};
// 接收每次操作请求的回信
process.on('message', function(data) {
if (!data.isSharedMemoryMessage) return;
// 通过uuid找到相应的回调函数
var cb = self.__getCallbacks__[data.uuid];
if (cb && typeof cb == 'function') {
cb(data.value)
}
// 卸载回调函数
self.__getCallbacks__[data.uuid] = undefined;
});
};
// 处理操作
User.prototype.handle = function(method, key, value, callback) {
var self = this;
var uuid = self.__uuid__++;
process.send({
isSharedMemoryMessage: true,
method: method,
id: cluster.worker.id,
uuid: uuid,
key: key,
value: value
});
// 注册回调函数
self.__getCallbacks__[uuid] = callback;
};
User.prototype.set = function(key, value, callback) {
this.handle(‘set’, key, value, callback);
};
User.prototype.get = function(key, callback) {
this.handle(‘get’, key, null, callback);
};
##Manager类
Manager类的实例在master进程中工作,用于初始化一个Object作为共享内存,并根据User实例的请求,在共享内存中增加键值对,或者读取键值,然后将结果发送回去。
var Manager = function() {
var self = this;
// 初始化共享内存
self.__sharedMemory__ = {};
// 监听并处理来自worker的请求
cluster.on('online', function(worker) {
worker.on('message', function(data) {
// isSharedMemoryMessage是操作共享内存的通信标记
if (!data.isSharedMemoryMessage) return;
self.handle(data);
});
});
};
Manager.prototype.handle = function(data) {
var self = this;
var value = thisdata.method;
var msg = {
// 标记这是一次共享内存通信
isSharedMemoryMessage: true,
// 此次操作的唯一标示
uuid: data.uuid,
// 返回值
value: value
};
cluster.workers[data.id].send(msg);
};
// set操作返回ok表示成功
Manager.prototype.set = function(data) {
this.sharedMemory[data.key] = data.value;
return ‘OK’;
};
// get操作返回key对应的值
Manager.prototype.get = function(data) {
return this.sharedMemory[data.key];
};
##使用方法
if (cluster.isMaster) {
// 初始化Manager的实例
var sharedMemoryManager = new Manager();
// fork第一个worker
cluster.fork();
// 1秒后fork第二个worker
setTimeout(function() {
cluster.fork();
}, 1000);
} else {
// 初始化User类的实例
var sharedMemoryUser = new User();
if (cluster.worker.id == 1) {
// 第一个worker向共享内存写入一组数据,用a标记
sharedMemoryUser.set('a', [0, 1, 2, 3]);
}
if (cluster.worker.id == 2) {
// 第二个worker从共享内存读取a的值
sharedMemoryUser.get('a', function(data) {
console.log(data); // => [0, 1, 2, 3]
});
}
}
以上就是一个通过IPC通信实现的多进程共享内存功能,需要注意的是,这种方法是直接在master进程的内存里缓存数据,必须注意内存的使用情况,这里可以考虑加入一些简单的淘汰策略,优化内存的使用。另外,如果单次读写的数据比较大,IPC通信的耗时也会相应增加。
Node.js 利用进程通信实现Cluster共享内存
Node.js 的标准 API 并未提供直接的进程共享内存支持,但可以通过 IPC(进程间通信)接口的 send
方法和对 message
事件的监听来实现多进程之间的协同机制。这种机制允许我们通过通信来操作共享内存。
IPC的基本用法:
// worker进程 发送消息
process.send('读取共享内存');
// master进程 接收消息 -> 处理 -> 发送回信
cluster.on('online', function (worker) {
// 有worker进程建立,即开始监听message事件
worker.on('message', function(data) {
// 处理来自worker的请求
// 回传结果
worker.send('result');
});
});
在 Node.js 中,通过 send
和 on('message', callback)
实现的 IPC 通信有几个特点:
- Master 和 Worker 进程可以互相通信,但各个 Worker 之间不能直接通信,不过可以通过 Master 转发间接通信。
- 通过
send
方法传递的数据会被 JSON.stringify 处理后再传递,接收后会再用 JSON.parse 解析。因此,Buffer 对象传递后会变成数组,而函数则无法直接传递。
基于以上特点的设计方案:
- Worker 进程 作为共享内存的使用者,并不直接操作共享内存,而是通过
send
方法通知 Master 进程进行写入(set)或读取(get)操作。 - Master 进程 初始化一个 Object 对象作为共享内存,并根据 Worker 发来的消息,对 Object 的键值进行读写。
- 由于要使用跨进程通信,所以 Worker 发起的 set 和 get 都是异步操作,Master 根据请求进行实际读写操作,然后将结果返回给 Worker(即把结果数据 send 给 Worker)。
数据格式
为了实现进程间异步的读写功能,需要对通信数据的格式做一点规范。
Worker 的请求数据:
requestMessage = {
isSharedMemoryMessage: true, // 表示这是一次共享内存的操作通信
method: 'set', // or 'get' 操作的方法
id: cluster.worker.id, // 发起操作的进程(在一些特殊场景下,用于保证 Master 可以回信)
uuid: uuid, // 此次操作的唯一标示
key: key, // 要操作的键
value: value // 键对应的值(写入)
}
Master 的响应数据:
responseMessage = {
isSharedMemoryMessage: true, // 标记这是一次共享内存通信
uuid: requestMessage.uuid, // 此次操作的唯一标示
value: value // 返回值。get 操作为 key 对应的值,set 操作为成功或失败
}
示例代码
User 类
User 类的实例在 Worker 进程中工作,负责发送操作共享内存的请求,并监听 Master 的回信。
var User = function() {
var self = this;
self.__uuid__ = 0;
// 缓存回调函数
self.__getCallbacks__ = {};
// 接收每次操作请求的回信
process.on('message', function(data) {
if (!data.isSharedMemoryMessage) return;
// 通过 uuid 找到相应的回调函数
var cb = self.__getCallbacks__[data.uuid];
if (cb && typeof cb == 'function') {
cb(data.value);
}
// 卸载回调函数
self.__getCallbacks__[data.uuid] = undefined;
});
};
// 处理操作
User.prototype.handle = function(method, key, value, callback) {
var self = this;
var uuid = self.__uuid__++;
process.send({
isSharedMemoryMessage: true,
method: method,
id: cluster.worker.id,
uuid: uuid,
key: key,
value: value
});
// 注册回调函数
self.__getCallbacks__[uuid] = callback;
};
User.prototype.set = function(key, value, callback) {
this.handle('set', key, value, callback);
};
User.prototype.get = function(key, callback) {
this.handle('get', key, null, callback);
};
Manager 类
Manager 类的实例在 Master 进程中工作,用于初始化一个 Object 作为共享内存,并根据 User 实例的请求,在共享内存中增加键值对,或者读取键值,然后将结果发送回去。
var Manager = function() {
var self = this;
// 初始化共享内存
self.__sharedMemory__ = {};
// 监听并处理来自 Worker 的请求
cluster.on('online', function(worker) {
worker.on('message', function(data) {
// isSharedMemoryMessage 是操作共享内存的通信标记
if (!data.isSharedMemoryMessage) return;
self.handle(data);
});
});
};
Manager.prototype.handle = function(data) {
var self = this;
var value = this[data.method](data);
var msg = {
// 标记这是一次共享内存通信
isSharedMemoryMessage: true,
// 此次操作的唯一标示
uuid: data.uuid,
// 返回值
value: value
};
cluster.workers[data.id].send(msg);
};
// set 操作返回 ok 表示成功
Manager.prototype.set = function(data) {
this.__sharedMemory__[data.key] = data.value;
return 'OK';
};
// get 操作返回 key 对应的值
Manager.prototype.get = function(data) {
return this.__sharedMemory__[data.key];
};
使用方法
if (cluster.isMaster) {
// 初始化 Manager 的实例
var sharedMemoryManager = new Manager();
// fork 第一个 Worker
cluster.fork();
// 1 秒后 fork 第二个 Worker
setTimeout(function() {
cluster.fork();
}, 1000);
} else {
// 初始化 User 类的实例
var sharedMemoryUser = new User();
if (cluster.worker.id == 1) {
// 第一个 Worker 向共享内存写入一组数据,用 a 标记
sharedMemoryUser.set('a', [0, 1, 2, 3]);
}
if (cluster.worker.id == 2) {
// 第二个 Worker 从共享内存读取 a 的值
sharedMemoryUser.get('a', function(data) {
console.log(data); // => [0, 1, 2, 3]
});
}
}
以上就是一个通过 IPC 通信实现的多进程共享内存功能。需要注意的是,这种方法是直接在 Master 进程的内存里缓存数据,必须注意内存的使用情况,这里可以考虑加入一些简单的淘汰策略,优化内存的使用。此外,如果单次读写的数据较大,IPC 通信的耗时也会相应增加。
完整代码:GitHub链接
我们服务器是通过socket进行rpc进程通信的
为什么不使用redis或memcache等第三方服务,这样可以使应用更健壮~
你有没有测试并发的? 没有锁机制,你这代码根本用不了 调用 sharedMemoryUser.set(‘a’, [0, 1, 2, 3]); 内面 var uuid = self.uuid++; 这数据根本就乱了,代码根本用不上
真不明白这也能置顶。。。。。
可能你把uuid的作用理解错了。你可以试下能不能正确读写。如果有bug可以提。
var initSharedMemory = require('../lib/sharedmemory').init;
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;
var http = require('http');
var sharedMemoryController = initSharedMemory();
if (cluster.isMaster) {
// transfer
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
} else {
http.createServer(function(req,res){
sharedMemoryController.set("aaa", "bbbb");
res.end();
}).listen(8000);
}
D:\APMServ5.2.6\Apache\bin>ab -c 100 -n 1000 http://127.0.0.1:8000/
245 ’ uuid++++++++++’
244 ’ uuid++++++++++’
230 ’ uuid++++++++++’
246 ’ uuid++++++++++’
256 ’ uuid++++++++++’
245 ’ uuid++++++++++’
231 ’ uuid++++++++++’
257 ’ uuid++++++++++’
246 ’ uuid++++++++++’
258 ’ uuid++++++++++’
247 ’ uuid++++++++++’
247 ’ uuid++++++++++’
232 ’ uuid++++++++++’
259 ’ uuid++++++++++’
248 ’ uuid++++++++++’
233 ’ uuid++++++++++’
248 ’ uuid++++++++++’
260 ’ uuid++++++++++’
234 ’ uuid++++++++++’
261 ’ uuid++++++++++’
235 ’ uuid++++++++++’
262 ’ uuid++++++++++’
236 ’ uuid++++++++++’
249 ’ uuid++++++++++’
237 ’ uuid++++++++++’
263 ’ uuid++++++++++’
250 ’ uuid++++++++++’
249 ’ uuid++++++++++’
250 ’ uuid++++++++++’
User.prototype.handle = function(method, key, value, cb) {
var self = this;
var uuid = self.uuid();
console.log(uuid," uuid++++++++++");
process.send({
isSharedMemoryMessage: true,
method: method,
id: cluster.worker.id,
uuid: uuid,
key: key,
value: value
});
self.__getCallbacks__[uuid] = cb;
};
我把你的测试代码改了一下,你再用ab试一下,可以看到并发状态下,依然可以根据key读取到正确的值。
var initSharedMemory = require('../lib/sharedmemory').init;
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;
var http = require('http');
var sharedMemoryController = initSharedMemory();
if (cluster.isMaster) {
// transfer
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
} else {
var key, value;
//先存好一些数据,键和值都是worker的id加上一个数字
for (var i = 0; i < 1000; i++) {
key = value = cluster.worker.id + '-' + i;
sharedMemoryController.set(key, value);
}
//访问计数
var count = 0;
http.createServer(function(req,res){
//这里根据访问次数获取前面存放的值
var key = cluster.worker.id + ‘-’ + count++;
sharedMemoryController.get(key, function(data){
//key能够获取到对应的值,则说明没问题
console.log(key, ’ => ', data);
});
res.end();
}).listen(8000);
}
…哥,并发就是要解决写操作啊。。。。。。
var key = cluster.worker.id + ‘-’ + count++; 你这行代码已经是有问题了
反正js没有提供锁的机制,用并发都会把数据弄成垃圾 除非是只读,一个网站,有可能全是只读操作没有写操作吗?
还是等出E7 标准吧,看看有没有锁,没有也只能玩玩
哥,以你的例子来看
sharedMemoryController.set("aaa", "bbbb");
并发调用这个写入操作,会制造什么样垃圾数据?请稍微详细描述一下,谢谢。
我觉得浪费时间纠结在Cluster上很没必要。 如果是缓存数据, 无论是考虑到scale或者轻node进程, memcached或者redis肯定是最好的选择。 如果是信息通知, 考虑到scale, 一步到位直接redis或者zmq多方便。
要在Node.js中实现多进程间的共享内存,可以利用IPC(进程间通信)机制,通过process.send()
和message
事件来传递数据。下面给出具体示例代码,并简要说明实现步骤。
示例代码
Master 进程代码
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
const Manager = require('./manager');
let sharedMemoryManager = new Manager();
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
} else {
const User = require('./user');
const user = new User();
// Worker 1 写入数据
if (cluster.worker.id === 1) {
user.set('key1', 'value1', (result) => {
console.log(`Worker ${cluster.worker.id} wrote:`, result);
});
}
// Worker 2 读取数据
if (cluster.worker.id === 2) {
user.get('key1', (result) => {
console.log(`Worker ${cluster.worker.id} read:`, result);
});
}
}
Manager 类代码
const cluster = require('cluster');
class Manager {
constructor() {
this.sharedMemory = {};
}
handle(data) {
const value = this[data.method](data);
const msg = {
isSharedMemoryMessage: true,
uuid: data.uuid,
value: value
};
cluster.workers[data.id].send(msg);
}
set(data) {
this.sharedMemory[data.key] = data.value;
return 'OK';
}
get(data) {
return this.sharedMemory[data.key];
}
}
module.exports = Manager;
User 类代码
const uuid = require('uuid/v4');
class User {
constructor() {
this.callbacks = {};
this.uuid = 0;
}
handle(method, key, value, callback) {
const uuid = this.uuid++;
this.callbacks[uuid] = callback;
process.send({
isSharedMemoryMessage: true,
method: method,
id: cluster.worker.id,
uuid: uuid,
key: key,
value: value
});
}
set(key, value, callback) {
this.handle('set', key, value, callback);
}
get(key, callback) {
this.handle('get', key, null, callback);
}
onMessage(data) {
if (!data.isSharedMemoryMessage) return;
const cb = this.callbacks[data.uuid];
if (cb && typeof cb === 'function') {
cb(data.value);
}
this.callbacks[data.uuid] = undefined;
}
}
process.on('message', (data) => {
const user = new User();
user.onMessage(data);
});
module.exports = User;
实现说明
- Manager 类:用于初始化共享内存对象,并处理来自Worker进程的请求,通过
set
和get
方法更新共享内存。 - User 类:用于向Master进程发送读写请求,并监听回传的结果。每个Worker进程都有自己的User实例。
- Master 进程:创建Manager实例,并启动多个Worker进程。每个Worker进程都可以向Manager请求共享内存操作。
- Worker 进程:通过User实例发送读写请求,并处理结果。
这种方法利用了IPC实现了多进程间的共享内存功能,但要注意内存使用情况,并且不要频繁地传输大体积的数据,以避免性能问题。