Golang中Gorilla Websocket并发使用时的故障排查

Golang中Gorilla Websocket并发使用时的故障排查 我有一个使用Gorilla WebSocket的应用程序,地址是:“GitHub - gorilla/websocket: gorilla/websocket包是一个快速、经过良好测试且广泛使用的Go语言WebSocket实现。” 我的应用程序会打开连接,发送一个股票代码列表,然后在一个循环中读取通过该连接返回的消息,直到连接被取消或出错。 使用Go协程,我希望同时监控多个股票列表。当打开第二个连接且第一个连接正在读取消息时,会发生错误。在我的代码中,我通过迭代编号来标识协程,由于Go协程的工作方式,无法保证哪个连接会先被尝试。运行此代码时出现错误,因此我提取了必要的代码来演示该问题。我对WebSocket还比较陌生,所以很可能是我操作有误。我将发送示例代码和结果:

package main

import (
	"fmt"
	"github.com/gorilla/websocket"
	"net/url"
	"sync"
)

type Stock struct {
	Symbol string `json:"symbol"`
	Type   string `json:"type"`
}

func main() {
	// WebSocket连接的URL和令牌

	var wg *sync.WaitGroup = &sync.WaitGroup{}

	for i := 0; i < 2; i++ {
		wg.Add(1)
		go processStocks(i, wg)
	}

	wg.Wait()
}

func processStocks(interationNumber int, wg *sync.WaitGroup) {
	iterationStr := fmt.Sprintf("%d", interationNumber)

	u := url.URL{Scheme: "wss", Host: "ws.finnhub.io", RawQuery: "token=cbbb00iad3ibhoa1vbcg"}

	fmt.Printf("begin processStocks() for iteration: %s\n", iterationStr)

	// 股票代码列表
	stocks := []Stock{
		{"IBM", "subscribe"},
		// 在此处添加更多股票
	}

	defer wg.Done()

	// 创建新的WebSocket连接
	fmt.Printf("begin Dial() for iteration: %s\n", iterationStr)
	c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
	if err != nil {
		//cAddress := fmt.Sprintf("%p", c)
		fmt.Printf("failed dial error for iteration:%s address:%p: %v\n", iterationStr, c, err)
	} else {
		fmt.Printf("successful connection at address: %p for iteration: %s \n", c, iterationStr)
	}
	defer func(c *websocket.Conn) {
		err := c.Close()
		if err != nil {
			fmt.Printf("iteration %s Close error: %v\n", iterationStr, err)
		}
	}(c)

	// 发送股票代码列表
	for _, stock := range stocks {
		fmt.Printf("begin WriteJSON() for iteration: %s address: %p\n", iterationStr, c)
		err := c.WriteJSON(stock)
		if err != nil {
			fmt.Printf("iteration %s write error: %v\n", iterationStr, err)
			return
		} else {
			fmt.Printf("successful write at address: %p for iteration: %s \n", c, iterationStr)
		}

	}

	// 无限循环读取结果
	for {
		fmt.Printf("begin ReadMessage() for iteration:%s address:%p\n", iterationStr, c)
		_, message, err := c.ReadMessage()
		if err != nil {
			fmt.Printf("iteration %s read error: %v address:%p: \n", iterationStr, err, c)
			return
		} else {
			fmt.Printf("iteration %s successful read: [%s] address:%p: \n", iterationStr, message, c)
		}
	}
}

输出:

begin processStocks() for iteration: 1
begin Dial() for iteration: 1
begin processStocks() for iteration: 0
begin Dial() for iteration: 0
successful connection at address: 0x14000012580 for iteration: 0 
begin WriteJSON() for iteration: 0 address: 0x14000012580
successful write at address: 0x14000012580 for iteration: 0 
begin ReadMessage() for iteration:0 address:0x14000012580

*iteration 0 read error: websocket: close 1006 (abnormal closure): unexpected EOF address:0x14000012580:* 

successful connection at address: 0x140000126e0 for iteration: 1 
begin WriteJSON() for iteration: 1 address: 0x140000126e0
successful write at address: 0x140000126e0 for iteration: 1 
begin ReadMessage() for iteration:1 address:0x140000126e0
iteration 1 successful read: [{"data":[{"c":["1","12"],"p":144.995,"s":"IBM","t":1691083530560,"v":1}],"type":"trade"}] address:0x140000126e0: 
begin ReadMessage() for iteration:1 address:0x140000126e0

顺便提一下,这里面包含我的密钥,你可以用它来重现问题,不过我很快就会更改它,反正它是免费的。


更多关于Golang中Gorilla Websocket并发使用时的故障排查的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

你可以换个思路,在多个 goroutine 之间共享 WebSocket 连接。

