使用Golang实现聊天应用中的发布订阅模式

使用Golang实现聊天应用中的发布订阅模式 我已经用Go语言创建了一个基础的聊天应用。我有服务器端代码和客户端代码。现在我想在我已经创建的聊天应用中加入发布-订阅消息模式。我该怎么做?我原来的聊天应用没有使用WebSocket,它是一个非常基础的应用程序。如果需要,我也可以分享我现有的代码。

2 回复

这是一个非常宽泛的问题。我想我会从将你的应用程序切换到使用WebSocket或类似技术开始。举个例子,你可以查看我写的这个概念验证,我相信你可以重新利用其中的一些代码。我使用了github.com/gorilla/websocket,它有很棒的文档和示例。现在我想起来,他们有一个聊天示例,可能对你有用。

更多关于使用Golang实现聊天应用中的发布订阅模式的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中实现聊天应用的发布订阅模式,可以通过以下方式实现:

1. 核心发布订阅结构

package main

import (
    "sync"
)

// 订阅者接口
type Subscriber interface {
    Send(message string)
    GetID() string
}

// 发布订阅管理器
type PubSub struct {
    subscribers map[string][]Subscriber
    mu          sync.RWMutex
}

func NewPubSub() *PubSub {
    return &PubSub{
        subscribers: make(map[string][]Subscriber),
    }
}

// 订阅主题
func (ps *PubSub) Subscribe(topic string, subscriber Subscriber) {
    ps.mu.Lock()
    defer ps.mu.Unlock()
    
    ps.subscribers[topic] = append(ps.subscribers[topic], subscriber)
}

// 取消订阅
func (ps *PubSub) Unsubscribe(topic string, subscriberID string) {
    ps.mu.Lock()
    defer ps.mu.Unlock()
    
    subscribers := ps.subscribers[topic]
    for i, sub := range subscribers {
        if sub.GetID() == subscriberID {
            ps.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...)
            break
        }
    }
}

// 发布消息到主题
func (ps *PubSub) Publish(topic string, message string) {
    ps.mu.RLock()
    defer ps.mu.RUnlock()
    
    for _, subscriber := range ps.subscribers[topic] {
        go subscriber.Send(message)
    }
}

2. 聊天客户端实现

type ChatClient struct {
    ID       string
    Username string
    SendChan chan string
}

func NewChatClient(id, username string) *ChatClient {
    return &ChatClient{
        ID:       id,
        Username: username,
        SendChan: make(chan string, 100),
    }
}

func (c *ChatClient) Send(message string) {
    c.SendChan <- message
}

func (c *ChatClient) GetID() string {
    return c.ID
}

// 客户端消息处理循环
func (c *ChatClient) HandleMessages() {
    for msg := range c.SendChan {
        fmt.Printf("Client %s received: %s\n", c.Username, msg)
    }
}

3. 集成到聊天服务器

type ChatServer struct {
    pubsub   *PubSub
    clients  map[string]*ChatClient
    mu       sync.RWMutex
}

func NewChatServer() *ChatServer {
    return &ChatServer{
        pubsub:  NewPubSub(),
        clients: make(map[string]*ChatClient),
    }
}

// 用户加入聊天室
func (cs *ChatServer) JoinChat(client *ChatClient, room string) {
    cs.mu.Lock()
    cs.clients[client.ID] = client
    cs.mu.Unlock()
    
    cs.pubsub.Subscribe(room, client)
    cs.pubsub.Publish(room, fmt.Sprintf("%s joined the chat", client.Username))
    
    go client.HandleMessages()
}

// 发送消息到聊天室
func (cs *ChatServer) SendMessage(senderID, room, message string) {
    cs.mu.RLock()
    sender, exists := cs.clients[senderID]
    cs.mu.RUnlock()
    
    if exists {
        fullMessage := fmt.Sprintf("%s: %s", sender.Username, message)
        cs.pubsub.Publish(room, fullMessage)
    }
}

// 用户离开聊天室
func (cs *ChatServer) LeaveChat(clientID, room string) {
    cs.mu.Lock()
    client, exists := cs.clients[clientID]
    delete(cs.clients, clientID)
    cs.mu.Unlock()
    
    if exists {
        cs.pubsub.Unsubscribe(room, clientID)
        cs.pubsub.Publish(room, fmt.Sprintf("%s left the chat", client.Username))
        close(client.SendChan)
    }
}

4. 使用示例

func main() {
    server := NewChatServer()
    
    // 创建客户端
    client1 := NewChatClient("1", "Alice")
    client2 := NewChatClient("2", "Bob")
    client3 := NewChatClient("3", "Charlie")
    
    // 加入聊天室
    server.JoinChat(client1, "general")
    server.JoinChat(client2, "general")
    server.JoinChat(client3, "general")
    
    // 发送消息
    server.SendMessage("1", "general", "Hello everyone!")
    server.SendMessage("2", "general", "Hi Alice!")
    
    // 创建另一个聊天室
    server.JoinChat(client1, "private")
    server.JoinChat(client2, "private")
    server.SendMessage("1", "private", "Private message to Bob")
    
    // 离开聊天室
    time.Sleep(1 * time.Second)
    server.LeaveChat("3", "general")
    
    // 保持程序运行
    time.Sleep(2 * time.Second)
}

5. 支持多个主题的扩展

// 支持模式匹配的主题订阅
type PatternPubSub struct {
    pubsub *PubSub
}

func (pps *PatternPubSub) SubscribePattern(pattern string, subscriber Subscriber) {
    // 实现基于通配符的模式订阅
    // 例如: "room.*", "user.*.status"
}

func (pps *PatternPubSub) PublishToPattern(topic string, message string) {
    // 发布到匹配模式的所有主题
}

这个实现提供了线程安全的发布订阅机制,可以轻松集成到现有的聊天应用中。每个聊天室对应一个主题,客户端订阅感兴趣的主题来接收消息。

回到顶部