Nodejs Nestjs任务队列管理插件@nestjs/bull的使用

Nodejs Nestjs任务队列管理插件@nestjs/bull的使用
[@nestjs](/user/nestjs)/bull 是一个基于 Bull 消息队列库的 NestJS 插件,用于处理异步任务。Bull 使用 Redis 作为消息存储,所以你需要先安装 Redis 并启动它。

下面是一个简单的示例,展示如何在 NestJS 应用中使用 [@nestjs](/user/nestjs)/bull 来创建和消费队列。

1. 安装依赖

首先,你需要安装必要的依赖:

npm install [@nestjs](/user/nestjs)/bull bull ioredis

2. 配置 Bull 模块

在你的 NestJS 应用中,创建一个配置文件来初始化 Bull 模块:

// src/bull-queue.module.ts
import { Module } from '[@nestjs](/user/nestjs)/common';
import { BullModule } from '[@nestjs](/user/nestjs)/bull';
import { join } from 'path';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
    BullModule.registerQueue({
      name: 'image-processing',
    }),
  ],
})
export class BullQueueModule {}

3. 创建处理器

接下来,创建一个处理器来处理队列中的任务:

// src/image-processing/image-processing.job.ts
import { Job, Queue } from 'bull';
import { Processor, Process } from '[@nestjs](/user/nestjs)/bull';

@Processor('image-processing')
export class ImageProcessingJob {
  @Process()
  async processImage(job: Job<{ filename: string }>) {
    console.log(`Processing image ${job.data.filename}`);
    // 这里可以添加处理图像的逻辑
    return `Processed ${job.data.filename}`;
  }
}

4. 创建控制器

创建一个控制器来添加任务到队列:

// src/image-processing/image-processing.controller.ts
import { Controller, Post, Body } from '[@nestjs](/user/nestjs)/common';
import { Queue } from '[@nestjs](/user/nestjs)/bull';
import { ImageProcessingJob } from './image-processing.job';

@Controller('images')
export class ImageProcessingController {
  constructor(@Queue('image-processing') private readonly imageProcessingQueue: Queue) {}

  @Post('process')
  async processImage(@Body('filename') filename: string) {
    const job = await this.imageProcessingQueue.add('process-image', { filename });
    return { jobId: job.id };
  }
}

5. 启动应用

确保你的应用模块导入了 BullQueueModule

// src/app.module.ts
import { Module } from '[@nestjs](/user/nestjs)/common';
import { BullQueueModule } from './bull-queue.module';
import { ImageProcessingController } from './image-processing/image-processing.controller';

@Module({
  imports: [BullQueueModule],
  controllers: [ImageProcessingController],
})
export class AppModule {}

现在你可以运行你的 NestJS 应用,并通过发送 POST 请求到 /images/process 来添加图像处理任务。这些任务将被添加到 image-processing 队列,并由 ImageProcessingJob 处理器处理。

注意事项

  • 确保 Redis 服务正在运行。
  • 你可能需要根据实际情况调整 Redis 的连接配置。
  • 为了更好地组织代码,你可以将 BullModule 和处理器放在不同的模块中。

希望这个示例能帮助你开始使用 [@nestjs](/user/nestjs)/bull


3 回复

当然!@nestjs/bull 是一个基于 Bull(一个用 Node.js 实现的健壮的消息队列)的 NestJS 插件,用于处理任务队列。首先,你需要安装它:

npm install [@nestjs](/user/nestjs)/bull bull

接着,在你的模块中导入 BullModule 并配置所需的队列:

import { Module } from '[@nestjs](/user/nestjs)/common';
import { BullModule } from '[@nestjs](/user/nestjs)/bull';
import { Job } from 'bull';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'image-processing',
    }),
  ],
})
export class AppModule {}

然后,你可以创建一个处理器来处理这些任务:

import { Processor, OnQueueActive, OnQueueCompleted } from '[@nestjs](/user/nestjs)/bull';
import { Job } from 'bull';

