Flutter物联网客户端插件pulsar_iot_client的使用

Flutter物联网客户端插件pulsar_iot_client的使用

pulsar_iot_client

一个专注于物联网遥测领域的轻量级Apache Pulsar客户端。该项目旨在通过移除AMQP/MQTT代理并利用消息批处理来提高Pulsar集群的性能或减少运营成本。

核心特性:

  • 完全异步API(生产者、消费者)
  • 透明的主题重新平衡处理(在新代理上重建生产者/消费者)
  • 支持消息批处理。实际上,每个批处理的消息在Pulsar端作为单个账本条目存储,这使得它成为相对较小的周期性数据更新处理的首选方式。
  • OAuth2/令牌基础认证,并支持访问令牌刷新
  • 强制TLS模式以增强应用程序安全性

待办事项:

  • 分区主题支持
  • 负载模式支持

快速开始

生产者

// PulsarClient是入口点
var client = PulsarClient(
  settings: PulsarClientSettings(
    serviceUrl: 'pulsar://localhost:6650',
  ),
);

// 全局错误(不与个别请求完成关联)
client.clientErrorStream.listen((Object error) {
  print("Global error: $error");
});

// 在主题上创建生产者
var producer = await client.newProducer(
  ProducerCreateParams(topic: 'persistent://tenant/namespace/topic')
);

// 发送一条消息。结果包含messageId
var resultSingle = await producer.sendMessage(
  GenericMessage(
    propertyMap: {'key': 'value'},
    payload: Uint8List.fromList('Binary payload'.codeUnits),
  )
);

// 关闭单独的生产者
producer.close();
// 关闭客户端(带有所有相关的生产者、消费者和代理连接)
client.close();

消费者

// 创建具有定义的订阅名称和类型的消费者
var consumer = await client.newConsumer(
  ConsumerCreateParams(
      topic: 'persistent://tenant/namespace/topic',
      subscription: 'subscription-name',
      subType: SubscriptionType.exclusive,
  )
);

// 监听传入的消息
consumer.listen((GenericInputMessage message) {
  // 确认消息(除非它是批处理中的中间消息)
  if (message.storageType != MessageStorageType.batchIntermediate) {
    consumer.ackMessage(message.messageId, false);
  }
});

// 给代理权限推送接下来的100条消息(流控制)
await consumer.getFlow(100);

OAuth2(使用 oauth2 包)

Future<String> _getAuthToken(String? data) {
  var completer = Completer<String>();

  // 另外,如果特定授权类型支持,则可以调用现有oauth2客户端上的refreshCredentials()
  oauth2.clientCredentialsGrant(
    oauth2Endpoint,
    oauth2Identifier,
    oauth2Secret,
  ).then((oauth2client) {
    completer.complete(oauth2client.credentials.accessToken);
  }).catchError((error, trace) { completer.completeError(error, trace); });

  return completer.future;
}

// authTokenProvider 函数将被调用来获取/刷新访问令牌
// 周期性的令牌刷新将由代理触发,无需手动跟踪过期时间
var client = PulsarClient(
  settings: PulsarClientSettings(
    forceTLS: true,
    serviceUrl: 'pulsar+ssl://broker.my-domain.com:6651',
    authTokenProvider: _getAuthToken,
    authTokenRefreshSupported: true,
  ),
);

更多关于Flutter物联网客户端插件pulsar_iot_client的使用的实战教程也可以访问 https://www.itying.com/category-92-b0.html

1 回复

更多关于Flutter物联网客户端插件pulsar_iot_client的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html


在Flutter中集成和使用pulsar_iot_client插件来实现物联网客户端功能,可以通过以下步骤和代码示例来完成。假设你已经有一个Flutter项目,并且已经配置好了开发环境。

1. 添加依赖

首先,你需要在pubspec.yaml文件中添加pulsar_iot_client依赖。注意,这里假设pulsar_iot_client是一个假想的包名,实际使用时请替换为实际的包名。

dependencies:
  flutter:
    sdk: flutter
  pulsar_iot_client: ^x.y.z  # 替换为实际的版本号

然后运行flutter pub get来安装依赖。

2. 导入插件

在你的Dart文件中导入插件:

import 'package:pulsar_iot_client/pulsar_iot_client.dart';

3. 初始化客户端

