Golang实现Websocket监听MySQL数据库变更事件
Golang实现Websocket监听MySQL数据库变更事件 我正在使用一个名为Gorilla的WebSocket库。我可以用这个库实现一些简单的功能,但现在我需要获取MySQL表中发生变更(插入)的事件,并找到其中client_id等于WebSocket连接客户端ID的记录,然后将该记录的数据发送给相应用户。有没有办法实现这个功能?即使需要使用其他WebSocket库也可以…有人有好的思路吗?
谢谢。
1 回复
更多关于Golang实现Websocket监听MySQL数据库变更事件的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
要实现通过WebSocket监听MySQL数据库变更并推送特定数据给相应用户,可以使用MySQL的binlog监听机制结合WebSocket连接管理。以下是使用gorilla/websocket和go-mysql库的完整实现方案:
package main
import (
"database/sql"
"encoding/json"
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
"github.com/siddontang/go-mysql/canal"
"github.com/siddontang/go-mysql/mysql"
_ "github.com/go-sql-driver/mysql"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
// 客户端连接管理
type ClientManager struct {
clients map[string]*websocket.Conn
clientsMux sync.RWMutex
}
var manager = ClientManager{
clients: make(map[string]*websocket.Conn),
}
func (cm *ClientManager) addClient(clientID string, conn *websocket.Conn) {
cm.clientsMux.Lock()
defer cm.clientsMux.Unlock()
cm.clients[clientID] = conn
}
func (cm *ClientManager) removeClient(clientID string) {
cm.clientsMux.Lock()
defer cm.clientsMux.Unlock()
delete(cm.clients, clientID)
}
func (cm *ClientManager) sendToClient(clientID string, data []byte) {
cm.clientsMux.RLock()
defer cm.clientsMux.RUnlock()
if conn, exists := cm.clients[clientID]; exists {
err := conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
log.Printf("发送消息失败: %v", err)
conn.Close()
cm.removeClient(clientID)
}
}
}
// WebSocket处理
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
clientID := r.URL.Query().Get("client_id")
if clientID == "" {
http.Error(w, "缺少client_id参数", http.StatusBadRequest)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket升级失败: %v", err)
return
}
defer conn.Close()
manager.addClient(clientID, conn)
log.Printf("客户端连接: %s", clientID)
// 保持连接
for {
if _, _, err := conn.ReadMessage(); err != nil {
manager.removeClient(clientID)
log.Printf("客户端断开: %s", clientID)
break
}
}
}
// MySQL binlog监听
type MyEventHandler struct {
canal.DummyEventHandler
}
func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
if e.Table.Name == "your_table_name" && e.Action == "insert" {
for _, row := range e.Rows {
// 假设client_id在第二列(根据实际表结构调整)
clientID, ok := row[1].(string)
if !ok {
continue
}
// 构建要发送的数据
data := map[string]interface{}{
"action": e.Action,
"table": e.Table.Name,
"client_id": clientID,
"data": row,
}
jsonData, err := json.Marshal(data)
if err != nil {
log.Printf("JSON编码失败: %v", err)
continue
}
// 发送给对应客户端
manager.sendToClient(clientID, jsonData)
}
}
return nil
}
func (h *MyEventHandler) String() string {
return "MyEventHandler"
}
func startBinlogListener() {
cfg := canal.NewDefaultConfig()
cfg.Addr = "127.0.0.1:3306"
cfg.User = "your_username"
cfg.Password = "your_password"
cfg.Dump.ExecutionPath = ""
c, err := canal.NewCanal(cfg)
if err != nil {
log.Fatal(err)
}
c.SetEventHandler(&MyEventHandler{})
err = c.Run()
if err != nil {
log.Fatal(err)
}
}
func main() {
// 启动binlog监听
go startBinlogListener()
// 启动WebSocket服务器
http.HandleFunc("/ws", handleWebSocket)
log.Println("WebSocket服务器启动 :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
需要安装的依赖:
go get github.com/gorilla/websocket
go get github.com/siddontang/go-mysql
go get github.com/go-sql-driver/mysql
关键实现说明:
- WebSocket连接管理:使用
ClientManager维护client_id与WebSocket连接的映射关系 - MySQL binlog监听:使用go-mysql库监听数据库变更事件
- 数据过滤:在
OnRow方法中检查表名、操作类型和client_id - 消息推送:通过WebSocket向特定客户端发送JSON格式的数据
使用示例:
// 客户端连接
const ws = new WebSocket('ws://localhost:8080/ws?client_id=user123');
ws.onmessage = function(event) {
console.log('收到数据:', JSON.parse(event.data));
};
注意:需要确保MySQL已开启binlog并设置正确的权限。实际使用时需要根据表结构调整字段索引位置。

