Nodejs 队列的多进程问题---求指导
Nodejs 队列的多进程问题---求指导
大致描述一下问题的情况:客户端会产生大量的“消息”存到mongo数据库作为一个“消息池”,然后程序会每次读取一定量(上百个)的“消息”进行处理,会有异步处理,所以会用到async.quene来做并发控制(每次并发执行5个左右)。现在问题出现了:如果多进程的话,mongo“消息池”中的“消息”有可能被读出来多次的,一开始的思路是从mongo“消息池”读取“消息”的时候直接把已经读出来的标记一下状态,但是mongoose里面貌似只有findOneAndUpdate这样一次读一个的操作。不知道把问题说清楚了没,求指导。
貌似MongoDB后面的版本提供了基于文档的锁,是否可以考虑对于读出来的消息进行加锁处理,然后处理完了之后再更新状态标记,然后再回写,释放锁。。。
貌似MongoDB后面的版本提供了基于文档的锁
这一句是对 mongo 2.8 的更新,出现了理解错误。
现在的锁是数据库级别的,2.8 只是锁的级别到了文档级别,但是锁是不可被操作的。这种文档级别的锁的性质,只能被想办法利用。
在数据库没有锁的情况下,真的想不出该如何解决这种问题。
其实这种并发读写的问题一直是用“锁”这个概念在解决的,我在开发的过程中遇到过类似的场景,我们是把元信息存在 mysql 里面,然后具体的数据放在 nosql 里。这样我们读的时候可以利用 sql 的锁特性,然后再去 nosql 里面取出更多数据。
不过话说我们的消息数据并不大,我们只是为了不重复读才用的 sql,不知你的场景是不是写入太频繁?
谢谢你的回复,能加一下我的qq(568828193)详细咨询一下吗
Thanks
可以考虑在应用程序这一级提供分布式锁。例如,用Redis + redis-warlock或者redis-lock模块实现分布式锁。其中,redis-warlock
实现的是这个分布式锁算法。
每个进程:
- 请求加锁,如果成功则继续,失败则等待一段时间再重新申请
- 读取数据库未被标记为已读的记录
- 把读到的记录标记为已读,写回到数据库中
- 释放锁
- 每五个一组,处理本次读到的记录
另外,TokuMX是MongDB的一个变种,提供了更快的写操作和文档级别的锁机制。存储同样大小的数据量,TokuMX占用的空间明显要小。
嗯 谢谢你 我们也本来打算用这个思想自己写锁的 既然有这样的模块下来可以看看研究研究 不过现在我们这样做的:我们把收集到的消息放到redis的list里面 然后各个进程来一个一个的pop 目前测的速度能满足需求 不知道各位怎么看
没看明白。 如果是要并发从同一个数据源提出不重复的内容,把要取的内容编号就可以了。 进程1,2,3,4,5,每个读5个。 分别读1~5, 6~10, … 26~30,31~35,… 51~55,56~60,… 每个序列按n×5递增计算。 数据源新的内容编号放在后面。 读数据不需要锁。
谢谢你的回复 你这太有局限性了哈 先不说数据这样编号引发的问题 每个进程要单独配置 而且如果停掉一个进程 其它进程也都要停掉重新配置
用redis的list来实现呗
mongo各种方案都吃力不讨好
嗯 现在是用redis的list来实现的 但是一次只能pop一个还是比较蛋疼的
这种处理方法,相当于把单表拆成五个表,每个进程处理一个表。
Gearman 呢?
谢谢 看了一下哈 这个解决不了多读的问题
嗯 了解一下
在多进程环境下处理队列时,MongoDB的并发控制确实是一个挑战。为了避免重复处理队列中的任务,可以采用一些策略来确保每个任务只被处理一次。
一种常见的方法是使用MongoDB的findAndModify
或findOneAndUpdate
来实现原子操作,以避免多个进程同时读取同一任务。虽然Mongoose中没有直接提供这种功能,但可以通过原生的MongoDB驱动来实现。
以下是一个使用findOneAndUpdate
的简单示例:
const MongoClient = require('mongodb').MongoClient;
// 连接到MongoDB
MongoClient.connect('mongodb://localhost:27017/yourdb', (err, client) => {
if (err) throw err;
const db = client.db();
const collection = db.collection('messageQueue');
// 查找并更新任务
collection.findOneAndUpdate(
{ status: 'pending' }, // 查询条件
{ $set: { status: 'processing', workerId: process.pid } }, // 更新操作
{ returnOriginal: false }, // 返回更新后的文档
(err, result) => {
if (err) throw err;
if (result.value) {
// 处理结果
console.log(`Processing message ${result.value._id}`);
// ... 其他处理逻辑 ...
} else {
console.log('No messages available.');
}
}
);
});
这段代码使用findOneAndUpdate
来查找一个未处理的任务,并将其状态设置为processing
,同时记录处理该任务的进程ID。这样可以确保其他进程不会处理同一个任务。
此外,可以考虑使用一些现有的队列库,如Kue、Bull等,这些库已经解决了多进程环境下的并发控制问题。例如,Bull提供了更高级的功能来管理队列和任务,并支持Redis作为后端存储。
希望这能帮助你解决问题!