Nodejs 队列的多进程问题---求指导

Nodejs 队列的多进程问题---求指导

       大致描述一下问题的情况:客户端会产生大量的“消息”存到mongo数据库作为一个“消息池”,然后程序会每次读取一定量(上百个)的“消息”进行处理,会有异步处理,所以会用到async.quene来做并发控制(每次并发执行5个左右)。现在问题出现了:如果多进程的话,mongo“消息池”中的“消息”有可能被读出来多次的,一开始的思路是从mongo“消息池”读取“消息”的时候直接把已经读出来的标记一下状态,但是mongoose里面貌似只有findOneAndUpdate这样一次读一个的操作。不知道把问题说清楚了没,求指导。


19 回复

Node.js 队列的多进程问题 —— 求指导

问题背景

客户端会产生大量的“消息”并存储到MongoDB数据库中,形成一个“消息池”。程序需要从这个“消息池”中按需读取消息,并进行异步处理。为了控制并发量,使用了async.queue模块,每次并发执行5个左右的消息处理任务。

然而,当使用多进程时,可能会出现重复读取同一消息的问题。最初的想法是在从“消息池”读取消息时,将已读取的消息做标记(例如更改状态),但发现Mongoose中没有提供批量更新的方法,只能通过findOneAndUpdate逐条更新。

解决方案

为了解决这个问题,可以考虑以下几种方法:

  1. 使用唯一标识符

    • 在插入消息时,给每条消息分配一个唯一的标识符(如UUID)。
    • 当读取消息时,使用该唯一标识符作为查询条件,确保不会重复读取。
  2. 使用事务

    • 使用MongoDB的事务功能,确保在读取和更新消息时的原子性操作。
    • 这样可以在一个事务中完成读取和更新,避免并发问题。
  3. 使用乐观锁

    • 在消息文档中添加一个版本号字段。
    • 在读取消息时,同时检查版本号,更新时验证版本号是否一致。
    • 如果版本号不一致,则表示有其他进程已经更新了该消息,需要重新读取。

示例代码

const mongoose = require('mongoose');
const async = require('async');

// 定义消息模型
const MessageSchema = new mongoose.Schema({
    content: String,
    status: { type: String, default: 'pending' },
    version: { type: Number, default: 0 }
});

const Message = mongoose.model('Message', MessageSchema);

// 初始化数据库连接
mongoose.connect('mongodb://localhost:27017/testdb', { useNewUrlParser: true, useUnifiedTopology: true });

// 创建队列
const queue = async.queue((task, callback) => {
    const message = task.message;
    console.log(`Processing message: ${message.content}`);
    
    // 更新消息状态
    Message.findOneAndUpdate(
        { _id: message._id, version: message.version },
        { $set: { status: 'processed', version: message.version + 1 } },
        { new: true }
    ).then(updatedMessage => {
        if (!updatedMessage) {
            // 版本号不匹配,重新获取消息
            return Message.findById(message._id).then(newMessage => {
                task.message = newMessage;
                callback();
            });
        }
        console.log(`Processed message: ${updatedMessage.content}`);
        callback();
    }).catch(err => {
        console.error(err);
        callback(err);
    });
}, 5); // 并发数设置为5

// 读取消息并放入队列
Message.find({ status: 'pending' }, (err, messages) => {
    if (err) throw err;

    messages.forEach(message => {
        queue.push({ message });
    });
});

总结

通过上述方法,可以有效地解决多进程环境下消息重复读取的问题。使用唯一标识符、事务或乐观锁机制,都可以保证消息处理的正确性和一致性。


貌似MongoDB后面的版本提供了基于文档的锁,是否可以考虑对于读出来的消息进行加锁处理,然后处理完了之后再更新状态标记,然后再回写,释放锁。。。

貌似MongoDB后面的版本提供了基于文档的锁

这一句是对 mongo 2.8 的更新,出现了理解错误。

现在的锁是数据库级别的,2.8 只是锁的级别到了文档级别,但是锁是不可被操作的。这种文档级别的锁的性质,只能被想办法利用。