@Processor('image-processing')
export class ImageProcessingProcessor {
  @OnQueueActive()
  onActive(job: Job) {
    console.log(`Processing job ${job.id}`);
  }

  @OnQueueCompleted()
  onCompleted(job: Job, result: any) {
    console.log(`Job ${job.id} completed! Result: ${result}`);
  }
}

这样,你就有了一个基本的任务队列系统了。试试上传一些“图片”(想象一下),看看它们是否被“处理”!


@nestjs/bull 是 NestJS 生态系统中用于处理任务队列的一个强大插件,它基于 Bull(一个基于 Redis 的 Node.js 任务队列库)。下面是如何使用 @nestjs/bull 来创建和管理任务队列的基本步骤。

安装依赖

首先,确保安装了必要的依赖:

npm install @nestjs/bull bull

同时,你需要一个 Redis 服务器来运行 Bull。如果你还没有安装 Redis,可以通过以下命令安装:

sudo apt-get update
sudo apt-get install redis-server

配置 Bull 模块

接下来,在你的 NestJS 应用程序中配置 Bull 模块。假设你有一个名为 ImageProcessor 的服务,它将负责处理图像相关的任务。

创建 BullModule

在你的应用程序中创建一个模块来配置 Bull 模块:

// src/queue/queue.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { ImageProcessorConsumer } from './image-processor.consumer';

@Module({
  imports: [
    // 使用 BullModule.registerQueue 方法注册队列
    BullModule.registerQueue({
      name: 'image-processing',
      redis: {
        host: 'localhost', // Redis 主机
        port: 6379,        // Redis 端口
      },
    }),
  ],
  providers: [ImageProcessorConsumer],
})
export class QueueModule {}

创建 Consumer

然后,创建一个消费者来处理来自队列的任务:

// src/queue/image-processor.consumer.ts
import { MessagePattern, Payload } from '@nestjs/microservices';
import { Job, Queue } from 'bull';
import { Injectable } from '@nestjs/common';

@Injectable()
export class ImageProcessorConsumer {
  private queue: Queue;

  constructor() {
    this.queue = new Queue('image-processing', {
      redis: {
        host: 'localhost',
        port: 6379,
      },
    });
  }

  @MessagePattern({ cmd: 'processImage' })
  async processImage(@Payload() imageId: string) {
    const job = await this.queue.add('processImageJob', { imageId });
    return job.id;
  }
}

发布任务到队列

现在,你可以从任何地方发布任务到这个队列:

// 假设在一个服务中
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';

@Injectable()
export class ImageService {
  constructor(@InjectQueue('image-processing') private readonly queue: Queue) {}

  async uploadImage(imageId: string) {
    const jobId = await this.queue.add('processImageJob', { imageId });
    console.log(`Task added with job ID: ${jobId}`);
  }
}

以上就是使用 @nestjs/bull 来创建和管理任务队列的基本方法。你可以根据需要调整 Redis 连接配置、队列名称等参数。

@nestjs/bull 是基于 Bull(一个用作 Node.js 中的 Redis 队列)的 NestJS 任务队列管理插件。首先安装依赖:

npm install @nestjs/bull bull

然后配置模块和控制器:

import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({
        redis: {
          host: configService.get('REDIS_HOST'),
          port: configService.get<number>('REDIS_PORT'),
        },
      }),
      inject: [ConfigService],
    }),
    BullModule.registerQueue({ name: 'queue1' }),
  ],
})
export class AppModule {}

创建处理器处理队列中的任务:

import { Processor, Job } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';

@Injectable()
@Processor('queue1')
export class Queue1Processor {
  async handle(job: Job<any>, done: any) {
    // 处理队列中的任务
    console.log(`Processing job ${job.id}`);
    done();
  }
}

通过上述步骤,可以实现基本的队列管理和任务处理。

回到顶部