Golang SSE客户端实现方案

在Golang中实现SSE客户端时,如何正确处理长连接和事件流的解析?目前遇到连接不稳定或消息解析不全的问题,想了解最佳实践方案,比如是否需要自定义重连机制、如何高效处理分块数据,以及有哪些成熟的第三方库推荐?

2 回复

Golang实现SSE客户端主要有两种方式:

  1. 标准库net/http方案
resp, err := http.NewRequest("GET", url, nil)
resp.Header.Set("Accept", "text/event-stream")
// 处理响应流
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
    line := scanner.Text()
    // 解析SSE事件
}
  1. 使用eventsource包(推荐):
import "github.com/antage/eventsource"
es := eventsource.New(nil, nil)
defer es.Close()

// 监听事件
es.EventSource("/events", func(ev eventsource.Event) {
    fmt.Printf("Event: %s\n", ev.Data())
})

关键点:

  • 设置正确Header:Accept: text/event-stream
  • 保持长连接,处理断线重连
  • 解析事件格式(data/id/event字段)
  • 注意连接超时处理

建议使用现成的事件源库,简化连接管理和消息解析。

更多关于Golang SSE客户端实现方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Golang中实现SSE(Server-Sent Events)客户端,可以通过标准库net/http实现。以下是核心方案:

1. 基础实现

package main

import (
    "bufio"
    "fmt"
    "net/http"
    "strings"
)

func main() {
    req, _ := http.NewRequest("GET", "http://example.com/events", nil)
    req.Header.Set("Accept", "text/event-stream")
    req.Header.Set("Cache-Control", "no-cache")

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        panic(err)
    }
    defer resp.Body.Close()

    scanner := bufio.NewScanner(resp.Body)
    for scanner.Scan() {
        line := scanner.Text()
        
        if strings.HasPrefix(line, "data:") {
            data := strings.TrimPrefix(line, "data: ")
            fmt.Printf("Received: %s\n", data)
        }
        
        if line == "" {
            // 空行表示事件结束
            fmt.Println("Event end")
        }
    }
}

2. 增强功能版本

type SSEClient struct {
    URL    string
    Events chan Event
}

type Event struct {
    Data  string
    Event string
    ID    string
}

func (c *SSEClient) Connect() error {
    req, _ := http.NewRequest("GET", c.URL, nil)
    req.Header.Set("Accept", "text/event-stream")
    req.Header.Set("Cache-Control", "no-cache")

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }

    go c.readStream(resp)
    return nil
}

func (c *SSEClient) readStream(resp *http.Response) {
    defer resp.Body.Close()
    scanner := bufio.NewScanner(resp.Body)
    
    var currentEvent Event
    for scanner.Scan() {
        line := scanner.Text()
        
        switch {
        case strings.HasPrefix(line, "data:"):
            currentEvent.Data = strings.TrimSpace(strings.TrimPrefix(line, "data:"))
        case strings.HasPrefix(line, "event:"):
            currentEvent.Event = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
        case strings.HasPrefix(line, "id:"):
            currentEvent.ID = strings.TrimSpace(strings.TrimPrefix(line, "id:"))
        case line == "":
            if currentEvent.Data != "" {
                c.Events <- currentEvent
                currentEvent = Event{}
            }
        }
    }
}

// 使用示例
func main() {
    client := &SSEClient{
        URL:    "http://localhost:8080/events",
        Events: make(chan Event),
    }
    
    go client.Connect()
    
    for event := range client.Events {
        fmt.Printf("Event: %s, Data: %s\n", event.Event, event.Data)
    }
}

关键点:

  1. 设置正确的HTTP头:Accept: text/event-stream
  2. 处理多行数据和支持的事件字段
  3. 空行作为事件分隔符
  4. 保持长连接处理流式数据

可选增强:

  • 添加重连机制
  • 支持身份验证
  • 处理不同的编码格式
  • 添加超时控制

这个实现提供了基本的SSE客户端功能,可以根据具体需求进行扩展。

回到顶部