Golang微服务事务如何维护

Golang微服务事务如何维护 我计划创建一个微服务。我有以下问题:

  1. 我应该为微服务使用 Gin 框架还是 gRPC?
  2. 如果我构建微服务,它将被部署在多个端口上。
  3. 如何为多个微服务维护单一数据库事务?
5 回复

感谢您的帮助

更多关于Golang微服务事务如何维护的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我已经得到了前两个问题的答案。

第三个问题比较有挑战性。我有一个独立的会计应用程序,以及独立的销售和采购模块。我打算创建两个微服务……

这两个服务应该在一个单一的事务内协同工作。

两者应该在一个事务内完成

如果你使用单一数据库,能否创建一个数据库函数/存储过程,一步执行两个查询?

或者创建一个触发器,根据更新表1的结果自动更新表2?

考虑数据库层面——而不是微服务?

1 我应该为微服务使用 Gin 框架还是 gRPC

Gin 是一个额外的层(框架),而 gRPC 是一种协议。据我所知,你可以使用任何你想要的框架(或原生 Go)来使用任何协议。最常见的协议是 REST 和 gRPC。

2 如果我构建微服务,它将部署在多个端口上

每个端口一个微服务。

3 如何为多个微服务维护单一数据库事务

跨多个微服务的单一数据库事务(提交 - 回滚)?这可能是一个挑战。我不知道。

对于微服务中的事务管理,这是一个复杂但关键的问题。以下是针对您问题的具体解决方案:

1. Gin vs gRPC 选择

两者可以结合使用,根据场景选择:

// REST API (Gin) 示例
package main

import (
    "github.com/gin-gonic/gin"
    "net/http"
)

func main() {
    r := gin.Default()
    r.POST("/orders", createOrderHandler)
    r.Run(":8080")
}

func createOrderHandler(c *gin.Context) {
    // 处理订单创建逻辑
    c.JSON(http.StatusOK, gin.H{"message": "Order created"})
}
// gRPC 服务示例
package main

import (
    "context"
    "log"
    "net"

    "google.golang.org/grpc"
    pb "your-package-path/order"
)

type server struct {
    pb.UnimplementedOrderServiceServer
}

func (s *server) CreateOrder(ctx context.Context, req *pb.CreateOrderRequest) (*pb.CreateOrderResponse, error) {
    // 处理订单创建逻辑
    return &pb.CreateOrderResponse{OrderId: "123"}, nil
}

func main() {
    lis, _ := net.Listen("tcp", ":50051")
    s := grpc.NewServer()
    pb.RegisterOrderServiceServer(s, &server{})
    s.Serve(lis)
}

建议:内部服务间通信用gRPC,对外API用Gin。

2. 多端口部署配置

使用环境变量或配置文件管理端口:

package config

import "os"

type ServiceConfig struct {
    HTTPPort string
    GRPCPort string
    DBHost   string
}

func LoadConfig() ServiceConfig {
    return ServiceConfig{
        HTTPPort: getEnv("HTTP_PORT", "8080"),
        GRPCPort: getEnv("GRPC_PORT", "50051"),
        DBHost:   getEnv("DB_HOST", "localhost"),
    }
}

