Nestjs教程使用Kafka实现事件驱动架构

在Nestjs项目中集成Kafka实现事件驱动架构时遇到几个问题想请教:

  1. 如何正确配置Nestjs与Kafka的连接?官方文档提到的KafkaOptions参数不太清楚具体用法。
  2. 事件生产者发送消息后,消费者有时收不到消息,排查发现Kafka集群是正常的,可能是什么原因?
  3. Nestjs的Kafka装饰器(@EventPattern)和手动订阅有什么区别?哪种方式更适合微服务场景?
  4. 在分布式事务中如何保证Kafka消息的可靠性?比如订单服务和库存服务之间的最终一致性实现。
  5. 有没有成熟的Nestjs+Kafka项目结构最佳实践?感觉自己的项目分层有点混乱。

本地开发时用Kafka单节点没问题,但部署到K8s集群后经常出现连接超时,该如何配置网络?


3 回复

以下是一个简单的NestJS + Kafka实现事件驱动架构的步骤:

  1. 安装依赖

    npm install [@nestjs](/user/nestjs)/microservices kafka-node
    
  2. 配置Kafka模块: 在app.module.ts中引入[@nestjs](/user/nestjs)/microservices模块,并配置Kafka:

    import { Module } from '[@nestjs](/user/nestjs)/common';
    import { ClientsModule, Transport } from '[@nestjs](/user/nestjs)/microservices';
    
    [@Module](/user/Module)({
      imports: [
        ClientsModule.register([
          {
            name: 'KAFKA_SERVICE',
            transport: Transport.KAFKA,
            options: {
              client: {
                brokers: ['localhost:9092'], // Kafka地址
              },
              consumer: {
                groupId: 'nestjs-consumer', // 消费者组
              },
            },
          },
        ]),
      ],
      controllers: [],
      providers: [],
    })
    export class AppModule {}
    
  3. 创建生产者服务: 创建一个服务来发送消息到Kafka:

    import { Injectable } from '[@nestjs](/user/nestjs)/common';
    import { ClientKafka } from '[@nestjs](/user/nestjs)/microservices';
    
    [@Injectable](/user/Injectable)()
    export class ProducerService {
      constructor(private readonly client: ClientKafka) {}
    
      sendEvent(pattern: string, message: any) {
        return this.client.send(pattern, message).toPromise();
      }
    }
    
  4. 创建消费者服务: 创建一个服务来处理Kafka消息:

    import { Controller, OnModuleInit } from '[@nestjs](/user/nestjs)/common';
    import { ClientKafka } from '[@nestjs](/user/nestjs)/microservices';
    
    [@Controller](/user/Controller)()
    export class ConsumerController implements OnModuleInit {
      constructor(private readonly client: ClientKafka) {}
    
      onModuleInit() {
        this.client.subscribeToResponseOf('event.pattern').subscribe((data) => {
          console.log('Received:', data);
        });
      }
    }
    
  5. 运行应用: 启动NestJS应用,它会自动连接到Kafka并开始监听消息。

这样就完成了一个简单的事件驱动架构。实际项目中可能需要更复杂的错误处理和消息确认机制。


要在NestJS中使用Kafka实现事件驱动架构,首先安装必要的依赖:

npm install @nestjs/microservices kafka-node

创建一个Kafka模块:

import { Module } from '@nestjs/common';
import { KafkaModule } from './kafka/kafka.module';

@Module({
  imports: [KafkaModule],
})
export class AppModule {}

创建Kafka服务来处理生产者和消费者:

import { Controller, Get } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly client: ClientKafka) {}

  @Get('produce')
  produceMessage() {
    this.client.emit('event_topic', { data: 'Hello Kafka!' });
  }

  @Get('consume')
  consumeMessage() {
    this.client.subscribeToResponseOf('event_topic').subscribe((data) => {
      console.log('Consumed:', data);
    });
  }
}

配置Kafka客户端:

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_CLIENT',
        transport: Transport.KAFKA,
        options: {
          client: {
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'test-consumer-group',
          },
        },
      },
    ]),
  ],
})
export class KafkaModule {}

这样就完成了基本的事件驱动架构搭建。通过Kafka的Topic进行消息传递,生产者发送消息到Topic,消费者订阅并处理消息。

使用NestJS和Kafka实现事件驱动架构

NestJS与Kafka结合非常适合构建事件驱动架构(EDA)。以下是基本实现步骤:

1. 安装必要依赖

npm install kafkajs @nestjs/microservices

2. 配置Kafka模块

// kafka.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'my-group-id',
          },
        },
      },
    ]),
  ],
  exports: [ClientsModule],
})
export class KafkaModule {}

3. 创建生产者服务

// kafka.producer.service.ts
import { Injectable, Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Injectable()
export class KafkaProducerService {
  constructor(
    @Inject('KAFKA_SERVICE') private readonly client: ClientKafka
  ) {}

  async emitEvent(topic: string, message: any) {
    await this.client.emit(topic, message).toPromise();
  }
}

4. 创建消费者控制器

// kafka.consumer.controller.ts
import { Controller, Post } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';

@Controller()
export class KafkaConsumerController {
  @EventPattern('my-topic')
  async handleEvent(@Payload() message: any) {
    console.log('Received message:', message);
    // 处理业务逻辑
  }
}

5. 在主模块中集成

// app.module.ts
import { Module } from '@nestjs/common';
import { KafkaModule } from './kafka.module';
import { KafkaProducerService } from './kafka.producer.service';
import { KafkaConsumerController } from './kafka.consumer.controller';

@Module({
  imports: [KafkaModule],
  controllers: [KafkaConsumerController],
  providers: [KafkaProducerService],
})
export class AppModule {}

使用示例

// 在需要的地方发送消息
await this.kafkaProducerService.emitEvent('my-topic', {
  event: 'user.created',
  data: { userId: 123 },
});

高级配置

  1. 消息序列化:可以配置自定义的序列化器
  2. 错误处理:实现重试机制和死信队列
  3. 分区策略:根据业务需求选择合适的分区策略
  4. 性能调优:调整批处理大小和延迟等参数

这种架构适用于需要解耦服务、异步处理和高吞吐量的场景。

回到顶部