golang轻量级内部通信消息总线插件库Bus的使用
Golang轻量级内部通信消息总线插件库Bus的使用
Bus是一个用于内部通信的极简事件/消息总线实现,灵感来源于Elixir语言的event_bus包。
安装
通过go包安装:
go get github.com/mustafaturan/bus/v3
完整示例
下面是一个完整的Bus使用示例:
package main
import (
"context"
"fmt"
"time"
"github.com/mustafaturan/bus/v3"
"github.com/mustafaturan/monoton/v2"
"github.com/mustafaturan/monoton/v2/sequencer"
)
func main() {
// 初始化Bus
b := NewBus()
// 注册订单处理handler
handler := bus.Handler{
Handle: func(ctx context.Context, e bus.Event) {
// 异步处理订单事件
go func() {
fmt.Printf("处理订单事件: %+v\n", e)
// 这里可以添加具体的业务逻辑
}()
},
Matcher: "order.*", // 匹配所有order开头的topic
}
b.RegisterHandler("order-handler", handler)
// 模拟订单事件
ctx := context.Background()
ctx = context.WithValue(ctx, bus.CtxKeyTxID, "tx-123456")
ctx = context.WithValue(ctx, bus.CtxKeySource, "order-service")
order := map[string]string{
"orderID": "123456",
"orderAmount": "112.20",
"currency": "USD",
}
// 发送订单接收事件
err := b.Emit(ctx, "order.received", order)
if err != nil {
fmt.Println("发送事件失败:", err)
}
// 等待handler处理
time.Sleep(100 * time.Millisecond)
}
func NewBus() *bus.Bus {
// 配置ID生成器
node := uint64(1)
initialTime := uint64(1577865600000) // 设置2020-01-01 PST为初始时间
m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime)
if err != nil {
panic(err)
}
// 初始化ID生成器
var idGenerator bus.Next = m.Next
// 创建新的Bus实例
b, err := bus.NewBus(idGenerator)
if err != nil {
panic(err)
}
// 注册topic
b.RegisterTopics("order.received", "order.fulfilled")
return b
}
核心功能
1. 配置Bus
Bus需要一个唯一的ID生成器来为事件分配ID:
import (
"github.com/mustafaturan/bus/v3"
"github.com/mustafaturan/monoton/v2"
"github.com/mustafaturan/monoton/v2/sequencer"
)
func NewBus() *bus.Bus {
// 配置ID生成器
node := uint64(1)
initialTime := uint64(1577865600000) // 设置2020-01-01 PST为初始时间
m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime)
if err != nil {
panic(err)
}
// 初始化ID生成器
var idGenerator bus.Next = m.Next
// 创建新的Bus实例
b, err := bus.NewBus(idGenerator)
if err != nil {
panic(err)
}
// 注册topic
b.RegisterTopics("order.received", "order.fulfilled")
return b
}
2. 注册事件Topic
在发送事件到topic之前,需要先注册topic名称:
// 注册topic
b.RegisterTopics("order.received", "order.fulfilled")
3. 注册事件处理器
要接收topic事件,需要注册处理器:
handler := bus.Handler{
Handle: func(ctx context.Context, e bus.Event) {
// 处理事件
// 注意:强烈建议以异步方式处理事件
},
Matcher: ".*", // 匹配所有topic
}
b.RegisterHandler("handler的唯一key", handler)
4. 发送事件
发送事件的基本方式:
// 如果txID为空,bus会使用ID生成器生成一个
ctx := context.Background()
ctx = context.WithValue(ctx, bus.CtxKeyTxID, "some-transaction-id-if-exists")
// 可选来源
ctx = context.WithValue(ctx, bus.CtxKeySource, "source-of-the-event")
// 事件topic名称(必须提前注册)
topic := "order.received"
// 事件数据
order := make(map[string]string)
order["orderID"] = "123456"
order["orderAmount"] = "112.20"
order["currency"] = "USD"
// 发送事件
err := b.Emit(ctx, topic, order)
if err != nil {
// 处理错误
fmt.Println(err)
}
// 使用选项发送事件
err := b.EmitWithOpts(ctx, topic, order, bus.WithTxID("some-tx-id"))
if err != nil {
// 处理错误
fmt.Println(err)
}
5. 事件结构
处理器接收的事件结构如下:
// 事件数据结构
type Event struct {
ID string // 标识符
TxID string // 事务标识符
Topic string // topic名称
Source string // 事件来源
OccurredAt time.Time // 创建时间(纳秒)
Data interface{} // 实际事件数据
}
性能基准
基准测试结果:
goos: darwin
goarch: amd64
pkg: github.com/mustafaturan/bus/v3
cpu: Intel(R) Core(TM) i5-6267U CPU @ 2.90GHz
BenchmarkEmit-4 10000000 180.5 ns/op 8 B/op 0 allocs/op
BenchmarkEmitWithoutTxID-4 10000000 244.6 ns/op 72 B/op 2 allocs/op
BenchmarkEmitWithOpts-4 10000000 280.8 ns/op 112 B/op 4 allocs/op
BenchmarkEmitWithOptsUnspecified-4 10000000 169.4 ns/op 8 B/op 0 allocs/op
PASS
ok github.com/mustafaturan/bus/v3 8.884s
许可证
Apache License 2.0
更多关于golang轻量级内部通信消息总线插件库Bus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang轻量级内部通信消息总线插件库Bus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang轻量级内部通信消息总线插件库Bus使用指南
Bus是一个轻量级的Go语言内部通信消息总线库,它提供了一种简单的方式来实现发布/订阅模式的消息通信。下面我将详细介绍如何使用这个库。
安装
首先安装Bus库:
go get github.com/mustafaturan/bus
基本使用
1. 初始化Bus
package main
import (
"fmt"
"github.com/mustafaturan/bus"
"github.com/mustafaturan/bus/topic"
)
func main() {
// 创建一个新的Bus实例
b, err := bus.NewBus()
if err != nil {
panic(err)
}
// 注册一个主题
userCreatedTopic := topic.Topic{
Name: "user.created",
Description: "Triggered when a new user is created",
Source: "/users",
}
err = b.RegisterTopics(userCreatedTopic)
if err != nil {
panic(err)
}
}
2. 订阅消息
// 定义一个处理器函数
func userCreatedHandler(topic string, data interface{}) {
fmt.Printf("Received message on topic %s: %+v\n", topic, data)
}
// 注册处理器
handlerID, err := b.RegisterHandler("user.created", "user.created.handler", userCreatedHandler)
if err != nil {
panic(err)
}
// 也可以使用闭包
handlerID2, err := b.RegisterHandler("user.created", "user.created.handler2", func(topic string, data interface{}) {
user := data.(map[string]interface{})
fmt.Printf("User %s created!\n", user["name"])
})
3. 发布消息
// 发布消息到主题
err = b.Emit("user.created", map[string]interface{}{
"id": "123",
"name": "John Doe",
"email": "john@example.com",
})
if err != nil {
fmt.Printf("Failed to emit message: %v\n", err)
}
高级功能
1. 取消订阅
// 取消之前注册的处理器
err = b.DeregisterHandler(handlerID)
if err != nil {
fmt.Printf("Failed to deregister handler: %v\n", err)
}
2. 通配符订阅
Bus支持通配符订阅,可以监听多个相关主题:
// 订阅所有user相关事件
handlerID, err := b.RegisterHandler("user.*", "user.all.handler", func(topic string, data interface{}) {
fmt.Printf("User event %s occurred: %+v\n", topic, data)
})
3. 错误处理
// 带错误处理的处理器
errorHandler := func(topic string, data interface{}) error {
user, ok := data.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid user data format")
}
if user["name"] == "" {
return fmt.Errorf("user name cannot be empty")
}
fmt.Printf("Processing user: %s\n", user["name"])
return nil
}
// 注册带错误处理的处理器
handlerID, err := b.RegisterHandler("user.created", "user.error.handler", errorHandler)
实际应用示例
下面是一个完整的示例,展示如何在微服务中使用Bus进行内部通信:
package main
import (
"fmt"
"time"
"github.com/mustafaturan/bus"
"github.com/mustafaturan/bus/topic"
)
type User struct {
ID string
Name string
Email string
}
func main() {
// 初始化Bus
b, err := bus.NewBus()
if err != nil {
panic(err)
}
// 注册主题
topics := []topic.Topic{
{
Name: "user.created",
Description: "User creation event",
Source: "/users",
},
{
Name: "user.deleted",
Description: "User deletion event",
Source: "/users",
},
}
err = b.RegisterTopics(topics...)
if err != nil {
panic(err)
}
// 注册用户创建处理器
_, err = b.RegisterHandler("user.created", "email.service", func(topic string, data interface{}) {
user := data.(User)
fmt.Printf("[Email Service] Sending welcome email to %s (%s)\n", user.Name, user.Email)
})
if err != nil {
panic(err)
}
// 注册用户删除处理器
_, err = b.RegisterHandler("user.deleted", "audit.service", func(topic string, data interface{}) {
user := data.(User)
fmt.Printf("[Audit Service] Recording deletion of user %s\n", user.ID)
})
if err != nil {
panic(err)
}
// 模拟用户服务
go func() {
for i := 1; i <= 5; i++ {
user := User{
ID: fmt.Sprintf("user-%d", i),
Name: fmt.Sprintf("User %d", i),
Email: fmt.Sprintf("user%d@example.com", i),
}
// 发布用户创建事件
err = b.Emit("user.created", user)
if err != nil {
fmt.Printf("Error emitting user.created: %v\n", err)
}
time.Sleep(1 * time.Second)
}
}()
// 模拟删除用户
time.Sleep(3 * time.Second)
userToDelete := User{ID: "user-2", Name: "User 2"}
err = b.Emit("user.deleted", userToDelete)
if err != nil {
fmt.Printf("Error emitting user.deleted: %v\n", err)
}
// 保持程序运行
time.Sleep(5 * time.Second)
}
性能考虑
Bus库设计为轻量级,但在高并发场景下仍需注意:
- 避免在处理器中执行耗时操作
- 考虑使用缓冲通道处理高吞吐量消息
- 对于关键业务,实现重试机制
Bus库非常适合中小型应用的内部通信需求,但对于大规模分布式系统,可能需要考虑更专业的消息队列系统如RabbitMQ或Kafka。
希望这个指南能帮助你快速上手使用Bus库进行Go应用的内部通信!