Golang中如何使用Errgroup终止所有运行中的goroutine及子goroutine

Golang中如何使用Errgroup终止所有运行中的goroutine及子goroutine 你好,

我正在尝试在某个协程失败时终止所有正在运行的协程。通常这很简单,但我无法让它在包含子协程的协程中正常工作。对于这个示例,我需要的是,当迭代进行到 ID:4 COUNTER:2 这一步时,所有操作都应该失败。然而,所有协程都继续运行直到结束。我将 errgroupmain 函数注入到 run 函数中,但这并没有改变任何结果。我也尝试将组移到循环内部,但同样没有效果。

谢谢

2 回复

如果你想实现 goroutine 的取消操作,那么你需要使用 errgroup.WithContext,然后在每个 goroutine 中检查上下文是否已退出。

类似这样(playground

package main

import (
	"context"
	"errors"
	"fmt"
	"math/rand"
	"time"

	"golang.org/x/sync/errgroup"
)

func main() {
	g, gCtx := errgroup.WithContext(context.Background())

	// 从10个goroutine中打印随机掷骰子结果,直到其中一个掷出双六
	for i := 0; i < 10; i++ {
		id := i
		g.Go(func() error {
			for gCtx.Err() == nil {
				a, b := rand.Intn(6)+1, rand.Intn(6)+1

				fmt.Printf("Go routine %d rolls %d and %d\n", id, a, b)
				if a == 6 && b == 6 {
					fmt.Printf("Error from %d\n", id)
					return errors.New("Double six")
				}
				time.Sleep(1 * time.Millisecond)
			}
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Printf("error %v", err)
	}
}

更多关于Golang中如何使用Errgroup终止所有运行中的goroutine及子goroutine的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中,使用errgroup终止所有goroutine(包括子goroutine)需要正确管理上下文传播。你的示例中,子goroutine没有接收到父goroutine的取消信号,因此会继续运行。

以下是修正后的代码示例:

package main

import (
    "context"
    "fmt"
    "time"

    "golang.org/x/sync/errgroup"
)

func run(ctx context.Context, id int, g *errgroup.Group) error {
    for i := 0; i < 5; i++ {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            fmt.Printf("ID:%d COUNTER:%d\n", id, i)
            time.Sleep(1 * time.Second)
            
            if id == 4 && i == 2 {
                return fmt.Errorf("error from goroutine %d at counter %d", id, i)
            }
            
            // 启动子goroutine
            g.Go(func() error {
                childCtx, cancel := context.WithCancel(ctx)
                defer cancel()
                
                return process(childCtx, id, i)
            })
        }
    }
    return nil
}

func process(ctx context.Context, parentID, counter int) error {
    for j := 0; j < 3; j++ {
        select {
        case <-ctx.Done():
            fmt.Printf("子goroutine %d-%d 已取消\n", parentID, counter)
            return ctx.Err()
        default:
            fmt.Printf("子goroutine: 父ID:%d 计数:%d 子计数:%d\n", parentID, counter, j)
            time.Sleep(500 * time.Millisecond)
        }
    }
    return nil
}

func main() {
    g, ctx := errgroup.WithContext(context.Background())
    
    for i := 0; i < 5; i++ {
        id := i
        g.Go(func() error {
            return run(ctx, id, g)
        })
    }
    
    if err := g.Wait(); err != nil {
        fmt.Printf("发生错误: %v\n", err)
    }
    
    fmt.Println("所有goroutine已终止")
}

关键修改点:

  1. 上下文传播:将ctx作为参数传递给所有函数,确保取消信号能传递到子goroutine
  2. 子goroutine的上下文:使用context.WithCancel(ctx)创建子上下文,确保父上下文取消时子上下文也取消
  3. select监听取消:在所有循环中添加select语句监听ctx.Done()
  4. errgroup复用:将同一个errgroup.Group实例传递给子goroutine,确保错误能正确传播

当ID为4的goroutine在计数器为2时返回错误,errgroup会取消上下文,所有监听该上下文的goroutine都会收到取消信号并终止。

运行结果会显示:

  • 主goroutine在ID:4 COUNTER:2时返回错误
  • 所有正在运行的goroutine和子goroutine都会收到取消信号
  • 程序立即终止,不会继续执行后续迭代

这种方式确保了错误发生时,整个goroutine树都能被正确清理。

回到顶部