NodeJs 异步队列(AsyncQueue)
NodeJs 异步队列(AsyncQueue)
异步队列(AsyncQueue)
NodeJs 程序并没有锁概念, 可能是单线程程序的缘故吧. 但是存在异步回调, 也就造成并发执行统一代码的可能性, 当然这里并发不是真正意义上的并发. 是同一线程在不同时间点执行统一代码. 事故类似代码如下:
// 阻塞函数
const sleep = async (ms = 0) => {
return new Promise((resolve, reject) => {
return setTimeout(resolve(true), ms);
})
}
let total = 0;
const demoFunc = async () => {
let count = total;
await sleep(2000)
total = count + 1;
}
demoFunc();
demoFunc();
sleep(4000);
console.log(total); // 输出 1
示例是一个很简单的自增行为, 很多人可能会说直接在 demoFunc()前加 await 不是就是期望结果了, 然后事实可能是真的无法直接这样做, demoFunc 函数代表其实是一个 web 请求的处理. total 是数据读存(db 或 file). 我期望的结果是 2, 实际结果却为 1. 具体情况不同可能有不同解决方案, 本文提供一种异步队列机制去顺序执行需要互斥的的代码.
最简单的互斥执行
只用使用 Promise 链执行顺序执行互斥代码.
let queue = Promise.resolve(true);
const queue_exec = async (fn) => {
queue = queue.then(() => {
try {
return Promise.resolve(fn());
} catch(err) {
return Promise.reject(err);
}
});
return queue;
}
// 上述示例换成如下调用即可
queue_exec(demoFunc); // 在实际的请求中可以用 await queue_exec(demoFunc) 或去执行结果.
完善包装成 NPM 包
上述列表实现过于简单, 可以做很多优化, 如队列大小控制, 互斥代码执行超时, 执行优先级等... 既然实现原理已表明, 锦上添花的功能就不在细化描述, 具体实现参考: AsyncQueue.
假定代码已经编写完善, 目录为 AsyncQueue. 下面记录发布 NPM 包步骤:
- cd AsyncQueue # 进入代码目录
- npm init # npm 初始化 进入交互模式, 可以一路按回车, 最终生成 package.json 文件
- npm login # 登录 NPM 官网, 交互输入 npmjs.com 网站的用户名密码
- 编辑 package.json 修改相关字段
- name 包名, 不支持大写, 而且极易与存在的包名冲突, 可以通过 @用户名 /包名 格式做为包名, 避免冲突, 缺点就是包名很长.
- main 入口文件名, 指定 require(包名) 需要加载到文件名
- version 包版本
- 其它字段按需修改即可
- 编写 README.md 文件, 如包安装
npm install 包名 --save
包介绍, 用法等, 可选. - npm publish --access=public # 发布包, --access=public 显示发布公开包
- 变更完成, 更新 package.json 中 version 字段, 重新发布
npm publish --access=public
即可.
大哥, 你搞清楚了没, 代码都是错误的怎么说明你的想法? nodejs 所有的代码,包括回掉都是内部队列一个一个处理的不可能出现你说的问题, 你有没有搞明白 异步原理, 来给你一个正确的代码, 还是你写的东西, (原代码 sleep 函数写错的还 sleep 个屁,resolve 是一个函数, 你放在 setTimeout 里面用 resolve(true), 相当于给 直接执行后的结果放在 timeout 里面了 根本就没有用了)
// 阻塞函数
const sleep = async (ms = 0) => {
return new Promise((resolve, reject) => {
return setTimeout(resolve, ms);
})
}
let total = 0;
const demoFunc = async () => {
let count = total;
await sleep(2000)
total = count + 1;
}
async function main() {
await demoFunc();
await demoFunc();
await sleep(4000);
console.log(total); // 输出 1
}
main()
在Node.js中,处理异步任务时,使用异步队列(AsyncQueue)可以帮助我们更好地管理任务顺序和执行效率。虽然Node.js本身没有内置的AsyncQueue类,但我们可以利用现有的库如async
或自行实现一个简单的异步队列。
下面是一个简单的自定义异步队列实现示例:
class AsyncQueue {
constructor() {
this.queue = [];
this.processing = false;
}
enqueue(task) {
this.queue.push(task);
this.processQueue();
}
async processQueue() {
if (this.processing || this.queue.length === 0) {
return;
}
this.processing = true;
const task = this.queue.shift();
try {
await task();
} catch (error) {
console.error('Task failed:', error);
} finally {
this.processing = false;
this.processQueue();
}
}
}
// 使用示例
const queue = new AsyncQueue();
queue.enqueue(async () => {
console.log('Task 1 started');
await new Promise(resolve => setTimeout(resolve, 1000));
console.log('Task 1 finished');
});
queue.enqueue(async () => {
console.log('Task 2 started');
await new Promise(resolve => setTimeout(resolve, 500));
console.log('Task 2 finished');
});
在这个示例中,AsyncQueue
类管理一个任务队列,并逐个处理队列中的任务。每个任务都是一个返回Promise的异步函数。enqueue
方法将任务添加到队列并尝试处理队列。processQueue
方法确保只有一个任务在处理,并在任务完成后继续处理下一个任务。