golang实现消息/事件发布订阅模式的插件库hub的使用
Golang 实现消息/事件发布订阅模式的插件库 hub 的使用
简介
Hub 是一个快速的 Event Hub 库,用于 Go 应用程序实现发布/订阅模式,支持类似 rabbitMQ 交换机的主题模式。
安装
你可以使用 go get
安装这个库,但建议使用版本标签或管理依赖:
go get -u github.com/leandro-lugaresi/hub
或者使用 dep:
dep ensure --add github.com/leandro-lugaresi/hub
使用
订阅者
Hub 提供两种类型的订阅者:
- Subscriber:默认的订阅者,是阻塞式的。如果通道已满,发送操作会阻塞,直到订阅者消费消息。
- NonBlockingSubscriber:非阻塞订阅者,发布端永远不会阻塞,但如果通道容量已满,发布操作会丢失并触发警报。适用于可以接受数据丢失的场景,如指标、日志等。
主题
库使用类似 rabbitMQ 主题交换机的概念,消息名称用于匹配所有符合主题的订阅者。主题必须是由点(.
)分隔的单词列表,支持以下特殊字符:
*
(星号) 可以替代一个单词
示例代码
下面是一个完整的示例 demo:
package main
import (
"fmt"
"sync"
"github.com/leandro-lugaresi/hub"
)
func main() {
h := hub.New()
var wg sync.WaitGroup
// 创建一个容量为10的缓冲通道
// 如果要创建无缓冲通道,使用0作为容量
sub := h.Subscribe(10, "account.login.*", "account.changepassword.*")
wg.Add(1)
go func(s hub.Subscription) {
for msg := range s.Receiver {
fmt.Printf("收到主题为 %s 的消息,ID 为 %d\n", msg.Name, msg.Fields["id"])
}
wg.Done()
}(sub)
// 发布消息
h.Publish(hub.Message{
Name: "account.login.failed",
Fields: hub.Fields{"id": 123},
})
h.Publish(hub.Message{
Name: "account.changepassword.failed",
Fields: hub.Fields{"id": 456},
})
h.Publish(hub.Message{
Name: "account.login.success",
Fields: hub.Fields{"id": 123},
})
// 这条消息不会被路由到订阅者
h.Publish(hub.Message{
Name: "account.foo.failed",
Fields: hub.Fields{"id": 789},
})
// 关闭所有订阅者
h.Close()
// 等待缓冲区中的所有消息处理完成
wg.Wait()
// 输出:
// 收到主题为 account.login.failed 的消息,ID 为 123
// 收到主题为 account.changepassword.failed 的消息,ID 为 456
// 收到主题为 account.login.success 的消息,ID 为 123
}
性能
吞吐量测试
在 Intel® Core™ i5-4460 CPU @ 3.20GHz x4 上测试结果:
1317530.091292 msg/sec
CSTrie
该项目内部使用了由 @tylertreat 实现的并发订阅树(CSTrie)。
更多关于golang实现消息/事件发布订阅模式的插件库hub的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现消息/事件发布订阅模式的插件库hub的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang 消息/事件发布订阅模式实现 - Hub 库使用指南
消息/事件发布订阅(Pub/Sub)模式是一种常见的解耦设计模式,在Go中可以通过多种方式实现。下面我将介绍一个简单高效的Hub库实现及使用方法。
基本概念
发布订阅模式包含三个核心组件:
- 发布者(Publisher):负责发布消息/事件
- 订阅者(Subscriber):注册对特定消息/事件的兴趣
- 消息中心(Hub):负责接收消息并分发给订阅者
简单Hub实现
以下是一个线程安全的简单Hub实现:
package hub
import (
"sync"
)
type Message interface{}
type Hub struct {
subscribers map[string][]chan Message
mu sync.RWMutex
}
func NewHub() *Hub {
return &Hub{
subscribers: make(map[string][]chan Message),
}
}
// 订阅主题
func (h *Hub) Subscribe(topic string) chan Message {
h.mu.Lock()
defer h.mu.Unlock()
ch := make(chan Message, 100) // 缓冲通道防止阻塞
h.subscribers[topic] = append(h.subscribers[topic], ch)
return ch
}
// 发布消息
func (h *Hub) Publish(topic string, msg Message) {
h.mu.RLock()
defer h.mu.RUnlock()
if subs, ok := h.subscribers[topic]; ok {
for _, ch := range subs {
ch <- msg
}
}
}
// 取消订阅
func (h *Hub) Unsubscribe(topic string, ch chan Message) {
h.mu.Lock()
defer h.mu.Unlock()
if subs, ok := h.subscribers[topic]; ok {
for i, sub := range subs {
if sub == ch {
h.subscribers[topic] = append(subs[:i], subs[i+1:]...)
close(ch)
break
}
}
}
}
使用示例
package main
import (
"fmt"
"time"
)
func main() {
hub := NewHub()
// 订阅者1订阅"news"主题
sub1 := hub.Subscribe("news")
go func() {
for msg := range sub1 {
fmt.Printf("Subscriber 1 received: %v\n", msg)
}
fmt.Println("Subscriber 1 closed")
}()
// 订阅者2订阅"news"主题
sub2 := hub.Subscribe("news")
go func() {
for msg := range sub2 {
fmt.Printf("Subscriber 2 received: %v\n", msg)
}
fmt.Println("Subscriber 2 closed")
}()
// 订阅者3订阅"weather"主题
sub3 := hub.Subscribe("weather")
go func() {
for msg := range sub3 {
fmt.Printf("Subscriber 3 received: %v\n", msg)
}
fmt.Println("Subscriber 3 closed")
}()
// 发布消息
hub.Publish("news", "Breaking news: Go 1.20 released!")
hub.Publish("weather", "Today's weather: Sunny, 25°C")
time.Sleep(time.Second) // 等待消息处理
// 取消订阅
hub.Unsubscribe("news", sub1)
hub.Publish("news", "Another news: AI makes progress")
time.Sleep(time.Second) // 等待消息处理
}
高级特性扩展
如果需要更强大的功能,可以考虑以下扩展:
- 通配符订阅:支持类似"news.*"的通配符模式
- 持久化:将消息持久化到数据库
- 消息过滤:订阅时增加过滤条件
- QoS保证:实现不同级别的消息传递保证
现有成熟库推荐
如果不想自己实现,可以考虑以下成熟的开源库:
-
go-eventbus:轻量级事件总线
import "github.com/asaskevich/EventBus" bus := EventBus.New() bus.Subscribe("topic", func(msg string) { /* handler */ }) bus.Publish("topic", "message")
-
emitter:高性能事件发射器
import "github.com/olebedev/emitter" e := &emitter.Emitter{} e.On("event", func(event *emitter.Event) { /* handler */ }) e.Emit("event", "data")
-
watermill:用于构建事件驱动应用的库
import "github.com/ThreeDotsLabs/watermill" pubSub := watermill.NewStdPubSub(false, watermill.NopLogger{}) messages, _ := pubSub.Subscribe(context.Background(), "topic") pubSub.Publish("topic", watermill.NewMessage("1", []byte("payload")))
最佳实践
- 合理设置通道缓冲区大小
- 确保订阅者及时处理消息,避免阻塞
- 使用context实现优雅的取消订阅
- 考虑消息序列化格式(JSON, Protobuf等)
- 在高并发场景下注意性能优化
这个简单的Hub实现足以满足大多数基本发布订阅需求,对于更复杂的场景可以考虑使用上述成熟的第三方库。