Golang中多个WebSocket客户端冲突问题探讨
Golang中多个WebSocket客户端冲突问题探讨
我用Go语言创建了一个Web服务器应用程序。它的功能之一是在浏览器中提供一个屏幕上的行情显示器。我使用gorilla WebSocket包来获取价格,这在股票市场行业中被称为“tick”。每个tick包含股票代码、价格、交易量和其他一些信息。这些tick会被累积起来。浏览器大约每5秒进行一次SSE调用,我发送最新的tick。这个功能运行得相当好,过去一个月我一直在测试它。然而,在日志中,我偶尔会看到错误。调用WebSocketConnection.ReadJSON会返回一个错误:websocket: close 1006 (abnormal closure): unexpected EOF。当这个错误发生时,我关闭连接并尝试创建一个新的。有时成功,有时不成功。如果不成功,错误是“bad handshake”。因此,我实现了重试机制来创建WebSocket连接。最终它通常会成功并继续从套接字读取数据。如果我激活两个访问WebSocket的客户端(goroutine),情况会变得更糟。错误更多,而且似乎在任何给定时间,只有一个能成功读取,另一个失败。然后第一个失败,第二个成功。
所以,我的第一个问题是:我是否正确使用了WebSocket?我应该忽略错误并继续吗?
我提取了一些代码来示例说明这个问题:
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/gorilla/websocket"
"log"
"time"
)
func main() {
stockSymbol1 := "AAPL"
ch := make(chan struct{})
ctx, cancelFunc := context.WithCancel(context.Background())
go webSocket1(ctx, stockSymbol1)
stockSymbol2 := "TXN"
go webSocket2(ctx, stockSymbol2)
<-ch
cancelFunc()
}
func webSocket1(ctx context.Context, symbol string) {
readTicksForSymbol(ctx, symbol)
}
func webSocket2(ctx context.Context, symbol string) {
readTicksForSymbol(ctx, symbol)
}
func readTicksForSymbol(ctx context.Context, symbol string) {
reading := true
go func() {
select {
case <-ctx.Done():
reading = false
}
}()
w, err := getFinnHubWebSocketConnection(symbol)
if err != nil {
log.Printf("error from getFinnHubWebSocketConnection: %s", err)
}
for reading {
var msg interface{}
err := w.ReadJSON(&msg)
if err != nil {
log.Println(fmt.Sprintf("error from finnHubRealTime.WebSocketConnection.ReadJSON: %s", err))
w.Close()
w, err = getFinnHubWebSocketConnection(symbol)
if err != nil {
log.Printf("%s error from getFinnHubWebSocketConnection: %s", symbol, err)
}
} else {
msgMap, ok := msg.(map[string]interface{})
if ok {
msgType := msgMap["type"].(string)
log.Printf("msgType: %s", msgType)
// we need to process pings as well, since realtime.html needs to check when a new day occurs
if msgType == "trade" {
msgDataSlice := msgMap["data"].([]interface{})
log.Printf("received %d dataElementInterface's", len(msgDataSlice))
for _, dataElementInterface := range msgDataSlice {
interfaceMap := dataElementInterface.(map[string]interface{})
log.Printf("interfaceMap: %v", interfaceMap)
}
}
}
}
}
}
func getFinnHubWebSocketConnection(symbol string) (*websocket.Conn, error) {
var w *websocket.Conn
var err error
for retries := 0; retries < 100; retries++ {
log.Println("getFinnHubWebSocketConnection")
dialer := websocket.Dialer{
NetDial: nil,
NetDialContext: nil,
Proxy: nil,
TLSClientConfig: nil,
HandshakeTimeout: 0,
ReadBufferSize: 0,
WriteBufferSize: 0,
WriteBufferPool: nil,
Subprotocols: nil,
EnableCompression: false,
Jar: nil,
}
w, _, err = dialer.Dial("wss://ws.finnhub.io?token=xxxxxxxxxxxxxxxxx", nil)
if err == nil {
log.Println(fmt.Sprintf("successful websocket connection after try #: %v", retries))
break
} else {
log.Println(fmt.Sprintf("%s wait 2 seconds after error in getFinnHubWebSocketConnection from dialer.Dial try #: %v error: %v", symbol, retries, err))
time.Sleep(2 * time.Second)
}
}
msg, _ := json.Marshal(map[string]interface{}{"type": "subscribe", "symbol": symbol})
msgErr := w.WriteMessage(websocket.TextMessage, msg)
if msgErr != nil {
log.Printf("error submitting msg to websocket: %v", msgErr)
}
var errorMessage string
if err != nil {
errorMessage = fmt.Sprintf("error in getFinnHubWebSocketConnection from dialer.Dial: %v", err)
return w, errors.New(errorMessage)
} else {
return w, nil
}
}
任何帮助都将不胜感激。
更多关于Golang中多个WebSocket客户端冲突问题探讨的实战教程也可以访问 https://www.itying.com/category-94-b0.html
尝试将 ReadJSON 替换为 ReadMessage
更多关于Golang中多个WebSocket客户端冲突问题探讨的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
当然,这符合预期。但对于交易/市场服务器而言,多个客户端通常需要来自多个设备/IP。实际上,许多金融API服务都提供自定义用户方案,以满足您的此类需求。
根据文档(重点是我加的):
流式传输美国股票、外汇和加密货币的实时交易。某些外汇和加密货币交易所可能无法提供交易数据。在这种情况下,将发送一条交易量为0的价格更新消息。一条消息可以包含多笔交易。一个API密钥一次只能打开一个连接。
或许可以复用连接?
感谢您的回复。我会研究一下服务器API。
好的,我已经查阅了服务器相关的可用文档。这并没有提供答案。
应用程序在单个连接下通常运行良好。然而,如果我在一个goroutine中创建第二个连接,连接在大多数情况下会开始失败,尽管两个线程上偶尔也会有一些成功。我认为这会是一个常见的使用场景。一个Web服务器需要支持多个客户端。而多个客户端可能需要从Web套接字获取数据。
你好!我不熟悉finnhub的API,但我认为这个问题与服务器限制有关。这种情况在我使用过的许多交易所和市场平台中都很常见:你的连接由于超时而被关闭(具体来说,是关于工作套接字的,99%的平台都有持续时间限制,以防止独占服务器资源……这就是EOF的原因)。而“握手失败”则源于另一个限制:当你的前一个连接被关闭后,立即从同一IP地址建立新连接时,在那个瞬间你的IP地址处于“黑名单”中,该黑名单会将你的令牌-IP组合锁定几秒或几分钟。通常,同一个令牌/账户在同一时间不能有超过一个连接。
我建议你留意平台的限制(有时这些限制解释得不够充分),并尝试延迟你的下一次连接尝试。注意:如果需要保持连接始终活跃,可以在特定时间关闭连接,然后开启一个新的连接,但要避免触发黑名单机制。不要使用多个连接。
根据你提供的代码和描述,问题主要出现在WebSocket连接的并发管理和错误处理上。以下是具体问题和解决方案:
问题分析
- 连接共享冲突:多个goroutine使用相同的WebSocket连接,导致读取冲突
- 错误处理不当:
1006错误是WebSocket的异常关闭,需要正确处理重连 - 上下文管理问题:
reading变量的并发访问存在竞态条件
解决方案
1. 每个goroutine独立连接
type WebSocketClient struct {
symbol string
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
reconnect chan struct{}
mu sync.RWMutex
}
func NewWebSocketClient(symbol string) *WebSocketClient {
ctx, cancel := context.WithCancel(context.Background())
return &WebSocketClient{
symbol: symbol,
ctx: ctx,
cancel: cancel,
reconnect: make(chan struct{}, 1),
}
}
func (c *WebSocketClient) Start() {
go c.run()
}
func (c *WebSocketClient) run() {
defer c.cleanup()
for {
select {
case <-c.ctx.Done():
return
default:
if err := c.connectAndRead(); err != nil {
log.Printf("%s connection error: %v", c.symbol, err)
c.scheduleReconnect()
}
}
}
}
func (c *WebSocketClient) connectAndRead() error {
conn, err := c.dialWithRetry()
if err != nil {
return err
}
c.mu.Lock()
c.conn = conn
c.mu.Unlock()
defer func() {
c.mu.Lock()
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.mu.Unlock()
}()
return c.readMessages()
}
2. 改进的连接和重试机制
func (c *WebSocketClient) dialWithRetry() (*websocket.Conn, error) {
var conn *websocket.Conn
var err error
for i := 0; i < 10; i++ {
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
default:
}
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
conn, _, err = dialer.Dial("wss://ws.finnhub.io?token=xxxxxxxxxxxxxxxxx", nil)
if err == nil {
if err := c.subscribe(conn); err != nil {
conn.Close()
continue
}
return conn, nil
}
if i < 9 {
backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
time.Sleep(backoff)
}
}
return nil, fmt.Errorf("failed to connect after retries: %v", err)
}
func (c *WebSocketClient) subscribe(conn *websocket.Conn) error {
msg, _ := json.Marshal(map[string]interface{}{
"type": "subscribe",
"symbol": c.symbol,
})
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return fmt.Errorf("subscribe failed: %v", err)
}
conn.SetReadDeadline(time.Time{})
return nil
}
3. 改进的消息读取处理
func (c *WebSocketClient) readMessages() error {
c.mu.RLock()
conn := c.conn
c.mu.RUnlock()
if conn == nil {
return errors.New("no connection")
}
conn.SetReadDeadline(time.Time{})
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
go func() {
for range ticker.C {
c.mu.RLock()
if c.conn != nil {
c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second))
}
c.mu.RUnlock()
}
}()
for {
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
}
var msg map[string]interface{}
if err := conn.ReadJSON(&msg); err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
websocket.CloseNormalClosure) {
return fmt.Errorf("websocket closed: %v", err)
}
return err
}
if err := c.handleMessage(msg); err != nil {
log.Printf("%s message handling error: %v", c.symbol, err)
}
}
}
func (c *WebSocketClient) handleMessage(msg map[string]interface{}) error {
msgType, ok := msg["type"].(string)
if !ok {
return errors.New("invalid message format")
}
switch msgType {
case "trade":
data, ok := msg["data"].([]interface{})
if !ok {
return errors.New("invalid trade data")
}
for _, item := range data {
if trade, ok := item.(map[string]interface{}); ok {
log.Printf("%s trade: %v", c.symbol, trade)
}
}
case "ping":
c.mu.RLock()
if c.conn != nil {
c.conn.WriteJSON(map[string]string{"type": "pong"})
}
c.mu.RUnlock()
}
return nil
}
4. 使用示例
func main() {
clients := []*WebSocketClient{
NewWebSocketClient("AAPL"),
NewWebSocketClient("TXN"),
NewWebSocketClient("MSFT"),
}
for _, client := range clients {
client.Start()
}
// 等待信号
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
for _, client := range clients {
client.Stop()
}
}
关键改进点
- 连接隔离:每个股票代码使用独立的WebSocket连接和goroutine
- 优雅重连:指数退避重试机制,避免频繁重连
- 心跳保持:实现Ping/Pong机制保持连接活跃
- 并发安全:使用互斥锁保护共享状态
- 资源清理:确保连接正确关闭,避免资源泄漏
这种设计可以避免多个客户端之间的冲突,同时提供更稳定的连接管理。1006错误通常表示连接异常断开,通过上述重连机制可以自动恢复。

