Nestjs项目实战 结合WebSocket实现实时数据推送

在Nestjs项目中结合WebSocket实现实时数据推送时,如何优雅地处理客户端连接的生命周期?比如客户端异常断开后,如何自动清理残留的订阅关系?另外,在微服务架构下,多个Nestjs实例之间如何同步WebSocket消息,避免因负载均衡导致客户端遗漏推送?能否分享具体的网关配置和消息广播的最佳实践?

3 回复

在NestJS中实现WebSocket实时数据推送非常简单。首先安装依赖:@nestjs/websocketssocket.io

创建一个WebSocket服务:

import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';

@WebSocketGateway()
export class ChatGateway {
  @WebSocketServer() server: Server;

  // 监听客户端连接事件
  handleConnection(client: any) {
    console.log(`Client connected: ${client.id}`);
  }

  // 实时推送数据
  async sendRealTimeData(data: any) {
    this.server.emit('newMessage', data);
  }
}

在控制器中调用推送方法:

import { Controller } from '@nestjs/common';
import { ChatGateway } from './chat.gateway';

@Controller()
export class AppController {
  constructor(private readonly chatGateway: ChatGateway) {}

  @Get('push')
  pushData() {
    const data = { message: 'Hello, real-time!' };
    this.chatGateway.sendRealTimeData(data);
    return { status: 'success' };
  }
}

客户端接收数据:

<script src="/socket.io/socket.io.js"></script>
<script>
  const socket = io();
  socket.on('newMessage', (data) => {
    console.log(data); // 显示推送的数据
  });
</script>

这样,当访问/push接口时,服务器会通过WebSocket向所有连接的客户端推送消息。


在NestJS项目中实现WebSocket实时数据推送,首先安装依赖@nestjs/websocketssocket.io。创建一个WebSocket服务类,使用@WebSocketGateway()装饰器定义入口。

  1. 初始化项目并安装依赖:
npm install @nestjs/websockets socket.io
  1. 创建WebSocket模块:
import { Module } from '@nestjs/common';
import { WebSocketGateway } from '@nestjs/websockets';

@WebSocketGateway()
export class ChatGateway {}
  1. 在服务中监听连接与消息:
@WebSocketGateway()
export class ChatGateway {
  handleConnection(client: any) {
    console.log('客户端已连接', client.id);
  }

  handleDisconnect(client: any) {
    console.log('客户端已断开', client.id);
  }

  handleMessage(message: string, client: any) {
    console.log('收到消息:', message);
    this.server.emit('response', `服务器收到: ${message}`);
  }
}
  1. 实现实时推送:通过this.server.emit方法向所有客户端发送数据。

  2. 配置路由,访问ws://localhost:3000即可连接。结合业务逻辑,可定时推送或响应特定事件。

记得在main.ts启用WebSocket支持:

const app = await NestFactory.create(AppModule);
app.useWebSocketAdapter(new WsAdapter(app));
await app.listen(3000);

此方案适用于聊天、在线监控等场景,代码简洁高效。

NestJS 实战:结合 WebSocket 实现实时数据推送

WebSocket 是实现实时应用的关键技术,下面展示如何在 NestJS 项目中集成 WebSocket 实现实时数据推送。

1. 安装必要依赖

npm install @nestjs/websockets @nestjs/platform-socket.io socket.io

2. 创建 WebSocket 网关

// src/websocket/events.gateway.ts
import { WebSocketGateway, WebSocketServer, SubscribeMessage, OnGatewayConnection, OnGatewayDisconnect } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

@WebSocketGateway({
  cors: {
    origin: '*',
  },
})
export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer() server: Server;

  handleConnection(client: Socket) {
    console.log(`Client connected: ${client.id}`);
  }

  handleDisconnect(client: Socket) {
    console.log(`Client disconnected: ${client.id}`);
  }

  @SubscribeMessage('subscribeToData')
  handleSubscribe(client: Socket, payload: any) {
    // 这里可以验证订阅权限等
    client.join(payload.room);
    return { event: 'subscribed', data: `Subscribed to ${payload.room}` };
  }

  // 广播消息给所有客户端
  broadcastMessage(event: string, data: any) {
    this.server.emit(event, data);
  }

  // 发送消息给特定房间
  sendToRoom(room: string, event: string, data: any) {
    this.server.to(room).emit(event, data);
  }
}

3. 在模块中注册

// src/websocket/events.module.ts
import { Module } from '@nestjs/common';
import { EventsGateway } from './events.gateway';

@Module({
  providers: [EventsGateway],
})
export class EventsModule {}

4. 在服务中使用 WebSocket 推送

// src/some.service.ts
import { Injectable } from '@nestjs/common';
import { EventsGateway } from './websocket/events.gateway';

@Injectable()
export class SomeService {
  constructor(private readonly eventsGateway: EventsGateway) {}

  async updateData() {
    // 业务逻辑...
    const newData = { /* 新数据 */ };
    
    // 推送实时更新
    this.eventsGateway.broadcastMessage('dataUpdated', newData);
    // 或者推送到特定房间
    this.eventsGateway.sendToRoom('room1', 'dataUpdated', newData);
  }
}

5. 客户端连接示例 (前端)

import { io } from 'socket.io-client';

const socket = io('http://your-nestjs-server-url', {
  transports: ['websocket']
});

// 订阅特定房间
socket.emit('subscribeToData', { room: 'room1' });

// 监听数据更新
socket.on('dataUpdated', (data) => {
  console.log('Received update:', data);
  // 更新UI
});

进阶优化

  1. 认证与授权:可以在 WebSocket 连接建立时进行 JWT 验证
  2. 限流:防止客户端频繁发送消息
  3. 错误处理:完善错误处理和重连机制
  4. Redis 适配器:在集群模式下使用 Redis 适配器保持多实例间同步

这样你就实现了基本的 WebSocket 实时数据推送功能,可用于聊天应用、实时通知、数据监控等场景。

回到顶部