golang轻量级异步兼容事件总线插件库EventBus的使用
Golang轻量级异步兼容事件总线插件库EventBus的使用
EventBus是一个轻量级的、支持异步兼容的事件总线库,适用于Go语言。
安装
确保你的电脑已安装Go,然后在终端输入以下命令:
go get github.com/asaskevich/EventBus
安装完成后即可使用该包。
导入包
在你的Go文件中添加以下导入语句:
import "github.com/asaskevich/EventBus"
如果你觉得包名太长,可以使用别名:
import (
evbus "github.com/asaskevich/EventBus"
)
基础示例
// 定义一个计算器函数
func calculator(a int, b int) {
fmt.Printf("%d\n", a + b)
}
func main() {
// 创建新的事件总线
bus := EventBus.New()
// 订阅主题
bus.Subscribe("main:calculator", calculator)
// 发布事件
bus.Publish("main:calculator", 20, 40)
// 取消订阅
bus.Unsubscribe("main:calculator", calculator)
}
实现的方法
EventBus提供了以下方法:
New()
- 创建新的事件总线实例Subscribe()
- 订阅主题SubscribeOnce()
- 一次性订阅HasCallback()
- 检查是否有回调Unsubscribe()
- 取消订阅Publish()
- 发布事件SubscribeAsync()
- 异步订阅SubscribeOnceAsync()
- 一次性异步订阅WaitAsync()
- 等待异步回调完成
详细方法说明
New()
创建一个新的事件总线实例:
bus := EventBus.New()
Subscribe(topic string, fn interface{}) error
订阅一个主题,如果fn
不是函数会返回错误:
func Handler() { ... }
...
bus.Subscribe("topic:handler", Handler)
SubscribeOnce(topic string, fn interface{}) error
一次性订阅,执行后会自动取消订阅:
func HelloWorld() { ... }
...
bus.SubscribeOnce("topic:handler", HelloWorld)
Unsubscribe(topic string, fn interface{}) error
取消订阅指定主题的回调:
bus.Unsubscribe("topic:handler", HelloWord)
HasCallback(topic string) bool
检查指定主题是否有回调函数。
Publish(topic string, args …interface{})
发布事件,执行对应主题的回调函数:
func Handler(str string) { ... }
...
bus.Subscribe("topic:handler", Handler)
...
bus.Publish("topic:handler", "Hello, World!")
异步操作
SubscribeAsync(topic string, fn interface{}, transactional bool)
异步订阅一个主题:
func slowCalculator(a, b int) {
time.Sleep(3 * time.Second)
fmt.Printf("%d\n", a + b)
}
bus := EventBus.New()
bus.SubscribeAsync("main:slow_calculator", slowCalculator, false)
bus.Publish("main:slow_calculator", 20, 60)
fmt.Println("start: do some stuff while waiting for a result")
fmt.Println("end: do some stuff while waiting for a result")
bus.WaitAsync() // 等待所有异步回调完成
fmt.Println("do some stuff after waiting for result")
transactional
参数决定后续回调是串行(true)还是并行(false)执行。
SubscribeOnceAsync(topic string, args …interface{})
一次性异步订阅,与SubscribeOnce类似,但回调是异步执行的。
WaitAsync()
等待所有异步回调完成。
跨进程事件
EventBus支持跨进程事件通信,需要两个RPC服务:
server.go
func main() {
server := NewServer(":2010", "/_server_bus_", New())
server.Start()
// ...
server.EventBus().Publish("main:calculator", 4, 6)
// ...
server.Stop()
}
client.go
func main() {
client := NewClient(":2015", "/_client_bus_", New())
client.Start()
client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_")
// ...
client.Stop()
}
注意事项
- 确保回调函数的参数类型和数量与发布事件时提供的参数匹配
- 异步操作时注意资源竞争问题
- 跨进程通信需要确保网络连接正常
更多关于golang轻量级异步兼容事件总线插件库EventBus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang轻量级异步兼容事件总线插件库EventBus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang轻量级事件总线EventBus使用指南
EventBus是一个轻量级的异步事件总线库,用于在Golang应用中实现发布-订阅模式。下面我将详细介绍其使用方法并提供示例代码。
安装EventBus
go get github.com/asaskevich/EventBus
基本使用
1. 创建事件总线实例
import "github.com/asaskevich/EventBus"
func main() {
bus := EventBus.New() // 创建新的事件总线实例
// ...
}
2. 订阅事件
func handler1(data string) {
fmt.Println("Handler1 received:", data)
}
func handler2(data string) {
fmt.Println("Handler2 received:", data)
}
func main() {
bus := EventBus.New()
// 订阅事件
bus.Subscribe("topic1", handler1)
bus.Subscribe("topic1", handler2)
// ...
}
3. 发布事件
func main() {
// ... 前面的订阅代码
// 发布事件
bus.Publish("topic1", "Hello EventBus!")
// 输出:
// Handler1 received: Hello EventBus!
// Handler2 received: Hello EventBus!
}
高级特性
1. 异步事件处理
func slowHandler(data string) {
time.Sleep(2 * time.Second)
fmt.Println("Slow handler completed:", data)
}
func main() {
bus := EventBus.New()
bus.SubscribeAsync("async_topic", slowHandler, false)
fmt.Println("Publishing async event...")
bus.Publish("async_topic", "Async message")
fmt.Println("Event published, handler running in background")
time.Sleep(3 * time.Second) // 等待异步处理完成
}
2. 取消订阅
func main() {
bus := EventBus.New()
bus.Subscribe("topic2", handler1)
// 发布事件
bus.Publish("topic2", "First message") // 会被handler1处理
// 取消订阅
bus.Unsubscribe("topic2", handler1)
bus.Publish("topic2", "Second message") // 不会被处理
}
3. 等待异步处理完成
func main() {
bus := EventBus.New()
bus.SubscribeAsync("wait_topic", slowHandler, false)
bus.Publish("wait_topic", "Message with wait")
// 等待所有异步处理完成
bus.WaitAsync()
fmt.Println("All async handlers completed")
}
实际应用示例
下面是一个更完整的示例,模拟用户注册后发送欢迎邮件和记录日志的场景:
package main
import (
"fmt"
"time"
"github.com/asaskevich/EventBus"
)
type User struct {
ID int
Name string
Email string
}
func sendWelcomeEmail(user User) {
time.Sleep(1 * time.Second) // 模拟发送邮件耗时
fmt.Printf("Email sent to %s <%s>\n", user.Name, user.Email)
}
func logUserRegistration(user User) {
fmt.Printf("Log: User %d (%s) registered at %v\n",
user.ID, user.Name, time.Now())
}
func registerUser(bus EventBus.Bus, name, email string) {
// 模拟用户注册过程
user := User{
ID: time.Now().Nanosecond(),
Name: name,
Email: email,
}
// 发布用户注册事件
bus.Publish("user_registered", user)
}
func main() {
bus := EventBus.New()
// 订阅用户注册事件
bus.SubscribeAsync("user_registered", sendWelcomeEmail, false)
bus.Subscribe("user_registered", logUserRegistration)
// 注册新用户
fmt.Println("Registering new user...")
registerUser(bus, "Alice", "alice@example.com")
// 确保异步操作完成
bus.WaitAsync()
fmt.Println("User registration process completed")
}
注意事项
- 线程安全:EventBus是线程安全的,可以在多个goroutine中使用
- 错误处理:如果处理器函数返回错误,可以使用
SubscribeOnce
或SubscribeOnceAsync
来确保只执行一次 - 性能:对于高性能场景,考虑使用带缓冲的通道或专门的消息队列系统
- 内存泄漏:长期运行的应用中,注意及时取消不再需要的事件订阅
EventBus是一个简单但功能强大的库,非常适合在中小型Golang应用中实现组件间的松耦合通信。