Node.js中实现MongoDB的两阶段提交(上)

Node.js中实现MongoDB的两阶段提交(上)

简介

Node.js是一个使用Javascript语言,Chrome V8引擎作为其解释器的Web应用开发平台,其特点 是提供了非阻塞I/O,基于事件循环的异步处理,可用于高并发的服务器端应用开发。MongoDB是最为流行的NoSQL文档型数据库,其特点是无模式,高可扩展性。MongoDB内部使用JSON格式存储,存储过程也是用Javascript编写。这听上去很好,如果你开发一个典型的Web应用,那么从”浏览器->应用服务器->数据库“全部都是同一种编程语言、使用同一种格式来传递数据,无需转换。你是不是觉得有一种很轻松的感觉。

MongoDB是被设计用来解决大数据而引发的可扩展性问题的,因此不可避免的,它没有提供关系数据库所必须的特性:数据一致性。在MongoDB中只有对同一个文档进行的操作才是原子的。即便是在同一台数据库服务器上,MongoDB也不能保证同时更新两个文档的操作时数据一致性。 之所以MongoDB这样设计,我想理由应该有两个:一,很难在多台数据库服务器之间进行高效的分布式事务控制。二,事务控制对性能有很大影响。所以对事务要求高的情况下最好不要用NoSQL数据库,但是如果非要用 , 那么怎么解决数据一致性问题呢? MongoDB把这一任务留给了程序员自己,为了获得最终的一致性,我们可以采用 :

两阶段提交

顾名思义也就是把事务分成两个阶段:第一个阶段尝试进行提交,第二个阶段正式提交。这样,即使更新数据时发生故障,我们也能知道数据都处于什么状态,总是能够把数据恢复到更新之前的状态。

那么我们来看看在Node.js中怎么实现MongoDB的两阶段提交。

首先假定我们有这样的应用场景:我们需要把一个用户 10分 的积分转移到另一个用户的账户中。在MongoDB中,这个操作需要分成两步,第一步扣除用户A的积分s,第二步增加用户B的积分s。由于影响到了两个文档,但是必须保证操作都完成或者都失败(否则A可能被扣了分,但是B却没得到分),因此我们应用两阶段提交。

为了记录事务的状态和相关联的数据信息,我们需要建立一个事务集合(transaction),用于提交、回滚事务和故障恢复。这个集合中的文档包含了一次事务需要改变的所有数据(包括insert,update,delete的所有数据)。

没有事务控制的转移积分的操作在Node.js中大概可以写成这样:

Js代码

// 扣除A的积分10分  
dbskin.collection('user').findAndModify({name:'allenny.iteye.com'}, {$inc:{score:-10}}, function(err, result) {  
  if(err || !result) {  
    console.log('ERROR');  
  }  
  else {  
      // 增加B的积分10分  
      dbskin.collection('user').findAndModify({name:'B'}, {$inc:{score:10}}, function(err, result) {  
      if(err || !result) {  
        console.log('ERROR');        
      }  
      else {  
         // 转移成功  
      }  
  });  
  }  
});  

(注意:示例代码依赖于mongodb和mongoskin模块)

实现步骤:

(注意:为了方便描述而不影响理解,示例代码省略了部分可能在真实开发中所必要的内容,比如:更新文档的参数等;示例代码仅为演示,真实场景需考虑更多情况)

为了实现两阶段提交,我们需要在业务代码中穿插下面的事务逻辑调用:

  1. 在整个事务开始之前,首先需要在transaction集合中创建一条事务记录:

Js代码

dbskin.collection('transaction').insert(  
  {from:'allenny.iteye.com', to:'B', score:10, state:'initial'}, function(err, trans) {  
    // 创建成功获得trans ID后,就可以开始事务了  
    beginTransaction(trans._id);  
  }  
);  
  1. 开始事务,并执行更新操作:

Js代码

