Golang并发与并行生态系统的实践指南
Golang并发与并行生态系统的实践指南 我在Java中使用线程的用例是: 一个定时任务读取数据库中特定表的特定定时表达式 定时任务触发一个事件 线程开始为多个用户(2000个)执行以下操作 对于每个用户:
- 步骤 a: 调用外部API获取财务规则
- 步骤 b: 如果 a 成功,则使用从 a 得到的结果继续调用另一个外部API
- 如果 步骤 b 成功
- 执行计算操作
- 发送短信
- 将数据填充到表中
注意: 当所有这些步骤中出现异常时,会有一个将错误记录到表中的步骤,并且这些步骤不会停止,直到处理完所有2000个用户。
如何在Golang的并发与并行生态系统中实现这一点? 根据我的伪代码提供一个有用的示例将会很有帮助。 谢谢
更多关于Golang并发与并行生态系统的实践指南的实战教程也可以访问 https://www.itying.com/category-94-b0.html
感谢您的回复。我已更新我的需求,如果表述清晰,您的帮助对我而言将非常宝贵。
更多关于Golang并发与并行生态系统的实践指南的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
我不完全理解你的需求,但“在goroutine上每x秒执行某个任务而不阻塞你的主应用程序”相对简单。类似下面的代码应该能让你开始:
// Context for our goroutine
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Mutext to protect our long running job and make sure
// it doesn't run concurrently
mu := sync.Mutex{}
// Spin up goroutine to run our job every 10 seconds
// Obviously adjust timer as needed.
go func() {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return // exit goroutine
case <-t.C:
mu.Lock()
doLongRunningJobWithoutBlockingApp()
mu.Unlock()
}
}
}()
你也可以看看基于cron的库,例如 robfig/cron。我曾用它来处理诸如夜间报告生成和批量邮件处理(待处理账户到期通知等)等任务,效果很好。
在Go中实现这个并发处理场景,我们可以利用goroutine、channel和sync包来构建一个高效的解决方案。以下是一个完整的示例实现:
package main
import (
"context"
"database/sql"
"fmt"
"log"
"sync"
"time"
)
// 用户数据结构
type User struct {
ID int
Name string
}
// 外部API调用结果
type APIResult struct {
Success bool
Data interface{}
Error error
}
// 主处理函数
func main() {
// 模拟从数据库获取2000个用户
users := getUsersFromDB()
// 创建context用于超时控制
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 启动定时任务
go startScheduledTask(ctx, users)
// 保持主程序运行
select {
case <-ctx.Done():
log.Println("任务执行完成或超时")
}
}
// 定时任务
func startScheduledTask(ctx context.Context, users []User) {
ticker := time.NewTicker(1 * time.Hour) // 每小时执行一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
processUsersConcurrently(ctx, users)
case <-ctx.Done():
return
}
}
}
// 并发处理用户
func processUsersConcurrently(ctx context.Context, users []User) {
var wg sync.WaitGroup
userChan := make(chan User, 100) // 缓冲channel控制并发度
// 启动worker池
workerCount := 50 // 根据实际情况调整
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for user := range userChan {
processSingleUser(ctx, user)
}
}(i)
}
// 发送用户到channel
for _, user := range users {
select {
case userChan <- user:
case <-ctx.Done():
close(userChan)
wg.Wait()
return
}
}
close(userChan)
wg.Wait()
log.Println("所有用户处理完成")
}
// 处理单个用户
func processSingleUser(ctx context.Context, user User) {
// 步骤a: 调用外部API获取财务规则
ruleResult := callFinancialAPI(ctx, user)
if !ruleResult.Success {
logErrorToDB(user, "步骤a失败", ruleResult.Error)
return
}
// 步骤b: 调用另一个外部API
apiResult := callAnotherAPI(ctx, user, ruleResult.Data)
if !apiResult.Success {
logErrorToDB(user, "步骤b失败", apiResult.Error)
return
}
// 执行计算操作
calculationResult := performCalculation(apiResult.Data)
// 发送短信
if err := sendSMS(user, calculationResult); err != nil {
logErrorToDB(user, "发送短信失败", err)
return
}
// 将数据填充到表中
if err := insertDataToDB(user, calculationResult); err != nil {
logErrorToDB(user, "数据插入失败", err)
return
}
log.Printf("用户 %s 处理成功", user.Name)
}
// 模拟外部API调用 - 财务规则
func callFinancialAPI(ctx context.Context, user User) APIResult {
// 这里实现实际的API调用逻辑
// 使用context控制超时
select {
case <-time.After(100 * time.Millisecond):
return APIResult{
Success: true,
Data: fmt.Sprintf("财务规则数据-%d", user.ID),
}
case <-ctx.Done():
return APIResult{
Success: false,
Error: ctx.Err(),
}
}
}
// 模拟另一个外部API调用
func callAnotherAPI(ctx context.Context, user User, ruleData interface{}) APIResult {
select {
case <-time.After(150 * time.Millisecond):
return APIResult{
Success: true,
Data: fmt.Sprintf("API数据-%d", user.ID),
}
case <-ctx.Done():
return APIResult{
Success: false,
Error: ctx.Err(),
}
}
}
// 执行计算操作
func performCalculation(data interface{}) interface{} {
// 实现计算逻辑
return fmt.Sprintf("计算结果-%v", data)
}
// 发送短信
func sendSMS(user User, data interface{}) error {
// 实现短信发送逻辑
log.Printf("向用户 %s 发送短信: %v", user.Name, data)
return nil
}
// 插入数据到数据库
func insertDataToDB(user User, data interface{}) error {
// 实现数据库插入逻辑
log.Printf("插入用户 %s 的数据到数据库", user.Name)
return nil
}
// 错误记录到数据库
func logErrorToDB(user User, step string, err error) {
// 实现错误日志记录逻辑
log.Printf("用户 %s 在步骤 %s 出错: %v", user.Name, step, err)
// 实际实现中这里会有数据库插入操作
// db.Exec("INSERT INTO error_logs (user_id, step, error) VALUES (?, ?, ?)",
// user.ID, step, err.Error())
}
// 模拟从数据库获取用户
func getUsersFromDB() []User {
var users []User
for i := 1; i <= 2000; i++ {
users = append(users, User{
ID: i,
Name: fmt.Sprintf("用户-%d", i),
})
}
return users
}
对于更高级的控制,可以使用errgroup和semaphore来优化:
import "golang.org/x/sync/errgroup"
import "golang.org/x/sync/semaphore"
func processUsersWithErrGroup(ctx context.Context, users []User) error {
g, ctx := errgroup.WithContext(ctx)
// 使用信号量控制并发数量
sem := semaphore.NewWeighted(50)
for _, user := range users {
user := user // 创建局部变量
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
g.Go(func() error {
defer sem.Release(1)
return processUserWithErrorHandling(ctx, user)
})
}
return g.Wait()
}
func processUserWithErrorHandling(ctx context.Context, user User) error {
// 处理逻辑,返回错误
// 错误会在errgroup中收集
return nil
}
关键特性说明:
- goroutine池模式:通过channel控制并发goroutine数量,避免创建过多goroutine
- context超时控制:确保整个处理过程不会无限期运行
- 错误隔离:单个用户处理失败不会影响其他用户
- 资源管理:使用WaitGroup确保所有goroutine完成
- 缓冲channel:平衡生产者和消费者的速度差异
这个实现充分利用了Go的并发原语,提供了高性能、可扩展且健壮的解决方案来处理你的业务场景。

