Golang微服务事务如何维护
Golang微服务事务如何维护 我计划创建一个微服务。我有以下问题:
- 我应该为微服务使用 Gin 框架还是 gRPC?
- 如果我构建微服务,它将被部署在多个端口上。
- 如何为多个微服务维护单一数据库事务?
5 回复
我已经得到了前两个问题的答案。
第三个问题比较有挑战性。我有一个独立的会计应用程序,以及独立的销售和采购模块。我打算创建两个微服务……
这两个服务应该在一个单一的事务内协同工作。
两者应该在一个事务内完成
如果你使用单一数据库,能否创建一个数据库函数/存储过程,一步执行两个查询?
或者创建一个触发器,根据更新表1的结果自动更新表2?
考虑数据库层面——而不是微服务?
1 我应该为微服务使用 Gin 框架还是 gRPC
Gin 是一个额外的层(框架),而 gRPC 是一种协议。据我所知,你可以使用任何你想要的框架(或原生 Go)来使用任何协议。最常见的协议是 REST 和 gRPC。
2 如果我构建微服务,它将部署在多个端口上
每个端口一个微服务。
3 如何为多个微服务维护单一数据库事务
跨多个微服务的单一数据库事务(提交 - 回滚)?这可能是一个挑战。我不知道。
对于微服务中的事务管理,这是一个复杂但关键的问题。以下是针对您问题的具体解决方案:
1. Gin vs gRPC 选择
两者可以结合使用,根据场景选择:
// REST API (Gin) 示例
package main
import (
"github.com/gin-gonic/gin"
"net/http"
)
func main() {
r := gin.Default()
r.POST("/orders", createOrderHandler)
r.Run(":8080")
}
func createOrderHandler(c *gin.Context) {
// 处理订单创建逻辑
c.JSON(http.StatusOK, gin.H{"message": "Order created"})
}
// gRPC 服务示例
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
pb "your-package-path/order"
)
type server struct {
pb.UnimplementedOrderServiceServer
}
func (s *server) CreateOrder(ctx context.Context, req *pb.CreateOrderRequest) (*pb.CreateOrderResponse, error) {
// 处理订单创建逻辑
return &pb.CreateOrderResponse{OrderId: "123"}, nil
}
func main() {
lis, _ := net.Listen("tcp", ":50051")
s := grpc.NewServer()
pb.RegisterOrderServiceServer(s, &server{})
s.Serve(lis)
}
建议:内部服务间通信用gRPC,对外API用Gin。
2. 多端口部署配置
使用环境变量或配置文件管理端口:
package config
import "os"
type ServiceConfig struct {
HTTPPort string
GRPCPort string
DBHost string
}
func LoadConfig() ServiceConfig {
return ServiceConfig{
HTTPPort: getEnv("HTTP_PORT", "8080"),
GRPCPort: getEnv("GRPC_PORT", "50051"),
DBHost: getEnv("DB_HOST", "localhost"),
}
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
3. 分布式事务解决方案
方案A:Saga模式实现
package saga
import (
"context"
"errors"
)
type SagaStep struct {
Execute func(ctx context.Context) error
Compensate func(ctx context.Context) error
}
type Saga struct {
steps []SagaStep
}
func (s *Saga) AddStep(execute, compensate func(ctx context.Context) error) {
s.steps = append(s.steps, SagaStep{
Execute: execute,
Compensate: compensate,
})
}
func (s *Saga) Execute(ctx context.Context) error {
var completedSteps []int
for i, step := range s.steps {
if err := step.Execute(ctx); err != nil {
// 执行补偿操作
for j := len(completedSteps) - 1; j >= 0; j-- {
s.steps[completedSteps[j]].Compensate(ctx)
}
return err
}
completedSteps = append(completedSteps, i)
}
return nil
}
// 使用示例
func createOrderSaga(ctx context.Context) error {
saga := &Saga{}
saga.AddStep(
// 扣减库存
func(ctx context.Context) error {
// 调用库存服务
return nil
},
// 补偿:恢复库存
func(ctx context.Context) error {
// 调用库存恢复接口
return nil
},
)
saga.AddStep(
// 创建订单
func(ctx context.Context) error {
// 调用订单服务
return nil
},
// 补偿:取消订单
func(ctx context.Context) error {
// 调用订单取消接口
return nil
},
)
return saga.Execute(ctx)
}
方案B:使用DTM分布式事务框架
package main
import (
"context"
"github.com/dtm-labs/dtm/client/dtmgrpc"
pb "your-package-path/order"
)
func createOrderWithDTM() error {
// DTM服务器地址
dtmServer := "localhost:36789"
// 创建SAGA事务
saga := dtmgrpc.NewSaga(dtmServer, "order-saga").
Add("localhost:50051/order.OrderService/CreateOrder",
"localhost:50051/order.OrderService/CancelOrder",
&pb.CreateOrderRequest{UserId: "123"}).
Add("localhost:50052/inventory.InventoryService/ReduceStock",
"localhost:50052/inventory.InventoryService/RevertStock",
&pb.ReduceStockRequest{ProductId: "456", Quantity: 1})
// 提交事务
return saga.Submit()
}
方案C:本地消息表+事件驱动
package event
import (
"context"
"database/sql"
"encoding/json"
"time"
)
type OutboxMessage struct {
ID string
Topic string
Payload []byte
Status string
CreatedAt time.Time
}
type Outbox struct {
db *sql.DB
}
func (o *Outbox) SaveMessage(ctx context.Context, topic string, payload interface{}) error {
data, _ := json.Marshal(payload)
tx, _ := o.db.BeginTx(ctx, nil)
// 业务操作
_, err := tx.ExecContext(ctx,
"INSERT INTO orders (...) VALUES (...)")
if err != nil {
tx.Rollback()
return err
}
// 保存消息到本地表
_, err = tx.ExecContext(ctx,
"INSERT INTO outbox_messages (id, topic, payload, status) VALUES (?, ?, ?, 'pending')",
generateID(), topic, data)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
// 独立进程发送消息
func (o *Outbox) ProcessPendingMessages() {
for {
rows, _ := o.db.Query("SELECT * FROM outbox_messages WHERE status = 'pending'")
for rows.Next() {
var msg OutboxMessage
rows.Scan(&msg.ID, &msg.Topic, &msg.Payload, &msg.Status)
// 发送到消息队列
sendToKafka(msg.Topic, msg.Payload)
// 更新状态
o.db.Exec("UPDATE outbox_messages SET status = 'sent' WHERE id = ?", msg.ID)
}
time.Sleep(5 * time.Second)
}
}
数据库层面:每个服务独立数据库
// 订单服务数据库
type OrderDB struct {
db *sql.DB
}
func (o *OrderDB) CreateOrder(ctx context.Context, order Order) error {
tx, _ := o.db.BeginTx(ctx, nil)
_, err := tx.ExecContext(ctx,
"INSERT INTO orders (id, user_id, amount) VALUES (?, ?, ?)",
order.ID, order.UserID, order.Amount)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
// 库存服务数据库独立
type InventoryDB struct {
db *sql.DB
}
func (i *InventoryDB) ReduceStock(ctx context.Context, productID string, quantity int) error {
tx, _ := i.db.BeginTx(ctx, nil)
_, err := tx.ExecContext(ctx,
"UPDATE inventory SET stock = stock - ? WHERE product_id = ? AND stock >= ?",
quantity, productID, quantity)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
关键点:
- 避免跨服务数据库事务
- 使用Saga、TCC或消息队列保证最终一致性
- 每个微服务拥有独立数据库
- 通过协调器模式管理分布式事务

