Golang Go语言中 channel 的关闭时机
go 新人请教大佬一个关闭 channel 的问题,发送端逻辑是历遍一堆目录,把里面的文件发送到 chan ,递归方式实现。这里的 chan 关闭有什么方法。
目前这个代码跑起来的问题是会一直阻塞,要手动关闭
func main() {
var wg sync.WaitGroup
objchan := make(chan []string, 10)
wg.Add(1)
go func(och <-chan []string) {
defer wg.Done()
for objs := range och {
do_something(objs)
}
}(objchan)
for _, perfix := range []string{"test", "tc"} {
go Getfile(perfix, objchan)
}
wg.Wait()
}
func Getfile(dir string, filechan chan<- []string) {
// send files
…
filechan <- files
// 子目录递归
if dir {
go Getfile(dir, filechan)
}
}
Golang Go语言中 channel 的关闭时机
更多关于Golang Go语言中 channel 的关闭时机的实战教程也可以访问 https://www.itying.com/category-94-b0.html
WaitGroup 用法不对,在遍历文件时 Add(1)。
https://learnku.com/docs/gobyexample/2020/waitgroups/6286
如果想用协程池,参考这个:
https://sunwenfei.gitbook.io/sunwenfei/golang/golang-ji-chu-jiao-cheng/bing-fa/xie-cheng-chi
更多关于Golang Go语言中 channel 的关闭时机的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
for objs := range och 改成 for objs, ok := range och
ok 会在 channel 关闭后变成 false
记错了,for 不能用这个写法,正常读取可以
问题是递归,Getfile 不知道会跑多少次
感觉可以参考一下这个: https://stackoverflow.com/questions/13217547/tour-of-go-exercise-10-crawler ,每递归一次就 wg.Add(1)
改成了在历遍时候 add ,但是结果有点奇怪,只能随机处理"test", "tc"中的一个。
在 wg.Wait()后面 time.Sleep ,才能显示完整
发送端效率 》 接收端效率,所以发送端先关闭可能造成结果不完整?
所以还是在接收端处理 chan 关闭比较好?
使用协程池,而不是每次都创建一个 chan 。
和是不是递归没有关系,只要能把数据全部写到 chan 就行。
func main() {
poolNum := 10
var wg sync.WaitGroup
pool := make(chan string, poolNum)
// 处理文件
for i := 0; i < poolNum; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup, ch <-chan string) {
defer wg.Done()
for filename := range ch {
fmt.Println(filename)
}
}(&wg, pool)
}
// 遍历文件
err := filepath.Walk(".",
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
pool <- path
return nil
})
if err != nil {
log.Println(err)
}
close(pool)
wg.Wait()
}
关闭的时候有数据的话,接收端也会先收完的,接收端关闭有 panic 的风险,你不知道什么时候关,你就每个 Getfile 里再用个 wg 碰到 dir 就 wg add 1 , 最顶级目录完成了就是完成了
golang<br><br>import (<br> "fmt"<br> "io/ioutil"<br> "os"<br> "sync"<br>)<br><br>var DirPrefix string<br><br>func main() {<br> DirPrefix, _ = os.Getwd()<br> DirPrefix += "/dir/"<br> var wg sync.WaitGroup<br> objchan := make(chan string, 10)<br> wg.Add(1)<br> go func(och <-chan string) {<br> defer wg.Done()<br> for objs := range och {<br> fmt.Println(objs)<br> }<br> }(objchan)<br><br> wg.Add(1)<br> go func(och chan string) {<br> defer wg.Done()<br> var wgDir sync.WaitGroup<br> for _, perfix := range []string{"test", "tc"} {<br> wgDir.Add(1)<br> go GetFile(perfix, och, &wgDir)<br> }<br> wgDir.Wait()<br> close(objchan)<br> }(objchan)<br> wg.Wait()<br>}<br><br>func GetFile(dir string, fileChan chan string, wg *sync.WaitGroup) {<br> defer wg.Done()<br> // send files<br> dirNow := DirPrefix+dir<br> files, _ := ioutil.ReadDir(dirNow)<br> // 子目录递归<br> for _, v := range files {<br> filePath := DirPrefix+dir+"/"+<a target="_blank" href="http://v.Name" rel="nofollow noopener">v.Name</a>()<br> if IsDir(filePath) {<br> wg.Add(1)<br> go GetFile(filePath, fileChan, wg)<br> } else {<br> fileChan <- filePath<br> }<br> }<br>}<br><br>func IsDir(path string) bool{<br> s, err := os.Stat(path)<br> if err != nil {<br> return false<br> }<br> return s.IsDir()<br>}<br><br>
把读取文件的操作包装到单独的协程里;
在读取操作完成后,close chan ;
试试上面这段。
我试试,谢谢
您可以在 Getfile 函数中关闭 channel 。您可以在每次遍历子目录时,将 channel 传递给下一个 goroutine ,并在当前 goroutine 中关闭 channel 。例如:<br>func Getfile(dir string, filechan chan<- []string) {<br> // send files <br> ...<br> filechan <- files<br><br> // 子目录递归<br> if dir {<br> // 关闭当前 goroutine 中的 channel<br> close(filechan)<br> // 在新 goroutine 中继续遍历子目录<br> go Getfile(dir, filechan)<br> }<br>}<br>
这样,您就可以在遍历完一个子目录之后,关闭该目录中的 channel ,并在新 goroutine 中继续遍历子目录。这样,遍历完所有子目录后,您就可以在主函数中等待所有 goroutine 完成后退出程序。
channel 能重新打开?
按你改的写和上面的一样,需要加个 sleep ,不然就显示不全,我要处理的是对象存储,通过发 http 请求,是不是和 os 文件系统底层不一样导致你的代码不行<br> wg.Add(1)<br> go func(och chan<- []cos.Object) {<br> defer wg.Done()<br> var wgg sync.WaitGroup<br> for _, perfix := range []string{"test", "tc"} {<br> wgg.Add(1)<br> go tools.GetObjs(cosClient, perfix, objchan, &wgg)<br> }<br> wgg.Wait()<br> close(och)<br> }(objchan)<br>
用#1 介绍的协程池方法,目前可行
但是协程池 感觉复杂了一层,一定要这样?
能具体描述下你的使用场景吗? 读取文件是 http 请求获取的?还是消费 chan 的地方要有 http 请求?还是什么
腾讯的对象存储,读写都是用 sdk 的
https://github.com/smallnest/chanx
无限缓存 channel ,可以实现无限写,写完再读
在Go语言中,channel的关闭时机是一个需要仔细考虑的问题,因为不当的关闭时机可能导致panic或者死锁。以下是一些关于channel关闭时机的专业建议:
-
生产者关闭:通常,当生产者确定不会再向channel发送任何数据时,可以关闭channel。这样做可以通知消费者没有更多的数据可以读取,消费者在读取到channel的零值并检测到channel已关闭时,可以优雅地退出循环。
-
避免在消费者中关闭:消费者通常不应该关闭channel,因为消费者无法确定是否还有其他的生产者正在向channel发送数据。如果消费者关闭了channel,而生产者还在尝试发送数据,将会导致panic。
-
使用
sync.WaitGroup
或其他同步机制:在有多生产者或多消费者的情况下,关闭channel可能变得更加复杂。可以使用sync.WaitGroup
来确保所有的生产者都完成了数据的发送,然后由一个明确的关闭者来关闭channel。 -
避免在select中使用未关闭的channel进行读写:在
select
语句中,如果channel未被关闭,而你又希望退出循环,可以使用一个额外的channel来发送退出信号,而不是依赖于channel的关闭。
总之,channel的关闭应该由明确知道何时数据发送完毕的一方来执行,且需要确保没有其他goroutine正在或将会向该channel发送数据。在复杂的情况下,使用同步机制来管理channel的关闭是一个好的实践。