[已解决] Golang并发处理 - API请求与数据库更新实践

[已解决] Golang并发处理 - API请求与数据库更新实践 我正在从数据库中获取电子邮件,然后检查它们的垃圾邮件评分。数据库中存在重复的电子邮件。我可以按电子邮件哈希值对结果进行分组,以获取唯一的电子邮件列表。这样我就不会多次检查同一封电子邮件的垃圾邮件评分。在我获得电子邮件的垃圾邮件评分后,我想更新数据库中所有具有该哈希值的行。

因此,为了实现这个目标,我考虑可以使用几个工作协程来查询API,另一个协程来查询数据库以获取所有需要的记录。然后,当API返回结果时,用一个函数来保存记录。执行顺序无关紧要,所以我认为这是一个学习并发的好机会。

我已经尝试了下面代码的几种变体。输出结果也在下面。程序永远不会退出。我想我在弄清楚该在哪里关闭哪个通道时遇到了困难。

谢谢 🙂

package main

import (
	"log"
	"strconv"

	"github.com/fatih/color"
	"github.com/golevi/comint/models"
	"github.com/golevi/spamcheck"
)

func worker(in, out chan models.Email) {
	for email := range in {
		color.Yellow("Working %s\n", email.Hash)
		scr := spamcheck.NewRequest(email.Mail)
		resp, err := scr.CheckScore()
		if err != nil {
			log.Println(err)
		}
		score, err := strconv.ParseFloat(resp.Score, 64)
		email.SpamScore = score
		out <- email
		// log.Println(email.Hash, email.SpamScore)
	}
	color.Yellow("Worker quitting")
}

func sendWorkIn(in chan models.Email, limit int) {
	var emails []models.Email
	models.GetDB().Select("hash").Group("hash").Where("spam_score = 0").Limit(limit).Find(&emails)
	color.Red("Found %d hashes", len(emails))

	for _, email := range emails {
		color.Cyan("Sending %s\n", email.Hash)
		in <- email
	}
	close(in)
	color.Red("Finished sending emails")
}

func receiveResults(out chan models.Email, limit int) {
	color.Red("Receiving results...")
	tx := models.GetDB().Begin()
	for i := 0; i < limit; i++ {
		email := <-out
		tx.Model(models.Email{}).Where("hash = ?", email.Hash).Updates(models.Email{SpamScore: email.SpamScore})
		color.Green("Updated %s\n", email.Hash)
	}
	tx.Commit()
	color.Red("Finished updating database")
}

func main() {
	in, out := make(chan models.Email), make(chan models.Email)
	color.Red("Starting...")

	workers := 10
	for i := 0; i < workers; i++ {
		go worker(in, out)
	}
	limit := 100
	go sendWorkIn(in, limit)
	// wait for all the work to get done
	receiveResults(out, limit)
}


更多关于[已解决] Golang并发处理 - API请求与数据库更新实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

与其使用:

