Nestjs教程使用RabbitMQ实现消息队列

在Nestjs中集成RabbitMQ时遇到了一些问题:

  1. 官方文档的示例比较基础,想了解如何在实际项目中配置多个消息队列?比如不同业务模块需要独立的队列和交换机
  2. 消息持久化应该在哪里配置更合理?是在模块定义时设置,还是在每次发布消息时单独声明?
  3. 消费端出现异常时,RabbitMQ会自动重试吗?如果需要自定义重试策略(比如延迟重试),在Nestjs中该如何实现?
  4. 有没有优雅的监控方案?想实时查看消息堆积情况,并能在消息处理失败时触发告警
  5. 测试时发现本地开发环境连接云端的RabbitMQ经常超时,是否有推荐的连接池配置或重连机制?
3 回复

NestJS 是一个用于构建高效、可扩展的 Node.js 服务端应用的框架。使用 RabbitMQ 实现消息队列可以实现异步通信。

首先安装依赖:

npm install @nestjs/microservices amqp

配置 app.module.ts:

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'RMQ_CLIENT',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'], // RabbitMQ 地址
          queue: 'test_queue',
          queueOptions: { durable: false },
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

创建服务发送消息:

import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

@Injectable()
export class AppService {
  constructor(private readonly client: ClientProxy) {}

  sendMessage(message: string) {
    this.client.emit('message', message);
  }
}

创建控制器接收消息:

import { Controller, Get } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  @MessagePattern('message')
  handleMessage(data: any) {
    console.log(data); // 接收消息
    return `Received: ${data}`;
  }
}

运行后,调用 sendMessage 方法即可向 RabbitMQ 发送消息。


在NestJS中使用RabbitMQ实现消息队列非常实用。首先,通过npm安装依赖:npm install @nestjs/microservices amqplib

  1. 配置模块
    app.module.ts中引入@nestjs/microservices模块,并定义RabbitMQ的连接配置:
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'RMQ_CLIENT',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'], // RabbitMQ地址
          queue: 'test_queue',
          queueOptions: { durable: false },
        },
      },
    ]),
  ],
})
export class AppModule {}
  1. 创建服务
    在服务中注入RabbitMQ客户端并发送或接收消息:
@Injectable()
export class AppService {
  constructor(@Inject('RMQ_CLIENT') private readonly client: ClientRMQ) {}

  sendMessage(message: string) {
    this.client.emit('message', message);
  }

  listenForMessage() {
    this.client.subscribe('message', (data) => {
      console.log(data.toString());
    });
  }
}
  1. 启动服务
    在主应用中调用方法即可:
async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const appService = app.get(AppService);

  appService.listenForMessage(); // 监听消息
  appService.sendMessage('Hello RabbitMQ'); // 发送消息

  await app.listen(3000);
}
bootstrap();

这样,你就完成了基于NestJS与RabbitMQ的消息队列功能实现。

NestJS中使用RabbitMQ实现消息队列

NestJS提供了优雅的集成方式使用RabbitMQ作为消息队列。以下是完整实现步骤:

1. 安装必要依赖

npm install --save @nestjs/microservices amqplib amqp-connection-manager

2. 配置RabbitMQ模块

创建rabbitmq.module.ts:

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'RABBITMQ_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'main_queue',
          queueOptions: {
            durable: false
          },
        },
      },
    ]),
  ],
  exports: [ClientsModule],
})
export class RabbitMQModule {}

3. 消息生产者示例

import { Controller, Post } from '@nestjs/common';
import { ClientProxy, MessagePattern } from '@nestjs/microservices';
import { Inject } from '@nestjs/common';

@Controller('messages')
export class MessageController {
  constructor(
    @Inject('RABBITMQ_SERVICE') private readonly client: ClientProxy,
  ) {}

  @Post()
  async sendMessage() {
    await this.client.emit('message_event', {
      text: 'Hello RabbitMQ!',
      date: new Date(),
    }).toPromise();
    return { status: 'Message sent' };
  }
}

4. 消息消费者示例

import { Injectable } from '@nestjs/common';
import {
  MessagePattern,
  Payload,
  Ctx,
  RmqContext,
} from '@nestjs/microservices';

@Injectable()
export class MessageConsumer {
  @MessagePattern('message_event')
  async handleMessage(@Payload() data: any, @Ctx() context: RmqContext) {
    console.log('Received message:', data);
    
    // 手动确认消息
    const channel = context.getChannelRef();
    const originalMsg = context.getMessage();
    channel.ack(originalMsg);
  }
}

5. 在AppModule中注册

import { Module } from '@nestjs/common';
import { RabbitMQModule } from './rabbitmq.module';
import { MessageController } from './message.controller';
import { MessageConsumer } from './message.consumer';

@Module({
  imports: [RabbitMQModule],
  controllers: [MessageController],
  providers: [MessageConsumer],
})
export class AppModule {}

关键点说明

  1. emit()用于发送事件(无需响应)
  2. send()用于发送请求(期待响应)
  3. 可以通过@Ctx()获取RabbitMQ原生上下文
  4. 建议使用amqp-connection-manager处理连接重连

记得先运行RabbitMQ服务器并正确配置连接URL。

回到顶部