Nodejs 如何使用mongoose对一个100万+的mongodb的表进行遍历操作

Nodejs 如何使用mongoose对一个100万+的mongodb的表进行遍历操作

这个问题困扰我很久了,一致没有想明白。 这个表有100万的document,我需要遍历他,每次拿出来10条文档,对其进行1个比较耗时的处理,会利用callback返回,进行下10条文档的处理。 直接使用如下代码,把所有100万数据都放入内存,然后利用async处理,我知道比较蠢,测试了一下,内存猛涨

Blockquote MyModel.find({}, function (err, docs) { dosomething(docs,callback)}); Blockquote

这里请教一下大侠们,可否指点一下方法,使得可以遍历完成100万数据的处理,并且不会引起内存骤增,谢谢了!


17 回复

要在Node.js中使用Mongoose遍历一个包含超过100万个文档的大集合,同时避免一次性将所有文档加载到内存中导致内存溢出的问题,可以采用分批查询的方式。这样可以在处理一批文档的同时,从数据库中获取下一批文档,从而保持较低的内存占用。以下是一个示例代码,展示了如何实现这一目标:

const mongoose = require('mongoose');
const MyModel = mongoose.model('MyModel', new mongoose.Schema({}));

// 连接到MongoDB
mongoose.connect('mongodb://localhost:27017/mydatabase', {
    useNewUrlParser: true,
    useUnifiedTopology: true
}).then(() => console.log('Connected to MongoDB'))
  .catch(err => console.error('Failed to connect to MongoDB', err));

// 定义每次处理的文档数量
const batchSize = 10;

// 递归函数用于处理一批文档后请求下一批
function processBatch(skip) {
    MyModel.find({})
        .skip(skip)
        .limit(batchSize)
        .exec((err, docs) => {
            if (err) {
                console.error('Error fetching documents:', err);
                return;
            }
            
            // 对这批文档进行处理
            for (let doc of docs) {
                doSomethingWithDoc(doc, () => {
                    // 当前文档处理完成后,递归调用处理下一批
                    if (docs.length === batchSize) {
                        processBatch(skip + batchSize);
                    }
                });
            }
        });
}

// 开始处理
processBatch(0);

// 假设的耗时处理函数
function doSomethingWithDoc(doc, callback) {
    // 这里执行你的耗时操作
    setTimeout(() => {
        console.log(`Processed document ${doc._id}`);
        callback();
    }, 1000); // 模拟耗时操作
}

在这个例子中,我们定义了一个processBatch函数,它接受一个skip参数来指定从哪个位置开始跳过文档。每次处理完一批文档(在这个例子中是10个),我们就递归地调用processBatch函数,传递下一个需要跳过的文档数量,从而继续处理下一批文档。

这种方法确保了我们不会一次性将大量文档加载到内存中,而是以小批量的方式逐步处理整个集合,有效地管理了内存使用。


加个定时器试试,个人理解,高手勿喷

定时器是无法搞定的!

100W应该有索引吧例如_id,那每次读10条,callback自己再继续下个10条?

callback 会导致堆栈溢出 有个类似的问题:http://stackoverflow.com/questions/10546193/how-to-return-large-amount-of-rows-from-mongodb-using-node-js-http-server

答案4不明白如何实现的

如果每次读取10条,需要等待这10条处理完毕callback返回后,再去拿下10条去处理。这么考虑是为了不让处理数据的服务负载太大。

如果每次拿10条记录,不等待处理服务处理完毕,就又去领10条记录处理,会让处理数据的服务压力太大!

因此,想采取单线一次只处理10条记录,处理完毕再处理下10条,直至所有100万记录处理完毕。

是不是mongodb输出数据时,是不等待的,无法组塞住?

I guess it means

var MongoClient = require('mongodb').MongoClient;
var url='mongodb://localhost/sample';
MongoClient.connect(url, function(err, db) {
    if(err) { return console.log("connection failed: %s",err); }
    console.log('connected to mongo, ready for duty work');
    var collection=db.collection('files');
    //setInterval(function(){ console.log('memroy usage : %j',process.memoryUsage());},10000);
    /** fetch all records **/
    var stream=collection.find().stream(),cache=[];
    stream.on('data',function(item){
        cache.push(item);
        if(cache.length==10){
            /** signal mongo to pause reading **/
            stream.pause();
            process.nextTick(function(){
                doLotsWork(cache,function(){
                    cache=[];
                    /** signal mongo to continue, fetch next record **/
                    stream.resume();
                });
            });
        }
    });
    stream.on('end',function(){ console.log('query ended'); });
    stream.on('close',function(){ console.log('query closed'); });
});

