golang超高性能分布式Actor模型开发插件库ProtoActor的使用

Golang超高性能分布式Actor模型开发插件库ProtoActor的使用

ProtoActor是一个跨平台的Actor模型实现,支持Go和C#之间的交互。以下是关于ProtoActor的详细介绍和使用示例。

设计原则

  • 最小化API:API小巧易用,避免企业级JVM式的容器和配置
  • 基于现有技术:利用gRPC流进行网络传输,Protobuf进行序列化
  • 传递数据而非对象:显式处理序列化问题
  • 高性能:不因API魔法而牺牲性能

ProtoActor的远程通信性能极佳,在两个节点之间每秒可传递超过200万条消息,同时保持消息顺序。

基础使用示例

Hello World示例

type Hello struct{ Who string }
type HelloActor struct{}

func (state *HelloActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case Hello:
        fmt.Printf("Hello %v\n", msg.Who)
    }
}

func main() {
    context := actor.EmptyRootContext
    props := actor.PropsFromProducer(func() actor.Actor { return &HelloActor{} })
    pid, err := context.Spawn(props)
    if err != nil {
        panic(err)
    }
    context.Send(pid, Hello{Who: "Roger"})
    console.ReadLine()
}

状态机示例

type Hello struct{ Who string }
type SetBehaviorActor struct{}

func (state *SetBehaviorActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case Hello:
        fmt.Printf("Hello %v\n", msg.Who)
        context.SetBehavior(state.Other)
    }
}

func (state *SetBehaviorActor) Other(context actor.Context) {
    switch msg := context.Message().(type) {
    case Hello:
        fmt.Printf("%v, ey we are now handling messages in another behavior", msg.Who)
    }
}

func NewSetBehaviorActor() actor.Actor {
    return &SetBehaviorActor{}
}

func main() {
    context := actor.EmptyRootContext
    props := actor.PropsFromProducer(NewSetBehaviorActor)
    pid, err := context.Spawn(props)
    if err != nil {
        panic(err)
    }
    context.Send(pid, Hello{Who: "Roger"})
    context.Send(pid, Hello{Who: "Roger"})
    console.ReadLine()
}

生命周期事件

type Hello struct{ Who string }
type HelloActor struct{}

func (state *HelloActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Started:
        fmt.Println("Started, initialize actor here")
    case *actor.Stopping:
        fmt.Println("Stopping, actor is about shut down")
    case *actor.Stopped:
        fmt.Println("Stopped, actor and its children are stopped")
    case *actor.Restarting:
        fmt.Println("Restarting, actor is about restart")
    case Hello:
        fmt.Printf("Hello %v\n", msg.Who)
    }
}

func main() {
    context := actor.EmptyRootContext
    props := actor.PropsFromProducer(func() actor.Actor { return &HelloActor{} })
    pid, err := context.Spawn(props)
    if err != nil {
        panic(err)
    }
    context.Send(pid, Hello{Who: "Roger"})

    // 等待1秒以展示正确的生命周期事件顺序
    time.Sleep(1 * time.Second)
    context.Stop(pid)

    console.ReadLine()
}

监督策略示例

type Hello struct{ Who string }
type ParentActor struct{}

func (state *ParentActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case Hello:
        props := actor.PropsFromProducer(NewChildActor)
        child := context.Spawn(props)
        context.Send(child, msg)
    }
}

func NewParentActor() actor.Actor {
    return &ParentActor{}
}

type ChildActor struct{}

func (state *ChildActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Started:
        fmt.Println("Starting, initialize actor here")
    case *actor.Stopping:
        fmt.Println("Stopping, actor is about shut down")
    case *actor.Stopped:
        fmt.Println("Stopped, actor and its children are stopped")
    case *actor.Restarting:
        fmt.Println("Restarting, actor is about restart")
    case Hello:
        fmt.Printf("Hello %v\n", msg.Who)
        panic("Ouch")
    }
}

func NewChildActor() actor.Actor {
    return &ChildActor{}
}

