Nodejs 简化代码学习everyauth的异步流程控制

Nodejs 简化代码学习everyauth的异步流程控制

看了everyauth源码,第一次看解决异步流程问题的实现方法,感到很惊讶,为了更好地学习,实现了这个流程的简化版。

例子

先看看这段代码,功能是从本地文件读到一个url—请求这个url—把结果写入另一个文件。

  var fs = require('fs'),
  http = require('http');

fs.readFile(’./url’, ‘utf8’, function (err,data) { console.log(data);
http.get(data, function(res){ var body = ‘’; res.on(‘data’, function©{ body += c; }).on(‘end’, function(){ fs.writeFile(’./steptest’, data + body, function(e) { if (e) console.log(‘error’, e); else console.log(‘done’); }); }); }).on(‘error’, function(e){ console.log(e);
}); });

这段代码包括了三个步骤三个功能,但耦合在一起,可读性差,难以修改,对任意一部分修改或增加都要看完整坨代码才能找到,即时把每个callback都抽成一个变量,这一整个流程也是无法分离的。

改进版

对这种情况,everyauth使用了一种方法,可以把整个流程的实现代码写成这样:

engine
  .do('fetchHtml')
    .step('readUrl')
      .accepts('')
      .promises('url')
    .step('getHtml')
      .accepts('url')
      .promises('html')
    .step('saveHtml')
      .accepts('url html')
      .promises(null)