function beginTransaction(transId) {  
  // 为简化代码此处_id的值直接写为transId  
  dbskin.collection('transatcion').findAndModify({_id:transId}, {$set:{state:'pending'}},  function(err, result) {  
  // 扣除用户A的分数10,并与事务记录关联,表示此记录已更新但可能会被回滚。注意将事务ID作为更新记录的条件,避免重复更新,用于故障恢复时找到恢复点。  
  var cond_a = {name:'allenny.iteye.com', pendingTransactions:{$ne:transId}};  
  dbskin.collection('user').findAndModify(cond_a, {$inc:{score:-10}, $push:{pendingTransaction:transId}}, {safe:true}, function(err, result) {  

      // 增加用户B的分数10,其余同上。  
      var cond_b = {name:'B', pendingTransactions:{$ne:transId}};  
      dbskin.collection('user').findAndModify(cond_b, {$inc:{score:10}, $push:{pendingTransaction:transId}}, {safe:true}, function(err, result) {  

        // 如果全部更新成功,则可以直接提交该事务。  
        commit(transId);  
      });  
  });  
}  

});
}

在这一步中,更新用户记录执行中途时,系统有可能发生故障当机,导致B被扣减了积分,但是A却未得到积分,或者干脆没有更新成功。故障恢复程序执行时应当寻找处于’pending’状态的事务记录,然后重新尝试执行业务逻辑。注意两次更新积分时的执行条件:pendingTransactions中不能包含当前事务ID,这是用来避免恢复时重复修改数据的。换句话说,通过pendingTransactions的记录我们的恢复程序才知道故障发生时数据更新到什么状态了。

  1. 用户积分转移成功完成后,就该将事务的状态置为‘committed’,并清除AB两用户文档和事务记录的关联(从pendingTransactions中删除当前事务ID)。

Js代码

function commit(transId) {  
  dbskin.collection('transatcion').findAndModify({_id:transId},  {$set:{state:'committed'}}, function(err, result) {  
   dbskin.collection('user').findAndModify({name:'allenny.iteye.com'}, {$pull:{pendingTransactions:transId}}, function(err, result) {  

      dbskin.collection('user').findAndModify({name:'B'}, {$pull:{pendingTransactions:transId}}, function(err, result) {  

         // 取消关联后,可以直接完成该事务。  
         endTransaction(transId, function() {  
            console.log(' Transaction done');  
         });  
  });  
});  

});
}

如果此时系统发生故障当机,那么恢复程序应当从事务表中搜索处于‘committed’状态的记录,然后尝试重新清除与事务的关联。

  1. 完成后,结束整个事务(即将事务状态改为‘done’):

Js代码

function endTransaction(transId, fnCallback) {  
  dbskin.collection('transatcion').findAndModify({_id:transId}, {$set:{state:'done'}}, function(err, result) {  
       fnCallback();  
 });  
}  

