Golang中如何优雅地清理Waitgroups和Channels

Golang中如何优雅地清理Waitgroups和Channels 关于使用等待组(WaitGroup)和通道(Channel),我有一个问题。我有一个简单的命令,它接收一个文件列表并并发地对它们进行哈希计算(使用 numRoutines 个协程)。我觉得我这里的代码可以变得更简洁,但也许我想得太复杂了。有人能看一下并给我一些反馈吗?

我特别好奇如何改进对协程返回哈希数量的检查(在 go/for/select 代码块中)。

func analyzeNode(log *logger.LoggerInstance, wg *sync.WaitGroup, in <-chan string, out chan<- models.Node, errored chan<- struct{}) {
	defer wg.Done()

	for filename := range in {
		fileStat, err := os.Stat(filename)
		if err != nil {
			log.Error("Can't read file", err, "file", filename)
			errored <- struct{}{}
			continue
		}

		f, err := os.Open(filename)
		if err != nil {
			log.Error("something went wrong opening file", err, "file", filename)
			errored <- struct{}{}
			continue
		}

		md5Hash := file_ops.MD5Hash(f)
		out <- models.Node{Name: filename, Size: fileStat.Size(), MD5: md5Hash}
		f.Close()
	}
}

var analyzeCmd = &cobra.Command{
	Use:   "analyze",
	Short: "Analyze a set of directories and/or files",
	Run: func(cmd *cobra.Command, args []string) {
		log := logger.CreateLogger("analyze")
		var nl models.NodeList

		log.Trace("analyze file called")

		numRoutines, err := cmd.Flags().GetInt("routines")
		if err != nil {
			log.Error("Can't get numRoutines. Defaulting to 1", err)
			numRoutines = 1
		}

		files := disk_inventory.BuildFileList(args...)
		var wg sync.WaitGroup
		supplier, receiver, errored, done := make(chan string), make(chan models.Node), make(chan struct{}), make(chan struct{})
		received := 0

		for i := 0; i < numRoutines; i++ {
			wg.Add(1)
			go analyzeNode(&log, &wg, supplier, receiver, errored)
		}

		go func() {
			for {
				select {
				case n := <-receiver:
					nl.Append(n)
					received++
				case <-errored:
					received++
				}

				if received == len(files) {
					done <- struct{}{}
					break
				}
			}
		}()

		for _, filename := range files {
			supplier <- filename
		}

		<-done
		close(supplier)
		close(receiver)
		wg.Wait()
	},
}

谢谢!


更多关于Golang中如何优雅地清理Waitgroups和Channels的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复
  • 我想知道是否可以直接从变量 f 获取文件大小。
  • 检查 len(out) 应该能告诉你通道中有多少条未读消息(注意:这不是并发安全的)。

更多关于Golang中如何优雅地清理Waitgroups和Channels的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


感谢第一个提示!我完全忽略了那一点!

不过,我认为我不需要从通道中查找未读消息的数量。依赖它似乎会导致错误。

这是一个典型的并发模式优化问题。你的代码可以通过几种方式变得更简洁和安全。主要问题是手动跟踪 received 计数器和通道关闭逻辑不够优雅。

以下是改进版本:

func analyzeNode(log *logger.LoggerInstance, wg *sync.WaitGroup, in <-chan string, out chan<- models.Node, errored chan<- struct{}) {
	defer wg.Done()

	for filename := range in {
		fileStat, err := os.Stat(filename)
		if err != nil {
			log.Error("Can't read file", err, "file", filename)
			errored <- struct{}{}
			continue
		}

		f, err := os.Open(filename)
		if err != nil {
			log.Error("something went wrong opening file", err, "file", filename)
			errored <- struct{}{}
			continue
		}

		md5Hash := file_ops.MD5Hash(f)
		out <- models.Node{Name: filename, Size: fileStat.Size(), MD5: md5Hash}
		f.Close()
	}
}

var analyzeCmd = &cobra.Command{
	Use:   "analyze",
	Short: "Analyze a set of directories and/or files",
	Run: func(cmd *cobra.Command, args []string) {
		log := logger.CreateLogger("analyze")
		var nl models.NodeList

		log.Trace("analyze file called")

		numRoutines, err := cmd.Flags().GetInt("routines")
		if err != nil {
			log.Error("Can't get numRoutines. Defaulting to 1", err)
			numRoutines = 1
		}

		files := disk_inventory.BuildFileList(args...)
		var wg sync.WaitGroup
		supplier := make(chan string, len(files))
		receiver := make(chan models.Node, len(files))
		errored := make(chan struct{}, len(files))

		// 启动工作协程
		for i := 0; i < numRoutines; i++ {
			wg.Add(1)
			go analyzeNode(&log, &wg, supplier, receiver, errored)
		}

		// 发送所有文件到通道
		for _, filename := range files {
			supplier <- filename
		}
		close(supplier)

		// 等待所有工作协程完成
		wg.Wait()
		
		// 关闭接收通道
		close(receiver)
		close(errored)

		// 收集结果
		for node := range receiver {
			nl.Append(node)
		}

		// 统计错误数量
		errorCount := 0
		for range errored {
			errorCount++
		}

		log.Info(fmt.Sprintf("Processed %d files, %d errors", len(files), errorCount))
	},
}

关键改进:

  1. 使用缓冲通道:为 supplierreceivererrored 通道设置缓冲区大小等于文件数量,避免阻塞。

  2. 简化协程管理:先发送所有文件到 supplier 通道,然后立即关闭它。工作协程会在通道关闭后自动退出。

  3. 移除 done 通道:通过 wg.Wait() 等待所有工作协程完成,然后安全地关闭接收通道。

  4. 顺序收集结果:在工作协程全部完成后,顺序地从 receivererrored 通道读取结果,无需复杂的 select 语句。

  5. 自动清理:使用 defer wg.Done() 确保工作协程结束时递减计数器,close(supplier) 确保工作协程能正常退出。

这个版本更简洁,避免了竞态条件,并且资源清理更彻底。

回到顶部