Golang中多个WebSocket客户端冲突问题探讨

Golang中多个WebSocket客户端冲突问题探讨 我用Go语言创建了一个Web服务器应用程序。它的功能之一是在浏览器中提供一个屏幕上的行情显示器。我使用gorilla WebSocket包来获取价格,这在股票市场行业中被称为“tick”。每个tick包含股票代码、价格、交易量和其他一些信息。这些tick会被累积起来。浏览器大约每5秒进行一次SSE调用,我发送最新的tick。这个功能运行得相当好,过去一个月我一直在测试它。然而,在日志中,我偶尔会看到错误。调用WebSocketConnection.ReadJSON会返回一个错误:websocket: close 1006 (abnormal closure): unexpected EOF。当这个错误发生时,我关闭连接并尝试创建一个新的。有时成功,有时不成功。如果不成功,错误是“bad handshake”。因此,我实现了重试机制来创建WebSocket连接。最终它通常会成功并继续从套接字读取数据。如果我激活两个访问WebSocket的客户端(goroutine),情况会变得更糟。错误更多,而且似乎在任何给定时间,只有一个能成功读取,另一个失败。然后第一个失败,第二个成功。

所以,我的第一个问题是:我是否正确使用了WebSocket?我应该忽略错误并继续吗?

我提取了一些代码来示例说明这个问题:

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/gorilla/websocket"
	"log"
	"time"
)

func main() {
	stockSymbol1 := "AAPL"
	ch := make(chan struct{})
	ctx, cancelFunc := context.WithCancel(context.Background())
	go webSocket1(ctx, stockSymbol1)

	stockSymbol2 := "TXN"
	go webSocket2(ctx, stockSymbol2)

	<-ch
	cancelFunc()
}

func webSocket1(ctx context.Context, symbol string) {

	readTicksForSymbol(ctx, symbol)
}

func webSocket2(ctx context.Context, symbol string) {

	readTicksForSymbol(ctx, symbol)
}

func readTicksForSymbol(ctx context.Context, symbol string) {
	reading := true

	go func() {
		select {
		case <-ctx.Done():
			reading = false
		}
	}()

	w, err := getFinnHubWebSocketConnection(symbol)
	if err != nil {
		log.Printf("error from getFinnHubWebSocketConnection: %s", err)
	}

	for reading {
		var msg interface{}
		err := w.ReadJSON(&msg)
		if err != nil {
			log.Println(fmt.Sprintf("error from finnHubRealTime.WebSocketConnection.ReadJSON: %s", err))
			w.Close()
			w, err = getFinnHubWebSocketConnection(symbol)
			if err != nil {
				log.Printf("%s error from getFinnHubWebSocketConnection: %s", symbol, err)
			}
		} else {
			msgMap, ok := msg.(map[string]interface{})
			if ok {
				msgType := msgMap["type"].(string)
				log.Printf("msgType: %s", msgType)
				// we need to process pings as well, since realtime.html needs to check when a new day occurs
				if msgType == "trade" {
					msgDataSlice := msgMap["data"].([]interface{})
					log.Printf("received %d dataElementInterface's", len(msgDataSlice))
					for _, dataElementInterface := range msgDataSlice {
						interfaceMap := dataElementInterface.(map[string]interface{})
						log.Printf("interfaceMap: %v", interfaceMap)
					}
				}
			}

		}

	}
}

func getFinnHubWebSocketConnection(symbol string) (*websocket.Conn, error) {

	var w *websocket.Conn
	var err error

	for retries := 0; retries < 100; retries++ {

		log.Println("getFinnHubWebSocketConnection")
		dialer := websocket.Dialer{
			NetDial:           nil,
			NetDialContext:    nil,
			Proxy:             nil,
			TLSClientConfig:   nil,
			HandshakeTimeout:  0,
			ReadBufferSize:    0,
			WriteBufferSize:   0,
			WriteBufferPool:   nil,
			Subprotocols:      nil,
			EnableCompression: false,
			Jar:               nil,
		}

		w, _, err = dialer.Dial("wss://ws.finnhub.io?token=xxxxxxxxxxxxxxxxx", nil)
		if err == nil {
			log.Println(fmt.Sprintf("successful websocket connection after try #: %v", retries))
			break
		} else {
			log.Println(fmt.Sprintf("%s wait 2 seconds after error in getFinnHubWebSocketConnection from dialer.Dial try #: %v error: %v", symbol, retries, err))
			time.Sleep(2 * time.Second)
		}
	}

	msg, _ := json.Marshal(map[string]interface{}{"type": "subscribe", "symbol": symbol})
	msgErr := w.WriteMessage(websocket.TextMessage, msg)
	if msgErr != nil {
		log.Printf("error submitting msg to websocket: %v", msgErr)
	}

	var errorMessage string
	if err != nil {
		errorMessage = fmt.Sprintf("error in getFinnHubWebSocketConnection from dialer.Dial: %v", err)
		return w, errors.New(errorMessage)
	} else {
		return w, nil
	}
}