接下来,你需要初始化Pulsar IoT客户端。这通常包括设置服务器地址、端口、认证信息等。

class PulsarIoTService {
  late PulsarIoTClient _client;

  Future<void> initialize() async {
    final config = PulsarIoTClientConfig(
      serviceUrl: 'pulsar://your-pulsar-service-url:6650',  // 替换为你的Pulsar服务URL和端口
      auth: PulsarAuthData(
        // 如果需要认证,请在这里添加认证信息,例如Token、用户名密码等
        // token: 'your-auth-token',
      ),
    );

    _client = await PulsarIoTClient.connect(config);
  }

  // 其他方法,如发送消息、订阅主题等
}

4. 发送消息

一旦客户端初始化成功,你可以发送消息到指定的主题。

Future<void> sendMessage(String topic, String message) async {
    final producer = await _client.createProducer(topic: topic);
    await producer.send(Utf8Encoder().convert(message));
    await producer.close();
}

5. 订阅主题

你还可以订阅主题以接收消息。

StreamSubscription<String>? _subscription;

Future<void> subscribeToTopic(String topic) async {
    final consumer = await _client.subscribe(topic: topic, subscriptionType: SubscriptionType.Exclusive);

    _subscription = consumer.messages.listen((msg) {
      final messageContent = String.fromCharCodes(msg.data);
      print('Received message: $messageContent');

      // 确认消息已接收
      consumer.acknowledge(msg.messageId);
    });
  }

  // 记得在不再需要订阅时取消订阅
  Future<void> unsubscribe() async {
    await _subscription?.cancel();
    _subscription = null;
  }

6. 关闭客户端

最后,别忘了在不使用客户端时关闭连接。

Future<void> closeClient() async {
    await _client.close();
  }

完整示例

下面是一个完整的示例,将上述步骤整合在一起:

import 'dart:convert';
import 'package:flutter/material.dart';
import 'package:pulsar_iot_client/pulsar_iot_client.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('Pulsar IoT Client Demo'),
        ),
        body: PulsarIoTDemo(),
      ),
    );
  }
}

class PulsarIoTDemo extends StatefulWidget {
  @override
  _PulsarIoTDemoState createState() => _PulsarIoTDemoState();
}

class _PulsarIoTDemoState extends State<PulsarIoTDemo> {
  late PulsarIoTService pulsarService;

  @override
  void initState() {
    super.initState();
    pulsarService = PulsarIoTService();
    pulsarService.initialize().then((_) {
      // 初始化成功后可以发送消息或订阅主题
      pulsarService.sendMessage('my-topic', 'Hello, Pulsar!');
      pulsarService.subscribeToTopic('my-topic');
    });
  }

  @override
  void dispose() {
    pulsarService.unsubscribe();
    pulsarService.closeClient();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Center(
      child: Text('Check console for received messages.'),
    );
  }
}

class PulsarIoTService {
  late PulsarIoTClient _client;

  Future<void> initialize() async {
    final config = PulsarIoTClientConfig(
      serviceUrl: 'pulsar://your-pulsar-service-url:6650',
      // auth: PulsarAuthData(token: 'your-auth-token'), // 如果需要认证,请取消注释并设置
    );

    _client = await PulsarIoTClient.connect(config);
  }

  Future<void> sendMessage(String topic, String message) async {
    final producer = await _client.createProducer(topic: topic);
    await producer.send(Utf8Encoder().convert(message));
    await producer.close();
  }

  StreamSubscription<String>? _subscription;

  Future<void> subscribeToTopic(String topic) async {
    final consumer = await _client.subscribe(topic: topic, subscriptionType: SubscriptionType.Exclusive);

    _subscription = consumer.messages.listen((msg) {
      final messageContent = String.fromCharCodes(msg.data);
      print('Received message: $messageContent');
      consumer.acknowledge(msg.messageId);
    });
  }

  Future<void> unsubscribe() async {
    await _subscription?.cancel();
    _subscription = null;
  }

  Future<void> closeClient() async {
    await _client.close();
  }
}

请注意,由于pulsar_iot_client是一个假想的包名,实际使用时你需要替换为真实存在的Pulsar客户端Flutter插件,并根据该插件的文档进行相应的调整。此外,上述代码示例未处理异常和错误处理,实际开发中应添加适当的错误处理逻辑。

回到顶部