golang轻量级异步兼容事件总线插件库EventBus的使用

Golang轻量级异步兼容事件总线插件库EventBus的使用

EventBus是一个轻量级的、支持异步兼容的事件总线库,适用于Go语言。

安装

确保你的电脑已安装Go,然后在终端输入以下命令:

go get github.com/asaskevich/EventBus

安装完成后即可使用该包。

导入包

在你的Go文件中添加以下导入语句:

import "github.com/asaskevich/EventBus"

如果你觉得包名太长,可以使用别名:

import (
    evbus "github.com/asaskevich/EventBus"
)

基础示例

// 定义一个计算器函数
func calculator(a int, b int) {
    fmt.Printf("%d\n", a + b)
}

func main() {
    // 创建新的事件总线
    bus := EventBus.New()
    
    // 订阅主题
    bus.Subscribe("main:calculator", calculator)
    
    // 发布事件
    bus.Publish("main:calculator", 20, 40)
    
    // 取消订阅
    bus.Unsubscribe("main:calculator", calculator)
}

实现的方法

EventBus提供了以下方法:

  • New() - 创建新的事件总线实例
  • Subscribe() - 订阅主题
  • SubscribeOnce() - 一次性订阅
  • HasCallback() - 检查是否有回调
  • Unsubscribe() - 取消订阅
  • Publish() - 发布事件
  • SubscribeAsync() - 异步订阅
  • SubscribeOnceAsync() - 一次性异步订阅
  • WaitAsync() - 等待异步回调完成

详细方法说明

New()

创建一个新的事件总线实例:

bus := EventBus.New()

Subscribe(topic string, fn interface{}) error

订阅一个主题,如果fn不是函数会返回错误:

func Handler() { ... }
...
bus.Subscribe("topic:handler", Handler)

SubscribeOnce(topic string, fn interface{}) error

一次性订阅,执行后会自动取消订阅:

func HelloWorld() { ... }
...
bus.SubscribeOnce("topic:handler", HelloWorld)

Unsubscribe(topic string, fn interface{}) error

取消订阅指定主题的回调:

bus.Unsubscribe("topic:handler", HelloWord)

HasCallback(topic string) bool

检查指定主题是否有回调函数。

Publish(topic string, args …interface{})

发布事件,执行对应主题的回调函数:

func Handler(str string) { ... }
...
bus.Subscribe("topic:handler", Handler)
...
bus.Publish("topic:handler", "Hello, World!")

异步操作

SubscribeAsync(topic string, fn interface{}, transactional bool)

异步订阅一个主题:

func slowCalculator(a, b int) {
    time.Sleep(3 * time.Second)
    fmt.Printf("%d\n", a + b)
}

bus := EventBus.New()
bus.SubscribeAsync("main:slow_calculator", slowCalculator, false)

bus.Publish("main:slow_calculator", 20, 60)

fmt.Println("start: do some stuff while waiting for a result")
fmt.Println("end: do some stuff while waiting for a result")

bus.WaitAsync() // 等待所有异步回调完成

fmt.Println("do some stuff after waiting for result")

transactional参数决定后续回调是串行(true)还是并行(false)执行。

SubscribeOnceAsync(topic string, args …interface{})

一次性异步订阅,与SubscribeOnce类似,但回调是异步执行的。

WaitAsync()

等待所有异步回调完成。

跨进程事件

EventBus支持跨进程事件通信,需要两个RPC服务:

server.go

func main() {
    server := NewServer(":2010", "/_server_bus_", New())
    server.Start()
    // ...
    server.EventBus().Publish("main:calculator", 4, 6)
    // ...
    server.Stop()
}

client.go

func main() {
    client := NewClient(":2015", "/_client_bus_", New())
    client.Start()
    client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_")
    // ...
    client.Stop()
}

注意事项

  1. 确保回调函数的参数类型和数量与发布事件时提供的参数匹配
  2. 异步操作时注意资源竞争问题
  3. 跨进程通信需要确保网络连接正常

更多关于golang轻量级异步兼容事件总线插件库EventBus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量级异步兼容事件总线插件库EventBus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang轻量级事件总线EventBus使用指南

