NestJS任务调度系统

“我正在使用NestJS开发一个需要定时任务调度的项目,但对如何实现最佳方案有些困惑。NestJS自带的@nestjs/schedule模块和第三方库如bull/bullmq哪个更适合处理复杂的分布式任务?在实际项目中应该如何选择?如果使用bullmq,怎样与NestJS优雅集成?另外想了解在集群部署时如何确保任务不会重复执行,以及如何处理任务失败后的重试机制?有没有成熟的实践方案可以分享?”

3 回复

在NestJS中搭建任务调度系统,可以使用node-scheduleagenda等库。首先安装node-schedule

npm install node-schedule

然后创建一个服务来管理定时任务:

import { Injectable } from '@nestjs/common';
import schedule from 'node-schedule';

@Injectable()
export class TaskSchedulerService {
  startTask() {
    const job = schedule.scheduleJob('*/5 * * * *', () => {
      console.log('每5分钟执行一次任务');
    });
  }
}

在模块中注入该服务并启动任务:

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TaskSchedulerService } from './task-scheduler/task-scheduler.service';

@Module({
  imports: [],
  controllers: [AppController],
  providers: [AppService, TaskSchedulerService],
})
export class AppModule {
  constructor(private taskSchedulerService: TaskSchedulerService) {
    this.taskSchedulerService.startTask();
  }
}

此示例中,任务每5分钟执行一次。你可以根据需求调整时间表达式和任务逻辑。如果需要更复杂的任务管理(如优先级、队列),可以考虑使用agenda


作为一个屌丝程序员,我建议先从简单的定时任务开始构建你的NestJS任务调度系统。可以使用node-schedule或者NestJS的@nestjs/schedule模块来实现。

首先安装必要的依赖:

npm install --save @nestjs/schedule node-schedule

创建一个调度模块:

nest g module schedule

schedule模块中定义任务:

import { Injectable, OnModuleInit } from '@nestjs/common';
import { SchedulerRegistry } from '@nestjs/schedule';

@Injectable()
export class MyTask implements OnModuleInit {
  constructor(private schedulerRegistry: SchedulerRegistry) {}

  onModuleInit() {
    const job = new CronJob('0 * * * *', () => {
      console.log('任务执行了');
    });
    this.schedulerRegistry.addCronJob('myTask', job);
    job.start();
  }
}

记得在主模块里启用调度功能:

import { ScheduleModule } from '@nestjs/schedule';

@Module({
  imports: [ScheduleModule.forRoot()],
})
export class AppModule {}

这样就搭建了一个基本的定时任务调度系统,适合处理周期性的后台任务。当然,生产环境可能还需要考虑日志记录、任务失败重试等功能。

在NestJS中实现任务调度系统,推荐使用官方推荐的[@nestjs](/user/nestjs)/schedule模块或更强大的Bull队列系统。以下是两种方案的简要说明:

  1. 使用@nestjs/schedule (适合简单定时任务)
import { Module } from '[@nestjs](/user/nestjs)/common';
import { ScheduleModule } from '[@nestjs](/user/nestjs)/schedule';

@Module({
  imports: [ScheduleModule.forRoot()],
})
export class AppModule {}

// 定时任务示例
import { Injectable, Logger } from '[@nestjs](/user/nestjs)/common';
import { Cron, Interval, Timeout } from '[@nestjs](/user/nestjs)/schedule';

@Injectable()
export class TasksService {
  private readonly logger = new Logger(TasksService.name);

  @Cron('45 * * * * *') // 每分钟第45秒执行
  handleCron() {
    this.logger.debug('定时任务执行');
  }

  @Interval(10000) // 每10秒执行
  handleInterval() {
    this.logger.debug('间隔任务执行');
  }
}
  1. 使用Bull队列 (适合分布式和复杂任务)
// 安装:npm install [@nestjs](/user/nestjs)/bull bull
import { Module } from '[@nestjs](/user/nestjs)/common';
import { BullModule } from '[@nestjs](/user/nestjs)/bull';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
    BullModule.registerQueue({ name: 'audio' }),
  ],
})
export class AppModule {}

// 生产者服务
@Injectable()
export class AudioService {
  constructor(@InjectQueue('audio') private audioQueue: Queue) {}

  async addTranscodeJob() {
    await this.audioQueue.add('transcode', { file: 'audio.mp3' });
  }
}

// 消费者处理器
@Processor('audio')
export class AudioProcessor {
  @Process('transcode')
  async handleTranscode(job: Job) {
    console.log('处理任务:', job.data);
  }
}

选择建议:

  • 简单定时任务:使用[@nestjs](/user/nestjs)/schedule
  • 复杂/分布式任务:使用Bull队列系统
  • 记得在生产环境配置Redis持久化(Bull需要)
回到顶部