Golang中如何优雅地清理Waitgroups和Channels
Golang中如何优雅地清理Waitgroups和Channels 关于使用等待组(WaitGroup)和通道(Channel),我有一个问题。我有一个简单的命令,它接收一个文件列表并并发地对它们进行哈希计算(使用 numRoutines 个协程)。我觉得我这里的代码可以变得更简洁,但也许我想得太复杂了。有人能看一下并给我一些反馈吗?
我特别好奇如何改进对协程返回哈希数量的检查(在 go/for/select 代码块中)。
func analyzeNode(log *logger.LoggerInstance, wg *sync.WaitGroup, in <-chan string, out chan<- models.Node, errored chan<- struct{}) {
defer wg.Done()
for filename := range in {
fileStat, err := os.Stat(filename)
if err != nil {
log.Error("Can't read file", err, "file", filename)
errored <- struct{}{}
continue
}
f, err := os.Open(filename)
if err != nil {
log.Error("something went wrong opening file", err, "file", filename)
errored <- struct{}{}
continue
}
md5Hash := file_ops.MD5Hash(f)
out <- models.Node{Name: filename, Size: fileStat.Size(), MD5: md5Hash}
f.Close()
}
}
var analyzeCmd = &cobra.Command{
Use: "analyze",
Short: "Analyze a set of directories and/or files",
Run: func(cmd *cobra.Command, args []string) {
log := logger.CreateLogger("analyze")
var nl models.NodeList
log.Trace("analyze file called")
numRoutines, err := cmd.Flags().GetInt("routines")
if err != nil {
log.Error("Can't get numRoutines. Defaulting to 1", err)
numRoutines = 1
}
files := disk_inventory.BuildFileList(args...)
var wg sync.WaitGroup
supplier, receiver, errored, done := make(chan string), make(chan models.Node), make(chan struct{}), make(chan struct{})
received := 0
for i := 0; i < numRoutines; i++ {
wg.Add(1)
go analyzeNode(&log, &wg, supplier, receiver, errored)
}
go func() {
for {
select {
case n := <-receiver:
nl.Append(n)
received++
case <-errored:
received++
}
if received == len(files) {
done <- struct{}{}
break
}
}
}()
for _, filename := range files {
supplier <- filename
}
<-done
close(supplier)
close(receiver)
wg.Wait()
},
}
谢谢!
更多关于Golang中如何优雅地清理Waitgroups和Channels的实战教程也可以访问 https://www.itying.com/category-94-b0.html
感谢第一个提示!我完全忽略了那一点!
不过,我认为我不需要从通道中查找未读消息的数量。依赖它似乎会导致错误。
这是一个典型的并发模式优化问题。你的代码可以通过几种方式变得更简洁和安全。主要问题是手动跟踪 received 计数器和通道关闭逻辑不够优雅。
以下是改进版本:
func analyzeNode(log *logger.LoggerInstance, wg *sync.WaitGroup, in <-chan string, out chan<- models.Node, errored chan<- struct{}) {
defer wg.Done()
for filename := range in {
fileStat, err := os.Stat(filename)
if err != nil {
log.Error("Can't read file", err, "file", filename)
errored <- struct{}{}
continue
}
f, err := os.Open(filename)
if err != nil {
log.Error("something went wrong opening file", err, "file", filename)
errored <- struct{}{}
continue
}
md5Hash := file_ops.MD5Hash(f)
out <- models.Node{Name: filename, Size: fileStat.Size(), MD5: md5Hash}
f.Close()
}
}
var analyzeCmd = &cobra.Command{
Use: "analyze",
Short: "Analyze a set of directories and/or files",
Run: func(cmd *cobra.Command, args []string) {
log := logger.CreateLogger("analyze")
var nl models.NodeList
log.Trace("analyze file called")
numRoutines, err := cmd.Flags().GetInt("routines")
if err != nil {
log.Error("Can't get numRoutines. Defaulting to 1", err)
numRoutines = 1
}
files := disk_inventory.BuildFileList(args...)
var wg sync.WaitGroup
supplier := make(chan string, len(files))
receiver := make(chan models.Node, len(files))
errored := make(chan struct{}, len(files))
// 启动工作协程
for i := 0; i < numRoutines; i++ {
wg.Add(1)
go analyzeNode(&log, &wg, supplier, receiver, errored)
}
// 发送所有文件到通道
for _, filename := range files {
supplier <- filename
}
close(supplier)
// 等待所有工作协程完成
wg.Wait()
// 关闭接收通道
close(receiver)
close(errored)
// 收集结果
for node := range receiver {
nl.Append(node)
}
// 统计错误数量
errorCount := 0
for range errored {
errorCount++
}
log.Info(fmt.Sprintf("Processed %d files, %d errors", len(files), errorCount))
},
}
关键改进:
-
使用缓冲通道:为
supplier、receiver和errored通道设置缓冲区大小等于文件数量,避免阻塞。 -
简化协程管理:先发送所有文件到
supplier通道,然后立即关闭它。工作协程会在通道关闭后自动退出。 -
移除
done通道:通过wg.Wait()等待所有工作协程完成,然后安全地关闭接收通道。 -
顺序收集结果:在工作协程全部完成后,顺序地从
receiver和errored通道读取结果,无需复杂的select语句。 -
自动清理:使用
defer wg.Done()确保工作协程结束时递减计数器,close(supplier)确保工作协程能正常退出。
这个版本更简洁,避免了竞态条件,并且资源清理更彻底。


