Golang中的SQL依赖关系概念解析
Golang中的SQL依赖关系概念解析 我正在开发一个基于 MS SQL Server 数据库 的 Golang API。我想在 Go 语言中实现 SQL 依赖 的概念。
这是否可行?如果可行,请提供解决方案。
如果您在 Go 语言中有类似 SQL 依赖的其他解决方案,请告诉我。
我希望能够通过 Go 语言 API 监听 SQL Server 数据库的变更。
请尽快给我解决方案。
您能针对以下需求提供解决方案吗?
我希望通过 Golang API 监听 SQL Server 数据库的变更。
更多关于Golang中的SQL依赖关系概念解析的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
我正在寻找 MS SQL Server 数据库 的解决方案 请给我在 Golang 中实现 MS SQL 表依赖的解决方案
这是一个很酷的SQL功能,即你可以“监听”数据库中的任何变化。
我找到了这个论坛帖子:sql - 如何使用Golang捕获新PostgreSQL记录的事件 - Stack Overflow
第二个回复推荐了GitHub - lib/pq: 用于 database/sql 的纯 Go Postgres 驱动,甚至还有一个简单的示例Go Playground - The Go Programming Language
首先,你需要使用 microsoft/go-mssqldb 这个 SQL 驱动程序来连接 MSSQL 数据库。其次:据我所知,SqlDependency 并不是 MSSQL 的一个特性。我相信它是 .NET 框架中的一个抽象概念。你可以深入研究一下源代码,看看他们是怎么实现的:
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Runtime.CompilerServices;
#if NETFRAMEWORK
using System.IO;
using System.Runtime.Remoting;
using System.Runtime.Serialization;
using System.Runtime.Versioning;
using System.Security.Permissions;
#endif
using System.Text;
using System.Threading;
using System.Xml;
using Microsoft.Data.Common;
此文件已被截断。显示原文
简而言之:这是可能的,但不会很容易。根据你的需求,使用像 @heidi 提到的那种方法可能会更好。
在Go中监听MS SQL Server数据库变更是可行的,主要通过两种方式实现:
方案一:使用SQL Server Query Notifications(推荐)
package main
import (
"context"
"database/sql"
"fmt"
"log"
"time"
_ "github.com/denisenkom/go-mssqldb"
)
func main() {
connString := "server=localhost;user id=sa;password=yourpassword;database=YourDB"
db, err := sql.Open("mssql", connString)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 启用Service Broker(需要在SQL Server中先启用)
_, err = db.Exec(`
ALTER DATABASE YourDB SET ENABLE_BROKER;
ALTER DATABASE YourDB SET TRUSTWORTHY ON;
`)
if err != nil {
log.Printf("启用Service Broker失败: %v", err)
}
// 监听表变更
go listenForChanges(db, "YourTable")
select {} // 保持程序运行
}
func listenForChanges(db *sql.DB, tableName string) {
ctx := context.Background()
// 创建查询通知
query := fmt.Sprintf(`
SELECT id, name, modified_date
FROM %s
WHERE modified_date > @lastCheck
`, tableName)
for {
rows, err := db.QueryContext(ctx, query,
sql.Named("lastCheck", time.Now().Add(-1*time.Minute)))
if err != nil {
log.Printf("查询失败: %v", err)
time.Sleep(5 * time.Second)
continue
}
// 处理变更数据
for rows.Next() {
var id int
var name string
var modified time.Time
if err := rows.Scan(&id, &name, &modified); err != nil {
log.Printf("扫描行失败: %v", err)
continue
}
fmt.Printf("检测到变更 - ID: %d, Name: %s, 修改时间: %v\n",
id, name, modified)
}
rows.Close()
time.Sleep(2 * time.Second) // 轮询间隔
}
}
方案二:使用SQL Server Change Tracking功能
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
_ "github.com/denisenkom/go-mssqldb"
)
type ChangeTracking struct {
SYS_CHANGE_VERSION int64
SYS_CHANGE_CREATION_VERSION sql.NullInt64
SYS_CHANGE_OPERATION string
SYS_CHANGE_COLUMNS varbinary // SQL Server varbinary类型
id int
name string
}
func setupChangeTracking(db *sql.DB) error {
// 在数据库级别启用变更跟踪
_, err := db.Exec(`
ALTER DATABASE YourDB
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
`)
if err != nil {
return err
}
// 在表级别启用变更跟踪
_, err = db.Exec(`
ALTER TABLE YourTable
ENABLE CHANGE_TRACKING
WITH (TRACK_COLUMNS_UPDATED = ON)
`)
return err
}
func monitorChanges(db *sql.DB) {
var lastVersion int64 = 0
for {
rows, err := db.Query(`
SELECT
ct.SYS_CHANGE_VERSION,
ct.SYS_CHANGE_OPERATION,
t.id,
t.name
FROM CHANGETABLE(CHANGES YourTable, ?) AS ct
LEFT JOIN YourTable AS t ON t.id = ct.id
ORDER BY ct.SYS_CHANGE_VERSION
`, lastVersion)
if err != nil {
log.Printf("查询变更失败: %v", err)
time.Sleep(5 * time.Second)
continue
}
for rows.Next() {
var change ChangeTracking
err := rows.Scan(
&change.SYS_CHANGE_VERSION,
&change.SYS_CHANGE_OPERATION,
&change.id,
&change.name,
)
if err != nil {
log.Printf("扫描变更数据失败: %v", err)
continue
}
// 处理变更
processChange(change)
lastVersion = change.SYS_CHANGE_VERSION
}
rows.Close()
time.Sleep(1 * time.Second)
}
}
func processChange(change ChangeTracking) {
data, _ := json.MarshalIndent(change, "", " ")
fmt.Printf("检测到数据库变更:\n%s\n", string(data))
switch change.SYS_CHANGE_OPERATION {
case "I":
fmt.Println("操作类型: 插入")
case "U":
fmt.Println("操作类型: 更新")
case "D":
fmt.Println("操作类型: 删除")
}
}
方案三:使用触发器+轮询(兼容性最好)
package main
import (
"database/sql"
"fmt"
"log"
"time"
_ "github.com/denisenkom/go-mssqldb"
)
// 创建变更日志表
func createChangeLogTable(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE ChangeLog (
id INT IDENTITY(1,1) PRIMARY KEY,
table_name NVARCHAR(100),
record_id INT,
operation NVARCHAR(10),
change_time DATETIME DEFAULT GETDATE(),
processed BIT DEFAULT 0
)
`)
return err
}
// 创建触发器
func createTrigger(db *sql.DB, tableName string) error {
triggerSQL := fmt.Sprintf(`
CREATE TRIGGER tr_%s_Change
ON %s
AFTER INSERT, UPDATE, DELETE
AS
BEGIN
DECLARE @Operation NVARCHAR(10)
IF EXISTS (SELECT * FROM inserted) AND EXISTS (SELECT * FROM deleted)
SET @Operation = 'UPDATE'
ELSE IF EXISTS (SELECT * FROM inserted)
SET @Operation = 'INSERT'
ELSE
SET @Operation = 'DELETE'
INSERT INTO ChangeLog (table_name, record_id, operation)
SELECT '%s',
COALESCE(inserted.id, deleted.id),
@Operation
FROM inserted FULL OUTER JOIN deleted
ON 1=0
END
`, tableName, tableName, tableName)
_, err := db.Exec(triggerSQL)
return err
}
// 轮询变更日志
func pollChanges(db *sql.DB) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for range ticker.C {
rows, err := db.Query(`
SELECT id, table_name, record_id, operation, change_time
FROM ChangeLog
WHERE processed = 0
ORDER BY change_time
`)
if err != nil {
log.Printf("查询变更日志失败: %v", err)
continue
}
for rows.Next() {
var logID int
var tableName string
var recordID int
var operation string
var changeTime time.Time
err := rows.Scan(&logID, &tableName, &recordID, &operation, &changeTime)
if err != nil {
log.Printf("扫描日志失败: %v", err)
continue
}
fmt.Printf("检测到变更: 表=%s, 记录ID=%d, 操作=%s, 时间=%v\n",
tableName, recordID, operation, changeTime)
// 标记为已处理
db.Exec("UPDATE ChangeLog SET processed = 1 WHERE id = ?", logID)
}
rows.Close()
}
}
快速启动示例
// main.go - 完整示例
package main
import (
"database/sql"
"log"
"sync"
_ "github.com/denisenkom/go-mssqldb"
)
func main() {
db, err := sql.Open("mssql",
"server=localhost;port=1433;database=YourDB;user id=sa;password=YourPassword")
if err != nil {
log.Fatal("数据库连接失败:", err)
}
defer db.Close()
// 测试连接
err = db.Ping()
if err != nil {
log.Fatal("数据库连接测试失败:", err)
}
var wg sync.WaitGroup
wg.Add(1)
// 启动变更监听
go func() {
defer wg.Done()
// 选择一种方案
// monitorChanges(db) // 方案二
// pollChanges(db) // 方案三
listenForChanges(db, "YourTable") // 方案一
}()
wg.Wait()
}
依赖安装
# 安装SQL Server驱动
go get github.com/denisenkom/go-mssqldb
# 如果需要JSON处理
go get github.com/tidwall/gjson
配置说明
-
SQL Server配置:
- 启用TCP/IP协议
- 配置SQL Server身份验证
- 为变更跟踪启用Service Broker
-
Go项目配置:
// 连接字符串示例 connString := fmt.Sprintf( "server=%s;user id=%s;password=%s;database=%s;encrypt=disable", server, user, password, database, ) -
性能优化:
db.SetMaxOpenConns(25) db.SetMaxIdleConns(25) db.SetConnMaxLifetime(5 * time.Minute)
这些方案都能在Go中实现SQL依赖关系监听,方案一(Query Notifications)最接近原生的SQL依赖概念,方案二(Change Tracking)更适合需要详细变更信息的场景,方案三(触发器)兼容性最好但性能开销较大。