任何帮助都将不胜感激。


更多关于Golang中多个WebSocket客户端冲突问题探讨的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复

尝试将 ReadJSON 替换为 ReadMessage

更多关于Golang中多个WebSocket客户端冲突问题探讨的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


当然,这符合预期。但对于交易/市场服务器而言,多个客户端通常需要来自多个设备/IP。实际上,许多金融API服务都提供自定义用户方案,以满足您的此类需求。

根据文档(重点是我加的):

流式传输美国股票、外汇和加密货币的实时交易。某些外汇和加密货币交易所可能无法提供交易数据。在这种情况下,将发送一条交易量为0的价格更新消息。一条消息可以包含多笔交易。一个API密钥一次只能打开一个连接。

或许可以复用连接?

感谢您的回复。我会研究一下服务器API。

好的,我已经查阅了服务器相关的可用文档。这并没有提供答案。

应用程序在单个连接下通常运行良好。然而,如果我在一个goroutine中创建第二个连接,连接在大多数情况下会开始失败,尽管两个线程上偶尔也会有一些成功。我认为这会是一个常见的使用场景。一个Web服务器需要支持多个客户端。而多个客户端可能需要从Web套接字获取数据。

你好!我不熟悉finnhub的API,但我认为这个问题与服务器限制有关。这种情况在我使用过的许多交易所和市场平台中都很常见:你的连接由于超时而被关闭(具体来说,是关于工作套接字的,99%的平台都有持续时间限制,以防止独占服务器资源……这就是EOF的原因)。而“握手失败”则源于另一个限制:当你的前一个连接被关闭后,立即从同一IP地址建立新连接时,在那个瞬间你的IP地址处于“黑名单”中,该黑名单会将你的令牌-IP组合锁定几秒或几分钟。通常,同一个令牌/账户在同一时间不能有超过一个连接。

我建议你留意平台的限制(有时这些限制解释得不够充分),并尝试延迟你的下一次连接尝试。注意:如果需要保持连接始终活跃,可以在特定时间关闭连接,然后开启一个新的连接,但要避免触发黑名单机制。不要使用多个连接。

根据你提供的代码和描述,问题主要出现在WebSocket连接的并发管理和错误处理上。以下是具体问题和解决方案:

问题分析

  1. 连接共享冲突:多个goroutine使用相同的WebSocket连接,导致读取冲突
  2. 错误处理不当1006错误是WebSocket的异常关闭,需要正确处理重连
  3. 上下文管理问题reading变量的并发访问存在竞态条件

解决方案

1. 每个goroutine独立连接

type WebSocketClient struct {
    symbol     string
    conn       *websocket.Conn
    ctx        context.Context
    cancel     context.CancelFunc
    reconnect  chan struct{}
    mu         sync.RWMutex
}

func NewWebSocketClient(symbol string) *WebSocketClient {
    ctx, cancel := context.WithCancel(context.Background())
    return &WebSocketClient{
        symbol:    symbol,
        ctx:       ctx,
        cancel:    cancel,
        reconnect: make(chan struct{}, 1),
    }
}

func (c *WebSocketClient) Start() {
    go c.run()
}

func (c *WebSocketClient) run() {
    defer c.cleanup()
    
    for {
        select {
        case <-c.ctx.Done():
            return
        default:
            if err := c.connectAndRead(); err != nil {
                log.Printf("%s connection error: %v", c.symbol, err)
                c.scheduleReconnect()
            }
        }
    }
}

func (c *WebSocketClient) connectAndRead() error {
    conn, err := c.dialWithRetry()
    if err != nil {
        return err
    }
    
    c.mu.Lock()
    c.conn = conn
    c.mu.Unlock()
    
    defer func() {
        c.mu.Lock()
        if c.conn != nil {
            c.conn.Close()
            c.conn = nil
        }
        c.mu.Unlock()
    }()
    
    return c.readMessages()
}

