Golang中grpc的错误处理机制探讨

Golang中grpc的错误处理机制探讨 大家好

我正在开发一个包含gRPC服务器部分的Web服务器。到目前为止一切运行正常。如果发生panic,我能够从中恢复。

因为我不想在每次请求到来时都建立数据库连接,所以为服务器实例使用了一个数据库连接。如果数据库连接关闭了怎么办?gRPC函数中的panic会自行恢复,但数据库连接将无法重新建立。

处理这种情况有哪些好的模式?

3 回复

感谢您的回复。问题表述得不够清楚:

我想我的问题是如何在出现错误时处理数据库连接,但我发现 sql.DB 结构已经处理了这个问题……

更多关于Golang中grpc的错误处理机制探讨的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我们需要更多信息来理解这个问题:

Michael_Hugi: 如果发生恐慌,我能够从中恢复。

如果什么发生?

Michael_Hugi: 我有一个用于服务器实例的数据库连接。

你所说的“数据库连接”是什么意思?是指 *sql.DB 还是 *sql.Conn,或者是其他东西?你所说的“用于服务器实例”又是什么意思?是指你有一个全局变量吗?

在gRPC中处理数据库连接失效的问题,可以通过连接池和健康检查机制来解决。下面是一个结合连接重试和健康检查的示例:

package main

import (
    "context"
    "database/sql"
    "errors"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    _ "github.com/lib/pq"
)

type DatabasePool struct {
    db        *sql.DB
    maxRetries int
    retryDelay time.Duration
}

func NewDatabasePool(connStr string, maxRetries int) (*DatabasePool, error) {
    db, err := sql.Open("postgres", connStr)
    if err != nil {
        return nil, err
    }
    
    pool := &DatabasePool{
        db:         db,
        maxRetries: maxRetries,
        retryDelay: 2 * time.Second,
    }
    
    // 设置连接池参数
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(25)
    db.SetConnMaxLifetime(5 * time.Minute)
    
    return pool, nil
}

func (p *DatabasePool) QueryWithRetry(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
    var rows *sql.Rows
    var err error
    
    for i := 0; i < p.maxRetries; i++ {
        rows, err = p.db.QueryContext(ctx, query, args...)
        if err == nil {
            return rows, nil
        }
        
        // 检查是否是连接错误
        if errors.Is(err, sql.ErrConnDone) || p.isConnectionError(err) {
            time.Sleep(p.retryDelay)
            continue
        }
        
        // 非连接错误直接返回
        return nil, err
    }
    
    return nil, status.Errorf(codes.Unavailable, "database unavailable after %d retries", p.maxRetries)
}

func (p *DatabasePool) isConnectionError(err error) bool {
    // 根据数据库驱动判断连接错误
    return err.Error() == "driver: bad connection" || 
           err.Error() == "connection reset by peer"
}

// gRPC拦截器处理数据库错误
func DatabaseInterceptor(pool *DatabasePool) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        // 执行健康检查
        if err := pool.HealthCheck(ctx); err != nil {
            return nil, status.Errorf(codes.Unavailable, "service temporarily unavailable")
        }
        
        resp, err := handler(ctx, req)
        return resp, err
    }
}

func (p *DatabasePool) HealthCheck(ctx context.Context) error {
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()
    
    return p.db.PingContext(ctx)
}

// gRPC服务实现示例
type UserService struct {
    dbPool *DatabasePool
}

func (s *UserService) GetUser(ctx context.Context, req *GetUserRequest) (*UserResponse, error) {
    rows, err := s.dbPool.QueryWithRetry(ctx, 
        "SELECT id, name, email FROM users WHERE id = $1", 
        req.UserId,
    )
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    
    // 处理查询结果
    // ...
    
    return &UserResponse{}, nil
}

对于更复杂的场景,可以使用连接池包装器配合context超时控制:

type ResilientDB struct {
    db          *sql.DB
    connStr     string
    mu          sync.RWMutex
    healthCheck *time.Ticker
}

func NewResilientDB(connStr string) *ResilientDB {
    rdb := &ResilientDB{
        connStr:     connStr,
        healthCheck: time.NewTicker(30 * time.Second),
    }
    
    rdb.connect()
    
    go rdb.monitorConnection()
    
    return rdb
}

func (rdb *ResilientDB) connect() error {
    rdb.mu.Lock()
    defer rdb.mu.Unlock()
    
    db, err := sql.Open("postgres", rdb.connStr)
    if err != nil {
        return err
    }
    
    if rdb.db != nil {
        rdb.db.Close()
    }
    
    rdb.db = db
    return nil
}

func (rdb *ResilientDB) monitorConnection() {
    for range rdb.healthCheck.C {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        
        if err := rdb.db.PingContext(ctx); err != nil {
            rdb.connect()
        }
    }
}

func (rdb *ResilientDB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
    rdb.mu.RLock()
    defer rdb.mu.RUnlock()
    
    return rdb.db.ExecContext(ctx, query, args...)
}

func (rdb *ResilientDB) Close() {
    rdb.healthCheck.Stop()
    rdb.db.Close()
}

在gRPC服务器中集成:

func main() {
    rdb := NewResilientDB("postgres://user:pass@localhost/db")
    defer rdb.Close()
    
    server := grpc.NewServer(
        grpc.UnaryInterceptor(DatabaseInterceptor(rdb)),
    )
    
    // 注册服务
    // ...
}

这种模式通过连接池管理、自动重试机制和健康检查,确保数据库连接中断时能够自动恢复,同时通过gRPC拦截器在请求层面处理数据库不可用的情况。

回到顶部