Flutter集群管理插件actor_cluster的使用

Flutter集群管理插件actor_cluster的使用

概述

⚠️ ALPHA ⚠️

此插件目前处于开发初期阶段。经常会有破坏性的变更。该插件尚未添加测试。

Actor Cluster

将多个actor系统连接到一个集群。

特性

  • actor系统可以在不同的隔离区(isolate)和/或进程中运行。
  • 使用自定义的序列化器/反序列化器在网络上传输消息。
  • 当种子节点断开连接时,自动重新连接。
  • 集群根据运行时指标决定在哪个节点上创建actor。
  • 或者用户可以使用全限定的actor路径或节点标签来决定。

使用方法

示例代码

import 'dart:convert';
import 'dart:typed_data';

import 'package:actor_cluster/actor_cluster.dart';
import 'package:logging/logging.dart';

// 自定义的序列化器/反序列化器
class StringSerDes implements SerDes {
  [@override](/user/override)
  Object? deserialize(Uint8List data) {
    return utf8.decode(data); // 将字节数组解码为字符串
  }

  [@override](/user/override)
  Uint8List serialize(Object? message) {
    if (message is String) {
      return Uint8List.fromList(utf8.encode(message)); // 将字符串编码为字节数组
    }
    throw ArgumentError.value(message, 'message', 'not of type String');
  }
}

// 初始化日志记录
void initLogging() {
  Logger.root.level = Level.ALL;
  Logger.root.onRecord.listen((record) {
    if (record.loggerName.startsWith('actor://')) {
      print('${record.loggerName} ${record.level} ${record.message}');
    }
  });
}

void main(List<String> args) async {
  // 创建ActorCluster实例
  final clusterNode = ActorCluster(
    await readClusterConfigFromYaml('example/cluster.yaml'), // 从YAML文件读取集群配置
    await readNodeConfigFromYaml('example/${args[0]}.yaml'), // 从YAML文件读取节点配置
    StringSerDes(), // 使用自定义的序列化器
    initWorkerIsolate: (nodeId, workerId) => initLogging(), // 初始化日志记录
  );

  // 初始化集群
  await clusterNode.init(
    addActorFactories: (addActorFactory) {
      // 添加actor工厂
      addActorFactory(patternMatcher('/actor/1'), (path) {
        return (ActorContext context, Object? msg) async {
          final log = Logger(context.current.path.toString());
          final replyTo = context.replyTo;
          log.info('message: $msg'); // 打印消息
          log.info('correlationId: ${context.correlationId}'); // 打印相关ID
          log.info('replyTo: ${replyTo?.path}'); // 打印回复目标
          replyTo?.send(msg); // 发送消息
        };
      });

      addActorFactory(patternMatcher('/actor/2'), (path) {
        return (ActorContext context, Object? msg) async {
          final log = Logger(context.current.path.toString());
          log.info('message: $msg'); // 打印消息
          log.info('correlationId: ${context.correlationId}'); // 打印相关ID
          log.info('sender: ${context.sender?.path}'); // 打印发件人
          final actorRef = await context.lookupActor(actorPath('/actor/3')); // 查找actor
          actorRef?.send(msg); // 发送消息
        };
      });

      addActorFactory(patternMatcher('/actor/3'), (path) {
        return (ActorContext context, Object? msg) {
          final log = Logger(context.current.path.toString());
          log.info('message: $msg'); // 打印消息
          log.info('correlationId: ${context.correlationId}'); // 打印相关ID
          log.info('sender: ${context.sender?.path}'); // 打印发件人
        };
      });

      addActorFactory(patternMatcher('/foo'), (path) {
        return (ActorContext context, Object? msg) {
          final log = Logger(context.current.path.toString());
          log.info('message: $msg'); // 打印消息
        };
      });

      addActorFactory(patternMatcher('/bar'), (path) {
        return (ActorContext context, Object? msg) {
          final log = Logger(context.current.path.toString());
          log.info('message: $msg'); // 打印消息
        };
      });
    },
    initNode: (CreateActor createActor, List<String> tags) async {
      final actor = await createActor(actorPath('/foo'), 1000); // 创建actor
      await actor.send('test message'); // 发送消息
    },
    initCluster: (context) async {
      try {
        await context.createActor(actorPath('/bar', tag: 'foobar')); // 创建actor
        await context.createActor(actorPath('/bar', tag: 'foobar'));
        await context.createActor(actorPath('/bar', tag: 'foobar'));
      } catch (e) {
        print(e); // 打印异常信息
      }

      final actorRef1 = await context.createActor(Uri.parse('//node1/actor/1')); // 创建actor引用
      final actorRef2 = await context.createActor(Uri.parse('//node2/actor/2')); // 创建actor引用
      await context.createActor(Uri.parse('//node1/actor/3')); // 创建actor
      await actorRef1.send('hello cluster actor!', correlationId: '101', replyTo: actorRef2); // 发送消息

      print(await context.lookupActors(actorPath('/'))); // 查找所有actors
    },
  );
}

