使用Golang保持RDBMS数据同步的方法与实践

使用Golang保持RDBMS数据同步的方法与实践 我接手了一个项目,需要查询我们环境中的各种API(如CloudStack和Kubernetes),并更新一个用于报告和容量规划的RDBMS数据库(目前是MySQL 8)。目前有一个基于LAMP的门户用于报告,所以我暂时只能在他们现有基础上进行扩展;长期计划是将其迁移到Kubernetes,并使用像Elasticsearch或Solr这样的工具替代RDBMS。

无论如何……就目前而言。

在数据库行数据变动频繁(有些行被更新,新行被插入,有些行被删除)的情况下,保持数据库最新状态的最佳方法是什么?更新和插入操作相当直接,我更好奇的是,对于删除那些引用环境中已不存在对象的行,最佳的处理方法是什么。

对于处理需要删除的对象,是否有比以下方法更“巧妙”的方式:

  1. 运行我的收集任务,获取当前对象列表
  2. 查询数据库中的对象列表
  3. 删除在步骤1结果中不存在的项目

这种方法成本很高,我想知道是否有更好的方法、库或其他解决方案?请记住,我目前无法更改数据库。

非常感谢。 BH


更多关于使用Golang保持RDBMS数据同步的方法与实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

不清楚您期望得到什么样的答案。看起来您需要使用 MySQL。

我的建议是将多个插入、更新或删除操作分组到每个 SQL 请求中。这能显著提升速度。

我曾经需要将日志记录插入到 MySQL 数据库中。我的做法是将记录排队,一旦队列达到 200 条记录,就执行插入操作。我还设置了一个定时器,定期发送队列中的所有内容。这个时间间隔可以设置得较长,具体取决于您能容忍的数据库更新延迟。

从一开始就实现这个功能是值得的。如果每个 SQL 请求只执行单个插入、更新或删除操作,性能会很差。但对于您的使用场景来说,这可能已经足够了。这取决于您自己。

更多关于使用Golang保持RDBMS数据同步的方法与实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


针对你提到的场景,在无法修改数据库结构的前提下,保持RDBMS数据同步确实需要一些策略。以下是几种实用的Golang实现方案:

方案一:标记删除法(推荐)

通过软删除标记替代物理删除,避免全量对比:

package main

import (
    "database/sql"
    "time"
)

type SyncManager struct {
    db *sql.DB
}

// 标记删除而不是物理删除
func (sm *SyncManager) SyncWithMarkDeletion(currentIDs []string) error {
    tx, err := sm.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 1. 标记所有记录为待删除
    _, err = tx.Exec(`
        UPDATE resources 
        SET sync_status = 'pending_deletion',
            updated_at = ?
        WHERE sync_status != 'deleted'
    `, time.Now())
    if err != nil {
        return err
    }

    // 2. 更新或插入当前存在的记录
    for _, id := range currentIDs {
        _, err = tx.Exec(`
            INSERT INTO resources (id, sync_status, updated_at) 
            VALUES (?, 'active', ?)
            ON DUPLICATE KEY UPDATE 
                sync_status = 'active',
                updated_at = ?
        `, id, time.Now(), time.Now())
        if err != nil {
            return err
        }
    }

    // 3. 物理删除标记为待删除的记录
    _, err = tx.Exec(`
        DELETE FROM resources 
        WHERE sync_status = 'pending_deletion'
    `)
    if err != nil {
        return err
    }

    return tx.Commit()
}

方案二:批次对比删除

使用分批处理减少内存占用:

func (sm *SyncManager) BatchSyncDeletion(currentIDs map[string]bool, batchSize int) error {
    offset := 0
    
    for {
        var dbIDs []string
        
        // 分批查询数据库ID
        rows, err := sm.db.Query(`
            SELECT id FROM resources 
            LIMIT ? OFFSET ?
        `, batchSize, offset)
        if err != nil {
            return err
        }
        
        for rows.Next() {
            var id string
            if err := rows.Scan(&id); err != nil {
                rows.Close()
                return err
            }
            dbIDs = append(dbIDs, id)
        }
        rows.Close()
        
        if len(dbIDs) == 0 {
            break
        }
        
        // 批量删除不存在的ID
        var deleteIDs []string
        for _, dbID := range dbIDs {
            if !currentIDs[dbID] {
                deleteIDs = append(deleteIDs, dbID)
            }
        }
        
        if len(deleteIDs) > 0 {
            query, args, err := sqlx.In(`
                DELETE FROM resources WHERE id IN (?)
            `, deleteIDs)
            if err != nil {
                return err
            }
            
            _, err = sm.db.Exec(query, args...)
            if err != nil {
                return err
            }
        }
        
        offset += batchSize
    }
    
    return nil
}