2. 改进的连接和重试机制

func (c *WebSocketClient) dialWithRetry() (*websocket.Conn, error) {
    var conn *websocket.Conn
    var err error
    
    for i := 0; i < 10; i++ {
        select {
        case <-c.ctx.Done():
            return nil, c.ctx.Err()
        default:
        }
        
        dialer := websocket.Dialer{
            HandshakeTimeout: 10 * time.Second,
            ReadBufferSize:   1024,
            WriteBufferSize:  1024,
        }
        
        conn, _, err = dialer.Dial("wss://ws.finnhub.io?token=xxxxxxxxxxxxxxxxx", nil)
        if err == nil {
            if err := c.subscribe(conn); err != nil {
                conn.Close()
                continue
            }
            return conn, nil
        }
        
        if i < 9 {
            backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
            if backoff > 30*time.Second {
                backoff = 30 * time.Second
            }
            time.Sleep(backoff)
        }
    }
    
    return nil, fmt.Errorf("failed to connect after retries: %v", err)
}

func (c *WebSocketClient) subscribe(conn *websocket.Conn) error {
    msg, _ := json.Marshal(map[string]interface{}{
        "type":   "subscribe",
        "symbol": c.symbol,
    })
    
    conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
    if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
        return fmt.Errorf("subscribe failed: %v", err)
    }
    
    conn.SetReadDeadline(time.Time{})
    return nil
}

3. 改进的消息读取处理

func (c *WebSocketClient) readMessages() error {
    c.mu.RLock()
    conn := c.conn
    c.mu.RUnlock()
    
    if conn == nil {
        return errors.New("no connection")
    }
    
    conn.SetReadDeadline(time.Time{})
    conn.SetPongHandler(func(string) error {
        conn.SetReadDeadline(time.Now().Add(60 * time.Second))
        return nil
    })
    
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    go func() {
        for range ticker.C {
            c.mu.RLock()
            if c.conn != nil {
                c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second))
            }
            c.mu.RUnlock()
        }
    }()
    
    for {
        select {
        case <-c.ctx.Done():
            return c.ctx.Err()
        default:
        }
        
        var msg map[string]interface{}
        if err := conn.ReadJSON(&msg); err != nil {
            if websocket.IsUnexpectedCloseError(err, 
                websocket.CloseGoingAway,
                websocket.CloseAbnormalClosure,
                websocket.CloseNormalClosure) {
                return fmt.Errorf("websocket closed: %v", err)
            }
            return err
        }
        
        if err := c.handleMessage(msg); err != nil {
            log.Printf("%s message handling error: %v", c.symbol, err)
        }
    }
}

func (c *WebSocketClient) handleMessage(msg map[string]interface{}) error {
    msgType, ok := msg["type"].(string)
    if !ok {
        return errors.New("invalid message format")
    }
    
    switch msgType {
    case "trade":
        data, ok := msg["data"].([]interface{})
        if !ok {
            return errors.New("invalid trade data")
        }
        
        for _, item := range data {
            if trade, ok := item.(map[string]interface{}); ok {
                log.Printf("%s trade: %v", c.symbol, trade)
            }
        }
    case "ping":
        c.mu.RLock()
        if c.conn != nil {
            c.conn.WriteJSON(map[string]string{"type": "pong"})
        }
        c.mu.RUnlock()
    }
    
    return nil
}

4. 使用示例

func main() {
    clients := []*WebSocketClient{
        NewWebSocketClient("AAPL"),
        NewWebSocketClient("TXN"),
        NewWebSocketClient("MSFT"),
    }
    
    for _, client := range clients {
        client.Start()
    }
    
    // 等待信号
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
    <-ch
    
    for _, client := range clients {
        client.Stop()
    }
}

关键改进点

  1. 连接隔离:每个股票代码使用独立的WebSocket连接和goroutine
  2. 优雅重连:指数退避重试机制,避免频繁重连
  3. 心跳保持:实现Ping/Pong机制保持连接活跃
  4. 并发安全:使用互斥锁保护共享状态
  5. 资源清理:确保连接正确关闭,避免资源泄漏

这种设计可以避免多个客户端之间的冲突,同时提供更稳定的连接管理。1006错误通常表示连接异常断开,通过上述重连机制可以自动恢复。

回到顶部