Golang中如何在工作协程内关闭range遍历的channel

Golang中如何在工作协程内关闭range遍历的channel 你好,我是Go并发编程的新手。

我的问题具体围绕这几行代码:

	wg.Wait()
	close(results)

等待组(wait group)等待的goroutine正在对一个通道进行range操作,除非通道被关闭,否则这将无限期地阻塞。close语句发生在wait之后,所以通道永远不会关闭,工作协程(workers)就会一直阻塞。我在网上找到的几乎所有解释这个概念的例子中,工作协程都知道何时停止,也就是说,它们的循环条件不是range

对于go do(work)的想法是,我不关心它们需要多长时间或何时完成,我只关心我能得到结果。有些任务可能会重试,有些花费的时间不定,所以我没有为它们使用等待组(这可能是个错误,只是我还没意识到)。go do(work)的完成速度会比工作协程处理结果的速度快,工作越多,工作协程花费的时间就越长。

工作协程应该能够并行访问已完成的任何工作,并以某种方式处理它。当我在主函数的阻塞部分添加time.Sleep时,这能产生预期的结果,但添加wg.Wait()会导致panic。我猜想是因为defer wg.Done()永远不会递减,而工作协程由于它们的range操作而一直挂起。

我能否对我的代码做一个小小的调整来使其正常工作,还是说我必须重新思考这个问题?

我尝试了各种其他方法来发出工作完成的信号,并使用select和各种流程控制语句以及辅助通道来安全地关闭通道,但我似乎无法完全让它按照我想要的方式工作。有人能给我指明正确的方向吗?

图片

Go Playground - Go编程语言

package main

import (
	"fmt"
	"sync"
	"time"
)

// 一些需要约500毫秒的复杂工作
func do(num int, ch chan<- int) {
	// 在实际代码中,会有多个结果被传递到通道
	time.Sleep(time.Duration(500 * time.Millisecond))

	/*
		这里会执行实际的工作来产生一个新值放入通道
                有时会产生2或3个值,它们在此次调用中被添加到ch中
                我没有用 num = num*num 来恰当地说明这一点,但返回的数字是不同的,
                并且有时会添加多个条目
	*/
        num = num * num
	ch <- num
}

func main() {

	results := make(chan int)

	// 一定数量的必需复杂工作
	for i := 0; i < 53; i++ {
		go do(i, results)
	}

	var wg sync.WaitGroup

	// 启动3个可以处理结果的工作协程
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(id, results)
		}(i)
	}

	// 处理在所有工作协程完成时关闭通道

        // panic
	wg.Wait()
	close(results)

	// 可以工作
	//time.Sleep(time.Duration(10 * time.Second))

	fmt.Println("donezo")
}

// 以有意义的方式处理do()的结果
func worker(id int, ch <-chan int) {
	fmt.Println("starting worker", id)

	for i := range ch {
		fmt.Println("channel val:", i)
	}
}

更多关于Golang中如何在工作协程内关闭range遍历的channel的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

理论指出,应该在发送方(在你的场景中是“do”函数)关闭通道。 close函数只会向通道发送一条消息,你的工作协程将结束其range循环。

祝好!

更多关于Golang中如何在工作协程内关闭range遍历的channel的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中,当工作协程使用range遍历通道时,需要在所有数据发送完成后关闭通道。你的问题在于关闭通道的顺序和位置不正确。

以下是修改后的代码:

package main

import (
	"fmt"
	"sync"
	"time"
)

func do(num int, ch chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	time.Sleep(time.Duration(500 * time.Millisecond))
	num = num * num
	ch <- num
}

func main() {
	results := make(chan int)
	var producerWg sync.WaitGroup

	// 启动生产者goroutine
	for i := 0; i < 53; i++ {
		producerWg.Add(1)
		go do(i, results, &producerWg)
	}

	var consumerWg sync.WaitGroup

	// 启动消费者goroutine
	for i := 0; i < 3; i++ {
		consumerWg.Add(1)
		go func(id int) {
			defer consumerWg.Done()
			worker(id, results)
		}(i)
	}

	// 等待所有生产者完成
	producerWg.Wait()
	// 关闭通道以通知消费者
	close(results)
	// 等待所有消费者完成
	consumerWg.Wait()

	fmt.Println("donezo")
}

func worker(id int, ch <-chan int) {
	fmt.Println("starting worker", id)
	for i := range ch {
		fmt.Println("channel val:", i)
	}
}

关键修改:

  1. 为生产者goroutine添加单独的sync.WaitGroup
  2. 生产者完成后关闭通道
  3. 消费者完成后等待

如果你需要更灵活的控制,可以使用context来取消操作:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func do(ctx context.Context, num int, ch chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	select {
	case <-time.After(time.Duration(500 * time.Millisecond)):
		ch <- num * num
	case <-ctx.Done():
		return
	}
}

func worker(ctx context.Context, id int, ch <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	fmt.Println("starting worker", id)
	for {
		select {
		case i, ok := <-ch:
			if !ok {
				return
			}
			fmt.Println("channel val:", i)
		case <-ctx.Done():
			return
		}
	}
}

这样修改后,生产者完成后会关闭通道,消费者在通道关闭后会自动退出range循环,避免了goroutine泄漏。

回到顶部