Golang实现Server-Sent Events的完整指南

Golang实现Server-Sent Events的完整指南 如何在 Go 中实现服务器发送事件

有没有人在他们的项目中使用服务器发送事件?如果有,用于哪些用例?

packagemain #30: How to Implement Server-Sent Events in Go

2 回复

很高兴知道 Package Main 有一个 YouTube 频道。

更多关于Golang实现Server-Sent Events的完整指南的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中实现Server-Sent Events(SSE)非常直接,以下是一个完整的实现示例:

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"
)

type Message struct {
    ID    int    `json:"id"`
    Event string `json:"event"`
    Data  string `json:"data"`
}

func sseHandler(w http.ResponseWriter, r *http.Request) {
    // 设置SSE所需的响应头
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    // 创建flusher以支持流式响应
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
        return
    }

    // 发送初始连接消息
    fmt.Fprintf(w, "event: connected\ndata: Connection established\n\n")
    flusher.Flush()

    // 模拟实时数据推送
    messageID := 1
    for {
        select {
        case <-r.Context().Done():
            // 客户端断开连接
            log.Println("Client disconnected")
            return
        case <-time.After(2 * time.Second):
            // 每2秒发送一条消息
            msg := Message{
                ID:    messageID,
                Event: "update",
                Data:  fmt.Sprintf("Current time: %s", time.Now().Format(time.RFC3339)),
            }

            // 将消息转换为JSON
            jsonData, err := json.Marshal(msg)
            if err != nil {
                log.Printf("Error marshaling message: %v", err)
                continue
            }

            // 发送SSE格式的消息
            fmt.Fprintf(w, "id: %d\n", messageID)
            fmt.Fprintf(w, "event: %s\n", msg.Event)
            fmt.Fprintf(w, "data: %s\n\n", jsonData)
            flusher.Flush()

            messageID++
        }
    }
}

func main() {
    http.HandleFunc("/events", sseHandler)
    
    // 提供HTML客户端页面
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "text/html")
        fmt.Fprint(w, `
<!DOCTYPE html>
<html>
<head>
    <title>SSE Client</title>
</head>
<body>
    <h1>Server-Sent Events Demo</h1>
    <div id="messages"></div>
    
    <script>
        const eventSource = new EventSource('/events');
        
        eventSource.addEventListener('connected', function(e) {
            console.log('Connected:', e.data);
        });
        
        eventSource.addEventListener('update', function(e) {
            const msg = JSON.parse(e.data);
            const div = document.getElementById('messages');
            div.innerHTML += `<p>ID: ${msg.id}, Event: ${msg.event}, Data: ${msg.data}</p>`;
        });
        
        eventSource.onerror = function(e) {
            console.error('EventSource failed:', e);
        };
    </script>
</body>
</html>
        `)
    })

    log.Println("Server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

关于SSE的常见用例:

  1. 实时通知系统
// 通知推送示例
func notificationHandler(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream")
    flusher, _ := w.(http.Flusher)
    
    // 模拟数据库轮询或消息队列监听
    for {
        // 检查新通知
        notifications := checkNewNotifications()
        for _, notif := range notifications {
            fmt.Fprintf(w, "event: notification\n")
            fmt.Fprintf(w, "data: %s\n\n", notif)
            flusher.Flush()
        }
        time.Sleep(1 * time.Second)
    }
}
  1. 实时数据监控仪表盘
// 系统监控数据推送
func metricsHandler(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream")
    flusher, _ := w.(http.Flusher)
    
    for {
        metrics := collectSystemMetrics()
        jsonMetrics, _ := json.Marshal(metrics)
        
        fmt.Fprintf(w, "event: metrics\n")
        fmt.Fprintf(w, "data: %s\n\n", jsonMetrics)
        flusher.Flush()
        
        time.Sleep(5 * time.Second)
    }
}
  1. 实时聊天应用
// 聊天消息广播
type ChatServer struct {
    clients map[chan string]bool
    mu      sync.RWMutex
}

func (cs *ChatServer) broadcastHandler(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream")
    flusher, _ := w.(http.Flusher)
    
    messageChan := make(chan string, 10)
    
    cs.mu.Lock()
    cs.clients[messageChan] = true
    cs.mu.Unlock()
    
    defer func() {
        cs.mu.Lock()
        delete(cs.clients, messageChan)
        cs.mu.Unlock()
        close(messageChan)
    }()
    
    for msg := range messageChan {
        fmt.Fprintf(w, "data: %s\n\n", msg)
        flusher.Flush()
    }
}

SSE在Go项目中的优势包括:

  • 自动重连机制
  • 简单的协议格式
  • 内置的消息ID跟踪
  • 与HTTP/1.1和HTTP/2兼容
  • 比WebSocket更轻量级,适用于单向数据流场景

这个实现包含了完整的服务器端和客户端示例,可以直接运行测试SSE功能。

回到顶部