Nestjs高级进阶实时数据推送实现

在Nestjs中实现实时数据推送时遇到几个问题:

  1. 使用WebSocket和Socket.io时,如何优雅地集成到现有Nestjs项目中?官方文档的示例比较基础,实际项目中该如何组织模块和依赖?
  2. 实时推送的性能优化有什么建议?特别是在高并发场景下,如何避免内存泄漏或连接数过多的问题?
  3. 除了Socket.io,Nestjs是否支持SSE(Server-Sent Events)或其他轻量级方案?不同方案的适用场景如何选择?
  4. 客户端断线重连后,如何保证消息不丢失?是否需要引入消息队列或持久化机制?
  5. 有没有成熟的权限控制方案?比如如何根据JWT验证控制不同用户的推送权限?

3 回复

在NestJS中实现高级进阶的实时数据推送,通常使用WebSocket技术。首先安装@nestjs/websocketssocket.io库:

npm install @nestjs/websockets socket.io
  1. 创建一个WebSocket Gateway:
import { SubscribeMessage, WebSocketGateway } from '@nestjs/websockets';

@WebSocketGateway()
export class ChatGateway {
  @SubscribeMessage('events')
  handleEvent(client, data) {
    // 实时处理事件
    return { message: '实时消息', data };
  }
}
  1. 使用Redis实现广播机制:安装redissocket.io-redis
npm install redis socket.io-redis

修改Gateway配置:

import { NestApplicationContext } from '@nestjs/common';
import { WsAdapter } from '@nestjs/platform-ws';
import { createClient } from 'redis';
import { Server } from 'socket.io';

export class RedisIoAdapter extends WsAdapter {
  private pubClient;
  private subClient;

  async connectToRedis() {
    this.pubClient = createClient();
    this.subClient = createClient();

    await Promise.all([this.subClient.connect(), this.pubClient.connect()]);
    this.subClient.subscribe('events');
    this.subClient.on('message', (channel, message) => {
      this.server.emit(channel.toString(), JSON.parse(message));
    });
  }

  override bindClientConnectEvent(app: NestApplicationContext) {
    const server = app.getHttpAdapter().getInstance() as Server;
    this.pubClient = createClient();
    this.subClient = createClient();

    return super.bindClientConnectEvent({
      ...app,
      getInstance: () => {
        this.subClient.connect();
        this.subClient.subscribe('events');
        this.subClient.on('message', (channel, message) => {
          server.emit(channel.toString(), JSON.parse(message));
        });
        return server;
      },
    });
  }
}

最后在主模块中应用自定义适配器:

import { RedisIoAdapter } from './redis-io.adapter';

@Module({
  providers: [ChatGateway],
})
export class AppModule implements NestModule {
  configure(consumer: MiddlewareConsumer | ModuleRef): any {
    const app = this.moduleRef.get(HttpAdapterHost).adapter;
    const redisAdapter = new RedisIoAdapter(app);
    redisAdapter.connectToRedis();
    app.setAdapter(redisAdapter);
  }
}

这样就能实现高并发下的实时数据推送。


在NestJS中实现高级进阶的实时数据推送,可以利用WebSocket技术。首先安装依赖@nestjs/websocketssocket.io。创建一个WebSocket服务,例如AppGateway

import { WebSocketGateway, OnGatewayConnection, OnGatewayDisconnect } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

@WebSocketGateway()
export class AppGateway implements OnGatewayConnection, OnGatewayDisconnect {
  handleConnection(client: Socket) {
    console.log(`Client connected: ${client.id}`);
  }

  handleDisconnect(client: Socket) {
    console.log(`Client disconnected: ${client.id}`);
  }

  // 自定义事件
  @SubscribeMessage('events')
  findAll(client: Socket, data: any): void {
    client.emit('response', `Data: ${data}`);
  }
}

通过main.ts启用Gateway:

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const server = app.getHttpAdapter().getInstance();
  const gateway = new AppGateway();
  gateway.afterInit(server);
  await app.listen(3000);
}
bootstrap();

客户端使用socket.io-client连接并监听推送:

const socket = io('http://localhost:3000');
socket.on('response', (data) => {
  console.log(data);
});
socket.emit('events', 'Hello Server');

这种实现支持双向通信,适合实时聊天、通知等场景。通过结合JWT认证可增强安全性。

NestJS 高级进阶:实时数据推送实现

在 NestJS 中实现实时数据推送主要有几种方式,我将介绍最常用的两种:Socket.IO 和 SSE (Server-Sent Events)。

1. 使用 Socket.IO 实现

Socket.IO 是一个流行的实时通信库,非常适合双向通信场景。

安装依赖

npm install @nestjs/platform-socket.io socket.io

实现步骤

// socket.gateway.ts
import { WebSocketGateway, WebSocketServer, SubscribeMessage } from '@nestjs/websockets';
import { Server } from 'socket.io';

@WebSocketGateway({
  cors: {
    origin: '*',
  },
})
export class SocketGateway {
  @WebSocketServer()
  server: Server;

  @SubscribeMessage('message')
  handleMessage(client: any, payload: any): string {
    return 'Message received';
  }

  // 主动推送消息给所有客户端
  broadcastEvent(event: string, data: any) {
    this.server.emit(event, data);
  }
}

2. 使用 SSE (Server-Sent Events) 实现

SSE 更适合服务器向客户端单向推送数据的场景,如实时通知、日志推送等。

实现步骤

// sse.controller.ts
import { Controller, Get, Res, Sse } from '@nestjs/common';
import { Response } from 'express';
import { Observable, interval, map } from 'rxjs';

@Controller('sse')
export class SseController {
  @Sse('stream')
  sse(): Observable<MessageEvent> {
    return interval(1000).pipe(
      map((_) => ({ data: { message: 'Hello World!' } } as MessageEvent)),
    );
  }

  @Get('manual')
  manualSse(@Res() res: Response) {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    
    setInterval(() => {
      res.write(`data: ${JSON.stringify({ time: new Date().toISOString() })}\n\n`);
    }, 1000);
  }
}

实际应用建议

  1. 选择合适的技术

    • 需要双向通信 → Socket.IO
    • 只需要服务器推送 → SSE
  2. 性能优化

    • 使用 Redis 适配器实现多实例间的消息广播
    • 考虑消息压缩
    • 设置合理的心跳间隔
  3. 最佳实践

    • 对事件进行合理命名和分类
    • 实现认证和授权
    • 添加速率限制
    • 处理连接失败和重连

这两种方法都可以很好地集成到 NestJS 应用中,选择哪种取决于你的具体需求。

回到顶部