golang轻量级事件总线实现插件库go-eventbus的使用
Golang轻量级事件总线实现插件库go-eventbus的使用
简介
这是一个简单的Golang事件总线实现,支持以下模式:
- 发布/订阅模式
- 请求/响应模式
快速开始
在你的项目中使用go-eventbus,可以运行以下命令:
go get github.com/stanipetrosyan/go-eventbus
然后导入:
import (
goeventbus "github.com/stanipetrosyan/go-eventbus"
)
发布/订阅模式
这是一个简单的发布/订阅模式示例:
// 创建事件总线实例
eventbus := goeventbus.NewEventBus()
// 定义主题地址
address := "topic"
// 创建消息头选项
options := goeventbus.NewMessageHeadersBuilder().SetHeader("header", "value").Build()
// 创建消息
message := goeventbus.NewMessageBuilder().SetPayload("Hi Topic").SetHeaders(options).Build()
// 订阅主题并设置处理函数
eventbus.Channel(address).Subscriber().Listen(func(dc goeventbus.Context) {
fmt.Printf("Message %s\n", dc.Result().Data)
})
// 发布消息
eventbus.Channel(address).Publisher().Publish(message)
请求/响应模式
这是一个简单的请求/响应模式示例:
// 创建事件总线实例
eventbus := goeventbus.NewEventBus()
// 定义主题地址
address := "topic"
// 创建消息
message := goeventbus.NewMessageBuilder().SetPayload("Hi Topic").Build()
// 订阅主题并设置处理函数
eventbus.Channel(address).Subscriber().Listen(func(context goeventbus.Context) {
fmt.Printf("Message %s\n", context.Result().Extract())
context.Reply("Hello from subscriber") // 响应消息
})
// 发送请求并设置响应处理函数
eventbus.Channel(address).Publisher().Request(message, func(context goeventbus.Context) {
fmt.Printf("Message %s\n", context.Result().Extract()) // 处理响应
})
消息构造
要发布消息,你需要使用MessageBuilder创建消息对象:
// 创建消息头选项
options := goeventbus.NewMessageHeadersBuilder().SetHeader("header", "value").Build()
// 创建消息
message := goeventbus.NewMessageBuilder().SetPayload("Hi Topic").SetHeaders(options).Build()
// 发布消息
eventBus.Channel("address").Publisher().Publish(message)
处理器(Processor)
处理器类似于中间件,只有在调用context.Next()方法时才会转发消息。处理器会拦截从发布者到订阅者的消息。
eventbus.Channel("topic1").Processor().Listen(func(context goeventbus.Context) {
// 检查消息头中是否包含特定header
if context.Result().ExtractHeaders().Contains("header") {
context.Next() // 只有满足条件才转发消息
}
})
在处理器中,可以使用Map方法修改要转发的消息:
eventbus.Channel("topic1").Processor().Listen(func(context goeventbus.Context) {
if context.Result().ExtractHeaders().Contains("header") {
// 创建新消息
newMessage := NewMessageBuilder().SetPayload("new message").Build()
// 修改消息并转发
context.Map(newMessage).Next()
}
})
网络总线(Network Bus)
网络总线可以在不同服务之间建立TCP连接。NetworkBus是本地事件总线的包装器。
一个简单的服务器/客户端示例可以在examples/networkbus
目录中找到。
更多关于golang轻量级事件总线实现插件库go-eventbus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang轻量级事件总线实现插件库go-eventbus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Go-EventBus: 轻量级事件总线实现
Go-EventBus 是一个轻量级的 Golang 事件总线库,它实现了发布-订阅模式,非常适合在应用程序中实现组件间的松耦合通信。下面我将详细介绍它的使用方法和示例代码。
基本概念
Go-EventBus 的核心概念包括:
- 事件总线(EventBus): 负责管理订阅者和事件的中心枢纽
- 主题(Topic): 事件的分类标识
- 订阅者(Subscriber): 监听特定主题事件的函数
- 发布者(Publisher): 向特定主题发布事件的组件
安装
go get github.com/asaskevich/EventBus
基本使用示例
package main
import (
"fmt"
"github.com/asaskevich/EventBus"
)
func main() {
// 创建事件总线实例
bus := EventBus.New()
// 定义事件处理函数
handler := func(topic string, data interface{}) {
fmt.Printf("收到事件: 主题=%s, 数据=%v\n", topic, data)
}
// 订阅主题
if err := bus.Subscribe("user:created", handler); err != nil {
fmt.Println("订阅失败:", err)
return
}
// 发布事件
bus.Publish("user:created", map[string]interface{}{
"id": 123,
"name": "张三",
"email": "zhangsan@example.com",
})
// 取消订阅
bus.Unsubscribe("user:created", handler)
}
高级特性
1. 异步事件处理
func main() {
bus := EventBus.New()
// 异步订阅
bus.SubscribeAsync("order:completed", func(topic string, data interface{}) {
order := data.(map[string]interface{})
fmt.Printf("处理订单 #%d (异步)\n", order["id"].(int))
time.Sleep(1 * time.Second)
}, false) // false表示不保证顺序
// 发布多个事件
for i := 1; i <= 5; i++ {
bus.Publish("order:completed", map[string]interface{}{
"id": i,
"total": 100 * i,
})
}
// 等待所有异步处理完成
bus.WaitAsync()
fmt.Println("所有订单处理完成")
}
2. 错误处理
func main() {
bus := EventBus.New()
// 带有错误处理的事件订阅
bus.Subscribe("payment:failed", func(topic string, data interface{}) {
payment := data.(map[string]interface{})
if payment["amount"].(float64) > 1000 {
panic("大额支付失败!")
}
fmt.Printf("处理失败支付: %v\n", payment)
})
// 设置全局错误处理器
bus.SetErrorHandler(func(err error) {
fmt.Printf("事件处理出错: %v\n", err)
})
// 触发会panic的事件
bus.Publish("payment:failed", map[string]interface{}{
"id": "txn123",
"amount": 1500.00,
})
}
3. 通配符订阅
func main() {
bus := EventBus.New()
// 订阅所有用户相关事件
bus.Subscribe("user.*", func(topic string, data interface{}) {
fmt.Printf("用户事件[%s]: %v\n", topic, data)
})
// 发布不同类型的事件
bus.Publish("user.created", map[string]interface{}{"id": 1, "name": "Alice"})
bus.Publish("user.updated", map[string]interface{}{"id": 1, "name": "Alice Smith"})
bus.Publish("user.deleted", map[string]interface{}{"id": 1})
}
实际应用场景示例
微服务间通信
package main
import (
"fmt"
"github.com/asaskevich/EventBus"
"time"
)
type User struct {
ID int
Name string
Email string
}
func main() {
bus := EventBus.New()
// 用户服务
bus.Subscribe("user.create", func(topic string, data interface{}) {
user := data.(User)
fmt.Printf("创建用户: %+v\n", user)
// 模拟处理延迟
time.Sleep(500 * time.Millisecond)
// 发布用户创建完成事件
bus.Publish("user.created", user)
})
// 邮件服务
bus.Subscribe("user.created", func(topic string, data interface{}) {
user := data.(User)
fmt.Printf("发送欢迎邮件给: %s <%s>\n", user.Name, user.Email)
})
// 日志服务
bus.Subscribe("user.*", func(topic string, data interface{}) {
user := data.(User)
fmt.Printf("[日志] 事件: %s, 用户ID: %d\n", topic, user.ID)
})
// 创建新用户
newUser := User{
ID: 1,
Name: "李四",
Email: "lisi@example.com",
}
bus.Publish("user.create", newUser)
// 等待异步处理完成
time.Sleep(1 * time.Second)
}
性能考虑
Go-EventBus 是非常轻量级的,但在高并发场景下仍需注意:
- 对于高频事件,考虑使用异步处理
- 避免在事件处理函数中执行耗时操作
- 对于需要顺序处理的事件,设置
SubscribeAsync
的第三个参数为true
总结
Go-EventBus 提供了简单而强大的事件总线功能,适合以下场景:
- 微服务内部组件通信
- 插件系统的事件通知
- 需要解耦的模块间通信
- 需要广播通知的多接收者场景
它的轻量级特性使得它非常适合嵌入到各种规模的 Go 应用程序中,而不会带来显著的性能开销。