在数据库没有锁的情况下,真的想不出该如何解决这种问题。

其实这种并发读写的问题一直是用“锁”这个概念在解决的,我在开发的过程中遇到过类似的场景,我们是把元信息存在 mysql 里面,然后具体的数据放在 nosql 里。这样我们读的时候可以利用 sql 的锁特性,然后再去 nosql 里面取出更多数据。

不过话说我们的消息数据并不大,我们只是为了不重复读才用的 sql,不知你的场景是不是写入太频繁?

谢谢你的回复,能加一下我的qq(568828193)详细咨询一下吗

可以考虑在应用程序这一级提供分布式锁。例如,用Redis + redis-warlock或者redis-lock模块实现分布式锁。其中,redis-warlock实现的是这个分布式锁算法

每个进程:

  • 请求加锁,如果成功则继续,失败则等待一段时间再重新申请
  • 读取数据库未被标记为已读的记录
  • 把读到的记录标记为已读,写回到数据库中
  • 释放锁
  • 每五个一组,处理本次读到的记录

另外,TokuMX是MongDB的一个变种,提供了更快的写操作和文档级别的锁机制。存储同样大小的数据量,TokuMX占用的空间明显要小。

嗯 谢谢你 我们也本来打算用这个思想自己写锁的 既然有这样的模块下来可以看看研究研究 不过现在我们这样做的:我们把收集到的消息放到redis的list里面 然后各个进程来一个一个的pop 目前测的速度能满足需求 不知道各位怎么看

没看明白。 如果是要并发从同一个数据源提出不重复的内容,把要取的内容编号就可以了。 进程1,2,3,4,5,每个读5个。 分别读1~5, 6~10, … 26~30,31~35,… 51~55,56~60,… 每个序列按n×5递增计算。 数据源新的内容编号放在后面。 读数据不需要锁。

谢谢你的回复 你这太有局限性了哈 先不说数据这样编号引发的问题 每个进程要单独配置 而且如果停掉一个进程 其它进程也都要停掉重新配置

用redis的list来实现呗

mongo各种方案都吃力不讨好

嗯 现在是用redis的list来实现的 但是一次只能pop一个还是比较蛋疼的

这种处理方法,相当于把单表拆成五个表,每个进程处理一个表。

Gearman 呢?

谢谢 看了一下哈 这个解决不了多读的问题

嗯 了解一下

在多进程环境下处理队列时,MongoDB的并发控制确实是一个挑战。为了避免重复处理队列中的任务,可以采用一些策略来确保每个任务只被处理一次。

一种常见的方法是使用MongoDB的findAndModifyfindOneAndUpdate来实现原子操作,以避免多个进程同时读取同一任务。虽然Mongoose中没有直接提供这种功能,但可以通过原生的MongoDB驱动来实现。

以下是一个使用findOneAndUpdate的简单示例:

const MongoClient = require('mongodb').MongoClient;

// 连接到MongoDB
MongoClient.connect('mongodb://localhost:27017/yourdb', (err, client) => {
    if (err) throw err;
    const db = client.db();
    const collection = db.collection('messageQueue');

    // 查找并更新任务
    collection.findOneAndUpdate(
        { status: 'pending' }, // 查询条件
        { $set: { status: 'processing', workerId: process.pid } }, // 更新操作
        { returnOriginal: false }, // 返回更新后的文档
        (err, result) => {
            if (err) throw err;
            if (result.value) {
                // 处理结果
                console.log(`Processing message ${result.value._id}`);
                // ... 其他处理逻辑 ...
            } else {
                console.log('No messages available.');
            }
        }
    );
});

这段代码使用findOneAndUpdate来查找一个未处理的任务,并将其状态设置为processing,同时记录处理该任务的进程ID。这样可以确保其他进程不会处理同一个任务。

此外,可以考虑使用一些现有的队列库,如Kue、Bull等,这些库已经解决了多进程环境下的并发控制问题。例如,Bull提供了更高级的功能来管理队列和任务,并支持Redis作为后端存储。

希望这能帮助你解决问题!

回到顶部