EventBus是一个轻量级的异步事件总线库,用于在Golang应用中实现发布-订阅模式。下面我将详细介绍其使用方法并提供示例代码。

安装EventBus

go get github.com/asaskevich/EventBus

基本使用

1. 创建事件总线实例

import "github.com/asaskevich/EventBus"

func main() {
    bus := EventBus.New() // 创建新的事件总线实例
    // ...
}

2. 订阅事件

func handler1(data string) {
    fmt.Println("Handler1 received:", data)
}

func handler2(data string) {
    fmt.Println("Handler2 received:", data)
}

func main() {
    bus := EventBus.New()
    
    // 订阅事件
    bus.Subscribe("topic1", handler1)
    bus.Subscribe("topic1", handler2)
    
    // ...
}

3. 发布事件

func main() {
    // ... 前面的订阅代码
    
    // 发布事件
    bus.Publish("topic1", "Hello EventBus!")
    
    // 输出:
    // Handler1 received: Hello EventBus!
    // Handler2 received: Hello EventBus!
}

高级特性

1. 异步事件处理

func slowHandler(data string) {
    time.Sleep(2 * time.Second)
    fmt.Println("Slow handler completed:", data)
}

func main() {
    bus := EventBus.New()
    bus.SubscribeAsync("async_topic", slowHandler, false)
    
    fmt.Println("Publishing async event...")
    bus.Publish("async_topic", "Async message")
    fmt.Println("Event published, handler running in background")
    
    time.Sleep(3 * time.Second) // 等待异步处理完成
}

2. 取消订阅

func main() {
    bus := EventBus.New()
    bus.Subscribe("topic2", handler1)
    
    // 发布事件
    bus.Publish("topic2", "First message") // 会被handler1处理
    
    // 取消订阅
    bus.Unsubscribe("topic2", handler1)
    
    bus.Publish("topic2", "Second message") // 不会被处理
}

3. 等待异步处理完成

func main() {
    bus := EventBus.New()
    bus.SubscribeAsync("wait_topic", slowHandler, false)
    
    bus.Publish("wait_topic", "Message with wait")
    
    // 等待所有异步处理完成
    bus.WaitAsync()
    fmt.Println("All async handlers completed")
}

实际应用示例

下面是一个更完整的示例,模拟用户注册后发送欢迎邮件和记录日志的场景:

package main

import (
	"fmt"
	"time"
	
	"github.com/asaskevich/EventBus"
)

type User struct {
	ID    int
	Name  string
	Email string
}

func sendWelcomeEmail(user User) {
	time.Sleep(1 * time.Second) // 模拟发送邮件耗时
	fmt.Printf("Email sent to %s <%s>\n", user.Name, user.Email)
}

func logUserRegistration(user User) {
	fmt.Printf("Log: User %d (%s) registered at %v\n", 
		user.ID, user.Name, time.Now())
}

func registerUser(bus EventBus.Bus, name, email string) {
	// 模拟用户注册过程
	user := User{
		ID:    time.Now().Nanosecond(),
		Name:  name,
		Email: email,
	}
	
	// 发布用户注册事件
	bus.Publish("user_registered", user)
}

func main() {
	bus := EventBus.New()
	
	// 订阅用户注册事件
	bus.SubscribeAsync("user_registered", sendWelcomeEmail, false)
	bus.Subscribe("user_registered", logUserRegistration)
	
	// 注册新用户
	fmt.Println("Registering new user...")
	registerUser(bus, "Alice", "alice@example.com")
	
	// 确保异步操作完成
	bus.WaitAsync()
	fmt.Println("User registration process completed")
}

注意事项

  1. 线程安全:EventBus是线程安全的,可以在多个goroutine中使用
  2. 错误处理:如果处理器函数返回错误,可以使用SubscribeOnceSubscribeOnceAsync来确保只执行一次
  3. 性能:对于高性能场景,考虑使用带缓冲的通道或专门的消息队列系统
  4. 内存泄漏:长期运行的应用中,注意及时取消不再需要的事件订阅

EventBus是一个简单但功能强大的库,非常适合在中小型Golang应用中实现组件间的松耦合通信。

回到顶部