Golang如何实现与数据库的并发事务处理

Golang如何实现与数据库的并发事务处理 我想创建一个安全的支付服务,用户可以进行存款、取款,以及两个账户之间的转账。 我希望实现事务的并发处理。我使用 PostgreSQL 作为数据库。 请帮我解答如何创建这个服务?

2 回复

据我所知,访问数据库不应该是并发的,因为一个 goroutine 可能会在另一个 goroutine 试图从数据库读取数据时改变数据库的状态。

更多关于Golang如何实现与数据库的并发事务处理的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中实现与PostgreSQL的并发事务处理,关键在于正确使用database/sql包的事务机制和适当的隔离级别。以下是一个完整的支付服务示例,包含存款、取款和转账功能:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "sync"
    "time"

    _ "github.com/lib/pq"
)

type PaymentService struct {
    db *sql.DB
}

func NewPaymentService(connStr string) (*PaymentService, error) {
    db, err := sql.Open("postgres", connStr)
    if err != nil {
        return nil, err
    }
    
    // 设置连接池参数
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(25)
    db.SetConnMaxLifetime(5 * time.Minute)
    
    return &PaymentService{db: db}, nil
}

// 存款操作
func (s *PaymentService) Deposit(ctx context.Context, accountID string, amount float64) error {
    tx, err := s.db.BeginTx(ctx, &sql.TxOptions{
        Isolation: sql.LevelSerializable, // 使用最高隔离级别
        ReadOnly:  false,
    })
    if err != nil {
        return fmt.Errorf("开始事务失败: %w", err)
    }
    defer tx.Rollback()

    // 更新账户余额
    _, err = tx.ExecContext(ctx,
        "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
        amount, accountID)
    if err != nil {
        return fmt.Errorf("更新余额失败: %w", err)
    }

    // 记录交易
    _, err = tx.ExecContext(ctx,
        "INSERT INTO transactions (account_id, amount, type) VALUES ($1, $2, 'deposit')",
        accountID, amount)
    if err != nil {
        return fmt.Errorf("记录交易失败: %w", err)
    }

    return tx.Commit()
}

// 取款操作
func (s *PaymentService) Withdraw(ctx context.Context, accountID string, amount float64) error {
    tx, err := s.db.BeginTx(ctx, &sql.TxOptions{
        Isolation: sql.LevelRepeatableRead,
        ReadOnly:  false,
    })
    if err != nil {
        return fmt.Errorf("开始事务失败: %w", err)
    }
    defer tx.Rollback()

    // 检查余额是否充足
    var currentBalance float64
    err = tx.QueryRowContext(ctx,
        "SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
        accountID).Scan(&currentBalance)
    if err != nil {
        return fmt.Errorf("查询余额失败: %w", err)
    }

    if currentBalance < amount {
        return fmt.Errorf("余额不足")
    }

    // 更新账户余额
    _, err = tx.ExecContext(ctx,
        "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
        amount, accountID)
    if err != nil {
        return fmt.Errorf("更新余额失败: %w", err)
    }

    // 记录交易
    _, err = tx.ExecContext(ctx,
        "INSERT INTO transactions (account_id, amount, type) VALUES ($1, $2, 'withdraw')",
        accountID, amount)
    if err != nil {
        return fmt.Errorf("记录交易失败: %w", err)
    }

    return tx.Commit()
}

