golang实现消息/事件发布订阅模式的插件库hub的使用

Golang 实现消息/事件发布订阅模式的插件库 hub 的使用

简介

Hub 是一个快速的 Event Hub 库,用于 Go 应用程序实现发布/订阅模式,支持类似 rabbitMQ 交换机的主题模式。

安装

你可以使用 go get 安装这个库,但建议使用版本标签或管理依赖:

go get -u github.com/leandro-lugaresi/hub

或者使用 dep:

dep ensure --add github.com/leandro-lugaresi/hub

使用

订阅者

Hub 提供两种类型的订阅者:

  1. Subscriber:默认的订阅者,是阻塞式的。如果通道已满,发送操作会阻塞,直到订阅者消费消息。
  2. NonBlockingSubscriber:非阻塞订阅者,发布端永远不会阻塞,但如果通道容量已满,发布操作会丢失并触发警报。适用于可以接受数据丢失的场景,如指标、日志等。

主题

库使用类似 rabbitMQ 主题交换机的概念,消息名称用于匹配所有符合主题的订阅者。主题必须是由点(.)分隔的单词列表,支持以下特殊字符:

  • * (星号) 可以替代一个单词

示例代码

下面是一个完整的示例 demo:

package main

import (
	"fmt"
	"sync"

	"github.com/leandro-lugaresi/hub"
)

func main() {
	h := hub.New()
	var wg sync.WaitGroup
	
	// 创建一个容量为10的缓冲通道
	// 如果要创建无缓冲通道,使用0作为容量
	sub := h.Subscribe(10, "account.login.*", "account.changepassword.*")
	wg.Add(1)
	
	go func(s hub.Subscription) {
		for msg := range s.Receiver {
			fmt.Printf("收到主题为 %s 的消息,ID 为 %d\n", msg.Name, msg.Fields["id"])
		}
		wg.Done()
	}(sub)

	// 发布消息
	h.Publish(hub.Message{
		Name:   "account.login.failed",
		Fields: hub.Fields{"id": 123},
	})
	h.Publish(hub.Message{
		Name:   "account.changepassword.failed",
		Fields: hub.Fields{"id": 456},
	})
	h.Publish(hub.Message{
		Name:   "account.login.success",
		Fields: hub.Fields{"id": 123},
	})
	
	// 这条消息不会被路由到订阅者
	h.Publish(hub.Message{
		Name:   "account.foo.failed",
		Fields: hub.Fields{"id": 789},
	})

	// 关闭所有订阅者
	h.Close()
	// 等待缓冲区中的所有消息处理完成
	wg.Wait()

	// 输出:
	// 收到主题为 account.login.failed 的消息,ID 为 123
	// 收到主题为 account.changepassword.failed 的消息,ID 为 456
	// 收到主题为 account.login.success 的消息,ID 为 123
}

性能

吞吐量测试

在 Intel® Core™ i5-4460 CPU @ 3.20GHz x4 上测试结果:

1317530.091292 msg/sec

CSTrie

该项目内部使用了由 @tylertreat 实现的并发订阅树(CSTrie)。


更多关于golang实现消息/事件发布订阅模式的插件库hub的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现消息/事件发布订阅模式的插件库hub的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang 消息/事件发布订阅模式实现 - Hub 库使用指南

消息/事件发布订阅(Pub/Sub)模式是一种常见的解耦设计模式,在Go中可以通过多种方式实现。下面我将介绍一个简单高效的Hub库实现及使用方法。

基本概念

发布订阅模式包含三个核心组件:

  1. 发布者(Publisher):负责发布消息/事件
  2. 订阅者(Subscriber):注册对特定消息/事件的兴趣
  3. 消息中心(Hub):负责接收消息并分发给订阅者

简单Hub实现

以下是一个线程安全的简单Hub实现:

package hub

import (
	"sync"
)

type Message interface{}

type Hub struct {
	subscribers map[string][]chan Message
	mu          sync.RWMutex
}

func NewHub() *Hub {
	return &Hub{
		subscribers: make(map[string][]chan Message),
	}
}

