Golang Go语言中 channel 的关闭时机

go 新人请教大佬一个关闭 channel 的问题,发送端逻辑是历遍一堆目录,把里面的文件发送到 chan ,递归方式实现。这里的 chan 关闭有什么方法。

目前这个代码跑起来的问题是会一直阻塞,要手动关闭

func main() {
	var wg sync.WaitGroup
objchan := make(chan []string, 10)

wg.Add(1)
go func(och <-chan []string) {
	defer wg.Done()
	for objs := range och {
		do_something(objs)
	}
}(objchan)

for _, perfix := range []string{"test", "tc"} {
	go Getfile(perfix, objchan)
}

wg.Wait()

}

func Getfile(dir string, filechan chan<- []string) { // send files … filechan <- files // 子目录递归 if dir { go Getfile(dir, filechan) } }


Golang Go语言中 channel 的关闭时机

更多关于Golang Go语言中 channel 的关闭时机的实战教程也可以访问 https://www.itying.com/category-94-b0.html

21 回复

更多关于Golang Go语言中 channel 的关闭时机的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


for objs := range och 改成 for objs, ok := range och
ok 会在 channel 关闭后变成 false

记错了,for 不能用这个写法,正常读取可以

问题是递归,Getfile 不知道会跑多少次

感觉可以参考一下这个: https://stackoverflow.com/questions/13217547/tour-of-go-exercise-10-crawler ,每递归一次就 wg.Add(1)



改成了在历遍时候 add ,但是结果有点奇怪,只能随机处理"test", "tc"中的一个。
在 wg.Wait()后面 time.Sleep ,才能显示完整

发送端效率 》 接收端效率,所以发送端先关闭可能造成结果不完整?
所以还是在接收端处理 chan 关闭比较好?

使用协程池,而不是每次都创建一个 chan 。
和是不是递归没有关系,只要能把数据全部写到 chan 就行。

func main() {
poolNum := 10

var wg sync.WaitGroup
pool := make(chan string, poolNum)

// 处理文件
for i := 0; i < poolNum; i++ {
wg.Add(1)

go func(wg *sync.WaitGroup, ch <-chan string) {
defer wg.Done()

for filename := range ch {
fmt.Println(filename)
}
}(&wg, pool)
}

// 遍历文件
err := filepath.Walk(".",
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

pool <- path
return nil
})
if err != nil {
log.Println(err)
}

close(pool)
wg.Wait()
}

关闭的时候有数据的话,接收端也会先收完的,接收端关闭有 panic 的风险,你不知道什么时候关,你就每个 Getfile 里再用个 wg 碰到 dir 就 wg add 1 , 最顶级目录完成了就是完成了