更多关于Flutter集群管理插件actor_cluster的使用的实战教程也可以访问 https://www.itying.com/category-92-b0.html

1 回复

更多关于Flutter集群管理插件actor_cluster的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html


actor_cluster 是一个用于 Flutter 的集群管理插件,它基于 Actor 模型,适用于构建分布式、可扩展的应用程序。通过 actor_cluster,你可以在多个节点之间分发任务,并管理这些节点之间的通信。

安装

首先,你需要在 pubspec.yaml 文件中添加 actor_cluster 依赖:

dependencies:
  actor_cluster: ^0.1.0  # 请根据实际情况选择最新版本

然后运行 flutter pub get 来安装依赖。

基本使用

1. 创建 Actor 系统

在使用 actor_cluster 之前,你需要创建一个 Actor 系统。Actor 系统是管理和调度 Actor 的基础设施。

import 'package:actor_cluster/actor_cluster.dart';

void main() {
  final actorSystem = ActorSystem('myActorSystem');
}

2. 创建 Actor

Actor 是系统中的基本执行单元。你可以通过继承 Actor 类来创建自定义的 Actor。

class MyActor extends Actor {
  [@override](/user/override)
  void onReceive(Object message) {
    print('Received message: $message');
  }
}

然后,你可以在 Actor 系统中注册这个 Actor:

final myActorRef = actorSystem.actorOf('myActor', () => MyActor());

3. 发送消息

你可以通过 ActorRef 向 Actor 发送消息:

myActorRef.tell('Hello, Actor!');

4. 集群管理

actor_cluster 支持集群管理,你可以在多个节点之间分发任务。首先,你需要配置集群:

final cluster = Cluster(actorSystem);
cluster.join('127.0.0.1', 2551);  // 加入集群

5. 远程 Actor

你可以在集群中的不同节点之间创建远程 Actor 并发送消息:

final remoteActorRef = actorSystem.actorFor('akka.tcp://myActorSystem@127.0.0.1:2552/user/myActor');
remoteActorRef.tell('Hello from another node!');

示例代码

以下是一个完整的示例,展示了如何创建 Actor 系统、Actor 以及如何在集群中发送消息:

import 'package:actor_cluster/actor_cluster.dart';

class MyActor extends Actor {
  [@override](/user/override)
  void onReceive(Object message) {
    print('Received message: $message');
  }
}

void main() {
  final actorSystem = ActorSystem('myActorSystem');
  final myActorRef = actorSystem.actorOf('myActor', () => MyActor());

  myActorRef.tell('Hello, Actor!');

  final cluster = Cluster(actorSystem);
  cluster.join('127.0.0.1', 2551);

  final remoteActorRef = actorSystem.actorFor('akka.tcp://myActorSystem@127.0.0.1:2552/user/myActor');
  remoteActorRef.tell('Hello from another node!');
}
回到顶部