Golang中如何确定哪个goroutine阻塞了执行?

Golang中如何确定哪个goroutine阻塞了执行? 你好!我是 Golang 的新手。

我有一个小型解析器,它会将找到的数据写入 Postgres。我使用的数据库框架是 https://github.com/jackc/pgx

我将解析后的数据从多个 goroutine 写入一个无缓冲通道。

我有一个专门的 goroutine,用于从这个通道读取数据并将其写入数据库。

我正在调试应用程序,它有时会在之后永远挂起(可能是在等待连接池中的空闲数据库连接)。

如何确定是哪个 goroutine 阻塞了执行?

我听说有 pprof,但我从未使用过它。

谢谢。

最小示例:

我有一个类似这样的结构体:

ParsingResults struct {
    parser  DataParser
    data []*common.Data
    err     error
}

在单独的 goroutine 中,我像这样初始化无缓冲通道:

results = make(chan *ParsingResults)

然后我启动多个 goroutine,在其中运行解析器:

go fetcher.Parse(results)

每个解析器收集数据并将其传递到通道,像这样:

var (
    results chan<- *ParsingResults
    pageResults *ParsingResults
)
results <- pageResults
if pageResults.err != nil {
    return
}

time.Sleep(p.provider.DelayBetweenPages)

并且在单独的 goroutine 中启动这样一个函数:

func (fetcher *Fetcher) waitForResults(ctx context.Context) {
    for {
        select {
        case results := <-fetcher.resultsChannel:
            provider := results.parser.GetProvider()
            if results.err != nil {
                common.Logger.Errorw("failed to fetch data from provider",
                    "provider", provider.Url,
                    "error", results.err)
                continue
            }
            data := fetcher.removeDuplicates(results.data)
            common.Logger.Infow("fetched some data",
                "provider", provider.Url,
                "rows_count", len(results.data),
                "unique_rows_count", len(data))
            _, err := fetcher.Repo.SaveFetchedData(ctx, data)
            if err != nil {
                common.Logger.Errorw("failed to save fetched data",
                    "provider", provider.Url,
                    "error", err)
                continue
            }
            common.Logger.Infow("fetched data were saved successfully",
                "provider", provider.Url,
                "rows_count", len(results.data),
                "unique_rows_count", len(data))
        case <-ctx.Done():
            return
        default:
            common.Logger.Infow("for debugging's sake! waiting for some data to arrive!")
        }
    }
}

数据在此函数中存储到数据库:

func (repo *Repository) SaveFetchedData(ctx context.Context, rows []*common.Data) (int64, error) {
    if len(rows) == 0 {
        return 0, nil
    }

    baseQB := sq.Insert(db.DataTableName).
        Columns(saveFetchedDataCols...).
        PlaceholderFormat(sq.Dollar)

    batch := &pgx.Batch{}
    for _, p := range rows {
        curQB := baseQB.Values(p.Row1, p.Row2, sq.Expr("NOW()"))
        curQuery, curArgs, err := curQB.ToSql()

        if err != nil {
            return 0, fmt.Errorf("failed to generate SQL query: %w", err)
        }
        batch.Queue(curQuery, curArgs...)
    }

    br := repo.pool.SendBatch(ctx, batch)
    ct, err := br.Exec()
    if err != nil {
        return 0, fmt.Errorf("failed to run SQL query batch: %w", err)
    }

    return ct.RowsAffected(), nil
}

更多关于Golang中如何确定哪个goroutine阻塞了执行?的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

非常感谢Chris,我会尝试你的指南。

更多关于Golang中如何确定哪个goroutine阻塞了执行?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


确实,pprof 可以成为你的好帮手。

以下是如何在你的代码中轻松添加 pprof 服务器的步骤。

  1. 启动一个 HTTP 服务器(除非你的代码已经有一个):
func main() {
    go func() {
        log.Println(http.ListenAndServe("localhost:8080", nil))
    }()

启动 goroutine 是必要的,因为 ListenAndServe 会在 HTTP 服务器的整个运行期间阻塞。

你的编辑器应该会自动添加导入;否则,请手动添加 “net/http” 的导入。

  1. 在你的导入列表中添加这个导入:
_ "net/http/pprof"

注意下划线("空白标识符"符号)。导入这个包纯粹是为了其副作用,因此你需要在这里使用空白标识符;否则 Go 会抱怨未使用的导入。

  1. 启动你的应用,并在浏览器中打开

http://localhost:7070/debug/pprof

然后,当你确定你的应用已经达到死锁状态时,点击 “full goroutine dump”。

在转储信息中,搜索一对 goroutine,其中要么两者的状态都是 [semacquire],要么一个处于 [chan read] 状态,另一个处于 [chan write] 状态。(我猜还有其他阻塞状态的组合。)

考虑到堆栈转储只是一个快照,因此你可能需要刷新页面几次。如果你等待超过一分钟,阻塞状态会显示没有变化的分钟数。

检查调用堆栈,找出哪些函数在那里相互阻塞。

这应该就是查找阻塞 goroutine 所需的全部步骤。任何错误都是我的,因为我在早餐桌上用几分钟时间输入了这些内容。

提示:除了使用浏览器,你也可以在终端中运行

curl http://localhost:7070/debug/pprof/goroutine?debug=2

来直接获取 goroutine 堆栈转储。

要确定哪个goroutine阻塞了执行,可以使用pprof的goroutine分析功能。以下是具体步骤:

首先,在代码中导入pprof并启动HTTP服务器:

import (
    _ "net/http/pprof"
    "net/http"
)

func main() {
    // 启动pprof服务器
    go func() {
        http.ListenAndServe("localhost:6060", nil)
    }()
    
    // 你的其他代码...
}

当程序挂起时,通过浏览器访问 http://localhost:6060/debug/pprof/goroutine?debug=2 获取详细的goroutine堆栈信息。

或者使用curl获取goroutine分析:

curl http://localhost:6060/debug/pprof/goroutine?debug=2 > goroutines.txt

分析输出文件,查找处于阻塞状态的goroutine。重点关注以下状态:

  • chan send - 表示goroutine在等待向通道发送数据
  • chan receive - 表示goroutine在等待从通道接收数据
  • select - 表示goroutine在select语句处阻塞

也可以使用go tool pprof进行交互式分析:

go tool pprof http://localhost:6060/debug/pprof/goroutine

在pprof交互界面中,使用 top 命令查看goroutine数量,使用 traces 命令查看具体堆栈跟踪。

对于数据库连接池阻塞,可以添加连接池监控:

import (
    "context"
    "time"
    "github.com/jackc/pgx/v5/pgxpool"
)

func (repo *Repository) SaveFetchedData(ctx context.Context, rows []*common.Data) (int64, error) {
    // 添加带超时的上下文
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()
    
    // 记录连接池状态
    stats := repo.pool.Stat()
    common.Logger.Infow("connection pool stats",
        "total_conns", stats.TotalConns(),
        "idle_conns", stats.IdleConns(),
        "max_conns", stats.MaxConns())
    
    // 原有代码...
}

在waitForResults函数中,移除default分支以避免空转:

func (fetcher *Fetcher) waitForResults(ctx context.Context) {
    for {
        select {
        case results := <-fetcher.resultsChannel:
            // 处理结果...
        case <-ctx.Done():
            return
        }
    }
}

这样可以更准确地识别阻塞在通道操作上的goroutine。

回到顶部