Flutter Kafka客户端插件kafkabr的使用
Flutter Kafka客户端插件kafkabr的使用
安装
目前没有Pub包,但API稳定后会发布。 可以通过git依赖在pubspec.yaml
中添加:
dependencies:
kafka:
git: https://github.com/armando-couto/kafkabr.git
然后像平常一样导入:
import 'package:kafkabr/kafka.dart';
特性
此库提供了几个高阶API对象来与Kafka交互:
- KafkaSession - 负责管理到Kafka代理的连接并协调所有请求。也提供元数据信息访问。
- Producer - 发布消息到Kafka主题
- Consumer - 从Kafka主题消费消息并存储其状态(当前偏移量)。利用ConsumerMetadata API通过ConsumerGroup。
- Fetcher - 消费消息而不存储状态。
- OffsetMaster - 提供方便的接口允许轻松检索特定topic-partitions的最早和最新偏移量。
- ConsumerGroup - 提供方便的接口利用Consumer Metadata API轻松获取或提交消费者偏移量。
生产者
简单的实现的Kafka生产者。支持自动检测leader,并为每个broker创建单独的ProduceRequest
。请求是并发发送的,所有响应都聚集在特殊ProduceResult
对象中。
// file:produce.dart
import 'dart:io';
import 'package:kafkabr/kafka.dart';
main(List<String> arguments) async {
var host = new ContactPoint('127.0.0.1', 9092);
var session = new KafkaSession([host]);
var producer = new Producer(session, 1, 1000);
var result = await producer.produce([
new ProduceEnvelope('topicName', 0, [new Message('msgForPartition0'.codeUnits)]),
new ProduceEnvelope('topicName', 1, [new Message('msgForPartition1'.codeUnits)])
]);
print(result.hasErrors);
print(result.offsets);
session.close(); // 确保工作完成后关闭session。
}
结果:
$ dart produce.dart
$ false
$ {dartKafkaTest: {0: 213075, 1: 201680}}
消费者
高层次的Kafka消费者实现,使用Kafka的ConsumerMetadata API存储其状态。
如果不需要保存消费偏移量的状态,请查看Fetcher
,它专门为此用例设计。
消费者返回消息作为Stream
,因此所有标准流操作都适用。但是Kafka主题是有序的消息流,带有连续的偏移量。消费者实现了保持从服务器接收的消息顺序的功能。为此目的,所有消息都被包裹在特殊的MessageEnvelope
对象中,具有以下方法:
/// 信号给消费者,表示消息已被处理且可以提交偏移量。
void commit(String metadata);
/// 信号消息已被处理且我们已准备好下一个。此消息的偏移量将不会被提交。
void ack();
/// 信号给消费者取消任何进一步的交付并关闭流。
void cancel();
必须调用commit()
或ack()
以处理每条消息,否则消费者将不会发送下一条消息到流。
最简单的的消费者示例:
import 'dart:io';
import 'dart:async';
import 'package:kafkabr/kafka.dart';
void main(List<String> arguments) async {
var host = new ContactPoint('127.0.0.1', 9092);
var session = new KafkaSession([host]);
var group = new ConsumerGroup(session, 'consumerGroupName');
var topics = {
'topicName': [0, 1] // 列出要消费的主题的分区。
};
var consumer = new Consumer(session, group, topics, 100, 1);
await for (MessageEnvelope envelope in consumer.consume(limit: 3)) {
// 假设消息由上一个示例中的Producer产生。
var value = new String.fromCharCodes(envelope.message.value);
print('Got message: ${envelope.offset}, ${value}');
envelope.commit('metadata'); // 重要。
}
session.close(); // 确保工作完成后关闭session。
}
也可以批量消费消息以提高效率:
import 'dart:io';
import 'dart:async';
import 'package:kafkabr/kafka.dart';
void main(List<String> arguments) async {
var host = new ContactPoint('127.0.0.1', 9092);
var session = new KafkaSession([host]);
var group = new ConsumerGroup(session, 'consumerGroupName');
var topics = {
'topicName': [0, 1] // 列出要消费的主题的分区。
};
var consumer = new Consumer(session, group, topic, 100, 1);
await for (BatchEnvelope batch in consumer.batchConsume(20)) {
batch.items.forEach((MessageEnvelope envelope) {
// 使用envelope如常
});
batch.commit('metadata'); // 使用批量控制方法而不是单个消息。
}
session.close(); // 确保工作完成后关闭session。
}
消费者偏移重置策略
由于Kafka主题可以配置为定期删除旧消息,因此消费者的偏移量可能变得无效(因为Kafka主题中已经没有这样的消息/偏移量了)。
在这种情况下,Consumer
提供了可配置的策略,有以下选项:
OffsetOutOfRangeBehavior.throwError
OffsetOutOfRangeBehavior.resetToEarliest
(默认)OffsetOutOfRangeBehavior.resetToLatest
默认情况下,如果收到OffsetOutOfRange
服务器错误,它将重置其偏移量到所消耗的主题和分区的最早可用偏移量,这实际上意味着从头开始消费所有可用消息。
要修改此行为,请将onOffsetOutOfRange
属性设置为上述之一:
var consumer = new Consumer(session, group, topics, 100, 1);
consumer.onOffsetOutOfRange = OffsetOutOfRangeBehavior.throwError;
更多关于Flutter Kafka客户端插件kafkabr的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html
更多关于Flutter Kafka客户端插件kafkabr的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html
当然,以下是一个关于如何在Flutter应用中使用kafkabr
插件与Kafka进行交互的示例代码。kafkabr
是一个Flutter插件,用于与Apache Kafka进行通信。请注意,这个示例假设你已经有一个Kafka集群在运行,并且你已经知道了Kafka的broker地址、topic名称等配置信息。
首先,你需要在你的pubspec.yaml
文件中添加kafkabr
依赖:
dependencies:
flutter:
sdk: flutter
kafkabr: ^最新版本号 # 请替换为当前最新版本号
然后,运行flutter pub get
来安装依赖。
接下来是一个简单的Flutter应用示例,展示了如何使用kafkabr
插件来生产消息和消费消息。
import 'package:flutter/material.dart';
import 'package:kafkabr/kafkabr.dart';
void main() {
runApp(MyApp());
}
class MyApp extends StatefulWidget {
@override
_MyAppState createState() => _MyAppState();
}
class _MyAppState extends State<MyApp> {
KafkaClient? _kafkaClient;
String _status = 'Not Connected';
TextEditingController _messageController = TextEditingController();
@override
void initState() {
super.initState();
_connectToKafka();
}
@override
void dispose() {
_kafkaClient?.close();
_messageController.dispose();
super.dispose();
}
Future<void> _connectToKafka() async {
// 配置Kafka客户端
final config = KafkaConfig(
brokers: ['localhost:9092'], // 替换为你的Kafka broker地址
clientId: 'flutter_client',
);
// 创建Kafka客户端
_kafkaClient = await KafkaClient.create(config);
// 监听连接状态变化
_kafkaClient!.onConnectionOpened.listen((_) {
setState(() {
_status = 'Connected';
});
});
_kafkaClient!.onConnectionClosed.listen((_) {
setState(() {
_status = 'Not Connected';
});
});
// 开始消费消息
_consumeMessages();
}
Future<void> _consumeMessages() async {
final consumerConfig = ConsumerConfig(
groupId: 'flutter_group',
topic: 'your_topic', // 替换为你的topic名称
autoCommit: true,
);
final consumer = _kafkaClient!.createConsumer(consumerConfig);
consumer.subscribe();
consumer.stream.listen((record) {
print('Consumed message: ${record.value.toString()}');
});
}
Future<void> _produceMessage() async {
final producerConfig = ProducerConfig(acks: Acks.all);
final producer = _kafkaClient!.createProducer(producerConfig);
final record = ProducerRecord<String, String>(
topic: 'your_topic', // 替换为你的topic名称
key: 'key',
value: _messageController.text,
);
await producer.send(record);
print('Produced message: ${_messageController.text}');
// 清空文本框
_messageController.clear();
}
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
appBar: AppBar(
title: Text('Flutter Kafka Client'),
subtitle: Text(_status),
),
body: Padding(
padding: const EdgeInsets.all(16.0),
child: Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: <Widget>[
TextField(
controller: _messageController,
decoration: InputDecoration(labelText: 'Message'),
),
SizedBox(height: 16),
ElevatedButton(
onPressed: _produceMessage,
child: Text('Produce Message'),
),
],
),
),
),
);
}
}
在这个示例中,我们做了以下几件事:
- 在
pubspec.yaml
文件中添加了kafkabr
依赖。 - 创建了一个Flutter应用,并在
initState
方法中连接到了Kafka集群。 - 配置并开始消费指定的Kafka topic中的消息。
- 提供了一个UI界面,允许用户输入消息并通过点击按钮将消息生产到Kafka中。
请注意,你需要根据你的Kafka集群配置(如broker地址、topic名称等)来修改代码中的相应部分。此外,确保你的Kafka集群允许从Flutter应用所在的网络地址进行连接。