golang<br><br>import (<br> "fmt"<br> "io/ioutil"<br> "os"<br> "sync"<br>)<br><br>var DirPrefix string<br><br>func main() {<br> DirPrefix, _ = os.Getwd()<br> DirPrefix += "/dir/"<br> var wg sync.WaitGroup<br> objchan := make(chan string, 10)<br> wg.Add(1)<br> go func(och &lt;-chan string) {<br> defer wg.Done()<br> for objs := range och {<br> fmt.Println(objs)<br> }<br> }(objchan)<br><br> wg.Add(1)<br> go func(och chan string) {<br> defer wg.Done()<br> var wgDir sync.WaitGroup<br> for _, perfix := range []string{"test", "tc"} {<br> wgDir.Add(1)<br> go GetFile(perfix, och, &amp;wgDir)<br> }<br> wgDir.Wait()<br> close(objchan)<br> }(objchan)<br> wg.Wait()<br>}<br><br>func GetFile(dir string, fileChan chan string, wg *sync.WaitGroup) {<br> defer wg.Done()<br> // send files<br> dirNow := DirPrefix+dir<br> files, _ := ioutil.ReadDir(dirNow)<br> // 子目录递归<br> for _, v := range files {<br> filePath := DirPrefix+dir+"/"+<a target="_blank" href="http://v.Name" rel="nofollow noopener">v.Name</a>()<br> if IsDir(filePath) {<br> wg.Add(1)<br> go GetFile(filePath, fileChan, wg)<br> } else {<br> fileChan &lt;- filePath<br> }<br> }<br>}<br><br>func IsDir(path string) bool{<br> s, err := os.Stat(path)<br> if err != nil {<br> return false<br> }<br> return s.IsDir()<br>}<br><br>

把读取文件的操作包装到单独的协程里;
在读取操作完成后,close chan ;
试试上面这段。




我试试,谢谢

您可以在 Getfile 函数中关闭 channel 。您可以在每次遍历子目录时,将 channel 传递给下一个 goroutine ,并在当前 goroutine 中关闭 channel 。例如:

<br>func Getfile(dir string, filechan chan&lt;- []string) {<br> // send files <br> ...<br> filechan &lt;- files<br><br> // 子目录递归<br> if dir {<br> // 关闭当前 goroutine 中的 channel<br> close(filechan)<br> // 在新 goroutine 中继续遍历子目录<br> go Getfile(dir, filechan)<br> }<br>}<br>
这样,您就可以在遍历完一个子目录之后,关闭该目录中的 channel ,并在新 goroutine 中继续遍历子目录。这样,遍历完所有子目录后,您就可以在主函数中等待所有 goroutine 完成后退出程序。

channel 能重新打开?

目前代码是这样,问题是为什么后面不加 sleep 就只能随机显示 test ,tc 其中一个的内容?

<br>func main() {<br> var wg sync.WaitGroup<br><br> objchan := make(chan []string, 10)<br><br> go func(och &lt;-chan []string) {<br> for objs := range och {<br> println(objs)<br> }<br> }(objchan)<br><br> for _, perfix := range []string{"test", "tc"} {<br> wg.Add(1)<br> go Getfile(perfix, objchan, &amp;wg)<br> }<br><br> wg.Wait()<br> time.Sleep(1)<br>}<br><br>func Getfile(dir string, filechan chan&lt;- []string, wg *sync.WaitGroup) {<br> defer wg.Done()<br> // send files <br> ...<br> filechan &lt;- files<br> // 子目录递归<br> for _, dir := range dirs {<br> wg.Add(1)<br> go Getfile(dir, filechan)<br> }<br>}<br>

按你改的写和上面的一样,需要加个 sleep ,不然就显示不全,我要处理的是对象存储,通过发 http 请求,是不是和 os 文件系统底层不一样导致你的代码不行

<br> wg.Add(1)<br> go func(och chan&lt;- []cos.Object) {<br> defer wg.Done()<br> var wgg sync.WaitGroup<br> for _, perfix := range []string{"test", "tc"} {<br> wgg.Add(1)<br> go tools.GetObjs(cosClient, perfix, objchan, &amp;wgg)<br> }<br> wgg.Wait()<br> close(och)<br> }(objchan)<br>

用#1 介绍的协程池方法,目前可行

但是协程池 感觉复杂了一层,一定要这样?

能具体描述下你的使用场景吗? 读取文件是 http 请求获取的?还是消费 chan 的地方要有 http 请求?还是什么

腾讯的对象存储,读写都是用 sdk 的

https://github.com/smallnest/chanx

无限缓存 channel ,可以实现无限写,写完再读

在Go语言中,channel的关闭时机是一个需要仔细考虑的问题,因为不当的关闭时机可能导致panic或者死锁。以下是一些关于channel关闭时机的专业建议:

  1. 生产者关闭:通常,当生产者确定不会再向channel发送任何数据时,可以关闭channel。这样做可以通知消费者没有更多的数据可以读取,消费者在读取到channel的零值并检测到channel已关闭时,可以优雅地退出循环。

  2. 避免在消费者中关闭:消费者通常不应该关闭channel,因为消费者无法确定是否还有其他的生产者正在向channel发送数据。如果消费者关闭了channel,而生产者还在尝试发送数据,将会导致panic。

  3. 使用sync.WaitGroup或其他同步机制:在有多生产者或多消费者的情况下,关闭channel可能变得更加复杂。可以使用sync.WaitGroup来确保所有的生产者都完成了数据的发送,然后由一个明确的关闭者来关闭channel。

  4. 避免在select中使用未关闭的channel进行读写:在select语句中,如果channel未被关闭,而你又希望退出循环,可以使用一个额外的channel来发送退出信号,而不是依赖于channel的关闭。

总之,channel的关闭应该由明确知道何时数据发送完毕的一方来执行,且需要确保没有其他goroutine正在或将会向该channel发送数据。在复杂的情况下,使用同步机制来管理channel的关闭是一个好的实践。

回到顶部