Golang Worker池 - 如何等待所有goroutine完成工作

Golang Worker池 - 如何等待所有goroutine完成工作 大家好!我写了第一个Go语言应用,一切运行正常,除了一个问题——我使用工作池来并行执行任务,但我注意到并非所有的goroutine都在等待执行。我参考了这个例子

func (provider *GoogleDriveProvider) FetchVideos() {
resp, _ := provider.SheetService.Spreadsheets.Values.Get(provider.SpreadsheetId, provider.SpreadsheetRange).Do()

  jobs := make(chan *googleSheetsVideo, googleWorkersAmount)
  for i := 0; i < googleWorkersAmount; i++ {
       	go provider.executeWork(jobs)
  }
  for _, row := range resp.Values {
	  if len(row) <= minRowLengthForSheets {
		continue
	  }

	  dataSets := provider.getDataSets(row)
	  for _, sheetsVideo := range dataSets {
		  jobs <- sheetsVideo
	  }

  }
  close(jobs)
  provider.Logger.Info("Obtaining videos from google Drive is finished, good job!")
}

func (provider *GoogleDriveProvider) executeWork(jobs <-chan *googleSheetsVideo) {
    for sheetsVideo := range jobs {
	  googleVideoFile, _ := sheetsVideo.fetchVideoData()
	  provider.buildVideo(googleVideoFile, sheetsVideo)
	  provider.Logger.Info("Video saved")
    }
} 

我原本期望控制台最后显示的信息是“Obtaining videos from google Drive is finished, good job!”,但实际上我看到的是: "Obtaining videos from google Drive is finished, good job! “Video saved” “Video saved”

我尝试稍微重写了一下代码,并按照上面链接中的所有步骤(添加了结果通道):

func (provider *GoogleDriveProvider) FetchVideos() {
  resp, _ := provider.SheetService.Spreadsheets.Values.Get(provider.SpreadsheetId, provider.SpreadsheetRange).Do()
  jobs := make(chan *googleSheetsVideo, googleWorkersAmount)
  results := make(chan int, googleWorkersAmount)
  for i := 0; i < googleWorkersAmount; i++ {
	  go provider.executeWork(jobs, results)
  }
  counter:=0
  for _, row := range resp.Values {
	  if len(row) <= minRowLengthForSheets {
	  	continue
	  }
	  counter++
	  dataSets := provider.getDataSets(row)
	  for _, sheetsVideo := range dataSets {
		jobs <- sheetsVideo
	  }

  }
  provider.Logger.WithFields(logFields).Info("DEBUGGGGGGG")
  for a := 0; a < counter; a++ {
	  <-results
  }
  close(jobs)
  provider.Logger.Info("Obtaining videos from google Drive is finished, good job!")
}

func (provider *GoogleDriveProvider) executeWork(jobs <-chan *googleSheetsVideo, results chan<- int) {
for sheetsVideo := range jobs {
	googleVideoFile, _ := sheetsVideo.fetchVideoData()
	video, _ := provider.buildVideo(googleVideoFile, sheetsVideo)
	results <- 1
	provider.Logger.WithFields(logrus.Fields{
		"fileName": video.FileName,
	}).Info("video saved")
 }
}

在这种情况下,我的程序卡住了,我没有看到任何错误,甚至没有看到我的调试信息“DEBUGGGGGGG”。有人能解释一下哪里出了问题吗? PS:我知道有WaitGroup,但我想弄清楚工作池是如何工作的。


更多关于Golang Worker池 - 如何等待所有goroutine完成工作的实战教程也可以访问 https://www.itying.com/category-94-b0.html

8 回复

感谢您的解释!!

更多关于Golang Worker池 - 如何等待所有goroutine完成工作的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


请不要忘记将问题标记为已解决/已答复。

我原以为,通道是另一种无需等待组就能等待所有协程完成的方法。

你好!感谢你的回复。我期望看到最后一条消息显示“从Google Drive获取视频已完成,干得好!”,但在第一种情况下,我看到它之后还有几条“视频已保存”的消息。

考虑到这一点,我得出的结论是主协程没有等待所有工作协程完成。

你好,

在你的第一个例子中,独立的 goroutine 中的 range 循环会一直执行,直到通道中没有数据为止,然后退出循环。但是主 goroutine 在从通道读取之前就已经完成了向通道的写入,所以它会继续执行下去。

我尝试了第二个例子,它抛出了 goroutine 休眠的错误(在添加了另一个类似 result 的通道之后)。我会再深入研究一下第二个例子,然后告诉你。

如果我理解有误,请随时纠正我。

通道非常适合广播信号,通常是一个终止信号。这通常是一个主 Go 协程通知一组其他 Go 协程它们应该停止。这是通过关闭一个通道使其变为非阻塞来实现的。

这里,主 Go 协程需要等待所有其他 Go 协程终止。这种用例通常使用 WaitGroup。

你也可以用通道实现类似的功能,但不会那么直接。例如,你可以为每个工作协程创建一个通道,主协程将它们存储在一个切片中。当一个 Go 协程终止时,它会关闭通道以发出终止信号。主协程会遍历切片中的所有通道并从它们读取数据。一旦一个通道被关闭,读取操作就会解除阻塞,主协程可以继续处理下一个通道。当它到达切片末尾时,所有通道都已被关闭。

这种方法也有效,但 WaitGroup 计数器比一个通道切片更简单。