function doLotsWork(records,callback){ //…do lots of work //… //all done, ready to deal with next 10 records process.nextTick(function(){ callback(); }); }

tested with 967615 records, memory usage: 18M --> 110M --> 25M

一次性取100万个太大了,那能不能一次取10个呢?使用分页

function cursor(start,limit){
    //1.游标初始化
    var start;
    if(!start){
        start = start || 0;
    }
    //2.分页初始化
    var limit = limit || 10;
    //3.分页查询
    Model.skip(start).limit(limit).exec(function(err,docs){
        //遍历处理
        if(!docs || docs.length == 0){
            //处理完成
        }
        for(var i = docs.length - 1; ~i; i--,){
            //处理完的判断
            if(){
                //迭代
                cursor(start + docs.length,limit);
            }
        }
    });
}

刚才的没有做过测试,我那自己的项目写了一个完整案例,你看一下对你有没有帮助

//这是我封装后的抽象Model
var Dao = require('./Dao');
//这是我根据抽象的Model查找到实体的Model
var professionDao = Dao('profession');
/**
 * 游标函数
 * [@param](/user/param) _start 游标的起始位置
 * [@param](/user/param) _limit 游标的分页数量
 * [@param](/user/param) _callback 游标执行函数
 */
function cursor(_start,_limit,_callback){
  //初始化数据定义
  var start,limit,flag,len;
  //初始化起始位置
  start = !_start || _start < 0 ? 0 : _start;
  //初始化分页数量
  limit = !_limit || _limit < 1 ? 1 : _limit;
  //使用Model执行分页查询
  professionDao.find().skip(start).limit(limit).exec(function(err,docs){
    //缓存长度
    len = docs.length;
    //如果没有查询到,证明已经查询完毕
    if(len === 0){
      console.log('遍历结束');
    }
    //初始化循环结束标记
    flag = 0;
    //遍历
    docs.forEach(function(doc){
      //如果有执行函数就执行
      if(_callback && toString.call(_callback) === '[object Function]'){
        _callback(doc);
      }
      //如果循环到末尾,则迭代
      if(len == ++flag){
        cursor(start + docs.length,limit);
      }
    });
  });
}
//执行
cursor(0,10,function(doc){
  console.log(doc._id);
});

当然这个是没有经过优化的,如果要优化就要解决变量顺序、默认值等问题,尽量让api简介好用,希望对你有帮助!

skip方式对大数据有问题,不能使用。这个之前遇到过,后来我们采取的是数字索引排序的方式,index大于起始值,取出来100个,把100个中最大的index记录下来下次查询使用。

这样可以保证拿出的数据是少量的,而且减少了skip引起的速度慢的问题。

如果使用skip方式,在shard环境下,如果跨shard获取数据的话,会引起负载爆升!

相当清晰,非常感谢! 如果要使用在大数据下,不能取过大的数据,否则会引起跨shard扫描的问题。

一般单机环境小数据都是非常棒的方案!

感谢大神!

关注这个问题。 MARK一个

使用Query.prototype.cursor()

  • 流方式
Thing.
  find({ name: /^hello/ }).
  cursor().
  on('data', function(doc) { console.log(doc); }).
  on('end', function() { console.log('Done!'); });
  • 迭代器方式
co(function*() {
  const cursor = Thing.find({ name: /^hello/ }).cursor();
  for (let doc = yield cursor.next(); doc != null; doc = yield cursor.next()) {
    console.log(doc);
  }
});

参考:query_Query-cursor

阔以参考这篇博客:如何高效地遍历 MongoDB 超大集合?

function findAllMembersCursor() {
    return Member.find().cursor();
}

async function test() { const membersCursor = await findAllMembersCursor(); let N = 0; await membersCursor.eachAsync(member => { N++; console.log(name of the ${N}th member: ${member.name}); }); console.log(loop all ${N} members success); }

test();

在处理大量数据时,一次性加载所有文档到内存中是不切实际的。更好的方法是使用游标(Cursor)来逐批处理数据。Mongoose 提供了游标功能,允许我们一次获取少量的数据,从而避免内存问题。

以下是一个示例代码,展示如何使用 Mongoose 和游标来逐批处理百万级别的 MongoDB 文档:

const mongoose = require('mongoose');
const MyModel = mongoose.model('MyModel', new mongoose.Schema({}));

mongoose.connect('mongodb://localhost:27017/mydatabase', {
    useNewUrlParser: true,
    useUnifiedTopology: true
});

function processDocuments(batchSize) {
    const cursor = MyModel.find({}).cursor();

    function handleBatch(err, docs) {
        if (err) return console.error(err);

        // 对当前批次的文档进行处理
        docs.forEach(doc => {
            // 这里是你的处理逻辑
            doc.doSomething(() => {
                // 处理完成后继续处理下一个批次
                nextBatch();
            });
        });
    }

    function nextBatch() {
        cursor.next(handleBatch);
    }

    // 开始处理第一批文档
    nextBatch();
}

// 设定每次处理的文档数量
processDocuments(10);

解释:

  1. 连接数据库:首先连接到 MongoDB 数据库。
  2. 创建游标:使用 find({}).cursor() 创建一个游标对象。
  3. 处理批量文档:定义 handleBatch 函数来处理一批文档。每个文档处理完毕后调用 doc.doSomething(callback) 来执行你的业务逻辑。
  4. 处理下一批次:使用 cursor.next(callback) 获取并处理下一批文档。
  5. 启动处理:调用 nextBatch() 开始处理第一批文档。

这种方式通过游标逐批读取和处理数据,能够有效避免内存问题。

回到顶部