(本文地址:http://allenny.iteye.com/admin/blogs/1678233)

此时,整个事务操作顺利完成。除了可能发生的服务器故障之外,业务流程都能顺利完成的。那么,假如事务内的更新逻辑由于更新条件无法完成怎么办呢?比如,给 接受积分的用户B 增加限制,使其必须处于非锁定状态才能转入积分,如果B恰恰处于锁定状态的,而A积分已经扣除了,那我们就只能:

回滚(Rollback)

  1. 回滚是特定于不同业务逻辑的具体操作,因此我们先实现针对以上转移积分的回滚函数:

Js代码

function rollbackScoreTransfer(transId, fnCallback) {  
  dbskin.collection('transaction').findOne({_id:transid}, function(err, trans) {  
    // B用户的操作一定没有完成,无需处理,直接返还积分给A用户,同时需要清除与事务的关联。  
    dbskin.collection('user').update({name:'allenny.iteye.com', pendingTransactions:transId},   
      {$inc:{score: trans.score}, $pull:{pendingTransactions:transId}}, function(err, result) {  
        fnCallback();// 完成rollback  
    });  
  });  
}  
  1. 修改上面第二步的更新操作,使其根据业务更新成功与否调用rollback操作, 并将前面的回滚处理函数rollbackScoreTransfer()作为参数传入通用的rollback()函数:

Js代码

function beginTransaction(transId) {  
  dbskin.collection('transatcion').findAndModify({_id:transId}, {$set:{state:'pending'}}, function(err, result) {  
var cond_a = {name:'allenny.iteye.com', pendingTransactions:{$ne:transId}};   
dbskin.collection('user').findAndModify(cond_a, {$inc:{score:-10}, $push:{pendingTransaction:transId}}, function(err, result) {  

  // 此处改变更新条件,增加用户状态检查  
  var cond_b = {name:'B', state:{$ne:'locked'}, pendingTransactions:{$ne:transId}};  
  dbskin.collection('user').findAndModify(cond_b, {$inc:{score:10}, $push:{pendingTransaction:transId}},  function(err, result) {  
     if(err || !result) {  
       // 如果更新失败,则将回滚积分转移业务的函数传入rollback函数等待执行。  
       rollback(transId, rollbackScoreTransfer);  
     }  
     else {  
        commit(transId);  
     }  
  });  
});  

});
}

  1. 回滚操作函数的实现:

Js代码

function rollback(transId, fnOperation) {  

// 先将事务状态变为’canceling’
dbskin.collection(‘transaction’).update({_id:transId}, {$set:{state:‘canceling’}}, function(err, result) {

// 开始具体的回滚操作  
fnOperation(transId, function() {  

  // 完成事务,将事务状态变为'canceled', 回滚结束  
  endTransaction(transId, function() {  
     console.log('Transaction rollback');  
  });  
});  

});
}

调用此函数时将具体的回滚函数传入,待事务状态变为’canceling’后调用。回滚完成后修改事务状态为’canceled’

  1. 回滚完成后调用的endTransaction()函数要处理commit和rollback两种操作,因此修改前面的endTransaction()函数:

Js代码

function endTransaction(transId, fnCallback) {  
  dbskin.collection('transaction').findOne({_id:transid}, {field:['state']}, function(err, trans) {  
    if(trans.state == 'committed') {  
      dbskin.collection('transatcion').update({_id:transid}, {$set:{state:'done'}}, function(err, result) {  
        // 其他处理  
        fnCallback();  
     });  
    }   
    else if(trans.state == 'canceling') {  
      dbskin.collection('transatcion').update({_id:transid}, {$set:{state:'canceled'}}, function(err, result) {  
        // 其他处理  
        fnCallback();  
      });  
    }  
  });  
}  

至此,一个两阶段提交的控制流程已经完成了,你可以使用它来进行多文档的更新操作了,甚至可以 更新 分布式数据库。在任何时候应用服务器发生故障,事务都会处于未完成状态,可以通过恢复程序来完成事务。万事OK了吗?No,还没完,在《MongoDB权威指南》一书中,我看到这么一句话:MongoDB默认的存储引擎是内存映射引擎,MongoDB不能控制数据写入到磁盘的顺序…坑爹啊,我没理解错的话,这意味我们不能确保新的事务状态更新在业务数据更新写入到磁盘前已经写入了, 如果此时数据库服务器当掉的话, 那我们的两阶段提交就有可能得到错误的事务状态,数据一致性被 破坏了 。不过事情还算太糟,目前你可以通过MongoDB的复制功能来保证数据的完整性(这也符合MongoDB的设计初衷)。新的存储引擎也在开发中,不远的将来我们就可以使用到具有单机持久性的MongoDB数据库了。此外,你还可以祈祷数据库服务器不要当掉,心诚则灵。

参考:

《MongoDB权威指南》O’Reilly Media, Inc

《Perform Two Phase Commits》

http://cookbook.mongodb.org/patterns/perform-two-phase-commits/


13 回复

Node.js中实现MongoDB的两阶段提交(上)

简介

Node.js 是一个使用 JavaScript 语言,Chrome V8 引擎作为其解释器的 Web 应用开发平台。它的特点是提供了非阻塞 I/O 和基于事件循环的异步处理机制,适用于高并发的服务器端应用开发。MongoDB 是最流行的 NoSQL 文档型数据库,以其无模式和高可扩展性著称。MongoDB 使用 JSON 格式存储数据,并且存储过程也是用 JavaScript 编写的。

这种组合听起来非常理想,特别是当你开发一个典型的 Web 应用时,从“浏览器 -> 应用服务器 -> 数据库”全部使用同一种编程语言和格式,无需进行数据转换。然而,MongoDB 没有提供关系数据库所必需的数据一致性保证。MongoDB 只能保证对同一个文档的操作是原子的,而在不同的文档或不同的数据库服务器上进行的更新操作不能保证一致性。

为什么需要两阶段提交?

MongoDB 设计之初就是为了处理大数据带来的可扩展性问题,因此它没有提供关系数据库所必需的事务支持。具体来说,MongoDB 不能保证跨多个文档的同时更新操作的一致性。为了解决这个问题,可以采用两阶段提交的方法。

实现两阶段提交

假设我们有一个应用场景:需要将用户 A 的 10 分积分转移到用户 B 的账户中。在这个过程中,我们需要确保要么两个操作都成功,要么都不执行。因此,我们可以使用两阶段提交的方法来保证数据一致性。

事务记录

首先,我们需要创建一个事务集合(transaction)来记录事务的状态和相关数据。这个集合中的文档包含了事务需要改变的所有数据(包括插入、更新和删除的数据)。

示例代码

// 引入必要的模块
const mongodb = require('mongodb');
const MongoClient = mongodb.MongoClient;
const mongoskin = require('mongoskin');

// 连接到MongoDB数据库
const dbUrl = 'mongodb://localhost:27017/mydb';
MongoClient.connect(dbUrl, { useNewUrlParser: true, useUnifiedTopology: true }, (err, client) => {
  if (err) throw err;
  const db = client.db();
  const dbskin = db.db();

  // 在transaction集合中创建一条事务记录
  dbskin.collection('transaction').insert(
    { from: 'allenny.iteye.com', to: 'B', score: 10, state: 'initial' },
    (err, trans) => {
      if (err) throw err;

      // 创建成功后开始事务
      beginTransaction(trans.insertedIds[0]._id);
    }
  );
});

// 开始事务
function beginTransaction(transId) {
  dbskin.collection('transaction').findAndModify(
    { _id: transId },
    { $set: { state: 'pending' } },
    (err, result) => {
      if (err) throw err;

      // 扣除用户A的积分10分
      const cond_a = { name: 'allenny.iteye.com', pendingTransactions: { $ne: transId } };
      dbskin.collection('user').findAndModify(
        cond_a,
        { $inc: { score: -10 }, $push: { pendingTransactions: transId } },
        { safe: true },
        (err, result) => {
          if (err) throw err;

          // 增加用户B的积分10分
          const cond_b = { name: 'B', pendingTransactions: { $ne: transId } };
          dbskin.collection('user').findAndModify(
            cond_b,
            { $inc: { score: 10 }, $push: { pendingTransactions: transId } },
            { safe: true },
            (err, result) => {
              if (err) throw err;

              // 如果更新成功,提交事务
              commit(transId);
            }
          );
        }
      );
    }
  );
}

// 提交事务
function commit(transId) {
  dbskin.collection('transaction').findAndModify(
    { _id: transId },
    { $set: { state: 'committed' } },
    (err, result) => {
      if (err) throw err;

      // 清除用户A和用户B文档中的事务关联
      dbskin.collection('user').findAndModify(
        { name: 'allenny.iteye.com' },
        { $pull: { pendingTransactions: transId } },
        (err, result) => {
          if (err) throw err;

          dbskin.collection('user').findAndModify(
            { name: 'B' },
            { $pull: { pendingTransactions: transId } },
            (err, result) => {
              if (err) throw err;

              // 结束事务
              endTransaction(transId, () => {
                console.log('Transaction done');
              });
            }
          );
        }
      );
    }
  );
}

// 结束事务
function endTransaction(transId, callback) {
  dbskin.collection('transaction').findAndModify(
    { _id: transId },
    { $set: { state: 'done' } },
    (err, result) => {
      if (err) throw err;
      callback();
    }
  );
}

总结

通过上述代码,我们实现了两阶段提交的过程。在第一阶段,我们创建事务记录并尝试更新数据。如果更新成功,我们在第二阶段提交事务,否则回滚事务。这种方法确保了数据的一致性,即使在系统故障的情况下也可以通过恢复程序来恢复事务状态。


提交后不能再修改吗?万一有错误需要修改怎么办?

嗯,很不错的文章,之前想等我这个项目完成之后来做事务的处理,没想到今天逛出了指导方案,学习学习,真心拜谢!

已经关注,跪求速求真心求全集(期待下)

好文章真心不错的,思路很好,不过我还是等mongodb支持事物吧,目前就用mysql吧

下我还没想好怎么写呢,等酝酿好了再来

确实,如果不支持事物,那么问题可就大了。但如果这样手动做2段提交模拟事物,也太麻烦了点。

回滚也是普通的数据库操作,要是回滚失败了怎么办?

看完这贴,我觉得如果涉及事务的数据操作,还是交给 Mysql 吧,自己写两阶段提交太蛋疼了…

同时并发n个写请求,而不是串行,{ j: true } 开日志保存,然后计数回来的消息,最后callback。

MongoDB会把每次的操作过程预先记录在mongo.log中。中间崩溃,重启恢复日志就可以了。

为了在Node.js中实现MongoDB的两阶段提交,我们需要构建一套机制来确保事务的完整性和一致性。以下是一个简单的实现示例,展示了如何在Node.js中实现两阶段提交,确保用户积分转移的过程中数据的一致性。

实现步骤

  1. 创建事务记录: 在开始事务之前,首先需要在transactions集合中创建一条事务记录,记录此次事务的相关信息。
dbskin.collection('transactions').insert(
  { from: 'allenny.iteye.com', to: 'B', score: 10, state: 'initial' },
  function(err, trans) {
    if (!err && trans) {
      beginTransaction(trans._id);
    }
  }
);
  1. 开始事务并执行更新操作: 在事务记录创建成功后,开始事务,并执行更新操作。
function beginTransaction(transId) {
  dbskin.collection('transactions').findAndModify(
    { _id: transId },
    { $set: { state: 'pending' } },
    function(err, result) {
      if (err || !result) {
        rollback(transId);
      } else {
        var cond_a = { name: 'allenny.iteye.com', pendingTransactions: { $ne: transId } };
        dbskin.collection('users').findAndModify(
          cond_a,
          { $inc: { score: -10 }, $push: { pendingTransactions: transId } },
          { safe: true },
          function(err, result) {
            if (err || !result) {
              rollback(transId);
            } else {
              var cond_b = { name: 'B', pendingTransactions: { $ne: transId } };
              dbskin.collection('users').findAndModify(
                cond_b,
                { $inc: { score: 10 }, $push: { pendingTransactions: transId } },
                { safe: true },
                function(err, result) {
                  if (err || !result) {
                    rollback(transId);
                  } else {
                    commit(transId);
                  }
                }
              );
            }
          }
        );
      }
    }
  );
}
  1. 提交事务: 当所有更新操作成功后,将事务状态设置为committed
function commit(transId) {
  dbskin.collection('transactions').findAndModify(
    { _id: transId },
    { $set: { state: 'committed' } },
    function(err, result) {
      if (err || !result) {
        rollback(transId);
      } else {
        dbskin.collection('users').findAndModify(
          { name: 'allenny.iteye.com' },
          { $pull: { pendingTransactions: transId } },
          function(err, result) {
            if (err || !result) {
              rollback(transId);
            } else {
              dbskin.collection('users').findAndModify(
                { name: 'B' },
                { $pull: { pendingTransactions: transId } },
                function(err, result) {
                  if (err || !result) {
                    rollback(transId);
                  } else {
                    endTransaction(transId, function() {
                      console.log('Transaction committed');
                    });
                  }
                }
              );
            }
          }
        );
      }
    }
  );
}
  1. 结束事务: 在事务提交成功后,将事务状态设置为done
function endTransaction(transId, callback) {
  dbskin.collection('transactions').findAndModify(
    { _id: transId },
    { $set: { state: 'done' } },
    function(err, result) {
      if (err || !result) {
        rollback(transId);
      } else {
        callback();
      }
    }
  );
}
  1. 回滚事务: 如果任何更新操作失败,需要回滚事务,撤销之前所做的更改。
function rollback(transId) {
  dbskin.collection('transactions').findAndModify(
    { _id: transId },
    { $set: { state: 'canceled' } },
    function(err, result) {
      if (err || !result) {
        console.error('Failed to rollback transaction');
      } else {
        dbskin.collection('users').findAndModify(
          { name: 'allenny.iteye.com', pendingTransactions: transId },
          { $inc: { score: 10 }, $pull: { pendingTransactions: transId } },
          function(err, result) {
            if (err || !result) {
              console.error('Failed to rollback user A');
            } else {
              dbskin.collection('users').findAndModify(
                { name: 'B', pendingTransactions: transId },
                { $inc: { score: -10 }, $pull: { pendingTransactions: transId } },
                function(err, result) {
                  if (err || !result) {
                    console.error('Failed to rollback user B');
                  } else {
                    endTransaction(transId, function() {
                      console.log('Transaction rolled back');
                    });
                  }
                }
              );
            }
          }
        );
      }
    }
  );
}

总结

通过上述代码,我们实现了MongoDB的两阶段提交。这种方式可以在事务过程中确保数据的一致性,即使在某些操作失败的情况下也可以进行回滚。需要注意的是,由于MongoDB的存储机制可能导致数据不一致的问题,建议结合其他机制如副本集或未来的持久化改进来进一步保障数据一致性。

回到顶部