Golang WebStomp实时通信开发

我在使用Golang开发WebStomp实时通信功能时遇到了一些问题。想请教大家:

  1. 如何正确配置Golang的WebStomp服务器端?目前连接总是失败。
  2. 有没有推荐的前端库可以配合Golang的WebStomp实现实时通信?
  3. 在处理大量并发连接时,需要注意哪些性能优化点?
  4. 有没有完整的示例代码可以参考?

感谢各位大佬指点!

2 回复

使用Golang实现WebStomp实时通信,可以结合gorilla/websocket和go-stomp库。

  1. 建立WebSocket连接:
var upgrader = websocket.Upgrader{}
conn, _ := upgrader.Upgrade(w, r, nil)
  1. 连接STOMP服务器:
stompConn, _ := stomp.Dial("tcp", "localhost:61613")
  1. 订阅消息:
sub, _ := stompConn.Subscribe("/topic/chat", stomp.AckAuto)
  1. 消息处理:
go func() {
    for {
        msg := <-sub.C
        // 处理接收到的消息
        conn.WriteMessage(websocket.TextMessage, msg.Body)
    }
}()
  1. 发送消息:
stompConn.Send("/topic/chat", "text/plain", []byte("Hello"))

关键点:

  • 使用WebSocket作为传输层
  • STOMP协议处理消息路由
  • 支持发布/订阅模式
  • 注意连接管理和错误处理

这种方案适合需要消息路由功能的实时应用,如聊天室、实时数据推送等场景。

更多关于Golang WebStomp实时通信开发的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Golang中实现WebSocket与STOMP协议的实时通信,可以通过以下步骤完成:

1. 安装依赖包

go get github.com/gorilla/websocket
go get github.com/go-stomp/stomp

2. WebSocket服务器实现

package main

import (
    "log"
    "net/http"
    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true // 生产环境应严格检查
    },
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println("WebSocket升级失败:", err)
        return
    }
    defer conn.Close()
    
    // STOMP连接处理
    handleStompConnection(conn)
}

func handleStompConnection(wsConn *websocket.Conn) {
    for {
        messageType, message, err := wsConn.ReadMessage()
        if err != nil {
            log.Println("读取消息失败:", err)
            break
        }
        
        // 处理STOMP帧
        stompFrame, err := parseStompFrame(string(message))
        if err != nil {
            log.Println("解析STOMP帧失败:", err)
            continue
        }
        
        // 根据STOMP命令处理
        switch stompFrame.Command {
        case "CONNECT":
            handleConnect(wsConn, stompFrame)
        case "SUBSCRIBE":
            handleSubscribe(wsConn, stompFrame)
        case "SEND":
            handleSend(wsConn, stompFrame)
        case "DISCONNECT":
            return
        }
    }
}

3. STOMP帧解析

type StompFrame struct {
    Command string
    Headers map[string]string
    Body    string
}

func parseStompFrame(data string) (*StompFrame, error) {
    lines := strings.Split(data, "\n")
    frame := &StompFrame{
        Command: lines[0],
        Headers: make(map[string]string),
    }
    
    // 解析头部
    for i := 1; i < len(lines); i++ {
        line := lines[i]
        if line == "" {
            // 空行后是消息体
            if i+1 < len(lines) {
                frame.Body = strings.Join(lines[i+1:], "\n")
            }
            break
        }
        
        parts := strings.SplitN(line, ":", 2)
        if len(parts) == 2 {
            frame.Headers[parts[0]] = parts[1]
        }
    }
    
    return frame, nil
}

4. 消息处理函数

var subscriptions = make(map[string][]*websocket.Conn)

func handleConnect(wsConn *websocket.Conn, frame *StompFrame) {
    // 发送CONNECTED帧
    response := "CONNECTED\nversion:1.2\n\n\x00"
    wsConn.WriteMessage(websocket.TextMessage, []byte(response))
}

func handleSubscribe(wsConn *websocket.Conn, frame *StompFrame) {
    destination := frame.Headers["destination"]
    if destination != "" {
        subscriptions[destination] = append(subscriptions[destination], wsConn)
    }
}

func handleSend(wsConn *websocket.Conn, frame *StompFrame) {
    destination := frame.Headers["destination"]
    if subscribers, exists := subscriptions[destination]; exists {
        // 构建MESSAGE帧
        messageFrame := buildMessageFrame(frame.Body, destination)
        
        // 广播给所有订阅者
        for _, subscriber := range subscribers {
            if subscriber != wsConn { // 可选:不发送给发送者自己
                subscriber.WriteMessage(websocket.TextMessage, []byte(messageFrame))
            }
        }
    }
}

func buildMessageFrame(body, destination string) string {
    return fmt.Sprintf("MESSAGE\ndestination:%s\ncontent-type:text/plain\n\n%s\x00", 
        destination, body)
}

5. 启动服务器

func main() {
    http.HandleFunc("/ws", handleWebSocket)
    log.Println("WebSocket STOMP服务器启动在 :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

6. 前端使用示例

<script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script>
<script>
const socket = new WebSocket('ws://localhost:8080/ws');
const stompClient = Stomp.over(socket);

stompClient.connect({}, function(frame) {
    // 订阅主题
    stompClient.subscribe('/topic/chat', function(message) {
        console.log('收到消息:', message.body);
    });
    
    // 发送消息
    stompClient.send('/topic/chat', {}, 'Hello, STOMP!');
});
</script>

这个实现提供了基本的WebSocket STOMP功能,包括连接管理、订阅发布和消息广播。可以根据需要添加身份验证、错误处理和更复杂的路由逻辑。

回到顶部