golang轻量级发布订阅模式实现插件库pubsub的使用
golang轻量级发布订阅模式实现插件库pubsub的使用
pubsub是一个简单的多主题发布订阅库。
安装
使用以下命令安装pubsub:
go get github.com/cskr/pubsub/v2
使用示例
下面是一个完整的pubsub使用示例:
package main
import (
"fmt"
"github.com/cskr/pubsub"
"time"
)
func main() {
// 创建一个新的pubsub实例,缓冲区大小为10
ps := pubsub.New(10)
// 订阅主题"topic1"和"topic2"
sub1 := ps.Sub("topic1", "topic2")
// 订阅主题"topic2"
sub2 := ps.Sub("topic2")
// 发布消息到主题"topic1"
ps.Pub("hello topic1", "topic1")
// 发布消息到主题"topic2"
ps.Pub("hello topic2", "topic2")
// 从sub1接收消息
go func() {
for msg := range sub1 {
fmt.Printf("sub1 received: %v\n", msg)
}
}()
// 从sub2接收消息
go func() {
for msg := range sub2 {
fmt.Printf("sub2 received: %v\n", msg)
}
}()
// 等待goroutines处理消息
time.Sleep(1 * time.Second)
// 取消订阅
ps.Unsub(sub1, "topic1", "topic2")
ps.Unsub(sub2, "topic2")
// 关闭所有订阅
ps.Shutdown()
}
代码说明
- 首先创建一个pubsub实例,可以指定缓冲区大小
- 使用
Sub()
方法订阅一个或多个主题,返回一个channel用于接收消息 - 使用
Pub()
方法向指定主题发布消息 - 使用
Unsub()
方法取消订阅指定主题 - 使用
Shutdown()
方法关闭所有订阅
高级用法
// 带缓冲区的订阅
sub := ps.SubBuffered("topic1", 100) // 缓冲区大小为100
// 尝试发送消息,如果订阅者不存在则丢弃
ps.TryPub("message", "topic1")
// 添加订阅到现有channel
ps.AddSub(sub, "new_topic")
// 获取订阅数
count := ps.SubCount("topic1")
该模块遵循BSD风格的许可证。
更多关于golang轻量级发布订阅模式实现插件库pubsub的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang轻量级发布订阅模式实现插件库pubsub的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang轻量级发布订阅模式实现 - pubsub库使用指南
发布订阅(Pub/Sub)模式是一种消息通信模式,发送者(发布者)将消息发送到特定的频道,而不需要知道哪些接收者(订阅者)存在。订阅者可以监听一个或多个频道,只接收感兴趣的消息。
pubsub库简介
pubsub是一个轻量级的Golang发布订阅库,具有以下特点:
- 简单易用
- 类型安全
- 支持异步/同步发布
- 无外部依赖
- 高性能
安装
go get github.com/cskr/pubsub
基础使用
1. 创建PubSub实例
package main
import (
"fmt"
"github.com/cskr/pubsub"
)
func main() {
// 创建一个新的PubSub实例
ps := pubsub.New(10) // 参数是缓冲区大小
// 使用完记得关闭
defer ps.Close()
}
2. 订阅和发布消息
// 订阅频道
ch := ps.Sub("news")
// 发布消息到news频道
ps.Pub("Breaking news: Go 1.20 released!", "news")
// 接收消息
msg := <-ch
fmt.Println("Received:", msg)
3. 取消订阅
// 取消订阅
ps.Unsub(ch, "news")
高级用法
1. 多频道订阅
// 订阅多个频道
ch := ps.Sub("news", "sports")
go func() {
for msg := range ch {
fmt.Printf("Received message: %v\n", msg)
}
}()
// 发布到不同频道
ps.Pub("Football match result", "sports")
ps.Pub("New tech product launched", "news")
2. 类型安全的消息
// 定义消息类型
type News struct {
Title string
Content string
}
type Score struct {
Game string
Result string
}
// 创建带缓冲的PubSub
ps := pubsub.New(10)
// 订阅特定类型的频道
newsCh := ps.Sub("news")
sportsCh := ps.Sub("sports")
// 发布类型安全的消息
ps.Pub(News{"Go Release", "Go 1.20 is out!"}, "news")
ps.Pub(Score{"Football", "3-2"}, "sports")
// 接收并类型断言
select {
case msg := <-newsCh:
if news, ok := msg.(News); ok {
fmt.Printf("News: %s - %s\n", news.Title, news.Content)
}
case msg := <-sportsCh:
if score, ok := msg.(Score); ok {
fmt.Printf("Score: %s %s\n", score.Game, score.Result)
}
}
3. 同步发布
// 同步发布会阻塞直到所有订阅者都接收到消息
ps.PubWait("Important sync message", "important")
4. 获取订阅数
count := ps.SubCount("news")
fmt.Printf("Number of subscribers to 'news': %d\n", count)
完整示例
package main
import (
"fmt"
"time"
"github.com/cskr/pubsub"
)
type Event struct {
Type string
Message string
}
func subscriber(name string, ps *pubsub.PubSub, topics ...string) {
ch := ps.Sub(topics...)
for msg := range ch {
if evt, ok := msg.(Event); ok {
fmt.Printf("%s received %s event: %s\n", name, evt.Type, evt.Message)
} else {
fmt.Printf("%s received: %v\n", name, msg)
}
}
}
func main() {
ps := pubsub.New(10)
defer ps.Close()
// 启动订阅者
go subscriber("Subscriber1", ps, "news", "weather")
go subscriber("Subscriber2", ps, "news")
go subscriber("Subscriber3", ps, "weather")
// 发布一些消息
ps.Pub(Event{"news", "Go 1.20 released!"}, "news")
ps.Pub(Event{"weather", "Sunny with 25°C"}, "weather")
// 等待消息处理
time.Sleep(100 * time.Millisecond)
// 输出订阅数
fmt.Printf("News subscribers: %d\n", ps.SubCount("news"))
fmt.Printf("Weather subscribers: %d\n", ps.SubCount("weather"))
// 同步发布示例
ps.PubWait(Event{"news", "Breaking: Important announcement"}, "news")
fmt.Println("Important news delivered to all subscribers")
}
性能考虑
-
缓冲区大小:创建PubSub实例时指定的缓冲区大小会影响性能。太小的缓冲区可能导致发布者阻塞,太大则可能消耗更多内存。
-
消息类型:使用具体类型而非interface{}可以提高性能,但会减少灵活性。
-
频道数量:过多的频道会增加管理开销。
-
订阅者处理速度:慢速的订阅者会拖累整个系统,考虑使用带缓冲的channel或在订阅者中使用单独的goroutine处理消息。
总结
pubsub库提供了简单高效的发布订阅实现,适合大多数Golang应用中的事件通知需求。它的轻量级特性使其成为微服务内部通信或组件间解耦的理想选择。