for {
    select {
    case <-out:
        email := <- out
        // ...

不如直接:

for email := range out

更多关于[已解决] Golang并发处理 - API请求与数据库更新实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


这是一个典型的并发控制问题。程序没有退出的原因是 receiveResults 函数只接收固定数量的结果(limit),但实际需要处理的结果数量可能少于这个值,导致通道阻塞。

以下是修复后的代码:

package main

import (
	"log"
	"strconv"
	"sync"

	"github.com/fatih/color"
	"github.com/golevi/comint/models"
	"github.com/golevi/spamcheck"
)

func worker(in, out chan models.Email, wg *sync.WaitGroup) {
	defer wg.Done()
	for email := range in {
		color.Yellow("Working %s\n", email.Hash)
		scr := spamcheck.NewRequest(email.Mail)
		resp, err := scr.CheckScore()
		if err != nil {
			log.Println(err)
			continue
		}
		score, err := strconv.ParseFloat(resp.Score, 64)
		if err != nil {
			log.Println(err)
			continue
		}
		email.SpamScore = score
		out <- email
	}
	color.Yellow("Worker quitting")
}

func sendWorkIn(in chan models.Email, limit int) {
	var emails []models.Email
	models.GetDB().Select("hash").Group("hash").Where("spam_score = 0").Limit(limit).Find(&emails)
	color.Red("Found %d hashes", len(emails))

	for _, email := range emails {
		color.Cyan("Sending %s\n", email.Hash)
		in <- email
	}
	close(in)
	color.Red("Finished sending emails")
}

func receiveResults(out chan models.Email, wg *sync.WaitGroup) {
	color.Red("Receiving results...")
	tx := models.GetDB().Begin()
	
	for email := range out {
		tx.Model(models.Email{}).Where("hash = ?", email.Hash).Updates(models.Email{SpamScore: email.SpamScore})
		color.Green("Updated %s\n", email.Hash)
	}
	
	tx.Commit()
	color.Red("Finished updating database")
}

func main() {
	in, out := make(chan models.Email), make(chan models.Email)
	color.Red("Starting...")

	workers := 10
	var wg sync.WaitGroup
	wg.Add(workers)
	
	for i := 0; i < workers; i++ {
		go worker(in, out, &wg)
	}
	
	limit := 100
	go sendWorkIn(in, limit)
	
	// 等待所有worker完成,然后关闭结果通道
	go func() {
		wg.Wait()
		close(out)
	}()
	
	receiveResults(out, &wg)
}

或者使用更简洁的版本,使用 sync.WaitGroup 来跟踪所有任务完成:

package main

import (
	"log"
	"strconv"
	"sync"

	"github.com/fatih/color"
	"github.com/golevi/comint/models"
	"github.com/golevi/spamcheck"
)

func worker(in, out chan models.Email, wg *sync.WaitGroup) {
	defer wg.Done()
	for email := range in {
		color.Yellow("Working %s\n", email.Hash)
		scr := spamcheck.NewRequest(email.Mail)
		resp, err := scr.CheckScore()
		if err != nil {
			log.Println(err)
			continue
		}
		score, err := strconv.ParseFloat(resp.Score, 64)
		if err != nil {
			log.Println(err)
			continue
		}
		email.SpamScore = score
		out <- email
	}
}

func main() {
	in, out := make(chan models.Email), make(chan models.Email)
	color.Red("Starting...")

	workers := 10
	var workerWg sync.WaitGroup
	var resultWg sync.WaitGroup
	
	workerWg.Add(workers)
	for i := 0; i < workers; i++ {
		go worker(in, out, &workerWg)
	}
	
	// 结果接收协程
	resultWg.Add(1)
	go func() {
		defer resultWg.Done()
		color.Red("Receiving results...")
		tx := models.GetDB().Begin()
		
		for email := range out {
			tx.Model(models.Email{}).Where("hash = ?", email.Hash).Updates(models.Email{SpamScore: email.SpamScore})
			color.Green("Updated %s\n", email.Hash)
		}
		
		tx.Commit()
		color.Red("Finished updating database")
	}()
	
	// 发送工作
	limit := 100
	go func() {
		var emails []models.Email
		models.GetDB().Select("hash").Group("hash").Where("spam_score = 0").Limit(limit).Find(&emails)
		color.Red("Found %d hashes", len(emails))

		for _, email := range emails {
			color.Cyan("Sending %s\n", email.Hash)
			in <- email
		}
		close(in)
		color.Red("Finished sending emails")
		
		// 等待所有worker完成
		workerWg.Wait()
		close(out)
	}()
	
	// 等待结果处理完成
	resultWg.Wait()
}

关键修改:

  1. 使用 sync.WaitGroup 跟踪worker协程
  2. 在所有worker完成后关闭结果通道
  3. 使用 for email := range out 循环接收所有结果,而不是固定次数
  4. 添加错误处理,避免单个API请求失败影响其他处理
回到顶部