Flutter Kafka客户端插件kafkabr的使用

发布于 1周前 作者 songsunli 来自 Flutter

Flutter Kafka客户端插件kafkabr的使用

安装

目前没有Pub包,但API稳定后会发布。 可以通过git依赖在pubspec.yaml中添加:

dependencies:
  kafka:
    git: https://github.com/armando-couto/kafkabr.git

然后像平常一样导入:

import 'package:kafkabr/kafka.dart';

特性

此库提供了几个高阶API对象来与Kafka交互:

  • KafkaSession - 负责管理到Kafka代理的连接并协调所有请求。也提供元数据信息访问。
  • Producer - 发布消息到Kafka主题
  • Consumer - 从Kafka主题消费消息并存储其状态(当前偏移量)。利用ConsumerMetadata API通过ConsumerGroup。
  • Fetcher - 消费消息而不存储状态。
  • OffsetMaster - 提供方便的接口允许轻松检索特定topic-partitions的最早和最新偏移量。
  • ConsumerGroup - 提供方便的接口利用Consumer Metadata API轻松获取或提交消费者偏移量。

生产者

简单的实现的Kafka生产者。支持自动检测leader,并为每个broker创建单独的ProduceRequest。请求是并发发送的,所有响应都聚集在特殊ProduceResult对象中。

// file:produce.dart
import 'dart:io';
import 'package:kafkabr/kafka.dart';

main(List<String> arguments) async {
  var host = new ContactPoint('127.0.0.1', 9092);
  var session = new KafkaSession([host]);

  var producer = new Producer(session, 1, 1000);
  var result = await producer.produce([
    new ProduceEnvelope('topicName', 0, [new Message('msgForPartition0'.codeUnits)]),
    new ProduceEnvelope('topicName', 1, [new Message('msgForPartition1'.codeUnits)])
  ]);
  print(result.hasErrors);
  print(result.offsets);
  session.close(); // 确保工作完成后关闭session。
}

结果:

$ dart produce.dart
$ false
$ {dartKafkaTest: {0: 213075, 1: 201680}}

消费者

高层次的Kafka消费者实现,使用Kafka的ConsumerMetadata API存储其状态。

如果不需要保存消费偏移量的状态,请查看Fetcher,它专门为此用例设计。

消费者返回消息作为Stream,因此所有标准流操作都适用。但是Kafka主题是有序的消息流,带有连续的偏移量。消费者实现了保持从服务器接收的消息顺序的功能。为此目的,所有消息都被包裹在特殊的MessageEnvelope对象中,具有以下方法:

/// 信号给消费者,表示消息已被处理且可以提交偏移量。
void commit(String metadata);

/// 信号消息已被处理且我们已准备好下一个。此消息的偏移量将不会被提交。
void ack();

/// 信号给消费者取消任何进一步的交付并关闭流。
void cancel();

必须调用commit()ack()以处理每条消息,否则消费者将不会发送下一条消息到流。

最简单的的消费者示例:

import 'dart:io';
import 'dart:async';
import 'package:kafkabr/kafka.dart';

void main(List<String> arguments) async {
  var host = new ContactPoint('127.0.0.1', 9092);
  var session = new KafkaSession([host]);
  var group = new ConsumerGroup(session, 'consumerGroupName');
  var topics = {
    'topicName': [0, 1] // 列出要消费的主题的分区。
  };

  var consumer = new Consumer(session, group, topics, 100, 1);
  await for (MessageEnvelope envelope in consumer.consume(limit: 3)) {
    // 假设消息由上一个示例中的Producer产生。
    var value = new String.fromCharCodes(envelope.message.value);
    print('Got message: ${envelope.offset}, ${value}');
    envelope.commit('metadata'); // 重要。
  }
  session.close(); // 确保工作完成后关闭session。
}

也可以批量消费消息以提高效率:

import 'dart:io';
import 'dart:async';
import 'package:kafkabr/kafka.dart';

void main(List<String> arguments) async {
  var host = new ContactPoint('127.0.0.1', 9092);
  var session = new KafkaSession([host]);
  var group = new ConsumerGroup(session, 'consumerGroupName');
  var topics = {
    'topicName': [0, 1] // 列出要消费的主题的分区。
  };

  var consumer = new Consumer(session, group, topic, 100, 1);
  await for (BatchEnvelope batch in consumer.batchConsume(20)) {
    batch.items.forEach((MessageEnvelope envelope) {
      // 使用envelope如常
    });
    batch.commit('metadata'); // 使用批量控制方法而不是单个消息。
  }
  session.close(); // 确保工作完成后关闭session。
}

消费者偏移重置策略

由于Kafka主题可以配置为定期删除旧消息,因此消费者的偏移量可能变得无效(因为Kafka主题中已经没有这样的消息/偏移量了)。

