Nodejs 简化代码学习everyauth的异步流程控制
Nodejs 简化代码学习everyauth的异步流程控制
看了everyauth源码,第一次看解决异步流程问题的实现方法,感到很惊讶,为了更好地学习,实现了这个流程的简化版。
例子
先看看这段代码,功能是从本地文件读到一个url—请求这个url—把结果写入另一个文件。
var fs = require('fs'),
http = require('http');
fs.readFile(’./url’, ‘utf8’, function (err,data) {
console.log(data);
http.get(data, function(res){
var body = ‘’;
res.on(‘data’, function©{
body += c;
}).on(‘end’, function(){
fs.writeFile(’./steptest’, data + body, function(e) {
if (e) console.log(‘error’, e);
else console.log(‘done’);
});
});
}).on(‘error’, function(e){
console.log(e);
});
});
这段代码包括了三个步骤三个功能,但耦合在一起,可读性差,难以修改,对任意一部分修改或增加都要看完整坨代码才能找到,即时把每个callback都抽成一个变量,这一整个流程也是无法分离的。
改进版
对这种情况,everyauth使用了一种方法,可以把整个流程的实现代码写成这样:
engine
.do('fetchHtml')
.step('readUrl')
.accepts('')
.promises('url')
.step('getHtml')
.accepts('url')
.promises('html')
.step('saveHtml')
.accepts('url html')
.promises(null)
.readUrl(function(){
//read url from file
…
})
.getHtml(function(url){
//send http request
…
})
.saveHtml(function(url, html){
//save to file
…
})
.start(‘fetchHtml’)
do是一串流水方法的开始,step指定每一个步骤对应的方法名,promises表示此步骤返回的变量名,accepts表示此步骤接受的参数(由前面step的方法提供的变量)。接下来是链式地实现每一个step的方法。
整个过程很清晰,程序的自我描述很好,把一段异步的流程按同步的方式写出来了。若要修改其中某个步骤,直接定位到某个步骤对应的方法就行,无需把整个流程的代码通读。若要增加步骤,也只需要在那些step流程上插入新的step然后实现具体方法,可以获取前面step提供的任何参数。
how it works
实现它用到四个对象:promise/step/sequence/engine
promise是基础,相信很多人熟悉它的概念,简单来说就是把多个callback放到一个promise对象里,在适当的地方通过这个对象调用这些callback。在这里的作用是:当step执行结束时,通知队列执行下一个step。具体地说就是把下一个step的函数保存到前一个step的promise里,前一个step完成任务时,带着数据回调下一个step进入执行。
step负责执行一个步骤,传入对应参数,并把执行结果(return值)按指定的promises名保存起来,以供下一个step使用。
sequence管理step链,让注册的step可以一步步往下执行。
engine是提供对外接口的对象,管理保存每一个do请求里的step和sequence,通过configurable配置自身的可动态添加的方法。
具体看代码:https://gist.github.com/3930621
var fs = require('fs'),
http = require('http');
var Promise = function(values) {
this._callbacks = [];
this._errbacks = [];
if (arguments.length > 0) {
this.fulfill.apply(this, values);
}
}
Promise.prototype = {
callback: function(fn, scope) {
//已有values表示promise已fulfill,立即执行
if (this.values) {
fn.apply(scope, this.values);
return this;
}
this._callbacks.push([fn, scope]);
return this;
}
, errback: function(fn, scope) {
if (this.err) {
fn.call(scope, this.err);
return this;
}
this._errbacks.push([fn, scope]);
return this;
}
, fulfill: function () {
if (this.isFulfilled || this.err) return;
this.isFulfilled = true;
var callbacks = this._callbacks;
this.values = arguments;
for (var i = 0, l = callbacks.length; i < l; i++) {
callbacks[i][0].apply(callbacks[i][1], arguments);
}
return this;
}
, fail: function (err) {
var errbacks = this._errbacks;
for (var i = 0, l = errbacks.length; i < l; i++) {
errbacks[i][0].call(errbacks[i][1], err);
}
return this;
}
}
var Step = function(name, _engine) {
this.name = name;
this.engine = _engine;
}
Step.prototype = {
exec : function(seq) {
var args = this._unwrapArgs(seq)
, promises = this.promises
var ret = this.engine[this.name]().apply(this.engine, args);
//若函数返回不是Promise,即是函数里直接返回值无异步操作。为流程一致,包装一个立即执行的promise
ret = (ret instanceof Promise)
? ret
: this.engine.Promise([ret]);
ret.callback(function() {
var returned = arguments
, vals = seq.values;
//step执行结束后把返回值写入seq.values供下一个step使用
if (promises !== null) promises.forEach( function (valName, i) {
vals[valName] = returned[i];
});
})
//加上默认的错误回调方法
ret.errback(this.engine.errback(), this.engine);
return ret;
}
, _unwrapArgs: function (seq) {
if (!this.accepts) return [];
return this.accepts.reduce( function (args, accept) {
//根据accept名取出对应变量
args.push(seq.values[accept]);
return args;
}, []);
}
}
var Sequence = function(name, engine) {
this.name = name;
this.engine = engine;
this.stepNames = [];
this.values = {};
}
Sequence.prototype = {
_bind : function(priorPromise, nextStep) {
var nextPromise = new Promise()
, seq = this;
priorPromise.callback( function () {
var resultPromise = nextStep.exec(seq);
resultPromise.callback(function () {
nextPromise.fulfill();
});
});
return nextPromise;
}
, start : function() {
var steps = this.steps;
var priorStepPromise = steps[0].exec(this);
for (var i = 1, l = steps.length; i < l; i++) {
//绑定step链
priorStepPromise = this._bind(priorStepPromise, steps[i]);
}
return priorStepPromise;
}
}
Object.defineProperty(Sequence.prototype, ‘steps’, {
get: function () {
var allSteps = this.engine._steps;
return this.stepNames.map(function (stepName) {
return allSteps[stepName];
})
}
});
var engine = {
configurable : function(name) {
this[name] = function(setTo) {
var k = ‘_’ + name;
if (arguments.length) {
this[k] = setTo;
return this;
}
return this[k];
}
return this;
}
, step : function(name) {
var steps = this._steps
, sequence = this._currSeq;
sequence.stepNames.push(name);
this._currentStep =
steps[name] || (steps[name] = new Step(name, this));
this.configurable(name);
return this;
}
, accepts : function(input) {
this._currentStep.accepts = input && input.split(/\s+/) || null;
return this;
}
, promises : function(output) {
this._currentStep.promises = output && output.split(/\s+/) || null;
return this;
}
, do : function(name) {
this._currSeq =
this._stepSequences[name] || (this._stepSequences[name] = new Sequence(name, this));
return this;
}
, Promise : function(values) {
return values ? new Promise(values) : new Promise();
}
, _stepSequences: {}
, _steps: {}
, start : function(seqName) {
var seq = this._stepSequences[seqName];
seq.start();
}
}
engine
.configurable(‘errback’)
.errback(function(err) {
console.log(‘errback’, err);
});
engine
.do(‘fetchHtml’)
.step(‘readUrl’)
.accepts(’’)
.promises(‘url’)
.step(‘getHtml’)
.accepts(‘url’)
.promises(‘html’)
.step(‘saveHtml’)
.accepts(‘url html’)
.promises(null)
.readUrl(function(){
var p = this.Promise();
//url.txt保存了一个网址
fs.readFile(’./url.txt’, ‘utf8’, function (err,data) {
if (err) p.fail(err);
else p.fulfill(data);
});
return p;
})
.getHtml(function(url){
var p = this.Promise();
http.get(url, function(res){
var body = ‘’;
res.on(‘data’, function©{
body += c;
}).on(‘end’, function(){
p.fulfill(body);
});
}).on(‘error’, function(err){
p.fail(err)
});
return p;
})
.saveHtml(function(url, html){
fs.writeFile(’./fetchResult’, url + html, function(e) {
if (e) console.log(‘error’, e);
else console.log(‘done’);
});
})
.start(‘fetchHtml’)
Node.js 简化代码学习 everyauth 的异步流程控制
引言
在处理复杂的异步流程时,Node.js 的回调地狱常常使代码难以阅读和维护。为了更好地理解和学习如何处理这种复杂性,我们可以通过简化 everyauth
中的异步流程控制来改善这一点。
示例代码
首先,让我们来看一下原始的代码,它从本地文件读取一个 URL,请求该 URL 并将结果写入另一个文件。
var fs = require('fs'),
http = require('http');
fs.readFile('./url', 'utf8', function (err, data) {
console.log(data);
http.get(data, function(res){
var body = '';
res.on('data', function(chunk){
body += chunk;
}).on('end', function(){
fs.writeFile('./steptest', data + body, function(e) {
if (e) console.log('error', e);
else console.log('done');
});
});
}).on('error', function(e){
console.log(e);
});
});
这段代码虽然实现了所需的功能,但它耦合严重,可读性和可维护性较差。如果需要修改或增加功能,则需要通读整个代码块。
改进版
为了解决上述问题,我们可以借鉴 everyauth
的方法,将整个流程分解成多个独立的步骤。以下是改进后的代码:
var fs = require('fs'),
http = require('http');
// 定义 Promise 类
class Promise {
constructor(values) {
this._callbacks = [];
this._errbacks = [];
if (values) {
this.fulfill.apply(this, values);
}
}
callback(fn, scope) {
if (this.values) {
fn.apply(scope, this.values);
return this;
}
this._callbacks.push([fn, scope]);
return this;
}
errback(fn, scope) {
if (this.err) {
fn.call(scope, this.err);
return this;
}
this._errbacks.push([fn, scope]);
return this;
}
fulfill(...values) {
if (this.isFulfilled || this.err) return;
this.isFulfilled = true;
this.values = values;
this._callbacks.forEach(([fn, scope]) => fn.apply(scope, values));
return this;
}
fail(err) {
this.err = err;
this._errbacks.forEach(([fn, scope]) => fn.call(scope, err));
return this;
}
}
// 定义 Step 类
class Step {
constructor(name, engine) {
this.name = name;
this.engine = engine;
}
exec(seq) {
const args = this._unwrapArgs(seq);
const promises = this.promises;
const ret = this.engine[this.name](...args);
ret = ret instanceof Promise ? ret : new Promise([ret]);
ret.callback(() => {
const returned = arguments;
if (promises !== null) {
promises.forEach((valName, i) => seq.values[valName] = returned[i]);
}
});
ret.errback(this.engine.errback(), this.engine);
return ret;
}
_unwrapArgs(seq) {
if (!this.accepts) return [];
return this.accepts.reduce((args, accept) => {
args.push(seq.values[accept]);
return args;
}, []);
}
}
// 定义 Sequence 类
class Sequence {
constructor(name, engine) {
this.name = name;
this.engine = engine;
this.stepNames = [];
this.values = {};
}
_bind(priorPromise, nextStep) {
const nextPromise = new Promise();
const seq = this;
priorPromise.callback(() => {
const resultPromise = nextStep.exec(seq);
resultPromise.callback(() => nextPromise.fulfill());
});
return nextPromise;
}
start() {
const steps = this.steps;
let priorStepPromise = steps[0].exec(this);
for (let i = 1, l = steps.length; i < l; i++) {
priorStepPromise = this._bind(priorStepPromise, steps[i]);
}
return priorStepPromise;
}
}
// 定义 Engine 类
const engine = {
configurable(name) {
this[name] = function(setTo) {
const k = '_' + name;
if (arguments.length) {
this[k] = setTo;
return this;
}
return this[k];
};
return this;
},
step(name) {
const steps = this._steps,
sequence = this._currSeq;
sequence.stepNames.push(name);
this._currentStep = steps[name] || (steps[name] = new Step(name, this));
this.configurable(name);
return this;
},
accepts(input) {
this._currentStep.accepts = input && input.split(/\s+/) || null;
return this;
},
promises(output) {
this._currentStep.promises = output && output.split(/\s+/) || null;
return this;
},
do(name) {
this._currSeq = this._stepSequences[name] || (this._stepSequences[name] = new Sequence(name, this));
return this;
},
Promise(values) {
return new Promise(values);
},
_stepSequences: {},
_steps: {},
start(seqName) {
const seq = this._stepSequences[seqName];
seq.start();
}
};
engine.configurable('errback').errback(function(err) {
console.log('errback', err);
});
engine
.do('fetchHtml')
.step('readUrl')
.accepts('')
.promises('url')
.step('getHtml')
.accepts('url')
.promises('html')
.step('saveHtml')
.accepts('url html')
.promises(null)
.readUrl(function() {
const p = this.Promise();
fs.readFile('./url.txt', 'utf8', (err, data) => {
if (err) p.fail(err);
else p.fulfill(data);
});
return p;
})
.getHtml(function(url) {
const p = this.Promise();
http.get(url, (res) => {
let body = '';
res.on('data', (chunk) => {
body += chunk;
}).on('end', () => {
p.fulfill(body);
});
}).on('error', (err) => {
p.fail(err);
});
return p;
})
.saveHtml(function(url, html) {
fs.writeFile('./fetchResult', url + html, (e) => {
if (e) console.log('error', e);
else console.log('done');
});
})
.start('fetchHtml');
解释
- Promise: 用于封装异步操作的结果,提供
.callback
和.errback
方法来处理成功和失败的情况。 - Step: 每个步骤包含其名称、参数接受方式和返回值命名方式。
exec
方法用于执行步骤并调用下一个步骤。 - Sequence: 管理步骤链,确保步骤按顺序执行。
- Engine: 提供对外接口,配置和启动流程。
这种方式使得代码更加模块化和易于维护,每个步骤都可以独立实现和测试。
@.@用了几个还是Wind好用。
Wind.js比较直观,用这类的模块的话源码不便于其他人阅读,若真要用还不如自己写个简单的函数呢
我就是自己写了一个…反正适合自己用就行了.我觉得.
mixu’s node book[http://book.mixu.net/ch7.html]
可惜老赵放弃了…
<h3>不错</h3>
针对你提到的需求,我们可以简化代码并使用类似everyauth的异步流程控制方法来重构你的代码。这种方法可以让代码更易于理解和维护。以下是一个简化版本的代码示例:
const fs = require('fs');
const http = require('http');
const engine = {
do(name) {
this._currentDo = name;
this._steps = [];
return this;
},
step(name) {
this._steps.push({name});
return this;
},
accepts(...args) {
this._steps[this._steps.length - 1].accepts = args;
return this;
},
promises(...args) {
this._steps[this._steps.length - 1].promises = args;
return this;
},
execStep(step, context, ...args) {
const fn = context[step.name];
if (typeof fn === 'function') {
const result = fn(...args);
if (result && typeof result.then === 'function') {
result.then(values => {
this.handleResult(step.promises, values);
this.executeNextStep(context);
});
} else {
this.handleResult(step.promises, [result]);
this.executeNextStep(context);
}
}
},
handleResult(promises, values) {
if (promises) {
promises.forEach((promise, index) => {
context[promise] = values[index];
});
}
},
executeNextStep(context) {
const nextStep = this._steps.find(step => step.name === context.currentStep + 1);
if (nextStep) {
context.currentStep++;
this.execStep(nextStep, context, ...context[nextStep.accepts]);
}
},
start() {
this._steps.forEach((step, index) => {
if (index === 0) {
context.currentStep = index;
this.execStep(step, this, ...step.accepts.map(key => this[key]));
}
});
}
};
engine
.do('fetchHtml')
.step('readUrl')
.accepts('')
.promises('url')
.step('getHtml')
.accepts('url')
.promises('html')
.step('saveHtml')
.accepts('url html')
.promises(null)
.readUrl(() => {
return new Promise((resolve, reject) => {
fs.readFile('./url.txt', 'utf8', (err, data) => {
if (err) reject(err);
else resolve(data);
});
});
})
.getHtml(url => {
return new Promise((resolve, reject) => {
http.get(url, res => {
let body = '';
res.on('data', chunk => body += chunk);
res.on('end', () => resolve(body));
}).on('error', reject);
});
})
.saveHtml((url, html) => {
return new Promise((resolve, reject) => {
fs.writeFile('./fetchResult', `${url}${html}`, err => {
if (err) reject(err);
else resolve();
});
});
})
.start();
在这个例子中,我们定义了一个engine
对象来管理异步流程。每个步骤都是一个对象,包含名称、接受的参数和返回的值。通过链式调用step()
、accepts()
和promises()
方法来定义流程中的各个步骤。最后,通过start()
方法启动整个流程。
这种模式使得代码更加模块化,每个步骤都可以独立定义和测试,同时保持了良好的可读性和可维护性。