Golang实现Websocket监听MySQL数据库变更事件

Golang实现Websocket监听MySQL数据库变更事件 我正在使用一个名为Gorilla的WebSocket库。我可以用这个库实现一些简单的功能,但现在我需要获取MySQL表中发生变更(插入)的事件,并找到其中client_id等于WebSocket连接客户端ID的记录,然后将该记录的数据发送给相应用户。有没有办法实现这个功能?即使需要使用其他WebSocket库也可以…有人有好的思路吗?

谢谢。

1 回复

更多关于Golang实现Websocket监听MySQL数据库变更事件的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


要实现通过WebSocket监听MySQL数据库变更并推送特定数据给相应用户,可以使用MySQL的binlog监听机制结合WebSocket连接管理。以下是使用gorilla/websocketgo-mysql库的完整实现方案:

package main

import (
    "database/sql"
    "encoding/json"
    "log"
    "net/http"
    "sync"

    "github.com/gorilla/websocket"
    "github.com/siddontang/go-mysql/canal"
    "github.com/siddontang/go-mysql/mysql"
    _ "github.com/go-sql-driver/mysql"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool { return true },
}

// 客户端连接管理
type ClientManager struct {
    clients    map[string]*websocket.Conn
    clientsMux sync.RWMutex
}

var manager = ClientManager{
    clients: make(map[string]*websocket.Conn),
}

func (cm *ClientManager) addClient(clientID string, conn *websocket.Conn) {
    cm.clientsMux.Lock()
    defer cm.clientsMux.Unlock()
    cm.clients[clientID] = conn
}

func (cm *ClientManager) removeClient(clientID string) {
    cm.clientsMux.Lock()
    defer cm.clientsMux.Unlock()
    delete(cm.clients, clientID)
}

func (cm *ClientManager) sendToClient(clientID string, data []byte) {
    cm.clientsMux.RLock()
    defer cm.clientsMux.RUnlock()
    if conn, exists := cm.clients[clientID]; exists {
        err := conn.WriteMessage(websocket.TextMessage, data)
        if err != nil {
            log.Printf("发送消息失败: %v", err)
            conn.Close()
            cm.removeClient(clientID)
        }
    }
}

// WebSocket处理
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
    clientID := r.URL.Query().Get("client_id")
    if clientID == "" {
        http.Error(w, "缺少client_id参数", http.StatusBadRequest)
        return
    }

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("WebSocket升级失败: %v", err)
        return
    }
    defer conn.Close()

    manager.addClient(clientID, conn)
    log.Printf("客户端连接: %s", clientID)

    // 保持连接
    for {
        if _, _, err := conn.ReadMessage(); err != nil {
            manager.removeClient(clientID)
            log.Printf("客户端断开: %s", clientID)
            break
        }
    }
}

// MySQL binlog监听
type MyEventHandler struct {
    canal.DummyEventHandler
}

func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
    if e.Table.Name == "your_table_name" && e.Action == "insert" {
        for _, row := range e.Rows {
            // 假设client_id在第二列(根据实际表结构调整)
            clientID, ok := row[1].(string)
            if !ok {
                continue
            }

            // 构建要发送的数据
            data := map[string]interface{}{
                "action":   e.Action,
                "table":    e.Table.Name,
                "client_id": clientID,
                "data":     row,
            }

            jsonData, err := json.Marshal(data)
            if err != nil {
                log.Printf("JSON编码失败: %v", err)
                continue
            }

            // 发送给对应客户端
            manager.sendToClient(clientID, jsonData)
        }
    }
    return nil
}

func (h *MyEventHandler) String() string {
    return "MyEventHandler"
}

func startBinlogListener() {
    cfg := canal.NewDefaultConfig()
    cfg.Addr = "127.0.0.1:3306"
    cfg.User = "your_username"
    cfg.Password = "your_password"
    cfg.Dump.ExecutionPath = ""

    c, err := canal.NewCanal(cfg)
    if err != nil {
        log.Fatal(err)
    }

    c.SetEventHandler(&MyEventHandler{})
    err = c.Run()
    if err != nil {
        log.Fatal(err)
    }
}

func main() {
    // 启动binlog监听
    go startBinlogListener()

    // 启动WebSocket服务器
    http.HandleFunc("/ws", handleWebSocket)
    log.Println("WebSocket服务器启动 :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

需要安装的依赖:

go get github.com/gorilla/websocket
go get github.com/siddontang/go-mysql
go get github.com/go-sql-driver/mysql

关键实现说明:

  1. WebSocket连接管理:使用ClientManager维护client_id与WebSocket连接的映射关系
  2. MySQL binlog监听:使用go-mysql库监听数据库变更事件
  3. 数据过滤:在OnRow方法中检查表名、操作类型和client_id
  4. 消息推送:通过WebSocket向特定客户端发送JSON格式的数据

使用示例:

// 客户端连接
const ws = new WebSocket('ws://localhost:8080/ws?client_id=user123');
ws.onmessage = function(event) {
    console.log('收到数据:', JSON.parse(event.data));
};

注意:需要确保MySQL已开启binlog并设置正确的权限。实际使用时需要根据表结构调整字段索引位置。

回到顶部