Nestjs教程使用RabbitMQ实现消息队列
在Nestjs中集成RabbitMQ时遇到了一些问题:
- 官方文档的示例比较基础,想了解如何在实际项目中配置多个消息队列?比如不同业务模块需要独立的队列和交换机
- 消息持久化应该在哪里配置更合理?是在模块定义时设置,还是在每次发布消息时单独声明?
- 消费端出现异常时,RabbitMQ会自动重试吗?如果需要自定义重试策略(比如延迟重试),在Nestjs中该如何实现?
- 有没有优雅的监控方案?想实时查看消息堆积情况,并能在消息处理失败时触发告警
- 测试时发现本地开发环境连接云端的RabbitMQ经常超时,是否有推荐的连接池配置或重连机制?
3 回复
NestJS 是一个用于构建高效、可扩展的 Node.js 服务端应用的框架。使用 RabbitMQ 实现消息队列可以实现异步通信。
首先安装依赖:
npm install @nestjs/microservices amqp
配置 app.module.ts:
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'RMQ_CLIENT',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'], // RabbitMQ 地址
queue: 'test_queue',
queueOptions: { durable: false },
},
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
创建服务发送消息:
import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Injectable()
export class AppService {
constructor(private readonly client: ClientProxy) {}
sendMessage(message: string) {
this.client.emit('message', message);
}
}
创建控制器接收消息:
import { Controller, Get } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
@Controller()
export class AppController {
@MessagePattern('message')
handleMessage(data: any) {
console.log(data); // 接收消息
return `Received: ${data}`;
}
}
运行后,调用 sendMessage
方法即可向 RabbitMQ 发送消息。
在NestJS中使用RabbitMQ实现消息队列非常实用。首先,通过npm安装依赖:npm install @nestjs/microservices amqplib
。
- 配置模块
在app.module.ts
中引入@nestjs/microservices
模块,并定义RabbitMQ的连接配置:
@Module({
imports: [
ClientsModule.register([
{
name: 'RMQ_CLIENT',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'], // RabbitMQ地址
queue: 'test_queue',
queueOptions: { durable: false },
},
},
]),
],
})
export class AppModule {}
- 创建服务
在服务中注入RabbitMQ客户端并发送或接收消息:
@Injectable()
export class AppService {
constructor(@Inject('RMQ_CLIENT') private readonly client: ClientRMQ) {}
sendMessage(message: string) {
this.client.emit('message', message);
}
listenForMessage() {
this.client.subscribe('message', (data) => {
console.log(data.toString());
});
}
}
- 启动服务
在主应用中调用方法即可:
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const appService = app.get(AppService);
appService.listenForMessage(); // 监听消息
appService.sendMessage('Hello RabbitMQ'); // 发送消息
await app.listen(3000);
}
bootstrap();
这样,你就完成了基于NestJS与RabbitMQ的消息队列功能实现。
NestJS中使用RabbitMQ实现消息队列
NestJS提供了优雅的集成方式使用RabbitMQ作为消息队列。以下是完整实现步骤:
1. 安装必要依赖
npm install --save @nestjs/microservices amqplib amqp-connection-manager
2. 配置RabbitMQ模块
创建rabbitmq.module.ts
:
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'RABBITMQ_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'main_queue',
queueOptions: {
durable: false
},
},
},
]),
],
exports: [ClientsModule],
})
export class RabbitMQModule {}
3. 消息生产者示例
import { Controller, Post } from '@nestjs/common';
import { ClientProxy, MessagePattern } from '@nestjs/microservices';
import { Inject } from '@nestjs/common';
@Controller('messages')
export class MessageController {
constructor(
@Inject('RABBITMQ_SERVICE') private readonly client: ClientProxy,
) {}
@Post()
async sendMessage() {
await this.client.emit('message_event', {
text: 'Hello RabbitMQ!',
date: new Date(),
}).toPromise();
return { status: 'Message sent' };
}
}
4. 消息消费者示例
import { Injectable } from '@nestjs/common';
import {
MessagePattern,
Payload,
Ctx,
RmqContext,
} from '@nestjs/microservices';
@Injectable()
export class MessageConsumer {
@MessagePattern('message_event')
async handleMessage(@Payload() data: any, @Ctx() context: RmqContext) {
console.log('Received message:', data);
// 手动确认消息
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}
}
5. 在AppModule中注册
import { Module } from '@nestjs/common';
import { RabbitMQModule } from './rabbitmq.module';
import { MessageController } from './message.controller';
import { MessageConsumer } from './message.consumer';
@Module({
imports: [RabbitMQModule],
controllers: [MessageController],
providers: [MessageConsumer],
})
export class AppModule {}
关键点说明
emit()
用于发送事件(无需响应)send()
用于发送请求(期待响应)- 可以通过
@Ctx()
获取RabbitMQ原生上下文 - 建议使用
amqp-connection-manager
处理连接重连
记得先运行RabbitMQ服务器并正确配置连接URL。