// 转账操作 - 处理并发转账的关键方法
func (s *PaymentService) Transfer(ctx context.Context, fromAccountID, toAccountID string, amount float64) error {
    // 确保账户ID顺序一致,避免死锁
    if fromAccountID > toAccountID {
        fromAccountID, toAccountID = toAccountID, fromAccountID
        amount = -amount
    }

    tx, err := s.db.BeginTx(ctx, &sql.TxOptions{
        Isolation: sql.LevelSerializable,
        ReadOnly:  false,
    })
    if err != nil {
        return fmt.Errorf("开始事务失败: %w", err)
    }
    defer tx.Rollback()

    // 使用SELECT FOR UPDATE锁定两个账户,按固定顺序避免死锁
    var fromBalance, toBalance float64
    
    // 锁定第一个账户
    err = tx.QueryRowContext(ctx,
        "SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
        fromAccountID).Scan(&fromBalance)
    if err != nil {
        return fmt.Errorf("锁定转出账户失败: %w", err)
    }

    // 锁定第二个账户
    err = tx.QueryRowContext(ctx,
        "SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
        toAccountID).Scan(&toBalance)
    if err != nil {
        return fmt.Errorf("锁定转入账户失败: %w", err)
    }

    // 检查转出账户余额
    if fromBalance < amount {
        return fmt.Errorf("转出账户余额不足")
    }

    // 执行转账
    _, err = tx.ExecContext(ctx,
        "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
        amount, fromAccountID)
    if err != nil {
        return fmt.Errorf("更新转出账户失败: %w", err)
    }

    _, err = tx.ExecContext(ctx,
        "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
        amount, toAccountID)
    if err != nil {
        return fmt.Errorf("更新转入账户失败: %w", err)
    }

    // 记录两笔交易
    _, err = tx.ExecContext(ctx,
        "INSERT INTO transactions (account_id, amount, type, related_account) VALUES ($1, $2, 'transfer_out', $3)",
        fromAccountID, -amount, toAccountID)
    if err != nil {
        return fmt.Errorf("记录转出交易失败: %w", err)
    }

    _, err = tx.ExecContext(ctx,
        "INSERT INTO transactions (account_id, amount, type, related_account) VALUES ($1, $2, 'transfer_in', $3)",
        toAccountID, amount, fromAccountID)
    if err != nil {
        return fmt.Errorf("记录转入交易失败: %w", err)
    }

    return tx.Commit()
}

// 并发处理示例
func (s *PaymentService) ProcessConcurrentTransfers(transfers []TransferRequest) {
    var wg sync.WaitGroup
    semaphore := make(chan struct{}, 10) // 限制并发数

    for _, transfer := range transfers {
        wg.Add(1)
        go func(t TransferRequest) {
            defer wg.Done()
            
            semaphore <- struct{}{}
            defer func() { <-semaphore }()

            ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
            defer cancel()

            // 重试机制处理死锁
            for retry := 0; retry < 3; retry++ {
                err := s.Transfer(ctx, t.FromAccount, t.ToAccount, t.Amount)
                if err == nil {
                    log.Printf("转账成功: %s -> %s 金额: %.2f", 
                        t.FromAccount, t.ToAccount, t.Amount)
                    break
                }
                
                if retry == 2 {
                    log.Printf("转账失败: %v", err)
                }
                time.Sleep(time.Duration(retry*100) * time.Millisecond)
            }
        }(transfer)
    }
    
    wg.Wait()
}

type TransferRequest struct {
    FromAccount string
    ToAccount   string
    Amount      float64
}

// 数据库表结构
const schema = `
CREATE TABLE IF NOT EXISTS accounts (
    id VARCHAR(50) PRIMARY KEY,
    balance DECIMAL(15,2) NOT NULL DEFAULT 0.00,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE IF NOT EXISTS transactions (
    id SERIAL PRIMARY KEY,
    account_id VARCHAR(50) NOT NULL,
    related_account VARCHAR(50),
    amount DECIMAL(15,2) NOT NULL,
    type VARCHAR(20) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (account_id) REFERENCES accounts(id)
);

CREATE INDEX idx_transactions_account ON transactions(account_id);
CREATE INDEX idx_transactions_created ON transactions(created_at);
`

func main() {
    // 初始化服务
    service, err := NewPaymentService("host=localhost port=5432 user=postgres password=secret dbname=payment sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer service.db.Close()

    // 创建表
    _, err = service.db.Exec(schema)
    if err != nil {
        log.Fatal(err)
    }

    // 示例使用
    ctx := context.Background()
    
    // 存款示例
    err = service.Deposit(ctx, "acc1", 1000.00)
    if err != nil {
        log.Printf("存款失败: %v", err)
    }

    // 并发转账示例
    transfers := []TransferRequest{
        {"acc1", "acc2", 100.00},
        {"acc2", "acc3", 50.00},
        {"acc1", "acc3", 75.00},
    }
    
    service.ProcessConcurrentTransfers(transfers)
}

关键点说明:

  1. 事务隔离级别

    • LevelSerializable:最高隔离级别,防止幻读
    • LevelRepeatableRead:适用于大多数读操作
  2. 死锁预防

    • 在转账操作中按固定顺序锁定账户
    • 使用SELECT FOR UPDATE明确锁定行
  3. 并发控制

    • 使用信号量限制并发数
    • 实现重试机制处理事务冲突
    • 设置合理的上下文超时
  4. 连接池配置

    • 设置最大连接数和空闲连接数
    • 配置连接生命周期

这个实现确保了在高并发场景下的数据一致性,通过事务的ACID特性保证支付操作的安全性和可靠性。

回到顶部