Nodejs 如何使用mongoose对一个100万+的mongodb的表进行遍历操作
Nodejs 如何使用mongoose对一个100万+的mongodb的表进行遍历操作
这个问题困扰我很久了,一致没有想明白。 这个表有100万的document,我需要遍历他,每次拿出来10条文档,对其进行1个比较耗时的处理,会利用callback返回,进行下10条文档的处理。 直接使用如下代码,把所有100万数据都放入内存,然后利用async处理,我知道比较蠢,测试了一下,内存猛涨
Blockquote MyModel.find({}, function (err, docs) { dosomething(docs,callback)}); Blockquote
这里请教一下大侠们,可否指点一下方法,使得可以遍历完成100万数据的处理,并且不会引起内存骤增,谢谢了!
要在Node.js中使用Mongoose遍历一个包含超过100万个文档的大集合,同时避免一次性将所有文档加载到内存中导致内存溢出的问题,可以采用分批查询的方式。这样可以在处理一批文档的同时,从数据库中获取下一批文档,从而保持较低的内存占用。以下是一个示例代码,展示了如何实现这一目标:
const mongoose = require('mongoose');
const MyModel = mongoose.model('MyModel', new mongoose.Schema({}));
// 连接到MongoDB
mongoose.connect('mongodb://localhost:27017/mydatabase', {
useNewUrlParser: true,
useUnifiedTopology: true
}).then(() => console.log('Connected to MongoDB'))
.catch(err => console.error('Failed to connect to MongoDB', err));
// 定义每次处理的文档数量
const batchSize = 10;
// 递归函数用于处理一批文档后请求下一批
function processBatch(skip) {
MyModel.find({})
.skip(skip)
.limit(batchSize)
.exec((err, docs) => {
if (err) {
console.error('Error fetching documents:', err);
return;
}
// 对这批文档进行处理
for (let doc of docs) {
doSomethingWithDoc(doc, () => {
// 当前文档处理完成后,递归调用处理下一批
if (docs.length === batchSize) {
processBatch(skip + batchSize);
}
});
}
});
}
// 开始处理
processBatch(0);
// 假设的耗时处理函数
function doSomethingWithDoc(doc, callback) {
// 这里执行你的耗时操作
setTimeout(() => {
console.log(`Processed document ${doc._id}`);
callback();
}, 1000); // 模拟耗时操作
}
在这个例子中,我们定义了一个processBatch
函数,它接受一个skip
参数来指定从哪个位置开始跳过文档。每次处理完一批文档(在这个例子中是10个),我们就递归地调用processBatch
函数,传递下一个需要跳过的文档数量,从而继续处理下一批文档。
这种方法确保了我们不会一次性将大量文档加载到内存中,而是以小批量的方式逐步处理整个集合,有效地管理了内存使用。
加个定时器试试,个人理解,高手勿喷
定时器是无法搞定的!
100W应该有索引吧例如_id,那每次读10条,callback自己再继续下个10条?
callback 会导致堆栈溢出 有个类似的问题:http://stackoverflow.com/questions/10546193/how-to-return-large-amount-of-rows-from-mongodb-using-node-js-http-server
答案4不明白如何实现的
如果每次读取10条,需要等待这10条处理完毕callback返回后,再去拿下10条去处理。这么考虑是为了不让处理数据的服务负载太大。
如果每次拿10条记录,不等待处理服务处理完毕,就又去领10条记录处理,会让处理数据的服务压力太大!
因此,想采取单线一次只处理10条记录,处理完毕再处理下10条,直至所有100万记录处理完毕。
是不是mongodb输出数据时,是不等待的,无法组塞住?
I guess it means
var MongoClient = require('mongodb').MongoClient;
var url='mongodb://localhost/sample';
MongoClient.connect(url, function(err, db) {
if(err) { return console.log("connection failed: %s",err); }
console.log('connected to mongo, ready for duty work');
var collection=db.collection('files');
//setInterval(function(){ console.log('memroy usage : %j',process.memoryUsage());},10000);
/** fetch all records **/
var stream=collection.find().stream(),cache=[];
stream.on('data',function(item){
cache.push(item);
if(cache.length==10){
/** signal mongo to pause reading **/
stream.pause();
process.nextTick(function(){
doLotsWork(cache,function(){
cache=[];
/** signal mongo to continue, fetch next record **/
stream.resume();
});
});
}
});
stream.on('end',function(){ console.log('query ended'); });
stream.on('close',function(){ console.log('query closed'); });
});
function doLotsWork(records,callback){
//…do lots of work
//…
//all done, ready to deal with next 10 records
process.nextTick(function(){
callback();
});
}
tested with 967615 records, memory usage: 18M --> 110M --> 25M
一次性取100万个太大了,那能不能一次取10个呢?使用分页
function cursor(start,limit){
//1.游标初始化
var start;
if(!start){
start = start || 0;
}
//2.分页初始化
var limit = limit || 10;
//3.分页查询
Model.skip(start).limit(limit).exec(function(err,docs){
//遍历处理
if(!docs || docs.length == 0){
//处理完成
}
for(var i = docs.length - 1; ~i; i--,){
//处理完的判断
if(){
//迭代
cursor(start + docs.length,limit);
}
}
});
}
刚才的没有做过测试,我那自己的项目写了一个完整案例,你看一下对你有没有帮助
//这是我封装后的抽象Model
var Dao = require('./Dao');
//这是我根据抽象的Model查找到实体的Model
var professionDao = Dao('profession');
/**
* 游标函数
* [@param](/user/param) _start 游标的起始位置
* [@param](/user/param) _limit 游标的分页数量
* [@param](/user/param) _callback 游标执行函数
*/
function cursor(_start,_limit,_callback){
//初始化数据定义
var start,limit,flag,len;
//初始化起始位置
start = !_start || _start < 0 ? 0 : _start;
//初始化分页数量
limit = !_limit || _limit < 1 ? 1 : _limit;
//使用Model执行分页查询
professionDao.find().skip(start).limit(limit).exec(function(err,docs){
//缓存长度
len = docs.length;
//如果没有查询到,证明已经查询完毕
if(len === 0){
console.log('遍历结束');
}
//初始化循环结束标记
flag = 0;
//遍历
docs.forEach(function(doc){
//如果有执行函数就执行
if(_callback && toString.call(_callback) === '[object Function]'){
_callback(doc);
}
//如果循环到末尾,则迭代
if(len == ++flag){
cursor(start + docs.length,limit);
}
});
});
}
//执行
cursor(0,10,function(doc){
console.log(doc._id);
});
当然这个是没有经过优化的,如果要优化就要解决变量顺序、默认值等问题,尽量让api简介好用,希望对你有帮助!
skip方式对大数据有问题,不能使用。这个之前遇到过,后来我们采取的是数字索引排序的方式,index大于起始值,取出来100个,把100个中最大的index记录下来下次查询使用。
这样可以保证拿出的数据是少量的,而且减少了skip引起的速度慢的问题。
如果使用skip方式,在shard环境下,如果跨shard获取数据的话,会引起负载爆升!
相当清晰,非常感谢! 如果要使用在大数据下,不能取过大的数据,否则会引起跨shard扫描的问题。
一般单机环境小数据都是非常棒的方案!
感谢大神!
关注这个问题。 MARK一个
学习了
使用Query.prototype.cursor()
- 流方式
Thing.
find({ name: /^hello/ }).
cursor().
on('data', function(doc) { console.log(doc); }).
on('end', function() { console.log('Done!'); });
- 迭代器方式
co(function*() {
const cursor = Thing.find({ name: /^hello/ }).cursor();
for (let doc = yield cursor.next(); doc != null; doc = yield cursor.next()) {
console.log(doc);
}
});
在处理大量数据时,一次性加载所有文档到内存中是不切实际的。更好的方法是使用游标(Cursor)来逐批处理数据。Mongoose 提供了游标功能,允许我们一次获取少量的数据,从而避免内存问题。
以下是一个示例代码,展示如何使用 Mongoose 和游标来逐批处理百万级别的 MongoDB 文档:
const mongoose = require('mongoose');
const MyModel = mongoose.model('MyModel', new mongoose.Schema({}));
mongoose.connect('mongodb://localhost:27017/mydatabase', {
useNewUrlParser: true,
useUnifiedTopology: true
});
function processDocuments(batchSize) {
const cursor = MyModel.find({}).cursor();
function handleBatch(err, docs) {
if (err) return console.error(err);
// 对当前批次的文档进行处理
docs.forEach(doc => {
// 这里是你的处理逻辑
doc.doSomething(() => {
// 处理完成后继续处理下一个批次
nextBatch();
});
});
}
function nextBatch() {
cursor.next(handleBatch);
}
// 开始处理第一批文档
nextBatch();
}
// 设定每次处理的文档数量
processDocuments(10);
解释:
- 连接数据库:首先连接到 MongoDB 数据库。
- 创建游标:使用
find({}).cursor()
创建一个游标对象。 - 处理批量文档:定义
handleBatch
函数来处理一批文档。每个文档处理完毕后调用doc.doSomething(callback)
来执行你的业务逻辑。 - 处理下一批次:使用
cursor.next(callback)
获取并处理下一批文档。 - 启动处理:调用
nextBatch()
开始处理第一批文档。
这种方式通过游标逐批读取和处理数据,能够有效避免内存问题。