Flutter物联网客户端插件pulsar_iot_client的使用
Flutter物联网客户端插件pulsar_iot_client的使用
pulsar_iot_client
一个专注于物联网遥测领域的轻量级Apache Pulsar客户端。该项目旨在通过移除AMQP/MQTT代理并利用消息批处理来提高Pulsar集群的性能或减少运营成本。
核心特性:
- 完全异步API(生产者、消费者)
- 透明的主题重新平衡处理(在新代理上重建生产者/消费者)
- 支持消息批处理。实际上,每个批处理的消息在Pulsar端作为单个账本条目存储,这使得它成为相对较小的周期性数据更新处理的首选方式。
- OAuth2/令牌基础认证,并支持访问令牌刷新
- 强制TLS模式以增强应用程序安全性
待办事项:
- 分区主题支持
- 负载模式支持
快速开始
生产者
// PulsarClient是入口点
var client = PulsarClient(
settings: PulsarClientSettings(
serviceUrl: 'pulsar://localhost:6650',
),
);
// 全局错误(不与个别请求完成关联)
client.clientErrorStream.listen((Object error) {
print("Global error: $error");
});
// 在主题上创建生产者
var producer = await client.newProducer(
ProducerCreateParams(topic: 'persistent://tenant/namespace/topic')
);
// 发送一条消息。结果包含messageId
var resultSingle = await producer.sendMessage(
GenericMessage(
propertyMap: {'key': 'value'},
payload: Uint8List.fromList('Binary payload'.codeUnits),
)
);
// 关闭单独的生产者
producer.close();
// 关闭客户端(带有所有相关的生产者、消费者和代理连接)
client.close();
消费者
// 创建具有定义的订阅名称和类型的消费者
var consumer = await client.newConsumer(
ConsumerCreateParams(
topic: 'persistent://tenant/namespace/topic',
subscription: 'subscription-name',
subType: SubscriptionType.exclusive,
)
);
// 监听传入的消息
consumer.listen((GenericInputMessage message) {
// 确认消息(除非它是批处理中的中间消息)
if (message.storageType != MessageStorageType.batchIntermediate) {
consumer.ackMessage(message.messageId, false);
}
});
// 给代理权限推送接下来的100条消息(流控制)
await consumer.getFlow(100);
OAuth2(使用 oauth2
包)
Future<String> _getAuthToken(String? data) {
var completer = Completer<String>();
// 另外,如果特定授权类型支持,则可以调用现有oauth2客户端上的refreshCredentials()
oauth2.clientCredentialsGrant(
oauth2Endpoint,
oauth2Identifier,
oauth2Secret,
).then((oauth2client) {
completer.complete(oauth2client.credentials.accessToken);
}).catchError((error, trace) { completer.completeError(error, trace); });
return completer.future;
}
// authTokenProvider 函数将被调用来获取/刷新访问令牌
// 周期性的令牌刷新将由代理触发,无需手动跟踪过期时间
var client = PulsarClient(
settings: PulsarClientSettings(
forceTLS: true,
serviceUrl: 'pulsar+ssl://broker.my-domain.com:6651',
authTokenProvider: _getAuthToken,
authTokenRefreshSupported: true,
),
);
更多关于Flutter物联网客户端插件pulsar_iot_client的使用的实战教程也可以访问 https://www.itying.com/category-92-b0.html
更多关于Flutter物联网客户端插件pulsar_iot_client的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html
在Flutter中集成和使用pulsar_iot_client
插件来实现物联网客户端功能,可以通过以下步骤和代码示例来完成。假设你已经有一个Flutter项目,并且已经配置好了开发环境。
1. 添加依赖
首先,你需要在pubspec.yaml
文件中添加pulsar_iot_client
依赖。注意,这里假设pulsar_iot_client
是一个假想的包名,实际使用时请替换为实际的包名。
dependencies:
flutter:
sdk: flutter
pulsar_iot_client: ^x.y.z # 替换为实际的版本号
然后运行flutter pub get
来安装依赖。
2. 导入插件
在你的Dart文件中导入插件:
import 'package:pulsar_iot_client/pulsar_iot_client.dart';
3. 初始化客户端
接下来,你需要初始化Pulsar IoT客户端。这通常包括设置服务器地址、端口、认证信息等。
class PulsarIoTService {
late PulsarIoTClient _client;
Future<void> initialize() async {
final config = PulsarIoTClientConfig(
serviceUrl: 'pulsar://your-pulsar-service-url:6650', // 替换为你的Pulsar服务URL和端口
auth: PulsarAuthData(
// 如果需要认证,请在这里添加认证信息,例如Token、用户名密码等
// token: 'your-auth-token',
),
);
_client = await PulsarIoTClient.connect(config);
}
// 其他方法,如发送消息、订阅主题等
}
4. 发送消息
一旦客户端初始化成功,你可以发送消息到指定的主题。
Future<void> sendMessage(String topic, String message) async {
final producer = await _client.createProducer(topic: topic);
await producer.send(Utf8Encoder().convert(message));
await producer.close();
}
5. 订阅主题
你还可以订阅主题以接收消息。
StreamSubscription<String>? _subscription;
Future<void> subscribeToTopic(String topic) async {
final consumer = await _client.subscribe(topic: topic, subscriptionType: SubscriptionType.Exclusive);
_subscription = consumer.messages.listen((msg) {
final messageContent = String.fromCharCodes(msg.data);
print('Received message: $messageContent');
// 确认消息已接收
consumer.acknowledge(msg.messageId);
});
}
// 记得在不再需要订阅时取消订阅
Future<void> unsubscribe() async {
await _subscription?.cancel();
_subscription = null;
}
6. 关闭客户端
最后,别忘了在不使用客户端时关闭连接。
Future<void> closeClient() async {
await _client.close();
}
完整示例
下面是一个完整的示例,将上述步骤整合在一起:
import 'dart:convert';
import 'package:flutter/material.dart';
import 'package:pulsar_iot_client/pulsar_iot_client.dart';
void main() {
runApp(MyApp());
}
class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
appBar: AppBar(
title: Text('Pulsar IoT Client Demo'),
),
body: PulsarIoTDemo(),
),
);
}
}
class PulsarIoTDemo extends StatefulWidget {
@override
_PulsarIoTDemoState createState() => _PulsarIoTDemoState();
}
class _PulsarIoTDemoState extends State<PulsarIoTDemo> {
late PulsarIoTService pulsarService;
@override
void initState() {
super.initState();
pulsarService = PulsarIoTService();
pulsarService.initialize().then((_) {
// 初始化成功后可以发送消息或订阅主题
pulsarService.sendMessage('my-topic', 'Hello, Pulsar!');
pulsarService.subscribeToTopic('my-topic');
});
}
@override
void dispose() {
pulsarService.unsubscribe();
pulsarService.closeClient();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Center(
child: Text('Check console for received messages.'),
);
}
}
class PulsarIoTService {
late PulsarIoTClient _client;
Future<void> initialize() async {
final config = PulsarIoTClientConfig(
serviceUrl: 'pulsar://your-pulsar-service-url:6650',
// auth: PulsarAuthData(token: 'your-auth-token'), // 如果需要认证,请取消注释并设置
);
_client = await PulsarIoTClient.connect(config);
}
Future<void> sendMessage(String topic, String message) async {
final producer = await _client.createProducer(topic: topic);
await producer.send(Utf8Encoder().convert(message));
await producer.close();
}
StreamSubscription<String>? _subscription;
Future<void> subscribeToTopic(String topic) async {
final consumer = await _client.subscribe(topic: topic, subscriptionType: SubscriptionType.Exclusive);
_subscription = consumer.messages.listen((msg) {
final messageContent = String.fromCharCodes(msg.data);
print('Received message: $messageContent');
consumer.acknowledge(msg.messageId);
});
}
Future<void> unsubscribe() async {
await _subscription?.cancel();
_subscription = null;
}
Future<void> closeClient() async {
await _client.close();
}
}
请注意,由于pulsar_iot_client
是一个假想的包名,实际使用时你需要替换为真实存在的Pulsar客户端Flutter插件,并根据该插件的文档进行相应的调整。此外,上述代码示例未处理异常和错误处理,实际开发中应添加适当的错误处理逻辑。