Golang中如何实现多协程的超时控制

Golang中如何实现多协程的超时控制 你好,

我正在尝试在 Go 语言中使用上下文(context)实现超时机制。

然而,我无法使其正常工作。即使我将超时设置为 1 秒,我仍然没有收到任何错误,而且 goroutine 的响应在 5-6 秒后才收到。

在理想情况下,我希望在达到超时时间时停止进程并返回空结果。

你能帮我理解我在这里遗漏了什么吗?

func GetResponse() {
  ctx, cancel := context.WithTimeout(context.Background(), time.Duration(searchData.TimeOut)*time.Second)
  defer cancel()
  InfoList := info.GetList()
  count := len(InfoList)
  call_status := make(chan bool)
  for _, info := range InfoList {
     go info.call_first(call_status, ctx,info)
     go info.call_second(call_status, ctx,info)
     go info.call_third(call_status, ctx,info)
     go info.call_n(call_status, ctx,info)
 }
 for i := 0; i < count; i++ {
	<-call_status
 }
}

Goroutine 函数如下:

func call_first(call_status, ctx,info){
 /* some processing here */
 call_status <- true
}

更多关于Golang中如何实现多协程的超时控制的实战教程也可以访问 https://www.itying.com/category-94-b0.html

10 回复

您能告诉我这里是否遗漏了什么吗?谢谢。

更多关于Golang中如何实现多协程的超时控制的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


感谢 @skillian@christophberger。 我是否也需要在 GetResponse() 中检查上下文?

我认为这取决于具体情况。假设超时被触发。如果 goroutine 在退出前向 call_status 发送状态,那么 GetResponse() 可以读取所有状态并正常终止。否则,GetResponse() 确实需要自行检查超时情况。

你能分享完整的可运行代码吗?(最好通过 play.golang.org 的分享链接)

你似乎不小心粘贴了一个无效的 call_first 版本。这里粘贴的函数甚至无法编译,因为 rows 未定义,value 未使用,并且 processdata(Info) 应该写作 processdata(info)(使用小写的 i)。

当上下文被取消时,协程不会自动取消。您需要自行添加代码来检查上下文是否被取消并返回。例如:

func call_first(ctx context.Context, callInfo chan bool, info Info) {
    if err := ctx.Err(); err != nil {
        // 上下文已被取消或处于错误状态。根据错误进行相应处理。
    }

    /* 此处进行一些处理 */
    call_status <- true
}

我无法分享确切的应用程序代码。但我创建了类似的代码:https://play.golang.org/p/wRcn7CtNxbj

在 main() 中,我将超时设置为 5 传递给 context。在 call_first()call_second() 中,有一个 5 次的循环和对 processdata() 的调用。

processdata() 中,我添加了睡眠超时来模拟延迟响应,并期望在 call_second() 中得到 context deadline exceeded 错误。但我现在没有收到任何错误。

我猜测 main() 函数过早退出了。从 call_status 收集布尔值的循环遍历数组长度,但两个 goroutine 各自循环调用 processdata() 5次。因此 main() 过早退出。

defer cancel() 在 main 退出时运行,但 goroutines 可能没有时间对取消操作做出反应,因为它们随着 main() 的退出而硬性停止。

(不过,当我反复运行 playground 代码时,偶尔会看到错误信息。)

func main() {
    fmt.Println("hello world")
}

我在call_first()函数内的循环中尝试了这种方法。然而,我一直遇到上下文截止时间超出的错误。它没有处理数据,也没有返回期望的响应。如果我注释掉select语句,就能得到响应。我也尝试过增加超时时间。

func call_first(ctx context.Context, call_status chan bool, info Info) {
	if err := ctx.Err(); err != nil {
		log.Println(err)
		call_status <- true
	}
	for _, value := range rows {
		select {
		case <-ctx.Done():
			log.Println(ctx.Err())
			call_status <- true
		default:
			
		}
		processdata(Info)
	}
}

除了 @skillian 的解决方案,如果 goroutine 包含一个长时间运行的循环,你也可以在循环内部通过 select 语句检查上下文的“done”通道来寻找超时。