// 订阅主题
func (h *Hub) Subscribe(topic string) chan Message {
	h.mu.Lock()
	defer h.mu.Unlock()

	ch := make(chan Message, 100) // 缓冲通道防止阻塞
	h.subscribers[topic] = append(h.subscribers[topic], ch)
	return ch
}

// 发布消息
func (h *Hub) Publish(topic string, msg Message) {
	h.mu.RLock()
	defer h.mu.RUnlock()

	if subs, ok := h.subscribers[topic]; ok {
		for _, ch := range subs {
			ch <- msg
		}
	}
}

// 取消订阅
func (h *Hub) Unsubscribe(topic string, ch chan Message) {
	h.mu.Lock()
	defer h.mu.Unlock()

	if subs, ok := h.subscribers[topic]; ok {
		for i, sub := range subs {
			if sub == ch {
				h.subscribers[topic] = append(subs[:i], subs[i+1:]...)
				close(ch)
				break
			}
		}
	}
}

使用示例

package main

import (
	"fmt"
	"time"
)

func main() {
	hub := NewHub()

	// 订阅者1订阅"news"主题
	sub1 := hub.Subscribe("news")
	go func() {
		for msg := range sub1 {
			fmt.Printf("Subscriber 1 received: %v\n", msg)
		}
		fmt.Println("Subscriber 1 closed")
	}()

	// 订阅者2订阅"news"主题
	sub2 := hub.Subscribe("news")
	go func() {
		for msg := range sub2 {
			fmt.Printf("Subscriber 2 received: %v\n", msg)
		}
		fmt.Println("Subscriber 2 closed")
	}()

	// 订阅者3订阅"weather"主题
	sub3 := hub.Subscribe("weather")
	go func() {
		for msg := range sub3 {
			fmt.Printf("Subscriber 3 received: %v\n", msg)
		}
		fmt.Println("Subscriber 3 closed")
	}()

	// 发布消息
	hub.Publish("news", "Breaking news: Go 1.20 released!")
	hub.Publish("weather", "Today's weather: Sunny, 25°C")
	
	time.Sleep(time.Second) // 等待消息处理
	
	// 取消订阅
	hub.Unsubscribe("news", sub1)
	
	hub.Publish("news", "Another news: AI makes progress")
	
	time.Sleep(time.Second) // 等待消息处理
}

高级特性扩展

如果需要更强大的功能,可以考虑以下扩展:

  1. 通配符订阅:支持类似"news.*"的通配符模式
  2. 持久化:将消息持久化到数据库
  3. 消息过滤:订阅时增加过滤条件
  4. QoS保证:实现不同级别的消息传递保证

现有成熟库推荐

如果不想自己实现,可以考虑以下成熟的开源库:

  1. go-eventbus:轻量级事件总线

    import "github.com/asaskevich/EventBus"
    
    bus := EventBus.New()
    bus.Subscribe("topic", func(msg string) { /* handler */ })
    bus.Publish("topic", "message")
    
  2. emitter:高性能事件发射器

    import "github.com/olebedev/emitter"
    
    e := &emitter.Emitter{}
    e.On("event", func(event *emitter.Event) { /* handler */ })
    e.Emit("event", "data")
    
  3. watermill:用于构建事件驱动应用的库

    import "github.com/ThreeDotsLabs/watermill"
    
    pubSub := watermill.NewStdPubSub(false, watermill.NopLogger{})
    messages, _ := pubSub.Subscribe(context.Background(), "topic")
    pubSub.Publish("topic", watermill.NewMessage("1", []byte("payload")))
    

最佳实践

  1. 合理设置通道缓冲区大小
  2. 确保订阅者及时处理消息,避免阻塞
  3. 使用context实现优雅的取消订阅
  4. 考虑消息序列化格式(JSON, Protobuf等)
  5. 在高并发场景下注意性能优化

这个简单的Hub实现足以满足大多数基本发布订阅需求,对于更复杂的场景可以考虑使用上述成熟的第三方库。

回到顶部