func main() {
    decider := func(reason interface{}) actor.Directive {
        log.Printf("handling failure for child. reason:%v", reason)
        return actor.RestartDirective
    }
    supervisor := actor.NewOneForOneStrategy(10, 1000, decider)

    ctx := actor.NewActorSystem().Root
    props := actor.PropsFromProducer(NewParentActor).WithSupervisor(supervisor)

    pid := ctx.Spawn(props)
    ctx.Send(pid, Hello{Who: "Roger"})

    console.ReadLine()
}

网络/远程通信示例

节点1代码

type MyActor struct {
    count int
}

func (state *MyActor) Receive(context actor.Context) {
    switch context.Message().(type) {
    case *messages.Response:
        state.count++
        fmt.Println(state.count)
    }
}

func main() {
    remote.Start("localhost:8090")

    context := actor.EmptyRootContext
    props := actor.PropsFromProducer(func() actor.Actor { return &MyActor{} })
    pid, _ := context.Spawn(props)
    message := &messages.Echo{Message: "hej", Sender: pid}

    // 创建远程actor
    spawnResponse, _ := remote.SpawnNamed("localhost:8091", "myactor", "hello", time.Second)

    // 获取创建的PID
    spawnedPID := spawnResponse.Pid
    for i := 0; i < 10; i++ {
        context.Send(spawnedPID, message)
    }

    console.ReadLine()
}

节点2代码

type MyActor struct{}

func (*MyActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *messages.Echo:
        context.Send(msg.Sender, &messages.Response{
            SomeValue: "result",
        })
    }
}

func main() {
    remote.Start("localhost:8091")

    // 注册本地actor以便远程创建
    remote.Register("hello", actor.PropsFromProducer(func() actor.Actor { return &MyActor{} }))
    console.ReadLine()
}

消息协议定义

syntax = "proto3";
package messages;
import "actor.proto"; // 需要导入actor.proto以便消息可以包含PID

// 节点1发送给节点2的消息
message Echo {
  actor.PID Sender = 1; // 远程actor应回复的PID
  string Message = 2;
}

// 远程actor应回复的消息
message Response {
  string SomeValue = 1;
}

构建和测试

确保$GOPATH变量正确设置后,执行以下命令:

go get github.com/asynkron/protoactor-go/...
cd $GOPATH/src/github.com/asynkron/protoactor-go
go get ./...
make

运行测试:

go test `go list ./... | grep -v "/examples/" | grep -v "/persistence" | grep -v "/scheduler"`

ProtoActor是一个仍在beta阶段的Go实现,但已经有一些生产环境用户。请注意API在1.0版本前可能会发生变化。


更多关于golang超高性能分布式Actor模型开发插件库ProtoActor的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang超高性能分布式Actor模型开发插件库ProtoActor的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


ProtoActor: Go语言高性能分布式Actor模型框架

ProtoActor是一个基于Go语言的高性能分布式Actor模型实现,它借鉴了Erlang和Akka的设计理念,同时针对Go语言的特点进行了优化。下面我将详细介绍ProtoActor的核心概念和使用方法。

核心概念

  1. Actor模型:将并发实体抽象为Actor,每个Actor有自己的状态和行为,通过消息传递进行通信
  2. PID:每个Actor都有一个唯一进程标识符(Process ID)
  3. Mailbox:每个Actor都有一个消息队列,用于接收和处理消息

安装ProtoActor

go get github.com/asynkron/protoactor-go

基本使用示例

1. 创建Actor

package main

import (
	"fmt"
	"time"

	"github.com/asynkron/protoactor-go/actor"
)

// 定义Actor结构体
type HelloActor struct{}

// 实现Receive方法处理消息
func (state *HelloActor) Receive(context actor.Context) {
	switch msg := context.Message().(type) {
	case string:
		fmt.Printf("HelloActor received: %v\n", msg)
		context.Respond("Hello " + msg)
	}
}

