Golang中如何在工作协程内关闭range遍历的channel
Golang中如何在工作协程内关闭range遍历的channel 你好,我是Go并发编程的新手。
我的问题具体围绕这几行代码:
wg.Wait()
close(results)
等待组(wait group)等待的goroutine正在对一个通道进行range操作,除非通道被关闭,否则这将无限期地阻塞。close语句发生在wait之后,所以通道永远不会关闭,工作协程(workers)就会一直阻塞。我在网上找到的几乎所有解释这个概念的例子中,工作协程都知道何时停止,也就是说,它们的循环条件不是range。
对于go do(work)的想法是,我不关心它们需要多长时间或何时完成,我只关心我能得到结果。有些任务可能会重试,有些花费的时间不定,所以我没有为它们使用等待组(这可能是个错误,只是我还没意识到)。go do(work)的完成速度会比工作协程处理结果的速度快,工作越多,工作协程花费的时间就越长。
工作协程应该能够并行访问已完成的任何工作,并以某种方式处理它。当我在主函数的阻塞部分添加time.Sleep时,这能产生预期的结果,但添加wg.Wait()会导致panic。我猜想是因为defer wg.Done()永远不会递减,而工作协程由于它们的range操作而一直挂起。
我能否对我的代码做一个小小的调整来使其正常工作,还是说我必须重新思考这个问题?
我尝试了各种其他方法来发出工作完成的信号,并使用select和各种流程控制语句以及辅助通道来安全地关闭通道,但我似乎无法完全让它按照我想要的方式工作。有人能给我指明正确的方向吗?

Go Playground - Go编程语言
package main
import (
"fmt"
"sync"
"time"
)
// 一些需要约500毫秒的复杂工作
func do(num int, ch chan<- int) {
// 在实际代码中,会有多个结果被传递到通道
time.Sleep(time.Duration(500 * time.Millisecond))
/*
这里会执行实际的工作来产生一个新值放入通道
有时会产生2或3个值,它们在此次调用中被添加到ch中
我没有用 num = num*num 来恰当地说明这一点,但返回的数字是不同的,
并且有时会添加多个条目
*/
num = num * num
ch <- num
}
func main() {
results := make(chan int)
// 一定数量的必需复杂工作
for i := 0; i < 53; i++ {
go do(i, results)
}
var wg sync.WaitGroup
// 启动3个可以处理结果的工作协程
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, results)
}(i)
}
// 处理在所有工作协程完成时关闭通道
// panic
wg.Wait()
close(results)
// 可以工作
//time.Sleep(time.Duration(10 * time.Second))
fmt.Println("donezo")
}
// 以有意义的方式处理do()的结果
func worker(id int, ch <-chan int) {
fmt.Println("starting worker", id)
for i := range ch {
fmt.Println("channel val:", i)
}
}
更多关于Golang中如何在工作协程内关闭range遍历的channel的实战教程也可以访问 https://www.itying.com/category-94-b0.html
理论指出,应该在发送方(在你的场景中是“do”函数)关闭通道。 close函数只会向通道发送一条消息,你的工作协程将结束其range循环。
祝好!
更多关于Golang中如何在工作协程内关闭range遍历的channel的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Go中,当工作协程使用range遍历通道时,需要在所有数据发送完成后关闭通道。你的问题在于关闭通道的顺序和位置不正确。
以下是修改后的代码:
package main
import (
"fmt"
"sync"
"time"
)
func do(num int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Duration(500 * time.Millisecond))
num = num * num
ch <- num
}
func main() {
results := make(chan int)
var producerWg sync.WaitGroup
// 启动生产者goroutine
for i := 0; i < 53; i++ {
producerWg.Add(1)
go do(i, results, &producerWg)
}
var consumerWg sync.WaitGroup
// 启动消费者goroutine
for i := 0; i < 3; i++ {
consumerWg.Add(1)
go func(id int) {
defer consumerWg.Done()
worker(id, results)
}(i)
}
// 等待所有生产者完成
producerWg.Wait()
// 关闭通道以通知消费者
close(results)
// 等待所有消费者完成
consumerWg.Wait()
fmt.Println("donezo")
}
func worker(id int, ch <-chan int) {
fmt.Println("starting worker", id)
for i := range ch {
fmt.Println("channel val:", i)
}
}
关键修改:
- 为生产者goroutine添加单独的
sync.WaitGroup - 生产者完成后关闭通道
- 消费者完成后等待
如果你需要更灵活的控制,可以使用context来取消操作:
package main
import (
"context"
"fmt"
"sync"
"time"
)
func do(ctx context.Context, num int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
select {
case <-time.After(time.Duration(500 * time.Millisecond)):
ch <- num * num
case <-ctx.Done():
return
}
}
func worker(ctx context.Context, id int, ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("starting worker", id)
for {
select {
case i, ok := <-ch:
if !ok {
return
}
fmt.Println("channel val:", i)
case <-ctx.Done():
return
}
}
}
这样修改后,生产者完成后会关闭通道,消费者在通道关闭后会自动退出range循环,避免了goroutine泄漏。

