Golang如何实现与数据库的并发事务处理
Golang如何实现与数据库的并发事务处理 我想创建一个安全的支付服务,用户可以进行存款、取款,以及两个账户之间的转账。 我希望实现事务的并发处理。我使用 PostgreSQL 作为数据库。 请帮我解答如何创建这个服务?
2 回复
据我所知,访问数据库不应该是并发的,因为一个 goroutine 可能会在另一个 goroutine 试图从数据库读取数据时改变数据库的状态。
更多关于Golang如何实现与数据库的并发事务处理的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Go中实现与PostgreSQL的并发事务处理,关键在于正确使用database/sql包的事务机制和适当的隔离级别。以下是一个完整的支付服务示例,包含存款、取款和转账功能:
package main
import (
"context"
"database/sql"
"fmt"
"log"
"sync"
"time"
_ "github.com/lib/pq"
)
type PaymentService struct {
db *sql.DB
}
func NewPaymentService(connStr string) (*PaymentService, error) {
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, err
}
// 设置连接池参数
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(25)
db.SetConnMaxLifetime(5 * time.Minute)
return &PaymentService{db: db}, nil
}
// 存款操作
func (s *PaymentService) Deposit(ctx context.Context, accountID string, amount float64) error {
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable, // 使用最高隔离级别
ReadOnly: false,
})
if err != nil {
return fmt.Errorf("开始事务失败: %w", err)
}
defer tx.Rollback()
// 更新账户余额
_, err = tx.ExecContext(ctx,
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, accountID)
if err != nil {
return fmt.Errorf("更新余额失败: %w", err)
}
// 记录交易
_, err = tx.ExecContext(ctx,
"INSERT INTO transactions (account_id, amount, type) VALUES ($1, $2, 'deposit')",
accountID, amount)
if err != nil {
return fmt.Errorf("记录交易失败: %w", err)
}
return tx.Commit()
}
// 取款操作
func (s *PaymentService) Withdraw(ctx context.Context, accountID string, amount float64) error {
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: false,
})
if err != nil {
return fmt.Errorf("开始事务失败: %w", err)
}
defer tx.Rollback()
// 检查余额是否充足
var currentBalance float64
err = tx.QueryRowContext(ctx,
"SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
accountID).Scan(¤tBalance)
if err != nil {
return fmt.Errorf("查询余额失败: %w", err)
}
if currentBalance < amount {
return fmt.Errorf("余额不足")
}
// 更新账户余额
_, err = tx.ExecContext(ctx,
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, accountID)
if err != nil {
return fmt.Errorf("更新余额失败: %w", err)
}
// 记录交易
_, err = tx.ExecContext(ctx,
"INSERT INTO transactions (account_id, amount, type) VALUES ($1, $2, 'withdraw')",
accountID, amount)
if err != nil {
return fmt.Errorf("记录交易失败: %w", err)
}
return tx.Commit()
}
// 转账操作 - 处理并发转账的关键方法
func (s *PaymentService) Transfer(ctx context.Context, fromAccountID, toAccountID string, amount float64) error {
// 确保账户ID顺序一致,避免死锁
if fromAccountID > toAccountID {
fromAccountID, toAccountID = toAccountID, fromAccountID
amount = -amount
}
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
ReadOnly: false,
})
if err != nil {
return fmt.Errorf("开始事务失败: %w", err)
}
defer tx.Rollback()
// 使用SELECT FOR UPDATE锁定两个账户,按固定顺序避免死锁
var fromBalance, toBalance float64
// 锁定第一个账户
err = tx.QueryRowContext(ctx,
"SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
fromAccountID).Scan(&fromBalance)
if err != nil {
return fmt.Errorf("锁定转出账户失败: %w", err)
}
// 锁定第二个账户
err = tx.QueryRowContext(ctx,
"SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
toAccountID).Scan(&toBalance)
if err != nil {
return fmt.Errorf("锁定转入账户失败: %w", err)
}
// 检查转出账户余额
if fromBalance < amount {
return fmt.Errorf("转出账户余额不足")
}
// 执行转账
_, err = tx.ExecContext(ctx,
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, fromAccountID)
if err != nil {
return fmt.Errorf("更新转出账户失败: %w", err)
}
_, err = tx.ExecContext(ctx,
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, toAccountID)
if err != nil {
return fmt.Errorf("更新转入账户失败: %w", err)
}
// 记录两笔交易
_, err = tx.ExecContext(ctx,
"INSERT INTO transactions (account_id, amount, type, related_account) VALUES ($1, $2, 'transfer_out', $3)",
fromAccountID, -amount, toAccountID)
if err != nil {
return fmt.Errorf("记录转出交易失败: %w", err)
}
_, err = tx.ExecContext(ctx,
"INSERT INTO transactions (account_id, amount, type, related_account) VALUES ($1, $2, 'transfer_in', $3)",
toAccountID, amount, fromAccountID)
if err != nil {
return fmt.Errorf("记录转入交易失败: %w", err)
}
return tx.Commit()
}
// 并发处理示例
func (s *PaymentService) ProcessConcurrentTransfers(transfers []TransferRequest) {
var wg sync.WaitGroup
semaphore := make(chan struct{}, 10) // 限制并发数
for _, transfer := range transfers {
wg.Add(1)
go func(t TransferRequest) {
defer wg.Done()
semaphore <- struct{}{}
defer func() { <-semaphore }()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 重试机制处理死锁
for retry := 0; retry < 3; retry++ {
err := s.Transfer(ctx, t.FromAccount, t.ToAccount, t.Amount)
if err == nil {
log.Printf("转账成功: %s -> %s 金额: %.2f",
t.FromAccount, t.ToAccount, t.Amount)
break
}
if retry == 2 {
log.Printf("转账失败: %v", err)
}
time.Sleep(time.Duration(retry*100) * time.Millisecond)
}
}(transfer)
}
wg.Wait()
}
type TransferRequest struct {
FromAccount string
ToAccount string
Amount float64
}
// 数据库表结构
const schema = `
CREATE TABLE IF NOT EXISTS accounts (
id VARCHAR(50) PRIMARY KEY,
balance DECIMAL(15,2) NOT NULL DEFAULT 0.00,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS transactions (
id SERIAL PRIMARY KEY,
account_id VARCHAR(50) NOT NULL,
related_account VARCHAR(50),
amount DECIMAL(15,2) NOT NULL,
type VARCHAR(20) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (account_id) REFERENCES accounts(id)
);
CREATE INDEX idx_transactions_account ON transactions(account_id);
CREATE INDEX idx_transactions_created ON transactions(created_at);
`
func main() {
// 初始化服务
service, err := NewPaymentService("host=localhost port=5432 user=postgres password=secret dbname=payment sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer service.db.Close()
// 创建表
_, err = service.db.Exec(schema)
if err != nil {
log.Fatal(err)
}
// 示例使用
ctx := context.Background()
// 存款示例
err = service.Deposit(ctx, "acc1", 1000.00)
if err != nil {
log.Printf("存款失败: %v", err)
}
// 并发转账示例
transfers := []TransferRequest{
{"acc1", "acc2", 100.00},
{"acc2", "acc3", 50.00},
{"acc1", "acc3", 75.00},
}
service.ProcessConcurrentTransfers(transfers)
}
关键点说明:
-
事务隔离级别:
LevelSerializable:最高隔离级别,防止幻读LevelRepeatableRead:适用于大多数读操作
-
死锁预防:
- 在转账操作中按固定顺序锁定账户
- 使用
SELECT FOR UPDATE明确锁定行
-
并发控制:
- 使用信号量限制并发数
- 实现重试机制处理事务冲突
- 设置合理的上下文超时
-
连接池配置:
- 设置最大连接数和空闲连接数
- 配置连接生命周期
这个实现确保了在高并发场景下的数据一致性,通过事务的ACID特性保证支付操作的安全性和可靠性。

