Golang中context.Context的标准用法是什么

Golang中context.Context的标准用法是什么 问题: 从一个 goroutine A 中,我将生成多个 goroutine B1, B2, … Bk,我希望:

  • 让 A 等待直到所有 Bi 都结束
  • 能够从 A 取消某个特定的 Bi

解决方案: 选项 1:让每个 Bi 接收一个 context.Context 作为参数,在一个循环中执行少量工作并检查上下文是否已结束,同时让每个 Bi 接收一个 sync.WaitGroup 作为输入,并 defer wg.Done()

我的父 goroutine A 将执行 wg.Wait() 来等待所有 Bi 结束。我如何让 A 等待(例如 3 秒,或者其他事件,比如 95% 的 Bi 完成),查看哪些 Bi goroutine 仍在运行,然后只关闭它们?

选项 2:让所有 Bi 接收一个 context.Context,但同时让它们返回一个完成通道,每个 Bi 都 defer close(finish)

这样我在 A 中可以有很高的灵活性,等待某些条件,然后有选择地取消任何剩余的 Bi。

你们如何解决这个问题,惯用的做法是什么?

func main() {
    fmt.Println("hello world")
}

更多关于Golang中context.Context的标准用法是什么的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中context.Context的标准用法是什么的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中,使用context.Contextsync.WaitGroup组合是处理goroutine生命周期管理的标准做法。以下是针对你需求的惯用实现:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func worker(ctx context.Context, wg *sync.WaitGroup, id int) {
    defer wg.Done()
    
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d cancelled: %v\n", id, ctx.Err())
            return
        default:
            // 模拟工作
            fmt.Printf("Worker %d working...\n", id)
            time.Sleep(500 * time.Millisecond)
            
            // 检查上下文是否已取消
            if ctx.Err() != nil {
                fmt.Printf("Worker %d detected cancellation\n", id)
                return
            }
        }
    }
}

func main() {
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    
    // 创建多个worker
    workers := make([]context.CancelFunc, 5)
    for i := 0; i < 5; i++ {
        // 为每个worker创建独立的子上下文
        workerCtx, workerCancel := context.WithCancel(ctx)
        workers[i] = workerCancel
        
        wg.Add(1)
        go worker(workerCtx, &wg, i)
    }
    
    // 等待3秒
    time.Sleep(3 * time.Second)
    
    // 取消特定的worker(例如索引2和3)
    fmt.Println("\nCancelling workers 2 and 3...")
    workers[2]()
    workers[3]()
    
    // 再等待2秒
    time.Sleep(2 * time.Second)
    
    // 取消所有剩余的worker
    fmt.Println("\nCancelling all remaining workers...")
    cancel()
    
    // 等待所有worker结束
    wg.Wait()
    fmt.Println("All workers finished")
}

对于更复杂的条件控制(如95%完成),可以结合使用通道和原子计数器:

package main

import (
    "context"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

type Worker struct {
    ctx    context.Context
    cancel context.CancelFunc
    done   chan struct{}
    id     int
}

func (w *Worker) run(wg *sync.WaitGroup, completion *int32, totalWorkers int32) {
    defer wg.Done()
    defer close(w.done)
    
    for i := 0; i < 10; i++ {
        select {
        case <-w.ctx.Done():
            fmt.Printf("Worker %d cancelled\n", w.id)
            return
        default:
            // 模拟工作
            time.Sleep(200 * time.Millisecond)
            fmt.Printf("Worker %d completed task %d\n", w.id, i)
        }
    }
    
    // 工作完成,增加完成计数
    completed := atomic.AddInt32(completion, 1)
    fmt.Printf("Worker %d finished. Completed: %d/%d\n", w.id, completed, totalWorkers)
}

func main() {
    const totalWorkers = 10
    var wg sync.WaitGroup
    var completed int32
    
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    workers := make([]*Worker, totalWorkers)
    
    // 启动所有worker
    for i := 0; i < totalWorkers; i++ {
        workerCtx, workerCancel := context.WithCancel(ctx)
        workers[i] = &Worker{
            ctx:    workerCtx,
            cancel: workerCancel,
            done:   make(chan struct{}),
            id:     i,
        }
        
        wg.Add(1)
        go workers[i].run(&wg, &completed, totalWorkers)
    }
    
    // 监控完成进度
    go func() {
        ticker := time.NewTicker(100 * time.Millisecond)
        defer ticker.Stop()
        
        for range ticker.C {
            current := atomic.LoadInt32(&completed)
            percentage := float64(current) / float64(totalWorkers) * 100
            
            if percentage >= 95.0 {
                fmt.Printf("\n95%% completed (%d/%d), cancelling remaining workers\n", 
                    current, totalWorkers)
                
                // 取消所有未完成的worker
                for _, w := range workers {
                    select {
                    case <-w.done:
                        // worker已完成
                    default:
                        // worker未完成,取消它
                        w.cancel()
                    }
                }
                return
            }
        }
    }()
    
    // 等待所有worker结束
    wg.Wait()
    fmt.Println("All workers finished")
}

这种模式提供了:

  1. 使用context.Context进行取消控制
  2. 使用sync.WaitGroup等待所有goroutine完成
  3. 能够单独取消特定goroutine
  4. 支持基于条件的批量取消
  5. 符合Go的并发模式惯例
回到顶部