Nodejs Nestjs Kafka插件@nestjs/kafka的使用

Nodejs Nestjs Kafka插件@nestjs/kafka的使用
[@nestjs](/user/nestjs)/kafka 是一个用于 NestJS 应用程序的 Kafka 客户端库。它允许你轻松地将 Kafka 集成到你的应用程序中,以实现消息传递功能。以下是如何在 NestJS 应用程序中使用 [@nestjs](/user/nestjs)/kafka 的基本步骤:

1. 安装依赖

首先,你需要安装 [@nestjs](/user/nestjs)/kafkakafkajs(Kafka 的 JavaScript 客户端):

npm install [@nestjs](/user/nestjs)/kafka kafkajs

2. 配置 Kafka 模块

在你的 NestJS 应用程序中,你可以通过创建一个模块来配置 Kafka。例如,创建一个名为 kafka.module.ts 的文件:

import { Module } from '[@nestjs](/user/nestjs)/common';
import { ClientsModule, Transport } from '[@nestjs](/user/nestjs)/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_CLIENT',
        transport: Transport.KAFKA,
        options: {
          client: {
            brokers: ['localhost:9092'], // Kafka 服务器地址
          },
          consumer: {
            groupId: 'nest-consumer-group', // 消费者组 ID
          },
        },
      },
    ]),
  ],
})
export class KafkaModule {}

3. 创建 Kafka 服务

接下来,创建一个服务来处理 Kafka 消息。例如,创建一个名为 kafka.service.ts 的文件:

import { Injectable } from '[@nestjs](/user/nestjs)/common';
import { ClientKafka } from '[@nestjs](/user/nestjs)/microservices';

@Injectable()
export class KafkaService {
  constructor(private readonly clientKafka: ClientKafka) {
    this.clientKafka.connect();
  }

  async onModuleInit() {
    this.clientKafka.subscribeToResponseOf('ping')
      .subscribe(data => console.log(data));
  }

  sendPingMessage(message: string) {
    this.clientKafka.emit('ping', message);
  }
}

4. 使用 Kafka 服务

最后,在你的控制器或其他服务中注入并使用 Kafka 服务:

import { Controller } from '[@nestjs](/user/nestjs)/common';
import { KafkaService } from './kafka.service';

@Controller()
export class AppController {
  constructor(private readonly kafkaService: KafkaService) {}

  @Get('send-ping')
  sendPing() {
    this.kafkaService.sendPingMessage('Hello Kafka!');
    return { message: 'Ping sent' };
  }
}

5. 运行应用

确保你的 Kafka 服务器正在运行,然后启动你的 NestJS 应用程序。访问 /send-ping 路由将触发向 Kafka 主题发送消息。

以上就是如何在 NestJS 应用程序中使用 [@nestjs](/user/nestjs)/kafka 插件的基本步骤。你可以根据需要调整配置和逻辑。


3 回复

当然,没问题!@nestjs/kafka 是 NestJS 提供的一个用于处理 Apache Kafka 的官方库。首先,你需要安装它:

npm install [@nestjs](/user/nestjs)/kafka

然后,在你的模块中导入 KafkaModule 并配置连接信息:

import { Module } from '[@nestjs](/user/nestjs)/common';
import { KafkaModule } from '[@nestjs](/user/nestjs)/kafka';

@Module({
  imports: [
    KafkaModule.forRoot({
      client: {
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'nest-consumer',
      },
    }),
  ],
})
export class AppModule {}

接下来,你可以创建一个 Kafka 消费者:

import { Controller, Payload, MessagePattern } from '[@nestjs](/user/nestjs)/microservices';

@Controller()
export class AppController {
  @MessagePattern({ cmd: 'hello' })
  getHello(@Payload() data: any) {
    return `Hello ${data.name}`;
  }
}

最后,别忘了启动消费者:

import { NestFactory } from '[@nestjs](/user/nestjs)/core';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.KAFKA,
  });
  await app.listen();
}
bootstrap();

现在,你可以发送一条消息到主题,并得到 “Hello [name]” 的回复了!希望这能帮到你,祝你编程愉快!


NestJS 提供了一个强大的 Kafka 插件 @nestjs/kafka 用于处理异步事件驱动的应用。以下是如何在 NestJS 应用中使用该插件的基本步骤和示例代码。

1. 安装依赖

首先,你需要安装 @nestjs/kafkakafkajs(Kafka 的一个 JavaScript 客户端):

npm install @nestjs/kafka kafkajs

2. 配置 Kafka 模块

在你的 NestJS 应用中,创建一个 Kafka 模块并配置 Kafka 客户端。通常,这会在 app.module.ts 中完成,但为了模块化,我们可以单独创建一个模块。

// kafka.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_CLIENT',
        transport: Transport.KAFKA,
        options: {
          client: {
            brokers: ['localhost:9092'], // 替换为你的 Kafka 服务器地址
          },
          consumer: {
            groupId: 'nest-consumer-group', // 消费者组ID
          },
        },
      },
    ]),
  ],
})
export class KafkaModule {}

3. 创建 Kafka 服务

接下来,我们创建一个服务来处理 Kafka 消息。

// kafka.service.ts
import { Injectable } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Injectable()
export class KafkaService {
  constructor(private readonly clientKafka: ClientKafka) {
    this.clientKafka.connect();
  }

  async onModuleInit() {
    await this.clientKafka.subscribeToResponseOf('test.topic')
      .subscribe(data => console.log('Received message:', data));
  }
}

4. 控制器

现在,我们可以创建一个控制器来发送消息到 Kafka 主题。

// app.controller.ts
import { Controller, Get } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly clientKafka: ClientKafka) {}

  @Get('send-message')
  async sendMessage() {
    this.clientKafka
      .emit('test.topic', { value: 'Hello, Kafka!' });
  }
}

5. 注册模块和控制器

确保在你的主应用模块中导入了 Kafka 模块,并注册了你的控制器。

// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { KafkaModule } from './kafka.module';
import { KafkaService } from './kafka.service';

@Module({
  imports: [KafkaModule],
  controllers: [AppController],
  providers: [AppService, KafkaService],
})
export class AppModule {}

这样,你就有了一个基本的 NestJS 应用,它可以发送和接收 Kafka 消息。你可以根据需要调整 Kafka 的配置和逻辑。

@nestjs/kafka 是 NestJS 提供的一个用于处理 Apache Kafka 的官方库。首先安装它:

npm install @nestjs/kafka --save

配置模块:

import { Module } from '@nestjs/common';
import { KafkaModule } from '@nestjs/kafka';

@Module({
  imports: [
    KafkaModule.register({
      client: {
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'test-consumer-group',
      },
    }),
  ],
})
export class AppModule {}

创建一个 Kafka 服务:

import { Controller, Get, KafkaContext, MessagePattern } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly client: ClientKafka) {}

  @MessagePattern({ cmd: 'hello' })
  getHello(context: KafkaContext) {
    this.client.send({ topic: 'hello', partition: 0 }, 'world').subscribe();
    return 'Hello World!';
  }
}

这样就完成了基本的 Kafka 消息发送和接收配置。

回到顶部