Nodejs 异步批量操作数据库的处理

Nodejs 异步批量操作数据库的处理

最近写数据库批量插入数据的时候遇到了一个异步编程之痛的问题,就是数据库操作都是异步回调的,那么就没有办法用for循环来操作了,思考了一下,看来只能利用迭代循环了,举例分享一下,希望高手提提意见,小白小心一下。

exports.saveBatch = function saveBatch(basicInfos, callback) {
    executeFunc(basicInfos, 0, [], callback);
}
function executeFunc(basicInfos, count, errs, callback) {//errs是个错误信息收集器,最后统一返回错误信息集合
    if (count == basicInfos.length) {
        callback(errs);
    } else {
        basicInfos[count].save(function (err) {
            count++;
            err && errs.push(err.message);
            executeFunc(basicInfos, count, errs, callback);
        });
    }
}

12 回复

Nodejs 异步批量操作数据库的处理

最近在编写一个需要批量插入数据到数据库的应用时,遇到了异步编程中的常见问题。由于数据库操作都是异步的,传统的 for 循环无法直接使用,因此需要采用迭代的方式来处理。

这里提供一种解决方案,通过递归函数来实现异步批量操作数据库。以下是一个简单的示例代码:

// 假设我们有一个模型(例如 Mongoose 模型)用来表示数据
const BasicInfo = require('./models/BasicInfo');

exports.saveBatch = function saveBatch(basicInfos, callback) {
    // 初始化执行函数,并传入基本参数
    executeFunc(basicInfos, 0, [], callback);
}

function executeFunc(basicInfos, count, errs, callback) {
    // 如果所有数据都已处理完毕,则调用回调函数并传递错误信息
    if (count === basicInfos.length) {
        return callback(errs);
    }

    // 对当前数据项进行保存操作
    basicInfos[count].save((err) => {
        // 计数器加一
        count++;

        // 如果有错误发生,则将其添加到错误信息集合中
        if (err) {
            errs.push(err.message);
        }

        // 递归调用执行函数,处理下一个数据项
        executeFunc(basicInfos, count, errs, callback);
    });
}

解释

  1. saveBatch 函数

    • 接收两个参数:basicInfos(待插入的数据数组)和 callback(完成后的回调函数)。
    • 调用 executeFunc 函数,并传入初始参数:basicInfos、计数器 count 初始值为 0、错误信息集合 errs 初始为空数组、以及回调函数 callback
  2. executeFunc 函数

    • 接收四个参数:basicInfos(待插入的数据数组)、count(当前处理的数据索引)、errs(错误信息集合)和 callback(完成后的回调函数)。
    • 如果 count 等于 basicInfos 的长度,说明所有数据已经处理完毕,此时调用 callback 并传入错误信息集合 errs
    • 否则,对当前数据项 basicInfos[count] 进行保存操作。如果保存过程中发生错误,则将错误信息添加到 errs 中。
    • 无论是否发生错误,递归调用 executeFunc 来处理下一个数据项,直到所有数据都被处理完毕。

这种递归方法可以有效地处理异步批量操作数据库的问题,避免了使用复杂的异步控制流库,如 asyncPromise,同时也便于理解和维护。


你可以考虑用async或Jscex

使用外围标记,在for循环中使用代码段标记判断即可,形如:

    var flag = 0;
    var datas = [.........................];
    for(var i =0; i < datas.length; i++){
        save(datas[i],function(){
            if(i == ++flag){
                callback();
            }
        });
    }

这个不错

这个也是有问题的,如果save的 callback被立即执行,callback将不被执行

数据都不依赖,这样串行执行没意义啊。

有异步流控问题,找EventProxy

https://github.com/JacksonTian/eventproxy

请关注 async.eachForSeries. 不知道有没拼写错误。

这样的批量操作肯定对性能有很大影响,弄不好网络还给堵死了。最好是运用数据库提供的批量操作,比如说pipe,或者合并sql,批量执行。

能详细介绍下如何使用pipe吗?

理论上我是做了一个Pool,然后利用 process.nextTick,但是我处理的只是在数据库异步初始化时候把它变成同步。假如是针对每次操作,能就要用的event的操作。其实这个一个很大的问题。异步操作时nodejs 的特性,不好的地方就是在大架构逻辑操作的时候,逻辑的耦合会很不容易处理。

在Node.js中进行异步批量操作数据库时,通常会遇到如何有效地管理异步回调的问题。上述代码通过递归的方式逐个执行数据库插入操作,并收集错误信息。这种方式可以确保所有数据都被尝试插入,同时在最终回调中报告任何错误。

示例代码改进

我们可以使用现代的异步处理方法(如async/await)来简化代码逻辑,使其更易读和维护。以下是一个使用async/await的改进版本:

const async = require('async'); // 引入async库

exports.saveBatch = async function saveBatch(basicInfos) {
    const errors = [];
    
    await async.eachOfLimit(basicInfos, 10, async (info, index) => {
        try {
            await info.save();
        } catch (error) {
            errors.push(error.message);
        }
    });

    return errors;
};

在这个例子中,我们使用了async库中的eachOfLimit函数来限制并发操作的数量,这有助于避免数据库连接池过载。每个数据项的保存操作都包裹在一个try-catch块中,这样可以单独捕获每个操作的错误。

解释

  • async.eachOfLimit: 控制并发数量,以防止过多的数据库请求同时发生。
  • try-catch: 确保每个数据库操作独立地捕获异常,并将错误信息收集起来。
  • await: 使异步函数看起来像同步代码,简化了异步逻辑的理解和调试。

这种方法不仅提高了代码的可读性,还增强了对错误处理的灵活性。你可以根据具体需求调整并发限制,或进一步优化错误处理逻辑。

回到顶部