[已解决] Golang并发处理 - API请求与数据库更新实践
[已解决] Golang并发处理 - API请求与数据库更新实践 我正在从数据库中获取电子邮件,然后检查它们的垃圾邮件评分。数据库中存在重复的电子邮件。我可以按电子邮件哈希值对结果进行分组,以获取唯一的电子邮件列表。这样我就不会多次检查同一封电子邮件的垃圾邮件评分。在我获得电子邮件的垃圾邮件评分后,我想更新数据库中所有具有该哈希值的行。
因此,为了实现这个目标,我考虑可以使用几个工作协程来查询API,另一个协程来查询数据库以获取所有需要的记录。然后,当API返回结果时,用一个函数来保存记录。执行顺序无关紧要,所以我认为这是一个学习并发的好机会。
我已经尝试了下面代码的几种变体。输出结果也在下面。程序永远不会退出。我想我在弄清楚该在哪里关闭哪个通道时遇到了困难。
谢谢 🙂
package main
import (
"log"
"strconv"
"github.com/fatih/color"
"github.com/golevi/comint/models"
"github.com/golevi/spamcheck"
)
func worker(in, out chan models.Email) {
for email := range in {
color.Yellow("Working %s\n", email.Hash)
scr := spamcheck.NewRequest(email.Mail)
resp, err := scr.CheckScore()
if err != nil {
log.Println(err)
}
score, err := strconv.ParseFloat(resp.Score, 64)
email.SpamScore = score
out <- email
// log.Println(email.Hash, email.SpamScore)
}
color.Yellow("Worker quitting")
}
func sendWorkIn(in chan models.Email, limit int) {
var emails []models.Email
models.GetDB().Select("hash").Group("hash").Where("spam_score = 0").Limit(limit).Find(&emails)
color.Red("Found %d hashes", len(emails))
for _, email := range emails {
color.Cyan("Sending %s\n", email.Hash)
in <- email
}
close(in)
color.Red("Finished sending emails")
}
func receiveResults(out chan models.Email, limit int) {
color.Red("Receiving results...")
tx := models.GetDB().Begin()
for i := 0; i < limit; i++ {
email := <-out
tx.Model(models.Email{}).Where("hash = ?", email.Hash).Updates(models.Email{SpamScore: email.SpamScore})
color.Green("Updated %s\n", email.Hash)
}
tx.Commit()
color.Red("Finished updating database")
}
func main() {
in, out := make(chan models.Email), make(chan models.Email)
color.Red("Starting...")
workers := 10
for i := 0; i < workers; i++ {
go worker(in, out)
}
limit := 100
go sendWorkIn(in, limit)
// wait for all the work to get done
receiveResults(out, limit)
}
更多关于[已解决] Golang并发处理 - API请求与数据库更新实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html
与其使用:
for {
select {
case <-out:
email := <- out
// ...
不如直接:
for email := range out
更多关于[已解决] Golang并发处理 - API请求与数据库更新实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
这是一个典型的并发控制问题。程序没有退出的原因是 receiveResults 函数只接收固定数量的结果(limit),但实际需要处理的结果数量可能少于这个值,导致通道阻塞。
以下是修复后的代码:
package main
import (
"log"
"strconv"
"sync"
"github.com/fatih/color"
"github.com/golevi/comint/models"
"github.com/golevi/spamcheck"
)
func worker(in, out chan models.Email, wg *sync.WaitGroup) {
defer wg.Done()
for email := range in {
color.Yellow("Working %s\n", email.Hash)
scr := spamcheck.NewRequest(email.Mail)
resp, err := scr.CheckScore()
if err != nil {
log.Println(err)
continue
}
score, err := strconv.ParseFloat(resp.Score, 64)
if err != nil {
log.Println(err)
continue
}
email.SpamScore = score
out <- email
}
color.Yellow("Worker quitting")
}
func sendWorkIn(in chan models.Email, limit int) {
var emails []models.Email
models.GetDB().Select("hash").Group("hash").Where("spam_score = 0").Limit(limit).Find(&emails)
color.Red("Found %d hashes", len(emails))
for _, email := range emails {
color.Cyan("Sending %s\n", email.Hash)
in <- email
}
close(in)
color.Red("Finished sending emails")
}
func receiveResults(out chan models.Email, wg *sync.WaitGroup) {
color.Red("Receiving results...")
tx := models.GetDB().Begin()
for email := range out {
tx.Model(models.Email{}).Where("hash = ?", email.Hash).Updates(models.Email{SpamScore: email.SpamScore})
color.Green("Updated %s\n", email.Hash)
}
tx.Commit()
color.Red("Finished updating database")
}
func main() {
in, out := make(chan models.Email), make(chan models.Email)
color.Red("Starting...")
workers := 10
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go worker(in, out, &wg)
}
limit := 100
go sendWorkIn(in, limit)
// 等待所有worker完成,然后关闭结果通道
go func() {
wg.Wait()
close(out)
}()
receiveResults(out, &wg)
}
或者使用更简洁的版本,使用 sync.WaitGroup 来跟踪所有任务完成:
package main
import (
"log"
"strconv"
"sync"
"github.com/fatih/color"
"github.com/golevi/comint/models"
"github.com/golevi/spamcheck"
)
func worker(in, out chan models.Email, wg *sync.WaitGroup) {
defer wg.Done()
for email := range in {
color.Yellow("Working %s\n", email.Hash)
scr := spamcheck.NewRequest(email.Mail)
resp, err := scr.CheckScore()
if err != nil {
log.Println(err)
continue
}
score, err := strconv.ParseFloat(resp.Score, 64)
if err != nil {
log.Println(err)
continue
}
email.SpamScore = score
out <- email
}
}
func main() {
in, out := make(chan models.Email), make(chan models.Email)
color.Red("Starting...")
workers := 10
var workerWg sync.WaitGroup
var resultWg sync.WaitGroup
workerWg.Add(workers)
for i := 0; i < workers; i++ {
go worker(in, out, &workerWg)
}
// 结果接收协程
resultWg.Add(1)
go func() {
defer resultWg.Done()
color.Red("Receiving results...")
tx := models.GetDB().Begin()
for email := range out {
tx.Model(models.Email{}).Where("hash = ?", email.Hash).Updates(models.Email{SpamScore: email.SpamScore})
color.Green("Updated %s\n", email.Hash)
}
tx.Commit()
color.Red("Finished updating database")
}()
// 发送工作
limit := 100
go func() {
var emails []models.Email
models.GetDB().Select("hash").Group("hash").Where("spam_score = 0").Limit(limit).Find(&emails)
color.Red("Found %d hashes", len(emails))
for _, email := range emails {
color.Cyan("Sending %s\n", email.Hash)
in <- email
}
close(in)
color.Red("Finished sending emails")
// 等待所有worker完成
workerWg.Wait()
close(out)
}()
// 等待结果处理完成
resultWg.Wait()
}
关键修改:
- 使用
sync.WaitGroup跟踪worker协程 - 在所有worker完成后关闭结果通道
- 使用
for email := range out循环接收所有结果,而不是固定次数 - 添加错误处理,避免单个API请求失败影响其他处理