你的第一段代码是正确的。缺少的是同步。打印 "Obtaining videos from google Drive is finished, good job!" 的主例程应该等待所有 goroutine 在通道关闭后都终止。

以下是你可以如何使用它:

// 同步变量
var wg sync.WaitGroup

func (provider *GoogleDriveProvider) FetchVideos() {
    resp, _ := provider.SheetService.Spreadsheets.Values.Get(provider.SpreadsheetId, provider.SpreadsheetRange).Do()
    wg.Add(googleWorkersAmount)
    jobs := make(chan *googleSheetsVideo, googleWorkersAmount)
    for i := 0; i < googleWorkersAmount; i++ {
        go provider.executeWork(jobs)
    }
    for _, row := range resp.Values {
        if len(row) <= minRowLengthForSheets {
	    continue
        }

        dataSets := provider.getDataSets(row)
        for _, sheetsVideo := range dataSets {
            jobs <- sheetsVideo
        }
    }
    close(jobs)
    wg.Wait()
    provider.Logger.Info("Obtaining videos from google Drive is finished, good job!")
}

func (provider *GoogleDriveProvider) executeWork(jobs <-chan *googleSheetsVideo) {
    for sheetsVideo := range jobs {
        googleVideoFile, _ := sheetsVideo.fetchVideoData()
        provider.buildVideo(googleVideoFile, sheetsVideo)
        provider.Logger.Info("Video saved")
    }
    wg.Done()
}

我添加了 wg 变量及其方法的调用。

WaitGroup 是一个计数器,通过 Add() 递增,通过 Done() 递减。你最初添加将要启动的 goroutine 数量,这样计数器的值就是 goroutine 的数量。Wait() 方法会阻塞,直到计数器达到 0。每个 goroutine 在其任务终止时调用 Done() 来递减计数器。

Wait() 调用必须在 close(jobs) 之后执行,因为关闭通道会通知 goroutine 它们可以终止了。它是在最终打印消息之前执行的,该消息将在所有 goroutine 都终止后执行。

问题在于你的计数器逻辑不正确。counter 统计的是行数,但实际需要等待的是所有任务完成,而不是行数。当 dataSets 包含多个元素时,每个行会生成多个任务,但 counter 只递增一次,导致 results 通道接收的数量少于实际任务数,造成死锁。

以下是修正后的代码,使用 sync.WaitGroup 等待所有 goroutine 完成:

import "sync"

func (provider *GoogleDriveProvider) FetchVideos() {
    resp, _ := provider.SheetService.Spreadsheets.Values.Get(provider.SpreadsheetId, provider.SpreadsheetRange).Do()
    
    jobs := make(chan *googleSheetsVideo, googleWorkersAmount)
    var wg sync.WaitGroup
    
    // 启动 worker
    for i := 0; i < googleWorkersAmount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            provider.executeWork(jobs)
        }()
    }
    
    // 发送任务
    for _, row := range resp.Values {
        if len(row) <= minRowLengthForSheets {
            continue
        }
        
        dataSets := provider.getDataSets(row)
        for _, sheetsVideo := range dataSets {
            jobs <- sheetsVideo
        }
    }
    
    close(jobs)
    wg.Wait() // 等待所有 worker 完成
    provider.Logger.Info("Obtaining videos from google Drive is finished, good job!")
}

func (provider *GoogleDriveProvider) executeWork(jobs <-chan *googleSheetsVideo) {
    for sheetsVideo := range jobs {
        googleVideoFile, _ := sheetsVideo.fetchVideoData()
        provider.buildVideo(googleVideoFile, sheetsVideo)
        provider.Logger.Info("Video saved")
    }
}

如果坚持使用通道同步,需要准确统计任务总数:

func (provider *GoogleDriveProvider) FetchVideos() {
    resp, _ := provider.SheetService.Spreadsheets.Values.Get(provider.SpreadsheetId, provider.SpreadsheetRange).Do()
    
    jobs := make(chan *googleSheetsVideo, googleWorkersAmount)
    results := make(chan int, googleWorkersAmount)
    
    // 启动 worker
    for i := 0; i < googleWorkersAmount; i++ {
        go provider.executeWork(jobs, results)
    }
    
    // 统计总任务数
    totalTasks := 0
    for _, row := range resp.Values {
        if len(row) <= minRowLengthForSheets {
            continue
        }
        
        dataSets := provider.getDataSets(row)
        totalTasks += len(dataSets)
        for _, sheetsVideo := range dataSets {
            jobs <- sheetsVideo
        }
    }
    
    // 等待所有任务完成
    for i := 0; i < totalTasks; i++ {
        <-results
    }
    
    close(jobs)
    provider.Logger.Info("Obtaining videos from google Drive is finished, good job!")
}

func (provider *GoogleDriveProvider) executeWork(jobs <-chan *googleSheetsVideo, results chan<- int) {
    for sheetsVideo := range jobs {
        googleVideoFile, _ := sheetsVideo.fetchVideoData()
        provider.buildVideo(googleVideoFile, sheetsVideo)
        results <- 1
        provider.Logger.Info("Video saved")
    }
}

关键点:

  1. totalTasks 需要准确统计所有 sheetsVideo 的数量
  2. results 通道的接收次数必须等于发送次数
  3. 通道操作完成后需要关闭通道避免 goroutine 泄漏
回到顶部