golang超高性能分布式Actor模型开发插件库ProtoActor的使用
Golang超高性能分布式Actor模型开发插件库ProtoActor的使用
ProtoActor是一个跨平台的Actor模型实现,支持Go和C#之间的交互。以下是关于ProtoActor的详细介绍和使用示例。
设计原则
- 最小化API:API小巧易用,避免企业级JVM式的容器和配置
- 基于现有技术:利用gRPC流进行网络传输,Protobuf进行序列化
- 传递数据而非对象:显式处理序列化问题
- 高性能:不因API魔法而牺牲性能
ProtoActor的远程通信性能极佳,在两个节点之间每秒可传递超过200万条消息,同时保持消息顺序。
基础使用示例
Hello World示例
type Hello struct{ Who string }
type HelloActor struct{}
func (state *HelloActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case Hello:
fmt.Printf("Hello %v\n", msg.Who)
}
}
func main() {
context := actor.EmptyRootContext
props := actor.PropsFromProducer(func() actor.Actor { return &HelloActor{} })
pid, err := context.Spawn(props)
if err != nil {
panic(err)
}
context.Send(pid, Hello{Who: "Roger"})
console.ReadLine()
}
状态机示例
type Hello struct{ Who string }
type SetBehaviorActor struct{}
func (state *SetBehaviorActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case Hello:
fmt.Printf("Hello %v\n", msg.Who)
context.SetBehavior(state.Other)
}
}
func (state *SetBehaviorActor) Other(context actor.Context) {
switch msg := context.Message().(type) {
case Hello:
fmt.Printf("%v, ey we are now handling messages in another behavior", msg.Who)
}
}
func NewSetBehaviorActor() actor.Actor {
return &SetBehaviorActor{}
}
func main() {
context := actor.EmptyRootContext
props := actor.PropsFromProducer(NewSetBehaviorActor)
pid, err := context.Spawn(props)
if err != nil {
panic(err)
}
context.Send(pid, Hello{Who: "Roger"})
context.Send(pid, Hello{Who: "Roger"})
console.ReadLine()
}
生命周期事件
type Hello struct{ Who string }
type HelloActor struct{}
func (state *HelloActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
fmt.Println("Started, initialize actor here")
case *actor.Stopping:
fmt.Println("Stopping, actor is about shut down")
case *actor.Stopped:
fmt.Println("Stopped, actor and its children are stopped")
case *actor.Restarting:
fmt.Println("Restarting, actor is about restart")
case Hello:
fmt.Printf("Hello %v\n", msg.Who)
}
}
func main() {
context := actor.EmptyRootContext
props := actor.PropsFromProducer(func() actor.Actor { return &HelloActor{} })
pid, err := context.Spawn(props)
if err != nil {
panic(err)
}
context.Send(pid, Hello{Who: "Roger"})
// 等待1秒以展示正确的生命周期事件顺序
time.Sleep(1 * time.Second)
context.Stop(pid)
console.ReadLine()
}
监督策略示例
type Hello struct{ Who string }
type ParentActor struct{}
func (state *ParentActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case Hello:
props := actor.PropsFromProducer(NewChildActor)
child := context.Spawn(props)
context.Send(child, msg)
}
}
func NewParentActor() actor.Actor {
return &ParentActor{}
}
type ChildActor struct{}
func (state *ChildActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
fmt.Println("Starting, initialize actor here")
case *actor.Stopping:
fmt.Println("Stopping, actor is about shut down")
case *actor.Stopped:
fmt.Println("Stopped, actor and its children are stopped")
case *actor.Restarting:
fmt.Println("Restarting, actor is about restart")
case Hello:
fmt.Printf("Hello %v\n", msg.Who)
panic("Ouch")
}
}
func NewChildActor() actor.Actor {
return &ChildActor{}
}
func main() {
decider := func(reason interface{}) actor.Directive {
log.Printf("handling failure for child. reason:%v", reason)
return actor.RestartDirective
}
supervisor := actor.NewOneForOneStrategy(10, 1000, decider)
ctx := actor.NewActorSystem().Root
props := actor.PropsFromProducer(NewParentActor).WithSupervisor(supervisor)
pid := ctx.Spawn(props)
ctx.Send(pid, Hello{Who: "Roger"})
console.ReadLine()
}
网络/远程通信示例
节点1代码
type MyActor struct {
count int
}
func (state *MyActor) Receive(context actor.Context) {
switch context.Message().(type) {
case *messages.Response:
state.count++
fmt.Println(state.count)
}
}
func main() {
remote.Start("localhost:8090")
context := actor.EmptyRootContext
props := actor.PropsFromProducer(func() actor.Actor { return &MyActor{} })
pid, _ := context.Spawn(props)
message := &messages.Echo{Message: "hej", Sender: pid}
// 创建远程actor
spawnResponse, _ := remote.SpawnNamed("localhost:8091", "myactor", "hello", time.Second)
// 获取创建的PID
spawnedPID := spawnResponse.Pid
for i := 0; i < 10; i++ {
context.Send(spawnedPID, message)
}
console.ReadLine()
}
节点2代码
type MyActor struct{}
func (*MyActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *messages.Echo:
context.Send(msg.Sender, &messages.Response{
SomeValue: "result",
})
}
}
func main() {
remote.Start("localhost:8091")
// 注册本地actor以便远程创建
remote.Register("hello", actor.PropsFromProducer(func() actor.Actor { return &MyActor{} }))
console.ReadLine()
}
消息协议定义
syntax = "proto3";
package messages;
import "actor.proto"; // 需要导入actor.proto以便消息可以包含PID
// 节点1发送给节点2的消息
message Echo {
actor.PID Sender = 1; // 远程actor应回复的PID
string Message = 2;
}
// 远程actor应回复的消息
message Response {
string SomeValue = 1;
}
构建和测试
确保$GOPATH
变量正确设置后,执行以下命令:
go get github.com/asynkron/protoactor-go/...
cd $GOPATH/src/github.com/asynkron/protoactor-go
go get ./...
make
运行测试:
go test `go list ./... | grep -v "/examples/" | grep -v "/persistence" | grep -v "/scheduler"`
ProtoActor是一个仍在beta阶段的Go实现,但已经有一些生产环境用户。请注意API在1.0版本前可能会发生变化。
更多关于golang超高性能分布式Actor模型开发插件库ProtoActor的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang超高性能分布式Actor模型开发插件库ProtoActor的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
ProtoActor: Go语言高性能分布式Actor模型框架
ProtoActor是一个基于Go语言的高性能分布式Actor模型实现,它借鉴了Erlang和Akka的设计理念,同时针对Go语言的特点进行了优化。下面我将详细介绍ProtoActor的核心概念和使用方法。
核心概念
- Actor模型:将并发实体抽象为Actor,每个Actor有自己的状态和行为,通过消息传递进行通信
- PID:每个Actor都有一个唯一进程标识符(Process ID)
- Mailbox:每个Actor都有一个消息队列,用于接收和处理消息
安装ProtoActor
go get github.com/asynkron/protoactor-go
基本使用示例
1. 创建Actor
package main
import (
"fmt"
"time"
"github.com/asynkron/protoactor-go/actor"
)
// 定义Actor结构体
type HelloActor struct{}
// 实现Receive方法处理消息
func (state *HelloActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case string:
fmt.Printf("HelloActor received: %v\n", msg)
context.Respond("Hello " + msg)
}
}
func main() {
// 创建Actor系统
system := actor.NewActorSystem()
// 定义Actor属性
props := actor.PropsFromProducer(func() actor.Actor { return &HelloActor{} })
// 生成Actor并获取PID
pid := system.Root.Spawn(props)
// 发送消息并等待响应
future := system.Root.RequestFuture(pid, "World", 5*time.Second)
// 处理响应
result, err := future.Result()
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("Received response:", result)
}
2. 父子Actor和监管策略
type ParentActor struct{}
func (state *ParentActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
// 创建子Actor
props := actor.PropsFromProducer(func() actor.Actor { return &ChildActor{} })
child := context.Spawn(props)
context.Watch(child) // 监控子Actor
case string:
fmt.Printf("Parent received: %v\n", msg)
}
}
type ChildActor struct{}
func (state *ChildActor) Receive(context actor.Context) {
switch context.Message().(type) {
case string:
fmt.Println("Child received message")
panic("child failed") // 模拟子Actor失败
}
}
func main() {
system := actor.NewActorSystem()
// 定义监管策略
supervisor := actor.NewOneForOneStrategy(10, time.Second, func(reason interface{}) actor.Directive {
fmt.Println("Child failed, restarting:", reason)
return actor.RestartDirective
})
props := actor.PropsFromProducer(func() actor.Actor { return &ParentActor{} }).
WithSupervisor(supervisor)
pid := system.Root.Spawn(props)
system.Root.Send(pid, "test")
time.Sleep(time.Second) // 等待子Actor重启
}
3. 集群功能(分布式Actor)
package main
import (
"fmt"
"log"
"time"
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/cluster"
"github.com/asynkron/protoactor-go/cluster/clusterproviders/consul"
"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"
"github.com/asynkron/protoactor-go/remote"
)
// 定义Grain接口和实现
type HelloGrain interface {
SayHello(name string) (*HelloResponse, error)
}
type HelloResponse struct {
Message string
}
type helloGrain struct {
ctx cluster.GrainContext
}
func (h *helloGrain) SayHello(name string) (*HelloResponse, error) {
return &HelloResponse{Message: "Hello " + name}, nil
}
func main() {
// 设置集群
system := actor.NewActorSystem()
// 使用Consul作为集群提供者
provider := consul.New()
lookup := disthash.New()
config := remote.Configure("localhost", 0)
// 创建集群
clusterConfig := cluster.Configure("my-cluster", provider, lookup, config)
c := cluster.New(system, clusterConfig)
// 注册Grain
clusterKind := cluster.NewKind(
"HelloGrain",
actor.PropsFromProducer(func() actor.Actor {
return &helloGrain{}
}),
)
c.RegisterKind(clusterKind)
// 启动集群
c.StartMember()
// 获取Grain客户端
client := cluster.GetCluster(system).Get("HelloGrain", "grain1")
hello := client.(HelloGrain)
// 调用远程Grain
resp, err := hello.SayHello("World")
if err != nil {
log.Fatal(err)
}
fmt.Println(resp.Message)
time.Sleep(time.Minute)
c.Shutdown(false)
}
性能优化技巧
- 减少消息拷贝:使用指针传递消息而非值拷贝
- 批量处理:对于高频小消息,考虑批量处理
- 合理设置Mailbox大小:避免过大导致内存占用或过小导致阻塞
- 使用Future:异步处理耗时操作
- 集群分区:合理设计Grain的key分布
适用场景
- 高并发消息处理系统
- 分布式计算任务
- 游戏服务器
- IoT设备管理
- 实时数据处理
ProtoActor结合了Go语言的高性能和Actor模型的并发优势,是构建分布式系统的强大工具。通过合理设计Actor层级和消息流,可以构建出高吞吐量、低延迟的分布式应用。
更多高级功能和详细文档可以参考ProtoActor官方GitHub仓库。