NodeJs 异步队列(AsyncQueue)

NodeJs 异步队列(AsyncQueue)

异步队列(AsyncQueue)

NodeJs 程序并没有锁概念, 可能是单线程程序的缘故吧. 但是存在异步回调, 也就造成并发执行统一代码的可能性, 当然这里并发不是真正意义上的并发. 是同一线程在不同时间点执行统一代码. 事故类似代码如下:

// 阻塞函数
const sleep = async (ms = 0) => {
	return new Promise((resolve, reject) => {
		return setTimeout(resolve(true), ms);
	})
}

let total = 0; const demoFunc = async () => { let count = total; await sleep(2000) total = count + 1; }

demoFunc(); demoFunc(); sleep(4000); console.log(total); // 输出 1

示例是一个很简单的自增行为, 很多人可能会说直接在 demoFunc()前加 await 不是就是期望结果了, 然后事实可能是真的无法直接这样做, demoFunc 函数代表其实是一个 web 请求的处理. total 是数据读存(db 或 file). 我期望的结果是 2, 实际结果却为 1. 具体情况不同可能有不同解决方案, 本文提供一种异步队列机制去顺序执行需要互斥的的代码.

最简单的互斥执行

只用使用 Promise 链执行顺序执行互斥代码.

let queue = Promise.resolve(true);

const queue_exec = async (fn) => { queue = queue.then(() => { try { return Promise.resolve(fn()); } catch(err) { return Promise.reject(err); } }); return queue; }

// 上述示例换成如下调用即可 queue_exec(demoFunc); // 在实际的请求中可以用 await queue_exec(demoFunc) 或去执行结果.

完善包装成 NPM 包

上述列表实现过于简单, 可以做很多优化, 如队列大小控制, 互斥代码执行超时, 执行优先级等... 既然实现原理已表明, 锦上添花的功能就不在细化描述, 具体实现参考: AsyncQueue.

假定代码已经编写完善, 目录为 AsyncQueue. 下面记录发布 NPM 包步骤:

  1. cd AsyncQueue # 进入代码目录
  2. npm init # npm 初始化 进入交互模式, 可以一路按回车, 最终生成 package.json 文件
  3. npm login # 登录 NPM 官网, 交互输入 npmjs.com 网站的用户名密码
  4. 编辑 package.json 修改相关字段
    • name 包名, 不支持大写, 而且极易与存在的包名冲突, 可以通过 @用户名 /包名 格式做为包名, 避免冲突, 缺点就是包名很长.
    • main 入口文件名, 指定 require(包名) 需要加载到文件名
    • version 包版本
    • 其它字段按需修改即可
  5. 编写 README.md 文件, 如包安装 npm install 包名 --save 包介绍, 用法等, 可选.
  6. npm publish --access=public # 发布包, --access=public 显示发布公开包
  7. 变更完成, 更新 package.json 中 version 字段, 重新发布 npm publish --access=public 即可.

原文地址


2 回复

大哥, 你搞清楚了没, 代码都是错误的怎么说明你的想法? nodejs 所有的代码,包括回掉都是内部队列一个一个处理的不可能出现你说的问题, 你有没有搞明白 异步原理, 来给你一个正确的代码, 还是你写的东西, (原代码 sleep 函数写错的还 sleep 个屁,resolve 是一个函数, 你放在 setTimeout 里面用 resolve(true), 相当于给 直接执行后的结果放在 timeout 里面了 根本就没有用了)
// 阻塞函数
const sleep = async (ms = 0) => {
return new Promise((resolve, reject) => {
return setTimeout(resolve, ms);
})
}

let total = 0;
const demoFunc = async () => {
let count = total;
await sleep(2000)
total = count + 1;
}
async function main() {
await demoFunc();
await demoFunc();
await sleep(4000);
console.log(total); // 输出 1
}

main()


在Node.js中,处理异步任务时,使用异步队列(AsyncQueue)可以帮助我们更好地管理任务顺序和执行效率。虽然Node.js本身没有内置的AsyncQueue类,但我们可以利用现有的库如async或自行实现一个简单的异步队列。

下面是一个简单的自定义异步队列实现示例:

class AsyncQueue {
  constructor() {
    this.queue = [];
    this.processing = false;
  }

  enqueue(task) {
    this.queue.push(task);
    this.processQueue();
  }

  async processQueue() {
    if (this.processing || this.queue.length === 0) {
      return;
    }
    this.processing = true;
    const task = this.queue.shift();
    try {
      await task();
    } catch (error) {
      console.error('Task failed:', error);
    } finally {
      this.processing = false;
      this.processQueue();
    }
  }
}

// 使用示例
const queue = new AsyncQueue();
queue.enqueue(async () => {
  console.log('Task 1 started');
  await new Promise(resolve => setTimeout(resolve, 1000));
  console.log('Task 1 finished');
});

queue.enqueue(async () => {
  console.log('Task 2 started');
  await new Promise(resolve => setTimeout(resolve, 500));
  console.log('Task 2 finished');
});

在这个示例中,AsyncQueue类管理一个任务队列,并逐个处理队列中的任务。每个任务都是一个返回Promise的异步函数。enqueue方法将任务添加到队列并尝试处理队列。processQueue方法确保只有一个任务在处理,并在任务完成后继续处理下一个任务。

回到顶部