func main() {
	// 创建Actor系统
	system := actor.NewActorSystem()

	// 定义Actor属性
	props := actor.PropsFromProducer(func() actor.Actor { return &HelloActor{} })

	// 生成Actor并获取PID
	pid := system.Root.Spawn(props)

	// 发送消息并等待响应
	future := system.Root.RequestFuture(pid, "World", 5*time.Second)
	
	// 处理响应
	result, err := future.Result()
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	
	fmt.Println("Received response:", result)
}

2. 父子Actor和监管策略

type ParentActor struct{}

func (state *ParentActor) Receive(context actor.Context) {
	switch msg := context.Message().(type) {
	case *actor.Started:
		// 创建子Actor
		props := actor.PropsFromProducer(func() actor.Actor { return &ChildActor{} })
		child := context.Spawn(props)
		context.Watch(child) // 监控子Actor
	case string:
		fmt.Printf("Parent received: %v\n", msg)
	}
}

type ChildActor struct{}

func (state *ChildActor) Receive(context actor.Context) {
	switch context.Message().(type) {
	case string:
		fmt.Println("Child received message")
		panic("child failed") // 模拟子Actor失败
	}
}

func main() {
	system := actor.NewActorSystem()
	
	// 定义监管策略
	supervisor := actor.NewOneForOneStrategy(10, time.Second, func(reason interface{}) actor.Directive {
		fmt.Println("Child failed, restarting:", reason)
		return actor.RestartDirective
	})
	
	props := actor.PropsFromProducer(func() actor.Actor { return &ParentActor{} }).
		WithSupervisor(supervisor)
	
	pid := system.Root.Spawn(props)
	system.Root.Send(pid, "test")
	
	time.Sleep(time.Second) // 等待子Actor重启
}

3. 集群功能(分布式Actor)

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/asynkron/protoactor-go/actor"
	"github.com/asynkron/protoactor-go/cluster"
	"github.com/asynkron/protoactor-go/cluster/clusterproviders/consul"
	"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"
	"github.com/asynkron/protoactor-go/remote"
)

// 定义Grain接口和实现
type HelloGrain interface {
	SayHello(name string) (*HelloResponse, error)
}

type HelloResponse struct {
	Message string
}

type helloGrain struct {
	ctx cluster.GrainContext
}

func (h *helloGrain) SayHello(name string) (*HelloResponse, error) {
	return &HelloResponse{Message: "Hello " + name}, nil
}

func main() {
	// 设置集群
	system := actor.NewActorSystem()
	
	// 使用Consul作为集群提供者
	provider := consul.New()
	lookup := disthash.New()
	config := remote.Configure("localhost", 0)
	
	// 创建集群
	clusterConfig := cluster.Configure("my-cluster", provider, lookup, config)
	c := cluster.New(system, clusterConfig)
	
	// 注册Grain
	clusterKind := cluster.NewKind(
		"HelloGrain",
		actor.PropsFromProducer(func() actor.Actor {
			return &helloGrain{}
		}),
	)
	c.RegisterKind(clusterKind)
	
	// 启动集群
	c.StartMember()
	
	// 获取Grain客户端
	client := cluster.GetCluster(system).Get("HelloGrain", "grain1")
	hello := client.(HelloGrain)
	
	// 调用远程Grain
	resp, err := hello.SayHello("World")
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(resp.Message)
	
	time.Sleep(time.Minute)
	c.Shutdown(false)
}

性能优化技巧

  1. 减少消息拷贝:使用指针传递消息而非值拷贝
  2. 批量处理:对于高频小消息,考虑批量处理
  3. 合理设置Mailbox大小:避免过大导致内存占用或过小导致阻塞
  4. 使用Future:异步处理耗时操作
  5. 集群分区:合理设计Grain的key分布

适用场景

  1. 高并发消息处理系统
  2. 分布式计算任务
  3. 游戏服务器
  4. IoT设备管理
  5. 实时数据处理

ProtoActor结合了Go语言的高性能和Actor模型的并发优势,是构建分布式系统的强大工具。通过合理设计Actor层级和消息流,可以构建出高吞吐量、低延迟的分布式应用。

更多高级功能和详细文档可以参考ProtoActor官方GitHub仓库

回到顶部