Golang中的SQL依赖关系概念解析

Golang中的SQL依赖关系概念解析 我正在开发一个基于 MS SQL Server 数据库Golang API。我想在 Go 语言中实现 SQL 依赖 的概念。

这是否可行?如果可行,请提供解决方案。

如果您在 Go 语言中有类似 SQL 依赖的其他解决方案,请告诉我。

我希望能够通过 Go 语言 API 监听 SQL Server 数据库的变更。

请尽快给我解决方案。

5 回复

您能针对以下需求提供解决方案吗?

我希望通过 Golang API 监听 SQL Server 数据库的变更。

更多关于Golang中的SQL依赖关系概念解析的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我正在寻找 MS SQL Server 数据库 的解决方案 请给我在 Golang 中实现 MS SQL 表依赖的解决方案

这是一个很酷的SQL功能,即你可以“监听”数据库中的任何变化。

我找到了这个论坛帖子:sql - 如何使用Golang捕获新PostgreSQL记录的事件 - Stack Overflow

第二个回复推荐了GitHub - lib/pq: 用于 database/sql 的纯 Go Postgres 驱动,甚至还有一个简单的示例Go Playground - The Go Programming Language

首先,你需要使用 microsoft/go-mssqldb 这个 SQL 驱动程序来连接 MSSQL 数据库。其次:据我所知,SqlDependency 并不是 MSSQL 的一个特性。我相信它是 .NET 框架中的一个抽象概念。你可以深入研究一下源代码,看看他们是怎么实现的:

dotnet/SqlClient/blob/main/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlDependency.cs

// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Runtime.CompilerServices;
#if NETFRAMEWORK
using System.IO;
using System.Runtime.Remoting;
using System.Runtime.Serialization;
using System.Runtime.Versioning;
using System.Security.Permissions;
#endif
using System.Text;
using System.Threading;
using System.Xml;
using Microsoft.Data.Common;

此文件已被截断。显示原文

简而言之:这是可能的,但不会很容易。根据你的需求,使用像 @heidi 提到的那种方法可能会更好。

在Go中监听MS SQL Server数据库变更是可行的,主要通过两种方式实现:

方案一:使用SQL Server Query Notifications(推荐)

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "time"
    _ "github.com/denisenkom/go-mssqldb"
)

func main() {
    connString := "server=localhost;user id=sa;password=yourpassword;database=YourDB"
    db, err := sql.Open("mssql", connString)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    // 启用Service Broker(需要在SQL Server中先启用)
    _, err = db.Exec(`
        ALTER DATABASE YourDB SET ENABLE_BROKER;
        ALTER DATABASE YourDB SET TRUSTWORTHY ON;
    `)
    if err != nil {
        log.Printf("启用Service Broker失败: %v", err)
    }

    // 监听表变更
    go listenForChanges(db, "YourTable")
    
    select {} // 保持程序运行
}