.readUrl(function(){ //read url from file … }) .getHtml(function(url){ //send http request … }) .saveHtml(function(url, html){ //save to file … })

.start(‘fetchHtml’)

do是一串流水方法的开始,step指定每一个步骤对应的方法名,promises表示此步骤返回的变量名,accepts表示此步骤接受的参数(由前面step的方法提供的变量)。接下来是链式地实现每一个step的方法。

整个过程很清晰,程序的自我描述很好,把一段异步的流程按同步的方式写出来了。若要修改其中某个步骤,直接定位到某个步骤对应的方法就行,无需把整个流程的代码通读。若要增加步骤,也只需要在那些step流程上插入新的step然后实现具体方法,可以获取前面step提供的任何参数。

how it works

实现它用到四个对象:promise/step/sequence/engine

promise是基础,相信很多人熟悉它的概念,简单来说就是把多个callback放到一个promise对象里,在适当的地方通过这个对象调用这些callback。在这里的作用是:当step执行结束时,通知队列执行下一个step。具体地说就是把下一个step的函数保存到前一个step的promise里,前一个step完成任务时,带着数据回调下一个step进入执行。

step负责执行一个步骤,传入对应参数,并把执行结果(return值)按指定的promises名保存起来,以供下一个step使用。

sequence管理step链,让注册的step可以一步步往下执行。

engine是提供对外接口的对象,管理保存每一个do请求里的step和sequence,通过configurable配置自身的可动态添加的方法。

具体看代码:https://gist.github.com/3930621

var fs = require('fs'),
http = require('http');

var Promise = function(values) { this._callbacks = []; this._errbacks = []; if (arguments.length > 0) { this.fulfill.apply(this, values); } } Promise.prototype = { callback: function(fn, scope) { //已有values表示promise已fulfill,立即执行 if (this.values) { fn.apply(scope, this.values); return this; } this._callbacks.push([fn, scope]); return this; } , errback: function(fn, scope) { if (this.err) { fn.call(scope, this.err); return this; } this._errbacks.push([fn, scope]); return this; } , fulfill: function () { if (this.isFulfilled || this.err) return; this.isFulfilled = true; var callbacks = this._callbacks; this.values = arguments; for (var i = 0, l = callbacks.length; i < l; i++) { callbacks[i][0].apply(callbacks[i][1], arguments); } return this; } , fail: function (err) { var errbacks = this._errbacks; for (var i = 0, l = errbacks.length; i < l; i++) { errbacks[i][0].call(errbacks[i][1], err); } return this; } }

var Step = function(name, _engine) { this.name = name; this.engine = _engine; }

Step.prototype = { exec : function(seq) { var args = this._unwrapArgs(seq) , promises = this.promises

  var ret = this.engine[this.name]().apply(this.engine, args);

  //若函数返回不是Promise,即是函数里直接返回值无异步操作。为流程一致,包装一个立即执行的promise
  ret = (ret instanceof Promise)
      ?  ret
      : this.engine.Promise([ret]);

  ret.callback(function() {
    var returned = arguments
      , vals = seq.values;
    //step执行结束后把返回值写入seq.values供下一个step使用
    if (promises !== null) promises.forEach( function (valName, i) {
      vals[valName] = returned[i];
    });
  })

  //加上默认的错误回调方法
  ret.errback(this.engine.errback(), this.engine);
  return ret;
}

, _unwrapArgs: function (seq) { if (!this.accepts) return []; return this.accepts.reduce( function (args, accept) { //根据accept名取出对应变量 args.push(seq.values[accept]); return args; }, []); } }

var Sequence = function(name, engine) { this.name = name; this.engine = engine; this.stepNames = []; this.values = {}; }

Sequence.prototype = { _bind : function(priorPromise, nextStep) { var nextPromise = new Promise() , seq = this;

  priorPromise.callback( function () {
    var resultPromise = nextStep.exec(seq);
    resultPromise.callback(function () {
      nextPromise.fulfill();
    });
  });
  return nextPromise;
}

, start : function() { var steps = this.steps; var priorStepPromise = steps[0].exec(this);

  for (var i = 1, l = steps.length; i &lt; l; i++) {
    //绑定step链
    priorStepPromise = this._bind(priorStepPromise, steps[i]);
  }
  return priorStepPromise;
}

}

Object.defineProperty(Sequence.prototype, ‘steps’, { get: function () { var allSteps = this.engine._steps; return this.stepNames.map(function (stepName) { return allSteps[stepName]; }) } });

var engine = { configurable : function(name) { this[name] = function(setTo) { var k = ‘_’ + name; if (arguments.length) { this[k] = setTo; return this; } return this[k]; } return this; } , step : function(name) { var steps = this._steps , sequence = this._currSeq;

  sequence.stepNames.push(name);
  this._currentStep = 
    steps[name] || (steps[name] = new Step(name, this));

  this.configurable(name);  
  return this;
}

, accepts : function(input) { this._currentStep.accepts = input && input.split(/\s+/) || null; return this; } , promises : function(output) { this._currentStep.promises = output && output.split(/\s+/) || null; return this; } , do : function(name) { this._currSeq = this._stepSequences[name] || (this._stepSequences[name] = new Sequence(name, this)); return this; } , Promise : function(values) { return values ? new Promise(values) : new Promise(); } , _stepSequences: {} , _steps: {}

, start : function(seqName) { var seq = this._stepSequences[seqName]; seq.start(); } }

engine .configurable(‘errback’) .errback(function(err) { console.log(‘errback’, err); });

engine .do(‘fetchHtml’) .step(‘readUrl’) .accepts(’’) .promises(‘url’) .step(‘getHtml’) .accepts(‘url’) .promises(‘html’) .step(‘saveHtml’) .accepts(‘url html’) .promises(null)

.readUrl(function(){ var p = this.Promise(); //url.txt保存了一个网址 fs.readFile(’./url.txt’, ‘utf8’, function (err,data) { if (err) p.fail(err); else p.fulfill(data); }); return p; })

.getHtml(function(url){ var p = this.Promise(); http.get(url, function(res){ var body = ‘’; res.on(‘data’, function©{ body += c; }).on(‘end’, function(){ p.fulfill(body); }); }).on(‘error’, function(err){ p.fail(err) }); return p; })

.saveHtml(function(url, html){ fs.writeFile(’./fetchResult’, url + html, function(e) { if (e) console.log(‘error’, e); else console.log(‘done’); }); })

.start(‘fetchHtml’)


9 回复

Node.js 简化代码学习 everyauth 的异步流程控制

引言

在处理复杂的异步流程时,Node.js 的回调地狱常常使代码难以阅读和维护。为了更好地理解和学习如何处理这种复杂性,我们可以通过简化 everyauth 中的异步流程控制来改善这一点。

示例代码

首先,让我们来看一下原始的代码,它从本地文件读取一个 URL,请求该 URL 并将结果写入另一个文件。

var fs = require('fs'),
http = require('http');

fs.readFile('./url', 'utf8', function (err, data) {
  console.log(data);  
  http.get(data, function(res){
    var body = '';
    res.on('data', function(chunk){
      body += chunk;
    }).on('end', function(){
      fs.writeFile('./steptest', data + body, function(e) {
        if (e) console.log('error', e);
        else console.log('done');
      });
    });
  }).on('error', function(e){
    console.log(e);  
  });
});

这段代码虽然实现了所需的功能,但它耦合严重,可读性和可维护性较差。如果需要修改或增加功能,则需要通读整个代码块。

改进版

为了解决上述问题,我们可以借鉴 everyauth 的方法,将整个流程分解成多个独立的步骤。以下是改进后的代码:

var fs = require('fs'),
http = require('http');

// 定义 Promise 类
class Promise {
  constructor(values) {
    this._callbacks = [];
    this._errbacks = [];
    if (values) {
      this.fulfill.apply(this, values);
    }
  }

  callback(fn, scope) {
    if (this.values) {
      fn.apply(scope, this.values);
      return this;
    }
    this._callbacks.push([fn, scope]);
    return this;
  }

  errback(fn, scope) {
    if (this.err) {
      fn.call(scope, this.err);
      return this;
    }
    this._errbacks.push([fn, scope]);
    return this;
  }

  fulfill(...values) {
    if (this.isFulfilled || this.err) return;
    this.isFulfilled = true;
    this.values = values;
    this._callbacks.forEach(([fn, scope]) => fn.apply(scope, values));
    return this;
  }

  fail(err) {
    this.err = err;
    this._errbacks.forEach(([fn, scope]) => fn.call(scope, err));
    return this;
  }
}

// 定义 Step 类
class Step {
  constructor(name, engine) {
    this.name = name;
    this.engine = engine;
  }

  exec(seq) {
    const args = this._unwrapArgs(seq);
    const promises = this.promises;

    const ret = this.engine[this.name](...args);

    ret = ret instanceof Promise ? ret : new Promise([ret]);

    ret.callback(() => {
      const returned = arguments;
      if (promises !== null) {
        promises.forEach((valName, i) => seq.values[valName] = returned[i]);
      }
    });

    ret.errback(this.engine.errback(), this.engine);
    return ret;
  }

  _unwrapArgs(seq) {
    if (!this.accepts) return [];
    return this.accepts.reduce((args, accept) => {
      args.push(seq.values[accept]);
      return args;
    }, []);
  }
}

// 定义 Sequence 类
class Sequence {
  constructor(name, engine) {
    this.name = name;
    this.engine = engine;
    this.stepNames = [];
    this.values = {};
  }

  _bind(priorPromise, nextStep) {
    const nextPromise = new Promise();
    const seq = this;

    priorPromise.callback(() => {
      const resultPromise = nextStep.exec(seq);
      resultPromise.callback(() => nextPromise.fulfill());
    });
    return nextPromise;
  }

  start() {
    const steps = this.steps;
    let priorStepPromise = steps[0].exec(this);

    for (let i = 1, l = steps.length; i < l; i++) {
      priorStepPromise = this._bind(priorStepPromise, steps[i]);
    }
    return priorStepPromise;
  }
}

// 定义 Engine 类
const engine = {
  configurable(name) {
    this[name] = function(setTo) {
      const k = '_' + name;
      if (arguments.length) {
        this[k] = setTo;
        return this;
      }
      return this[k];
    };
    return this;
  },

  step(name) {
    const steps = this._steps,
      sequence = this._currSeq;

    sequence.stepNames.push(name);
    this._currentStep = steps[name] || (steps[name] = new Step(name, this));

    this.configurable(name);
    return this;
  },

  accepts(input) {
    this._currentStep.accepts = input && input.split(/\s+/) || null;
    return this;
  },

  promises(output) {
    this._currentStep.promises = output && output.split(/\s+/) || null;
    return this;
  },

  do(name) {
    this._currSeq = this._stepSequences[name] || (this._stepSequences[name] = new Sequence(name, this));
    return this;
  },

  Promise(values) {
    return new Promise(values);
  },

  _stepSequences: {},
  _steps: {},

  start(seqName) {
    const seq = this._stepSequences[seqName];
    seq.start();
  }
};

engine.configurable('errback').errback(function(err) {
  console.log('errback', err);
});

engine
  .do('fetchHtml')
    .step('readUrl')
      .accepts('')
      .promises('url')
    .step('getHtml')
      .accepts('url')
      .promises('html')
    .step('saveHtml')
      .accepts('url html')
      .promises(null)

  .readUrl(function() {
    const p = this.Promise();
    fs.readFile('./url.txt', 'utf8', (err, data) => {
      if (err) p.fail(err);
      else p.fulfill(data);
    });
    return p;
  })

  .getHtml(function(url) {
    const p = this.Promise();
    http.get(url, (res) => {
      let body = '';
      res.on('data', (chunk) => {
        body += chunk;
      }).on('end', () => {
        p.fulfill(body);
      });
    }).on('error', (err) => {
      p.fail(err);
    });
    return p;
  })

  .saveHtml(function(url, html) {
    fs.writeFile('./fetchResult', url + html, (e) => {
      if (e) console.log('error', e);
      else console.log('done');
    });
  })

  .start('fetchHtml');

解释

  1. Promise: 用于封装异步操作的结果,提供 .callback.errback 方法来处理成功和失败的情况。
  2. Step: 每个步骤包含其名称、参数接受方式和返回值命名方式。exec 方法用于执行步骤并调用下一个步骤。
  3. Sequence: 管理步骤链,确保步骤按顺序执行。
  4. Engine: 提供对外接口,配置和启动流程。

这种方式使得代码更加模块化和易于维护,每个步骤都可以独立实现和测试。


这种类似的东西遍地开花,但是很难遇到这种写法即能满足异步,同时有能分离代码和业务逻辑使之层次分明易开发维护。

@.@用了几个还是Wind好用。

Wind.js比较直观,用这类的模块的话源码不便于其他人阅读,若真要用还不如自己写个简单的函数呢

我就是自己写了一个…反正适合自己用就行了.我觉得.

可惜老赵放弃了…

<h3>不错</h3>

针对你提到的需求,我们可以简化代码并使用类似everyauth的异步流程控制方法来重构你的代码。这种方法可以让代码更易于理解和维护。以下是一个简化版本的代码示例:

const fs = require('fs');
const http = require('http');

const engine = {
  do(name) {
    this._currentDo = name;
    this._steps = [];
    return this;
  },
  step(name) {
    this._steps.push({name});
    return this;
  },
  accepts(...args) {
    this._steps[this._steps.length - 1].accepts = args;
    return this;
  },
  promises(...args) {
    this._steps[this._steps.length - 1].promises = args;
    return this;
  },
  execStep(step, context, ...args) {
    const fn = context[step.name];
    if (typeof fn === 'function') {
      const result = fn(...args);
      if (result && typeof result.then === 'function') {
        result.then(values => {
          this.handleResult(step.promises, values);
          this.executeNextStep(context);
        });
      } else {
        this.handleResult(step.promises, [result]);
        this.executeNextStep(context);
      }
    }
  },
  handleResult(promises, values) {
    if (promises) {
      promises.forEach((promise, index) => {
        context[promise] = values[index];
      });
    }
  },
  executeNextStep(context) {
    const nextStep = this._steps.find(step => step.name === context.currentStep + 1);
    if (nextStep) {
      context.currentStep++;
      this.execStep(nextStep, context, ...context[nextStep.accepts]);
    }
  },
  start() {
    this._steps.forEach((step, index) => {
      if (index === 0) {
        context.currentStep = index;
        this.execStep(step, this, ...step.accepts.map(key => this[key]));
      }
    });
  }
};

engine
  .do('fetchHtml')
    .step('readUrl')
      .accepts('')
      .promises('url')
    .step('getHtml')
      .accepts('url')
      .promises('html')
    .step('saveHtml')
      .accepts('url html')
      .promises(null)

  .readUrl(() => {
    return new Promise((resolve, reject) => {
      fs.readFile('./url.txt', 'utf8', (err, data) => {
        if (err) reject(err);
        else resolve(data);
      });
    });
  })

  .getHtml(url => {
    return new Promise((resolve, reject) => {
      http.get(url, res => {
        let body = '';
        res.on('data', chunk => body += chunk);
        res.on('end', () => resolve(body));
      }).on('error', reject);
    });
  })

  .saveHtml((url, html) => {
    return new Promise((resolve, reject) => {
      fs.writeFile('./fetchResult', `${url}${html}`, err => {
        if (err) reject(err);
        else resolve();
      });
    });
  })

  .start();

在这个例子中,我们定义了一个engine对象来管理异步流程。每个步骤都是一个对象,包含名称、接受的参数和返回的值。通过链式调用step()accepts()promises()方法来定义流程中的各个步骤。最后,通过start()方法启动整个流程。

这种模式使得代码更加模块化,每个步骤都可以独立定义和测试,同时保持了良好的可读性和可维护性。

回到顶部