在这种情况下,Consumer提供了可配置的策略,有以下选项:

  • OffsetOutOfRangeBehavior.throwError
  • OffsetOutOfRangeBehavior.resetToEarliest (默认)
  • OffsetOutOfRangeBehavior.resetToLatest

默认情况下,如果收到OffsetOutOfRange服务器错误,它将重置其偏移量到所消耗的主题和分区的最早可用偏移量,这实际上意味着从头开始消费所有可用消息。

要修改此行为,请将onOffsetOutOfRange属性设置为上述之一:

var consumer = new Consumer(session, group, topics, 100, 1);
consumer.onOffsetOutOfRange = OffsetOutOfRangeBehavior.throwError;

更多关于Flutter Kafka客户端插件kafkabr的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html

1 回复

更多关于Flutter Kafka客户端插件kafkabr的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html


当然,以下是一个关于如何在Flutter应用中使用kafkabr插件与Kafka进行交互的示例代码。kafkabr是一个Flutter插件,用于与Apache Kafka进行通信。请注意,这个示例假设你已经有一个Kafka集群在运行,并且你已经知道了Kafka的broker地址、topic名称等配置信息。

首先,你需要在你的pubspec.yaml文件中添加kafkabr依赖:

dependencies:
  flutter:
    sdk: flutter
  kafkabr: ^最新版本号  # 请替换为当前最新版本号

然后,运行flutter pub get来安装依赖。

接下来是一个简单的Flutter应用示例,展示了如何使用kafkabr插件来生产消息和消费消息。

import 'package:flutter/material.dart';
import 'package:kafkabr/kafkabr.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatefulWidget {
  @override
  _MyAppState createState() => _MyAppState();
}

class _MyAppState extends State<MyApp> {
  KafkaClient? _kafkaClient;
  String _status = 'Not Connected';
  TextEditingController _messageController = TextEditingController();

  @override
  void initState() {
    super.initState();
    _connectToKafka();
  }

  @override
  void dispose() {
    _kafkaClient?.close();
    _messageController.dispose();
    super.dispose();
  }

  Future<void> _connectToKafka() async {
    // 配置Kafka客户端
    final config = KafkaConfig(
      brokers: ['localhost:9092'], // 替换为你的Kafka broker地址
      clientId: 'flutter_client',
    );

    // 创建Kafka客户端
    _kafkaClient = await KafkaClient.create(config);

    // 监听连接状态变化
    _kafkaClient!.onConnectionOpened.listen((_) {
      setState(() {
        _status = 'Connected';
      });
    });

    _kafkaClient!.onConnectionClosed.listen((_) {
      setState(() {
        _status = 'Not Connected';
      });
    });

    // 开始消费消息
    _consumeMessages();
  }

  Future<void> _consumeMessages() async {
    final consumerConfig = ConsumerConfig(
      groupId: 'flutter_group',
      topic: 'your_topic', // 替换为你的topic名称
      autoCommit: true,
    );

    final consumer = _kafkaClient!.createConsumer(consumerConfig);

    consumer.subscribe();

    consumer.stream.listen((record) {
      print('Consumed message: ${record.value.toString()}');
    });
  }

  Future<void> _produceMessage() async {
    final producerConfig = ProducerConfig(acks: Acks.all);
    final producer = _kafkaClient!.createProducer(producerConfig);

    final record = ProducerRecord<String, String>(
      topic: 'your_topic', // 替换为你的topic名称
      key: 'key',
      value: _messageController.text,
    );

    await producer.send(record);
    print('Produced message: ${_messageController.text}');

    // 清空文本框
    _messageController.clear();
  }

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('Flutter Kafka Client'),
          subtitle: Text(_status),
        ),
        body: Padding(
          padding: const EdgeInsets.all(16.0),
          child: Column(
            crossAxisAlignment: CrossAxisAlignment.start,
            children: <Widget>[
              TextField(
                controller: _messageController,
                decoration: InputDecoration(labelText: 'Message'),
              ),
              SizedBox(height: 16),
              ElevatedButton(
                onPressed: _produceMessage,
                child: Text('Produce Message'),
              ),
            ],
          ),
        ),
      ),
    );
  }
}

在这个示例中,我们做了以下几件事:

  1. pubspec.yaml文件中添加了kafkabr依赖。
  2. 创建了一个Flutter应用,并在initState方法中连接到了Kafka集群。
  3. 配置并开始消费指定的Kafka topic中的消息。
  4. 提供了一个UI界面,允许用户输入消息并通过点击按钮将消息生产到Kafka中。

请注意,你需要根据你的Kafka集群配置(如broker地址、topic名称等)来修改代码中的相应部分。此外,确保你的Kafka集群允许从Flutter应用所在的网络地址进行连接。

回到顶部