Nodejs 一个任务队列的module

Nodejs 一个任务队列的module

事情很曲折,我某天在萌否收音机里面听到了一首歌,很好听,叫 hypnotized,于是红心了。

过了几天我再去听——发现这首歌变了。

最后经过多方面求证,我大概得出结果就是应该有人传错了歌,然后后来有人重新传了一遍,导致我听的不是原来那首歌了。那我那天听的那首歌到底叫什么名字呢?

然后大致看了一下,虽然歌被重新传了,但是这里显示的这首歌的时间没变!还是11分钟,目测是数据库没更新。

于是我就想了个笨办法,去爬收音机里面所有 tag 为 東方project 的专辑,然后跑到专辑页看歌曲的长度。

问题来了,如果我直接爬,然后爬完 callback 之后又直接爬,没有任何间隔,就相当于我在 DDOS 它的站子。或者即使没那么严重——反正最后到一定程度并发太大我就访问不了了。

于是我就想到了做一个任务队列的 module。该 module 的作用就是把一堆任务扔到队列中,完成一个才开始下一个。

然后如果同时执行一个也太慢,module 还允许你开多几个子队列同时执行。

模块的 repo 在 GitHub 上面。名字叫 Scarlet Task 的原因一是我本身就喜欢二小姐,二是为了纪念这次事件我是为了找有关二小姐的歌。

要安装也很简单:

$ npm install scarlet-task

然后 repo 的 README.md 里面有使用方法的——大致就是实例化一个对象,然后定义好某个任务的任务标识(可以是字符串,可以是 json 对象,可以是任何类型的数据),然后再定义好处理这个任务的函数,将这个数据推倒队列中即可。然后在处理函数中任务处理完的时候执行以下任务完成的函数即可。


9 回复

Node.js 一个任务队列的module

在开发过程中,我们经常需要处理大量的异步任务,并且希望这些任务能够按照一定的顺序执行,以避免因并发过多而造成的性能问题。这时,我们可以创建一个任务队列来管理这些任务。本文将介绍如何创建一个简单的任务队列模块 Scarlet Task,并展示其基本用法。

模块概述

Scarlet Task 是一个用于管理任务队列的模块,它可以帮助我们在Node.js环境中有效地管理和执行一系列异步任务。通过该模块,我们可以设置多个子队列来并行处理任务,从而提高效率。

安装

首先,你需要安装这个模块。你可以通过npm来安装:

$ npm install scarlet-task

使用方法

接下来,我们将介绍如何使用 Scarlet Task 来创建和管理任务队列。

示例代码
const ScarletTask = require('scarlet-task');

// 创建一个任务队列实例
const taskQueue = new ScarletTask();

// 定义一个任务
const task = {
    id: "task-1",
    data: "This is some data for the task."
};

// 定义一个处理任务的函数
const processTask = (task, done) => {
    console.log(`Processing task ${task.id}`);
    // 模拟异步操作
    setTimeout(() => {
        console.log(`Completed task ${task.id}`);
        done(); // 任务完成
    }, 2000);
};

// 将任务添加到队列中
taskQueue.add(task, processTask);

// 启动队列
taskQueue.start();

// 如果需要开启多个子队列同时处理任务
taskQueue.setConcurrency(3); // 允许同时处理3个任务

解释

  1. 创建任务队列实例:首先,我们需要引入 Scarlet Task 模块并创建一个实例。

  2. 定义任务:任务可以是一个包含唯一标识符和其他相关信息的对象。在这个例子中,我们定义了一个包含 iddata 属性的任务。

  3. 处理任务的函数:这是一个异步函数,用于处理具体的任务逻辑。在这个例子中,我们模拟了一个耗时2秒的操作,并在完成后调用 done() 函数来通知任务已完成。

  4. 添加任务到队列:我们使用 add 方法将任务及其处理函数添加到队列中。

  5. 启动队列:调用 start 方法启动任务队列。

  6. 设置并发数量:如果需要,我们可以设置并发处理的任务数量,以优化性能。

通过上述步骤,我们可以创建一个功能强大的任务队列模块,用于管理大量异步任务,确保它们有序、高效地执行。


async 不是有类似功能吗?

楼主,我看了代码,如果任务队列里没任务了,流程是不是就结束了?async的queue好像是轮询,队列空了也不会停,不过它是并发的太耗资源,我的建议是在push的时候,判断一下流程有没有结束,如果结束就重新启动流程,不用手动执行taskQueue.taskDone(taskObject, true);

新加任务的时候有一个判断的——如果队列停了,那么新加任务的时候会自动开始;如果队列没停,那么只是把任务新加进去。

所以没任务是会结束流程的,但是一旦有新任务进来会重新开始。

async 整个库比我的大多了,而且基本上的项目都会在用。我这个只是一个小小的 module,所谓重复造轮子。

而且有时候并不需要 async 里面的 queue 那么多样化的定制,只需要一个简简单单的任务队列就够了。所以有时候重复造轮子也是有其意义在的。

你这不如用pythone ruby, 用node纯粹是浪费javascript这门语言。

先找出所有专辑的数量, 然后设定一个请求数量比如10个, 同时异步请求10个, 10个全部完成之后,启动下面的10个。

你这是同步编程的思维,你无法估计收集所有专辑要多少时间,还不是要一个线程收集链接,一个线程发请求,node用流程控制完全可以让这个在一个进程里同时处理

针对你的需求,我可以为你提供一个简单的任务队列模块实现,并解释其核心功能。这个模块会支持添加任务、并行执行多个任务以及任务之间的串行执行。

示例代码

class TaskQueue {
    constructor(concurrency = 1) {
        this.concurrency = concurrency;
        this.queue = [];
        this.pendingTasks = 0;
        this.isProcessing = false;
    }

    enqueue(taskData, taskFn) {
        this.queue.push({ taskData, taskFn });
        this.processQueue();
    }

    async processQueue() {
        while (this.queue.length > 0 && this.pendingTasks < this.concurrency) {
            const { taskData, taskFn } = this.queue.shift();
            this.pendingTasks++;
            try {
                await taskFn(taskData);
            } catch (err) {
                console.error("Task failed:", err);
            }
            this.pendingTasks--;
            this.processQueue();
        }
    }
}

// 使用示例
const queue = new TaskQueue(3); // 允许同时执行3个任务

queue.enqueue("task1", async (data) => {
    console.log(`Starting ${data}`);
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log(`Finished ${data}`);
});

queue.enqueue("task2", async (data) => {
    console.log(`Starting ${data}`);
    await new Promise(resolve => setTimeout(resolve, 2000));
    console.log(`Finished ${data}`);
});

queue.enqueue("task3", async (data) => {
    console.log(`Starting ${data}`);
    await new Promise(resolve => setTimeout(resolve, 1500));
    console.log(`Finished ${data}`);
});

解释

  • enqueue 方法用于向队列中添加任务,每个任务包含任务数据和处理函数。
  • processQueue 方法负责从队列中取出任务并执行,直到达到设定的并发数。
  • 每次任务完成后,会递归调用 processQueue 来继续处理队列中的下一个任务。
  • 这样可以确保在没有空闲线程的情况下不会启动新的任务,从而避免对目标服务器的过大压力。

通过这种方式,你可以创建一个灵活且可配置的任务队列,来管理和控制你的任务处理流程。

回到顶部