func listenForChanges(db *sql.DB, tableName string) {
    ctx := context.Background()
    
    // 创建查询通知
    query := fmt.Sprintf(`
        SELECT id, name, modified_date 
        FROM %s 
        WHERE modified_date > @lastCheck
    `, tableName)
    
    for {
        rows, err := db.QueryContext(ctx, query, 
            sql.Named("lastCheck", time.Now().Add(-1*time.Minute)))
        if err != nil {
            log.Printf("查询失败: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }
        
        // 处理变更数据
        for rows.Next() {
            var id int
            var name string
            var modified time.Time
            if err := rows.Scan(&id, &name, &modified); err != nil {
                log.Printf("扫描行失败: %v", err)
                continue
            }
            fmt.Printf("检测到变更 - ID: %d, Name: %s, 修改时间: %v\n", 
                id, name, modified)
        }
        rows.Close()
        
        time.Sleep(2 * time.Second) // 轮询间隔
    }
}

方案二:使用SQL Server Change Tracking功能

package main

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "log"
    _ "github.com/denisenkom/go-mssqldb"
)

type ChangeTracking struct {
    SYS_CHANGE_VERSION int64
    SYS_CHANGE_CREATION_VERSION sql.NullInt64
    SYS_CHANGE_OPERATION string
    SYS_CHANGE_COLUMNS varbinary // SQL Server varbinary类型
    id int
    name string
}

func setupChangeTracking(db *sql.DB) error {
    // 在数据库级别启用变更跟踪
    _, err := db.Exec(`
        ALTER DATABASE YourDB 
        SET CHANGE_TRACKING = ON 
        (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
    `)
    if err != nil {
        return err
    }

    // 在表级别启用变更跟踪
    _, err = db.Exec(`
        ALTER TABLE YourTable
        ENABLE CHANGE_TRACKING
        WITH (TRACK_COLUMNS_UPDATED = ON)
    `)
    
    return err
}

func monitorChanges(db *sql.DB) {
    var lastVersion int64 = 0
    
    for {
        rows, err := db.Query(`
            SELECT 
                ct.SYS_CHANGE_VERSION,
                ct.SYS_CHANGE_OPERATION,
                t.id,
                t.name
            FROM CHANGETABLE(CHANGES YourTable, ?) AS ct
            LEFT JOIN YourTable AS t ON t.id = ct.id
            ORDER BY ct.SYS_CHANGE_VERSION
        `, lastVersion)
        
        if err != nil {
            log.Printf("查询变更失败: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }
        
        for rows.Next() {
            var change ChangeTracking
            err := rows.Scan(
                &change.SYS_CHANGE_VERSION,
                &change.SYS_CHANGE_OPERATION,
                &change.id,
                &change.name,
            )
            if err != nil {
                log.Printf("扫描变更数据失败: %v", err)
                continue
            }
            
            // 处理变更
            processChange(change)
            lastVersion = change.SYS_CHANGE_VERSION
        }
        rows.Close()
        
        time.Sleep(1 * time.Second)
    }
}

func processChange(change ChangeTracking) {
    data, _ := json.MarshalIndent(change, "", "  ")
    fmt.Printf("检测到数据库变更:\n%s\n", string(data))
    
    switch change.SYS_CHANGE_OPERATION {
    case "I":
        fmt.Println("操作类型: 插入")
    case "U":
        fmt.Println("操作类型: 更新")
    case "D":
        fmt.Println("操作类型: 删除")
    }
}

方案三:使用触发器+轮询(兼容性最好)

package main

import (
    "database/sql"
    "fmt"
    "log"
    "time"
    _ "github.com/denisenkom/go-mssqldb"
)

// 创建变更日志表
func createChangeLogTable(db *sql.DB) error {
    _, err := db.Exec(`
        CREATE TABLE ChangeLog (
            id INT IDENTITY(1,1) PRIMARY KEY,
            table_name NVARCHAR(100),
            record_id INT,
            operation NVARCHAR(10),
            change_time DATETIME DEFAULT GETDATE(),
            processed BIT DEFAULT 0
        )
    `)
    return err
}

// 创建触发器
func createTrigger(db *sql.DB, tableName string) error {
    triggerSQL := fmt.Sprintf(`
        CREATE TRIGGER tr_%s_Change
        ON %s
        AFTER INSERT, UPDATE, DELETE
        AS
        BEGIN
            DECLARE @Operation NVARCHAR(10)
            
            IF EXISTS (SELECT * FROM inserted) AND EXISTS (SELECT * FROM deleted)
                SET @Operation = 'UPDATE'
            ELSE IF EXISTS (SELECT * FROM inserted)
                SET @Operation = 'INSERT'
            ELSE
                SET @Operation = 'DELETE'
            
            INSERT INTO ChangeLog (table_name, record_id, operation)
            SELECT '%s', 
                   COALESCE(inserted.id, deleted.id),
                   @Operation
            FROM inserted FULL OUTER JOIN deleted 
            ON 1=0
        END
    `, tableName, tableName, tableName)
    
    _, err := db.Exec(triggerSQL)
    return err
}

// 轮询变更日志
func pollChanges(db *sql.DB) {
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        rows, err := db.Query(`
            SELECT id, table_name, record_id, operation, change_time
            FROM ChangeLog 
            WHERE processed = 0 
            ORDER BY change_time
        `)
        
        if err != nil {
            log.Printf("查询变更日志失败: %v", err)
            continue
        }
        
        for rows.Next() {
            var logID int
            var tableName string
            var recordID int
            var operation string
            var changeTime time.Time
            
            err := rows.Scan(&logID, &tableName, &recordID, &operation, &changeTime)
            if err != nil {
                log.Printf("扫描日志失败: %v", err)
                continue
            }
            
            fmt.Printf("检测到变更: 表=%s, 记录ID=%d, 操作=%s, 时间=%v\n",
                tableName, recordID, operation, changeTime)
            
            // 标记为已处理
            db.Exec("UPDATE ChangeLog SET processed = 1 WHERE id = ?", logID)
        }
        rows.Close()
    }
}

快速启动示例

// main.go - 完整示例
package main

import (
    "database/sql"
    "log"
    "sync"
    _ "github.com/denisenkom/go-mssqldb"
)

func main() {
    db, err := sql.Open("mssql", 
        "server=localhost;port=1433;database=YourDB;user id=sa;password=YourPassword")
    if err != nil {
        log.Fatal("数据库连接失败:", err)
    }
    defer db.Close()
    
    // 测试连接
    err = db.Ping()
    if err != nil {
        log.Fatal("数据库连接测试失败:", err)
    }
    
    var wg sync.WaitGroup
    wg.Add(1)
    
    // 启动变更监听
    go func() {
        defer wg.Done()
        // 选择一种方案
        // monitorChanges(db)        // 方案二
        // pollChanges(db)           // 方案三
        listenForChanges(db, "YourTable") // 方案一
    }()
    
    wg.Wait()
}

依赖安装

# 安装SQL Server驱动
go get github.com/denisenkom/go-mssqldb

# 如果需要JSON处理
go get github.com/tidwall/gjson

配置说明

  1. SQL Server配置

    • 启用TCP/IP协议
    • 配置SQL Server身份验证
    • 为变更跟踪启用Service Broker
  2. Go项目配置

    // 连接字符串示例
    connString := fmt.Sprintf(
        "server=%s;user id=%s;password=%s;database=%s;encrypt=disable",
        server, user, password, database,
    )
    
  3. 性能优化

    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(25)
    db.SetConnMaxLifetime(5 * time.Minute)
    

这些方案都能在Go中实现SQL依赖关系监听,方案一(Query Notifications)最接近原生的SQL依赖概念,方案二(Change Tracking)更适合需要详细变更信息的场景,方案三(触发器)兼容性最好但性能开销较大。

回到顶部