Kafka客户端模块kafka-node的使用

Kafka客户端模块kafka-node的使用
kafka-node 是一个用于 Node.js 的 Kafka 客户端库。它允许你与 Apache Kafka 集群进行交互,无论是生产消息还是消费消息。以下是一些基本的使用示例:

生产者

首先,你需要安装 kafka-node 库。可以通过 npm 来安装:

npm install kafka-node

示例代码:创建一个简单的生产者

const Kafka = require('kafka-node');
const { HighLevelProducer } = Kafka;

// 创建一个客户端实例
const client = new Kafka.KafkaClient({
    kafkaHost: 'localhost:9092'  // Kafka 服务器地址
});

// 创建一个高阶级生产者实例
const producer = new HighLevelProducer(client);

producer.on('ready', () => {
    console.log("Producer is ready");
});

producer.on('error', (err) => {
    console.error("Producer error:", err);
});

// 发送消息
const payloads = [
    {
        topic: 'test-topic',
        messages: 'Hello, Kafka!',
        partition: 0
    }
];

producer.send(payloads, (err, data) => {
    if (err) {
        console.error("Failed to send message:", err);
    } else {
        console.log("Message sent successfully:", data);
    }
});

消费者

示例代码:创建一个简单的消费者

const Kafka = require('kafka-node');
const { ConsumerGroup } = Kafka;

const consumerOptions = {
    kafkaHost: 'localhost:9092',  // Kafka 服务器地址
    groupId: 'test-group',  // 消费者组ID
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    fromOffset: 'latest',  // 从最新的消息开始消费
};

const consumer = new ConsumerGroup(consumerOptions, ['test-topic']);

consumer.on('ready', function (consumer) {
    console.log("Consumer is ready");
});

consumer.on('message', function (message) {
    console.log("Received message:", message);
});

consumer.on('error', function (err) {
    console.error("Consumer error:", err);
});

注意事项

  1. 配置:确保你的 Kafka 服务器正在运行,并且 kafkaHost 地址正确。
  2. 主题:在发送或接收消息之前,确保 Kafka 主题已经存在。
  3. 错误处理:始终处理可能发生的错误。

以上就是 kafka-node 的一些基本用法。你可以根据具体需求调整配置和逻辑。希望这对你有所帮助!


3 回复

当然,kafka-node是Node.js中用于Apache Kafka的一个强大客户端库。首先,你需要通过npm安装它:npm install kafka-node --save

接下来,让我们来创建一个简单的生产者:

const Kafka = require('kafka-node');
const Producer = Kafka.Producer;
const KeyedMessage = Kafka.KeyedMessage;
const client = new Kafka.KafkaClient();
const producer = new Producer(client);

producer.on('ready', function () {
    console.log("Producer is ready!");
});

producer.send([{
    topic: 'my-topic',
    messages: ['This is a message'],
    partition: 0
}], function (err, data) {
    console.log(data);
});

这段代码创建了一个Kafka生产者,当生产者准备就绪时,它会发送一条消息到名为’my-topic’的主题。

消费者部分也很简单:

const Consumer = Kafka.Consumer;
const consumer = new Consumer(
    client,
    [{ topic: 'my-topic' }],
    { fromOffset: 'earliest' }
);

consumer.on('message', function (message) {
    console.log(message);
});

这将创建一个消费者,监听’my-topic’主题,并打印接收到的消息。希望这些示例能帮助你开始使用kafka-node!


kafka-node 是一个用于 Node.js 的 Kafka 客户端库,可以用来生产和消费消息。下面是一些基本的使用示例。

1. 安装

首先,你需要安装 kafka-node 库。你可以通过 npm 来安装:

npm install kafka-node

2. 生产者(Producer)

以下是一个简单的生产者示例,它发送一条消息到指定的 Kafka 主题。

const Kafka = require('kafka-node');
const Producer = Kafka.Producer;
const KeyedMessage = Kafka.KeyedMessage;
const client = new Kafka.KafkaClient();
const producer = new Producer(client);

producer.on('ready', function () {
    console.log('Producer is ready');
});

producer.on('error', function (err) {
    console.error('Producer error:', err);
});

const messages = [
    new KeyedMessage('key1', 'message 1'),
    new KeyedMessage('key2', 'message 2')
];

const payloads = [
    { topic: 'my-topic', messages: messages }
];

producer.send(payloads, function (err, data) {
    if (err) {
        console.error('Failed to send message:', err);
    } else {
        console.log('Message sent successfully:', data);
    }
});

3. 消费者(Consumer)

以下是如何创建一个消费者来接收来自 Kafka 的消息。

const Consumer = Kafka.Consumer;
const client = new Kafka.KafkaClient();
const consumer = new Consumer(
    client,
    [
        { topic: 'my-topic', partition: 0 }
    ],
    {
        autoCommit: true,
        groupId: 'my-group'
    }
);

consumer.on('message', function (message) {
    console.log('Received message:', message);
});

consumer.on('error', function (err) {
    console.error('Consumer error:', err);
});

4. 配置

你可以根据需要调整配置,例如设置 groupIdautoCommit 等等。

5. 更多功能

kafka-node 还提供了其他高级功能,如分区管理、事务处理等。你可以查阅官方文档获取更多信息。

参考文档

希望这些信息对你有帮助!如果你有更具体的需求或问题,请告诉我。

kafka-node 是一个用于 Apache Kafka 的 Node.js 客户端。你可以使用它来创建 Kafka 生产者和消费者。首先,通过 npm 安装 kafka-node

npm install kafka-node --save

创建生产者示例:

const { KafkaJSNonRetriableError } = require('kafka-node');
const { Producer } = require('kafka-node');

const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'});
const producer = new Producer(client);

producer.on('ready', function() {
  producer.send([{
    topic: 'test',
    messages: ['message1']
  }], function(err, data) {
    console.log(data);
  });
});

创建消费者示例:

const { Consumer } = require('kafka-node');

const consumer = new Consumer(
  client,
  [{ topic: 'test' }],
  { fromOffset: 'latest' }
);

consumer.on('message', function(message) {
  console.log(message);
});

以上代码展示了如何初始化并使用 kafka-node 进行基本的消息发送和接收。

回到顶部