func call_first(ctx context.Context, call_status chan bool, info Info) {
    ... 
    /* 长时间运行的循环 */
    for /* 某个条件 */ {
        select {
        case <-ctx.Done():
            fmt.Println(ctx.Err()) // 以某种方式处理错误
            return // 返回,或者也可以跳出循环
        default: // 需要此分支,以使 select 语句不会阻塞
        }
        /* 尚未超时或出错 -> 进行一些处理 */
    }
    call_status <- true
}

在Go中实现多协程的超时控制,需要正确使用context的取消机制。你的代码存在几个关键问题:

  1. context没有传递给goroutine使用:你的goroutine函数签名显示接收context,但实际调用时没有传递
  2. goroutine没有检查context状态:即使传递了context,也需要在goroutine内部检查ctx.Done()
  3. channel使用不当call_status channel应该能传递错误信息

以下是修正后的实现:

func GetResponse() ([]Result, error) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(searchData.TimeOut)*time.Second)
    defer cancel()
    
    InfoList := info.GetList()
    count := len(InfoList)
    
    // 使用带缓冲的channel,避免goroutine泄漏
    results := make(chan Result, count*4)
    errCh := make(chan error, 1)
    
    // 启动所有goroutine
    for _, info := range InfoList {
        go info.call_first(ctx, info, results, errCh)
        go info.call_second(ctx, info, results, errCh)
        go info.call_third(ctx, info, results, errCh)
        go info.call_n(ctx, info, results, errCh)
    }
    
    // 收集结果,直到超时或所有goroutine完成
    var collectedResults []Result
    for i := 0; i < count*4; i++ {
        select {
        case <-ctx.Done():
            // 超时发生,返回已收集的结果
            return collectedResults, ctx.Err()
        case result := <-results:
            collectedResults = append(collectedResults, result)
        case err := <-errCh:
            // 发生错误,立即返回
            return collectedResults, err
        }
    }
    
    return collectedResults, nil
}

goroutine函数需要这样实现:

func (i *Info) call_first(ctx context.Context, info Info, results chan<- Result, errCh chan<- error) {
    // 创建一个带取消的context,确保不会阻塞
    done := make(chan struct{})
    
    go func() {
        defer close(done)
        // 模拟耗时操作
        time.Sleep(5 * time.Second)
        
        select {
        case <-ctx.Done():
            return // 超时了,直接返回
        default:
            // 正常处理逻辑
            result := process(info)
            select {
            case results <- result:
            case <-ctx.Done():
                return
            }
        }
    }()
    
    select {
    case <-ctx.Done():
        // 超时发生
        return
    case <-done:
        // 正常完成
        return
    }
}

或者更简洁的实现方式:

func (i *Info) call_first(ctx context.Context, info Info, results chan<- Result, errCh chan<- error) {
    // 使用select监听context取消
    select {
    case <-ctx.Done():
        return // 超时或取消
    default:
        // 继续执行
    }
    
    // 执行实际工作
    result, err := doWork(info)
    if err != nil {
        select {
        case errCh <- err:
        case <-ctx.Done():
        }
        return
    }
    
    // 发送结果,同时监听context
    select {
    case results <- result:
    case <-ctx.Done():
        // 超时了,丢弃结果
    }
}

对于需要强制停止长时间运行操作的情况,可以使用context.WithCancel

func GetResponseWithForceStop() ([]Result, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    
    // 使用sync.WaitGroup跟踪goroutine
    var wg sync.WaitGroup
    results := make(chan Result, 100)
    
    for _, info := range InfoList {
        wg.Add(4)
        
        go func(info Info) {
            defer wg.Done()
            if err := doWorkWithContext(ctx, info); err != nil {
                // 处理错误
                return
            }
            // 发送结果
            select {
            case results <- result:
            case <-ctx.Done():
            }
        }(info)
        
        // 其他goroutine类似...
    }
    
    // 等待所有goroutine完成或超时
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    var collected []Result
    for {
        select {
        case <-ctx.Done():
            return collected, ctx.Err()
        case result, ok := <-results:
            if !ok {
                return collected, nil
            }
            collected = append(collected, result)
        }
    }
}

关键点:

  1. 每个goroutine必须定期检查ctx.Done()通道
  2. 所有阻塞操作都应该支持context取消
  3. 使用select语句同时监听结果和context状态
  4. 确保channel有足够的缓冲区或使用非阻塞发送
回到顶部