Golang中grpc的错误处理机制探讨
Golang中grpc的错误处理机制探讨 大家好
我正在开发一个包含gRPC服务器部分的Web服务器。到目前为止一切运行正常。如果发生panic,我能够从中恢复。
因为我不想在每次请求到来时都建立数据库连接,所以为服务器实例使用了一个数据库连接。如果数据库连接关闭了怎么办?gRPC函数中的panic会自行恢复,但数据库连接将无法重新建立。
处理这种情况有哪些好的模式?
感谢您的回复。问题表述得不够清楚:
我想我的问题是如何在出现错误时处理数据库连接,但我发现 sql.DB 结构已经处理了这个问题……
更多关于Golang中grpc的错误处理机制探讨的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
我们需要更多信息来理解这个问题:
Michael_Hugi: 如果发生恐慌,我能够从中恢复。
如果什么发生?
Michael_Hugi: 我有一个用于服务器实例的数据库连接。
你所说的“数据库连接”是什么意思?是指 *sql.DB 还是 *sql.Conn,或者是其他东西?你所说的“用于服务器实例”又是什么意思?是指你有一个全局变量吗?
在gRPC中处理数据库连接失效的问题,可以通过连接池和健康检查机制来解决。下面是一个结合连接重试和健康检查的示例:
package main
import (
"context"
"database/sql"
"errors"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
_ "github.com/lib/pq"
)
type DatabasePool struct {
db *sql.DB
maxRetries int
retryDelay time.Duration
}
func NewDatabasePool(connStr string, maxRetries int) (*DatabasePool, error) {
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, err
}
pool := &DatabasePool{
db: db,
maxRetries: maxRetries,
retryDelay: 2 * time.Second,
}
// 设置连接池参数
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(25)
db.SetConnMaxLifetime(5 * time.Minute)
return pool, nil
}
func (p *DatabasePool) QueryWithRetry(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
var rows *sql.Rows
var err error
for i := 0; i < p.maxRetries; i++ {
rows, err = p.db.QueryContext(ctx, query, args...)
if err == nil {
return rows, nil
}
// 检查是否是连接错误
if errors.Is(err, sql.ErrConnDone) || p.isConnectionError(err) {
time.Sleep(p.retryDelay)
continue
}
// 非连接错误直接返回
return nil, err
}
return nil, status.Errorf(codes.Unavailable, "database unavailable after %d retries", p.maxRetries)
}
func (p *DatabasePool) isConnectionError(err error) bool {
// 根据数据库驱动判断连接错误
return err.Error() == "driver: bad connection" ||
err.Error() == "connection reset by peer"
}
// gRPC拦截器处理数据库错误
func DatabaseInterceptor(pool *DatabasePool) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 执行健康检查
if err := pool.HealthCheck(ctx); err != nil {
return nil, status.Errorf(codes.Unavailable, "service temporarily unavailable")
}
resp, err := handler(ctx, req)
return resp, err
}
}
func (p *DatabasePool) HealthCheck(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
return p.db.PingContext(ctx)
}
// gRPC服务实现示例
type UserService struct {
dbPool *DatabasePool
}
func (s *UserService) GetUser(ctx context.Context, req *GetUserRequest) (*UserResponse, error) {
rows, err := s.dbPool.QueryWithRetry(ctx,
"SELECT id, name, email FROM users WHERE id = $1",
req.UserId,
)
if err != nil {
return nil, err
}
defer rows.Close()
// 处理查询结果
// ...
return &UserResponse{}, nil
}
对于更复杂的场景,可以使用连接池包装器配合context超时控制:
type ResilientDB struct {
db *sql.DB
connStr string
mu sync.RWMutex
healthCheck *time.Ticker
}
func NewResilientDB(connStr string) *ResilientDB {
rdb := &ResilientDB{
connStr: connStr,
healthCheck: time.NewTicker(30 * time.Second),
}
rdb.connect()
go rdb.monitorConnection()
return rdb
}
func (rdb *ResilientDB) connect() error {
rdb.mu.Lock()
defer rdb.mu.Unlock()
db, err := sql.Open("postgres", rdb.connStr)
if err != nil {
return err
}
if rdb.db != nil {
rdb.db.Close()
}
rdb.db = db
return nil
}
func (rdb *ResilientDB) monitorConnection() {
for range rdb.healthCheck.C {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := rdb.db.PingContext(ctx); err != nil {
rdb.connect()
}
}
}
func (rdb *ResilientDB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
rdb.mu.RLock()
defer rdb.mu.RUnlock()
return rdb.db.ExecContext(ctx, query, args...)
}
func (rdb *ResilientDB) Close() {
rdb.healthCheck.Stop()
rdb.db.Close()
}
在gRPC服务器中集成:
func main() {
rdb := NewResilientDB("postgres://user:pass@localhost/db")
defer rdb.Close()
server := grpc.NewServer(
grpc.UnaryInterceptor(DatabaseInterceptor(rdb)),
)
// 注册服务
// ...
}
这种模式通过连接池管理、自动重试机制和健康检查,确保数据库连接中断时能够自动恢复,同时通过gRPC拦截器在请求层面处理数据库不可用的情况。