方案三:使用哈希值对比

为每个记录计算哈希值,只同步有变化的记录:

import (
    "crypto/sha256"
    "encoding/hex"
    "github.com/jmoiron/sqlx"
)

type ResourceHash struct {
    ID   string `db:"id"`
    Hash string `db:"hash"`
}

func (sm *SyncManager) SyncByHashComparison(currentResources map[string]string) error {
    // 获取数据库现有记录的哈希值
    var existingHashes []ResourceHash
    err := sqlx.Select(sm.db, &existingHashes, 
        "SELECT id, hash FROM resources")
    if err != nil {
        return err
    }
    
    // 构建哈希映射
    existingHashMap := make(map[string]string)
    for _, h := range existingHashes {
        existingHashMap[h.ID] = h.Hash
    }
    
    // 对比并同步
    for id, currentHash := range currentResources {
        existingHash, exists := existingHashMap[id]
        
        if !exists {
            // 插入新记录
            _, err := sm.db.Exec(`
                INSERT INTO resources (id, hash, data) 
                VALUES (?, ?, ?)
            `, id, currentHash, getResourceData(id))
            if err != nil {
                return err
            }
        } else if existingHash != currentHash {
            // 更新已变化的记录
            _, err := sm.db.Exec(`
                UPDATE resources 
                SET hash = ?, data = ?, updated_at = ?
                WHERE id = ?
            `, currentHash, getResourceData(id), time.Now(), id)
            if err != nil {
                return err
            }
        }
        // 从映射中删除已处理的ID
        delete(existingHashMap, id)
    }
    
    // 删除数据库中不再存在的记录
    if len(existingHashMap) > 0 {
        deleteIDs := make([]string, 0, len(existingHashMap))
        for id := range existingHashMap {
            deleteIDs = append(deleteIDs, id)
        }
        
        query, args, err := sqlx.In(`
            DELETE FROM resources WHERE id IN (?)
        `, deleteIDs)
        if err != nil {
            return err
        }
        
        _, err = sm.db.Exec(query, args...)
        return err
    }
    
    return nil
}

func calculateHash(data []byte) string {
    hash := sha256.Sum256(data)
    return hex.EncodeToString(hash[:])
}

方案四:增量同步模式

使用时间戳或版本号进行增量同步:

type IncrementalSync struct {
    db *sql.DB
}

func (is *IncrementalSync) SyncIncrementally(lastSyncTime time.Time) error {
    // 获取自上次同步以来的变更
    currentChanges, err := fetchChangesSince(lastSyncTime)
    if err != nil {
        return err
    }
    
    tx, err := is.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // 处理变更
    for _, change := range currentChanges {
        switch change.Operation {
        case "INSERT", "UPDATE":
            _, err = tx.Exec(`
                INSERT INTO resources (id, data, updated_at) 
                VALUES (?, ?, ?)
                ON DUPLICATE KEY UPDATE 
                    data = VALUES(data),
                    updated_at = VALUES(updated_at)
            `, change.ID, change.Data, change.Timestamp)
            
        case "DELETE":
            _, err = tx.Exec(`
                DELETE FROM resources WHERE id = ?
            `, change.ID)
        }
        
        if err != nil {
            return err
        }
    }
    
    return tx.Commit()
}

性能优化建议

// 使用连接池优化
func initDB() *sql.DB {
    db, err := sql.Open("mysql", "user:pass@/dbname")
    if err != nil {
        panic(err)
    }
    
    // 优化连接池设置
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(25)
    db.SetConnMaxLifetime(5 * time.Minute)
    
    return db
}

// 使用预处理语句
func prepareStatements(db *sql.DB) (*sql.Stmt, *sql.Stmt, error) {
    insertStmt, err := db.Prepare(`
        INSERT INTO resources (id, data) VALUES (?, ?)
        ON DUPLICATE KEY UPDATE data = VALUES(data)
    `)
    if err != nil {
        return nil, nil, err
    }
    
    deleteStmt, err := db.Prepare(`
        DELETE FROM resources WHERE id = ?
    `)
    if err != nil {
        return nil, nil, err
    }
    
    return insertStmt, deleteStmt, nil
}

这些方案各有优劣:标记删除法对现有系统影响最小,哈希对比法效率最高但需要计算哈希,增量同步法最理想但需要API支持。根据你的具体场景选择最合适的方案。

回到顶部