Golang中Gorilla Websocket并发使用时的故障排查
Golang中Gorilla Websocket并发使用时的故障排查 我有一个使用Gorilla WebSocket的应用程序,地址是:“GitHub - gorilla/websocket: gorilla/websocket包是一个快速、经过良好测试且广泛使用的Go语言WebSocket实现。” 我的应用程序会打开连接,发送一个股票代码列表,然后在一个循环中读取通过该连接返回的消息,直到连接被取消或出错。 使用Go协程,我希望同时监控多个股票列表。当打开第二个连接且第一个连接正在读取消息时,会发生错误。在我的代码中,我通过迭代编号来标识协程,由于Go协程的工作方式,无法保证哪个连接会先被尝试。运行此代码时出现错误,因此我提取了必要的代码来演示该问题。我对WebSocket还比较陌生,所以很可能是我操作有误。我将发送示例代码和结果:
package main
import (
"fmt"
"github.com/gorilla/websocket"
"net/url"
"sync"
)
type Stock struct {
Symbol string `json:"symbol"`
Type string `json:"type"`
}
func main() {
// WebSocket连接的URL和令牌
var wg *sync.WaitGroup = &sync.WaitGroup{}
for i := 0; i < 2; i++ {
wg.Add(1)
go processStocks(i, wg)
}
wg.Wait()
}
func processStocks(interationNumber int, wg *sync.WaitGroup) {
iterationStr := fmt.Sprintf("%d", interationNumber)
u := url.URL{Scheme: "wss", Host: "ws.finnhub.io", RawQuery: "token=cbbb00iad3ibhoa1vbcg"}
fmt.Printf("begin processStocks() for iteration: %s\n", iterationStr)
// 股票代码列表
stocks := []Stock{
{"IBM", "subscribe"},
// 在此处添加更多股票
}
defer wg.Done()
// 创建新的WebSocket连接
fmt.Printf("begin Dial() for iteration: %s\n", iterationStr)
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
//cAddress := fmt.Sprintf("%p", c)
fmt.Printf("failed dial error for iteration:%s address:%p: %v\n", iterationStr, c, err)
} else {
fmt.Printf("successful connection at address: %p for iteration: %s \n", c, iterationStr)
}
defer func(c *websocket.Conn) {
err := c.Close()
if err != nil {
fmt.Printf("iteration %s Close error: %v\n", iterationStr, err)
}
}(c)
// 发送股票代码列表
for _, stock := range stocks {
fmt.Printf("begin WriteJSON() for iteration: %s address: %p\n", iterationStr, c)
err := c.WriteJSON(stock)
if err != nil {
fmt.Printf("iteration %s write error: %v\n", iterationStr, err)
return
} else {
fmt.Printf("successful write at address: %p for iteration: %s \n", c, iterationStr)
}
}
// 无限循环读取结果
for {
fmt.Printf("begin ReadMessage() for iteration:%s address:%p\n", iterationStr, c)
_, message, err := c.ReadMessage()
if err != nil {
fmt.Printf("iteration %s read error: %v address:%p: \n", iterationStr, err, c)
return
} else {
fmt.Printf("iteration %s successful read: [%s] address:%p: \n", iterationStr, message, c)
}
}
}
输出:
begin processStocks() for iteration: 1
begin Dial() for iteration: 1
begin processStocks() for iteration: 0
begin Dial() for iteration: 0
successful connection at address: 0x14000012580 for iteration: 0
begin WriteJSON() for iteration: 0 address: 0x14000012580
successful write at address: 0x14000012580 for iteration: 0
begin ReadMessage() for iteration:0 address:0x14000012580
*iteration 0 read error: websocket: close 1006 (abnormal closure): unexpected EOF address:0x14000012580:*
successful connection at address: 0x140000126e0 for iteration: 1
begin WriteJSON() for iteration: 1 address: 0x140000126e0
successful write at address: 0x140000126e0 for iteration: 1
begin ReadMessage() for iteration:1 address:0x140000126e0
iteration 1 successful read: [{"data":[{"c":["1","12"],"p":144.995,"s":"IBM","t":1691083530560,"v":1}],"type":"trade"}] address:0x140000126e0:
begin ReadMessage() for iteration:1 address:0x140000126e0
顺便提一下,这里面包含我的密钥,你可以用它来重现问题,不过我很快就会更改它,反正它是免费的。
更多关于Golang中Gorilla Websocket并发使用时的故障排查的实战教程也可以访问 https://www.itying.com/category-94-b0.html
你可以换个思路,在多个 goroutine 之间共享 WebSocket 连接。
更多关于Golang中Gorilla Websocket并发使用时的故障排查的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
好的,明白了。API密钥仅对单个连接有效,因此每个用户都需要获取自己的密钥,或者我需要与提供商合作,看看能否通过获取多用户密钥之类的方式来实现。至少我现在可以停止尝试修复这个无法修复的问题了。
在并发使用Gorilla WebSocket时,你遇到的问题很可能是由于共享的websocket.DefaultDialer导致的。websocket.DefaultDialer是一个全局变量,多个goroutine同时使用时可能会产生竞争条件。以下是修复后的代码:
package main
import (
"fmt"
"github.com/gorilla/websocket"
"net/url"
"sync"
"time"
)
type Stock struct {
Symbol string `json:"symbol"`
Type string `json:"type"`
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go processStocks(i, &wg)
}
wg.Wait()
}
func processStocks(iterationNumber int, wg *sync.WaitGroup) {
defer wg.Done()
iterationStr := fmt.Sprintf("%d", iterationNumber)
u := url.URL{
Scheme: "wss",
Host: "ws.finnhub.io",
RawQuery: "token=cbbb00iad3ibhoa1vbcg",
}
fmt.Printf("begin processStocks() for iteration: %s\n", iterationStr)
// 为每个goroutine创建独立的Dialer
dialer := &websocket.Dialer{
HandshakeTimeout: 45 * time.Second,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// 股票代码列表
stocks := []Stock{
{"IBM", "subscribe"},
}
// 创建WebSocket连接
fmt.Printf("begin Dial() for iteration: %s\n", iterationStr)
c, _, err := dialer.Dial(u.String(), nil)
if err != nil {
fmt.Printf("failed dial error for iteration:%s: %v\n", iterationStr, err)
return
}
defer c.Close()
fmt.Printf("successful connection at address: %p for iteration: %s\n", c, iterationStr)
// 设置读取超时
c.SetReadDeadline(time.Now().Add(60 * time.Second))
c.SetPongHandler(func(string) error {
c.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
// 发送股票代码列表
for _, stock := range stocks {
fmt.Printf("begin WriteJSON() for iteration: %s address: %p\n", iterationStr, c)
err := c.WriteJSON(stock)
if err != nil {
fmt.Printf("iteration %s write error: %v\n", iterationStr, err)
return
}
fmt.Printf("successful write at address: %p for iteration: %s\n", c, iterationStr)
}
// 启用ping/pong保持连接
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}()
// 读取消息
for {
fmt.Printf("begin ReadMessage() for iteration:%s address:%p\n", iterationStr, c)
messageType, message, err := c.ReadMessage()
if err != nil {
fmt.Printf("iteration %s read error: %v address:%p\n", iterationStr, err, c)
return
}
switch messageType {
case websocket.TextMessage:
fmt.Printf("iteration %s successful read: [%s] address:%p\n", iterationStr, message, c)
case websocket.BinaryMessage:
fmt.Printf("iteration %s binary message received address:%p\n", iterationStr, c)
case websocket.CloseMessage:
fmt.Printf("iteration %s connection closed address:%p\n", iterationStr, c)
return
}
}
}
关键修改:
-
使用独立的Dialer:为每个goroutine创建独立的
websocket.Dialer实例,避免共享全局DefaultDialer导致的竞争条件。 -
设置连接超时:配置
HandshakeTimeout确保连接握手不会无限期等待。 -
添加心跳机制:通过ping/pong消息保持WebSocket连接活跃,防止因空闲而断开。
-
设置读取超时:使用
SetReadDeadline避免读取操作永久阻塞。 -
正确处理消息类型:根据
messageType区分不同类型的WebSocket消息。 -
修复WaitGroup使用:将
wg作为指针传递,避免值复制问题。
这些修改应该能解决你遇到的并发连接问题。每个goroutine现在都有独立的Dialer配置和连接管理,避免了资源共享导致的冲突。

