Golang中如何实现发布变更与订阅变更的设计模式
Golang中如何实现发布变更与订阅变更的设计模式 我阅读了许多文章,观看了许多教程视频,但大多数资料的共同点是向所有用户发布变更。而不是订阅并获取仅与登录用户相关的信息。
另一个共同点是Websocket协议。似乎还有许多其他方法可以解决这个问题(马克·吐温),我有些不确定该选择哪一种。
我正在寻找一种方式,让用户登录并连接到服务器(Websocket服务器?),然后说类似 “嘿,我是Joe,请随时更新关于我的东西的信息”。
我已经成功设置了一个websocket服务器,但我对如何进行下去感到更加困惑。应该如何思考?我找到的最接近的资料是这篇和这个。但这超出了我的能力范围。我完全无法理解。
在这个阶段,我主要对“如何思考”感兴趣。不针对特定语言。
欢迎任何意见或方向!
更多关于Golang中如何实现发布变更与订阅变更的设计模式的实战教程也可以访问 https://www.itying.com/category-94-b0.html
你是否研究过使用现有的消息系统,比如 NATS?
更多关于Golang中如何实现发布变更与订阅变更的设计模式的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Sibert:
"listen tcp 127.0.0.1:2222: bind: address already in use"
看起来是另一个进程正在使用 2222 端口。 要排查此问题:
- 尝试使用另一个端口
- 检查正在运行的进程列表,看看是否有另一个服务器进程仍在运行(可能是之前启动的)
- 尝试运行
./server --help来查看服务器是否有用于增加日志输出详细程度的标志
从未听说过。与 Gorilla/Websocket 相比,主要区别是什么?
我这样理解对吗:你可以通过这种方式模拟 Socket.io 中的“房间”?ClientID = “房间”?
sc, err := stan.Connect(clusterID, clientID, stan.NatsURL(url),
那么,我如何获取“Joes stuff”?向 PostgreSQL 发送查询吗?是一次性查询还是每 5 分钟查询一次?
Sibert:
与 Gorilla/Websocket 相比,主要区别是什么?
我们这里讨论的是不同的层级。Websocket 是一种网络通信协议,与 HTTP 处于相同的协议层级,而 NATS 是一个消息传递系统,运行在 TCP 或 Websocket 这类网络协议之上。
Sibert:
我这样理解是否正确:你可以用这种方式模拟 Socket.io 中的“房间”?
我不熟悉 Socket.io,无法告诉你其文档之外的内容。看起来 Socket.io 确实有一种使用所谓的 rooms 的广播机制,允许向加入该房间的客户端发送消息。这听起来很像你正在寻找的——一种向已订阅客户端发布消息的解决方案。
这听起来很像你正在寻找的——一个向订阅客户端发布消息的解决方案。
是的!我了解过NATS,它似乎值得一试。我可以在Debian 10上启动NATS服务器,但看起来服务器并没有在监听。
[3265] 2022/10/04 11:28:20.779634 [INF] Starting nats-server
[3265] 2022/10/04 11:28:20.779813 [INF] Version: 2.9.2
[3265] 2022/10/04 11:28:20.779846 [INF] Git: [6d81dde]
[3265] 2022/10/04 11:28:20.779883 [INF] Name: NDQU3V6PO7F63NIRX7AC65QZKB3DM73OIJADYBIO2JJMQOEE5HZSYE4Q
[3265] 2022/10/04 11:28:20.779911 [INF] ID: NDQU3V6PO7F63NIRX7AC65QZKB3DM73OIJADYBIO2JJMQOEE5HZSYE4Q
[3265] 2022/10/04 11:28:20.780183 [FTL] Error listening on port: 127.0.0.1:2222, "listen tcp 127.0.0.1:2222: bind: address already in use"
我在这里找到了可执行文件 nats-server-v2.9.2-linux-amd64。我使用以下命令启动服务器:
cd /home/nats/
./server --a 127.0.0.1 --port 2222
没有使用Docker。基本上,我大致遵循了这个教程。
有什么线索知道我哪里做错了吗?
在Go中实现用户专属的发布-订阅模式,核心是建立用户ID与连接之间的映射关系。以下是具体实现思路和示例代码:
核心架构
package main
import (
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
// 用户连接管理器
type ClientManager struct {
clients map[string]*websocket.Conn // userID -> connection
broadcast chan UserMessage
register chan Client
unregister chan string
mu sync.RWMutex
}
type Client struct {
UserID string
Conn *websocket.Conn
}
type UserMessage struct {
UserID string
Message []byte
}
连接注册与用户绑定
func (manager *ClientManager) Start() {
for {
select {
case client := <-manager.register:
manager.mu.Lock()
manager.clients[client.UserID] = client.Conn
manager.mu.Unlock()
log.Printf("User %s registered", client.UserID)
case userID := <-manager.unregister:
manager.mu.Lock()
if conn, ok := manager.clients[userID]; ok {
conn.Close()
delete(manager.clients, userID)
}
manager.mu.Unlock()
case msg := <-manager.broadcast:
manager.mu.RLock()
if conn, ok := manager.clients[msg.UserID]; ok {
if err := conn.WriteMessage(websocket.TextMessage, msg.Message); err != nil {
conn.Close()
delete(manager.clients, msg.UserID)
}
}
manager.mu.RUnlock()
}
}
}
WebSocket处理器
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
func (manager *ClientManager) ServeWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// 从认证token或请求参数中获取用户ID
userID := r.URL.Query().Get("user_id")
if userID == "" {
conn.Close()
return
}
client := Client{
UserID: userID,
Conn: conn,
}
manager.register <- client
defer func() {
manager.unregister <- userID
}()
// 保持连接活跃
for {
_, _, err := conn.ReadMessage()
if err != nil {
break
}
}
}
发布用户专属消息
func (manager *ClientManager) SendToUser(userID string, message []byte) {
msg := UserMessage{
UserID: userID,
Message: message,
}
manager.broadcast <- msg
}
// 使用示例
func main() {
manager := &ClientManager{
clients: make(map[string]*websocket.Conn),
broadcast: make(chan UserMessage),
register: make(chan Client),
unregister: make(chan string),
}
go manager.Start()
http.HandleFunc("/ws", manager.ServeWS)
// 模拟业务逻辑:向特定用户发送更新
go func() {
// 当Joe的数据变更时
manager.SendToUser("joe123", []byte(`{"type":"order_update","data":{...}}`))
}()
log.Fatal(http.ListenAndServe(":8080", nil))
}
消息路由扩展
对于更复杂的路由需求,可以使用多级映射:
type SubscriptionManager struct {
userSubscriptions map[string]map[string]bool // userID -> [channel]bool
channelSubscribers map[string]map[string]bool // channel -> [userID]bool
mu sync.RWMutex
}
func (sm *SubscriptionManager) Subscribe(userID, channel string) {
sm.mu.Lock()
defer sm.mu.Unlock()
if sm.userSubscriptions[userID] == nil {
sm.userSubscriptions[userID] = make(map[string]bool)
}
sm.userSubscriptions[userID][channel] = true
if sm.channelSubscribers[channel] == nil {
sm.channelSubscribers[channel] = make(map[string]bool)
}
sm.channelSubscribers[channel][userID] = true
}
func (sm *SubscriptionManager) Publish(channel string, message []byte) {
sm.mu.RLock()
defer sm.mu.RUnlock()
if subscribers, ok := sm.channelSubscribers[channel]; ok {
for userID := range subscribers {
// 通过ClientManager发送给具体用户
clientManager.SendToUser(userID, message)
}
}
}
连接心跳保持
func (c *Client) KeepAlive() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.Conn.WriteControl(
websocket.PingMessage,
[]byte{},
time.Now().Add(10*time.Second),
); err != nil {
return
}
}
}
}
这种设计模式的关键点:
- 每个WebSocket连接与用户ID强绑定
- 通过用户ID到连接映射实现精准消息投递
- 支持用户订阅多个频道,实现细粒度控制
- 使用读写锁保证并发安全
实际部署时需要考虑连接重连、消息持久化、集群扩展等问题,但以上代码提供了用户专属发布-订阅的核心实现框架。