更多关于Golang中Gorilla Websocket并发使用时的故障排查的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


好的,明白了。API密钥仅对单个连接有效,因此每个用户都需要获取自己的密钥,或者我需要与提供商合作,看看能否通过获取多用户密钥之类的方式来实现。至少我现在可以停止尝试修复这个无法修复的问题了。

在并发使用Gorilla WebSocket时,你遇到的问题很可能是由于共享的websocket.DefaultDialer导致的。websocket.DefaultDialer是一个全局变量,多个goroutine同时使用时可能会产生竞争条件。以下是修复后的代码:

package main

import (
	"fmt"
	"github.com/gorilla/websocket"
	"net/url"
	"sync"
	"time"
)

type Stock struct {
	Symbol string `json:"symbol"`
	Type   string `json:"type"`
}

func main() {
	var wg sync.WaitGroup

	for i := 0; i < 2; i++ {
		wg.Add(1)
		go processStocks(i, &wg)
	}

	wg.Wait()
}

func processStocks(iterationNumber int, wg *sync.WaitGroup) {
	defer wg.Done()
	
	iterationStr := fmt.Sprintf("%d", iterationNumber)
	u := url.URL{
		Scheme:   "wss",
		Host:     "ws.finnhub.io",
		RawQuery: "token=cbbb00iad3ibhoa1vbcg",
	}

	fmt.Printf("begin processStocks() for iteration: %s\n", iterationStr)

	// 为每个goroutine创建独立的Dialer
	dialer := &websocket.Dialer{
		HandshakeTimeout: 45 * time.Second,
		ReadBufferSize:   1024,
		WriteBufferSize:  1024,
	}

	// 股票代码列表
	stocks := []Stock{
		{"IBM", "subscribe"},
	}

	// 创建WebSocket连接
	fmt.Printf("begin Dial() for iteration: %s\n", iterationStr)
	c, _, err := dialer.Dial(u.String(), nil)
	if err != nil {
		fmt.Printf("failed dial error for iteration:%s: %v\n", iterationStr, err)
		return
	}
	defer c.Close()
	
	fmt.Printf("successful connection at address: %p for iteration: %s\n", c, iterationStr)

	// 设置读取超时
	c.SetReadDeadline(time.Now().Add(60 * time.Second))
	c.SetPongHandler(func(string) error {
		c.SetReadDeadline(time.Now().Add(60 * time.Second))
		return nil
	})

	// 发送股票代码列表
	for _, stock := range stocks {
		fmt.Printf("begin WriteJSON() for iteration: %s address: %p\n", iterationStr, c)
		err := c.WriteJSON(stock)
		if err != nil {
			fmt.Printf("iteration %s write error: %v\n", iterationStr, err)
			return
		}
		fmt.Printf("successful write at address: %p for iteration: %s\n", c, iterationStr)
	}

	// 启用ping/pong保持连接
	go func() {
		ticker := time.NewTicker(30 * time.Second)
		defer ticker.Stop()
		
		for {
			select {
			case <-ticker.C:
				if err := c.WriteMessage(websocket.PingMessage, nil); err != nil {
					return
				}
			}
		}
	}()

	// 读取消息
	for {
		fmt.Printf("begin ReadMessage() for iteration:%s address:%p\n", iterationStr, c)
		messageType, message, err := c.ReadMessage()
		if err != nil {
			fmt.Printf("iteration %s read error: %v address:%p\n", iterationStr, err, c)
			return
		}
		
		switch messageType {
		case websocket.TextMessage:
			fmt.Printf("iteration %s successful read: [%s] address:%p\n", iterationStr, message, c)
		case websocket.BinaryMessage:
			fmt.Printf("iteration %s binary message received address:%p\n", iterationStr, c)
		case websocket.CloseMessage:
			fmt.Printf("iteration %s connection closed address:%p\n", iterationStr, c)
			return
		}
	}
}

关键修改:

  1. 使用独立的Dialer:为每个goroutine创建独立的websocket.Dialer实例,避免共享全局DefaultDialer导致的竞争条件。

  2. 设置连接超时:配置HandshakeTimeout确保连接握手不会无限期等待。

  3. 添加心跳机制:通过ping/pong消息保持WebSocket连接活跃,防止因空闲而断开。

  4. 设置读取超时:使用SetReadDeadline避免读取操作永久阻塞。

  5. 正确处理消息类型:根据messageType区分不同类型的WebSocket消息。

  6. 修复WaitGroup使用:将wg作为指针传递,避免值复制问题。

这些修改应该能解决你遇到的并发连接问题。每个goroutine现在都有独立的Dialer配置和连接管理,避免了资源共享导致的冲突。

回到顶部