func getEnv(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

3. 分布式事务解决方案

方案A:Saga模式实现

package saga

import (
    "context"
    "errors"
)

type SagaStep struct {
    Execute   func(ctx context.Context) error
    Compensate func(ctx context.Context) error
}

type Saga struct {
    steps []SagaStep
}

func (s *Saga) AddStep(execute, compensate func(ctx context.Context) error) {
    s.steps = append(s.steps, SagaStep{
        Execute:   execute,
        Compensate: compensate,
    })
}

func (s *Saga) Execute(ctx context.Context) error {
    var completedSteps []int
    
    for i, step := range s.steps {
        if err := step.Execute(ctx); err != nil {
            // 执行补偿操作
            for j := len(completedSteps) - 1; j >= 0; j-- {
                s.steps[completedSteps[j]].Compensate(ctx)
            }
            return err
        }
        completedSteps = append(completedSteps, i)
    }
    return nil
}

// 使用示例
func createOrderSaga(ctx context.Context) error {
    saga := &Saga{}
    
    saga.AddStep(
        // 扣减库存
        func(ctx context.Context) error {
            // 调用库存服务
            return nil
        },
        // 补偿:恢复库存
        func(ctx context.Context) error {
            // 调用库存恢复接口
            return nil
        },
    )
    
    saga.AddStep(
        // 创建订单
        func(ctx context.Context) error {
            // 调用订单服务
            return nil
        },
        // 补偿:取消订单
        func(ctx context.Context) error {
            // 调用订单取消接口
            return nil
        },
    )
    
    return saga.Execute(ctx)
}

方案B:使用DTM分布式事务框架

package main

import (
    "context"
    "github.com/dtm-labs/dtm/client/dtmgrpc"
    pb "your-package-path/order"
)

func createOrderWithDTM() error {
    // DTM服务器地址
    dtmServer := "localhost:36789"
    
    // 创建SAGA事务
    saga := dtmgrpc.NewSaga(dtmServer, "order-saga").
        Add("localhost:50051/order.OrderService/CreateOrder", 
            "localhost:50051/order.OrderService/CancelOrder", 
            &pb.CreateOrderRequest{UserId: "123"}).
        Add("localhost:50052/inventory.InventoryService/ReduceStock", 
            "localhost:50052/inventory.InventoryService/RevertStock", 
            &pb.ReduceStockRequest{ProductId: "456", Quantity: 1})
    
    // 提交事务
    return saga.Submit()
}

方案C:本地消息表+事件驱动

package event

import (
    "context"
    "database/sql"
    "encoding/json"
    "time"
)

type OutboxMessage struct {
    ID        string
    Topic     string
    Payload   []byte
    Status    string
    CreatedAt time.Time
}

type Outbox struct {
    db *sql.DB
}

func (o *Outbox) SaveMessage(ctx context.Context, topic string, payload interface{}) error {
    data, _ := json.Marshal(payload)
    
    tx, _ := o.db.BeginTx(ctx, nil)
    
    // 业务操作
    _, err := tx.ExecContext(ctx, 
        "INSERT INTO orders (...) VALUES (...)")
    if err != nil {
        tx.Rollback()
        return err
    }
    
    // 保存消息到本地表
    _, err = tx.ExecContext(ctx,
        "INSERT INTO outbox_messages (id, topic, payload, status) VALUES (?, ?, ?, 'pending')",
        generateID(), topic, data)
    if err != nil {
        tx.Rollback()
        return err
    }
    
    return tx.Commit()
}

// 独立进程发送消息
func (o *Outbox) ProcessPendingMessages() {
    for {
        rows, _ := o.db.Query("SELECT * FROM outbox_messages WHERE status = 'pending'")
        
        for rows.Next() {
            var msg OutboxMessage
            rows.Scan(&msg.ID, &msg.Topic, &msg.Payload, &msg.Status)
            
            // 发送到消息队列
            sendToKafka(msg.Topic, msg.Payload)
            
            // 更新状态
            o.db.Exec("UPDATE outbox_messages SET status = 'sent' WHERE id = ?", msg.ID)
        }
        
        time.Sleep(5 * time.Second)
    }
}

数据库层面:每个服务独立数据库

// 订单服务数据库
type OrderDB struct {
    db *sql.DB
}

func (o *OrderDB) CreateOrder(ctx context.Context, order Order) error {
    tx, _ := o.db.BeginTx(ctx, nil)
    
    _, err := tx.ExecContext(ctx,
        "INSERT INTO orders (id, user_id, amount) VALUES (?, ?, ?)",
        order.ID, order.UserID, order.Amount)
    
    if err != nil {
        tx.Rollback()
        return err
    }
    
    return tx.Commit()
}

// 库存服务数据库独立
type InventoryDB struct {
    db *sql.DB
}

func (i *InventoryDB) ReduceStock(ctx context.Context, productID string, quantity int) error {
    tx, _ := i.db.BeginTx(ctx, nil)
    
    _, err := tx.ExecContext(ctx,
        "UPDATE inventory SET stock = stock - ? WHERE product_id = ? AND stock >= ?",
        quantity, productID, quantity)
    
    if err != nil {
        tx.Rollback()
        return err
    }
    
    return tx.Commit()
}

关键点:

  1. 避免跨服务数据库事务
  2. 使用Saga、TCC或消息队列保证最终一致性
  3. 每个微服务拥有独立数据库
  4. 通过协调器模式管理分布式事务
回到顶部