golang轻量级发布订阅模式实现插件库pubsub的使用

golang轻量级发布订阅模式实现插件库pubsub的使用

PkgGoDev

pubsub是一个简单的多主题发布订阅库。

安装

使用以下命令安装pubsub:

go get github.com/cskr/pubsub/v2

使用示例

下面是一个完整的pubsub使用示例:

package main

import (
	"fmt"
	"github.com/cskr/pubsub"
	"time"
)

func main() {
	// 创建一个新的pubsub实例,缓冲区大小为10
	ps := pubsub.New(10)

	// 订阅主题"topic1"和"topic2"
	sub1 := ps.Sub("topic1", "topic2")
	
	// 订阅主题"topic2"
	sub2 := ps.Sub("topic2")

	// 发布消息到主题"topic1"
	ps.Pub("hello topic1", "topic1")
	
	// 发布消息到主题"topic2"
	ps.Pub("hello topic2", "topic2")

	// 从sub1接收消息
	go func() {
		for msg := range sub1 {
			fmt.Printf("sub1 received: %v\n", msg)
		}
	}()

	// 从sub2接收消息
	go func() {
		for msg := range sub2 {
			fmt.Printf("sub2 received: %v\n", msg)
		}
	}()

	// 等待goroutines处理消息
	time.Sleep(1 * time.Second)

	// 取消订阅
	ps.Unsub(sub1, "topic1", "topic2")
	ps.Unsub(sub2, "topic2")

	// 关闭所有订阅
	ps.Shutdown()
}

代码说明

  1. 首先创建一个pubsub实例,可以指定缓冲区大小
  2. 使用Sub()方法订阅一个或多个主题,返回一个channel用于接收消息
  3. 使用Pub()方法向指定主题发布消息
  4. 使用Unsub()方法取消订阅指定主题
  5. 使用Shutdown()方法关闭所有订阅

高级用法

// 带缓冲区的订阅
sub := ps.SubBuffered("topic1", 100) // 缓冲区大小为100

// 尝试发送消息,如果订阅者不存在则丢弃
ps.TryPub("message", "topic1")

// 添加订阅到现有channel
ps.AddSub(sub, "new_topic")

// 获取订阅数
count := ps.SubCount("topic1")

该模块遵循BSD风格的许可证。


更多关于golang轻量级发布订阅模式实现插件库pubsub的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量级发布订阅模式实现插件库pubsub的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang轻量级发布订阅模式实现 - pubsub库使用指南

发布订阅(Pub/Sub)模式是一种消息通信模式,发送者(发布者)将消息发送到特定的频道,而不需要知道哪些接收者(订阅者)存在。订阅者可以监听一个或多个频道,只接收感兴趣的消息。

pubsub库简介

pubsub是一个轻量级的Golang发布订阅库,具有以下特点:

  • 简单易用
  • 类型安全
  • 支持异步/同步发布
  • 无外部依赖
  • 高性能

安装

go get github.com/cskr/pubsub

基础使用

1. 创建PubSub实例

package main

import (
	"fmt"
	"github.com/cskr/pubsub"
)

func main() {
	// 创建一个新的PubSub实例
	ps := pubsub.New(10) // 参数是缓冲区大小
	
	// 使用完记得关闭
	defer ps.Close()
}

2. 订阅和发布消息

// 订阅频道
ch := ps.Sub("news") 

// 发布消息到news频道
ps.Pub("Breaking news: Go 1.20 released!", "news")

// 接收消息
msg := <-ch
fmt.Println("Received:", msg)

3. 取消订阅

// 取消订阅
ps.Unsub(ch, "news")

高级用法

1. 多频道订阅

// 订阅多个频道
ch := ps.Sub("news", "sports")

go func() {
    for msg := range ch {
        fmt.Printf("Received message: %v\n", msg)
    }
}()

// 发布到不同频道
ps.Pub("Football match result", "sports")
ps.Pub("New tech product launched", "news")

2. 类型安全的消息

// 定义消息类型
type News struct {
    Title   string
    Content string
}

type Score struct {
    Game    string
    Result  string
}

// 创建带缓冲的PubSub
ps := pubsub.New(10)

// 订阅特定类型的频道
newsCh := ps.Sub("news")
sportsCh := ps.Sub("sports")

// 发布类型安全的消息
ps.Pub(News{"Go Release", "Go 1.20 is out!"}, "news")
ps.Pub(Score{"Football", "3-2"}, "sports")

// 接收并类型断言
select {
case msg := <-newsCh:
    if news, ok := msg.(News); ok {
        fmt.Printf("News: %s - %s\n", news.Title, news.Content)
    }
case msg := <-sportsCh:
    if score, ok := msg.(Score); ok {
        fmt.Printf("Score: %s %s\n", score.Game, score.Result)
    }
}

3. 同步发布

// 同步发布会阻塞直到所有订阅者都接收到消息
ps.PubWait("Important sync message", "important")

4. 获取订阅数

count := ps.SubCount("news")
fmt.Printf("Number of subscribers to 'news': %d\n", count)

完整示例

package main

import (
	"fmt"
	"time"
	
	"github.com/cskr/pubsub"
)

type Event struct {
	Type    string
	Message string
}

func subscriber(name string, ps *pubsub.PubSub, topics ...string) {
	ch := ps.Sub(topics...)
	
	for msg := range ch {
		if evt, ok := msg.(Event); ok {
			fmt.Printf("%s received %s event: %s\n", name, evt.Type, evt.Message)
		} else {
			fmt.Printf("%s received: %v\n", name, msg)
		}
	}
}

func main() {
	ps := pubsub.New(10)
	defer ps.Close()
	
	// 启动订阅者
	go subscriber("Subscriber1", ps, "news", "weather")
	go subscriber("Subscriber2", ps, "news")
	go subscriber("Subscriber3", ps, "weather")
	
	// 发布一些消息
	ps.Pub(Event{"news", "Go 1.20 released!"}, "news")
	ps.Pub(Event{"weather", "Sunny with 25°C"}, "weather")
	
	// 等待消息处理
	time.Sleep(100 * time.Millisecond)
	
	// 输出订阅数
	fmt.Printf("News subscribers: %d\n", ps.SubCount("news"))
	fmt.Printf("Weather subscribers: %d\n", ps.SubCount("weather"))
	
	// 同步发布示例
	ps.PubWait(Event{"news", "Breaking: Important announcement"}, "news")
	fmt.Println("Important news delivered to all subscribers")
}

性能考虑

  1. 缓冲区大小:创建PubSub实例时指定的缓冲区大小会影响性能。太小的缓冲区可能导致发布者阻塞,太大则可能消耗更多内存。

  2. 消息类型:使用具体类型而非interface{}可以提高性能,但会减少灵活性。

  3. 频道数量:过多的频道会增加管理开销。

  4. 订阅者处理速度:慢速的订阅者会拖累整个系统,考虑使用带缓冲的channel或在订阅者中使用单独的goroutine处理消息。

总结

pubsub库提供了简单高效的发布订阅实现,适合大多数Golang应用中的事件通知需求。它的轻量级特性使其成为微服务内部通信或组件间解耦的理想选择。

回到顶部