Golang中Server-sent事件为何无法传递到所有客户端

Golang中Server-sent事件为何无法传递到所有客户端 希望我为这类问题选择了正确的分类……

我找到了这个示例,它展示了如何从 Golang(使用 Gin)发送服务器发送事件(SSE)并在 Angular 应用中消费它们。

服务器每秒发送一个 SSE。事件包含一个时间戳,以便更好地分析结果。

当只有一个客户端时,它的工作方式符合我的预期——每秒收到一个事件。

然而我注意到,当有三个客户端时(我在同一台机器上使用不同的浏览器来模拟一些真实场景),事件不会发送到所有客户端。每个客户端将每三秒收到一个事件(或者如果有两个客户端在监听,则每两秒收到一个事件)。

(注意每个客户端窗口时间戳中 3 秒的间隔)

pic2

这是 SSE 技术的正常行为,还是该示例实现特有的问题?

你会建议使用 SSE 来构建一个“用户活动实时流”,用于显示其他人当前正在查看的内容吗?当我扩展这个示例时,它基本上可以工作,只是存在这个“奇怪”的现象,即并非所有事件都能在所有客户端上看到。


更多关于Golang中Server-sent事件为何无法传递到所有客户端的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中Server-sent事件为何无法传递到所有客户端的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


这是一个典型的并发处理问题,不是SSE技术本身的限制。问题在于示例代码使用了全局变量和简单的循环广播方式,没有正确处理多个客户端的并发连接。

以下是问题分析和解决方案:

问题根源: 示例代码可能使用了类似这样的广播方式:

var clients = make(map[chan string]bool)

func broadcast(msg string) {
    for client := range clients {
        client <- msg  // 这里会阻塞
    }
}

当向一个阻塞的channel发送数据时,会等待直到数据被接收。如果有多个客户端,每个客户端都会阻塞整个广播循环。

正确实现方案:

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
    "github.com/gin-gonic/gin"
)

type Client struct {
    messages chan string
}

type Server struct {
    clients map[*Client]bool
    mu      sync.RWMutex
}

func NewServer() *Server {
    return &Server{
        clients: make(map[*Client]bool),
    }
}

func (s *Server) AddClient(c *gin.Context) {
    client := &Client{
        messages: make(chan string, 100), // 缓冲channel避免阻塞
    }
    
    s.mu.Lock()
    s.clients[client] = true
    s.mu.Unlock()
    
    c.Stream(func(w io.Writer) bool {
        select {
        case msg := <-client.messages:
            c.SSEvent("message", msg)
            return true
        case <-c.Request.Context().Done():
            s.RemoveClient(client)
            return false
        }
    })
}

func (s *Server) RemoveClient(client *Client) {
    s.mu.Lock()
    delete(s.clients, client)
    s.mu.Unlock()
    close(client.messages)
}

func (s *Server) Broadcast(msg string) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    for client := range s.clients {
        select {
        case client.messages <- msg:
            // 消息成功发送
        default:
            // 客户端channel满,跳过或处理背压
            fmt.Println("Client channel full, skipping message")
        }
    }
}

func main() {
    server := NewServer()
    r := gin.Default()
    
    r.GET("/events", func(c *gin.Context) {
        c.Header("Content-Type", "text/event-stream")
        c.Header("Cache-Control", "no-cache")
        c.Header("Connection", "keep-alive")
        server.AddClient(c)
    })
    
    // 模拟每秒广播消息
    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()
        
        for {
            select {
            case <-ticker.C:
                msg := fmt.Sprintf("Time: %v", time.Now().Unix())
                server.Broadcast(msg)
            }
        }
    }()
    
    r.Run(":8080")
}

关键改进:

  1. 使用带缓冲的channel(make(chan string, 100))避免发送阻塞
  2. 使用互斥锁保护共享的clients map
  3. 每个客户端有独立的channel
  4. 使用select语句避免channel满时的阻塞
  5. 正确处理连接断开时的清理

对于用户活动实时流的建议: SSE完全适合这种场景。上述修复后的实现可以确保所有客户端都能实时接收事件。你还可以考虑以下优化:

// 添加消息类型支持
type Event struct {
    Type    string      `json:"type"`
    UserID  string      `json:"user_id"`
    Action  string      `json:"action"`
    Page    string      `json:"page"`
    Time    time.Time   `json:"time"`
}

// 针对特定用户组广播
func (s *Server) BroadcastToUsers(event Event, userIDs []string) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    for client := range s.clients {
        // 这里可以根据业务逻辑过滤接收者
        select {
        case client.messages <- event:
        default:
            // 处理背压
        }
    }
}

这样每个客户端都能每秒收到事件,不会出现3秒间隔的问题。SSE在HTTP/1.1和HTTP/2下都能良好工作,适合实时用户活动流场景。

回到顶部