Golang并发与并行生态系统的实践指南

Golang并发与并行生态系统的实践指南 我在Java中使用线程的用例是: 一个定时任务读取数据库中特定表的特定定时表达式 定时任务触发一个事件 线程开始为多个用户(2000个)执行以下操作 对于每个用户:

  • 步骤 a: 调用外部API获取财务规则
  • 步骤 b: 如果 a 成功,则使用从 a 得到的结果继续调用另一个外部API
  • 如果 步骤 b 成功
    1. 执行计算操作
    2. 发送短信
    3. 将数据填充到表中

注意: 当所有这些步骤中出现异常时,会有一个将错误记录到表中的步骤,并且这些步骤不会停止,直到处理完所有2000个用户。

如何在Golang的并发与并行生态系统中实现这一点? 根据我的伪代码提供一个有用的示例将会很有帮助。 谢谢


更多关于Golang并发与并行生态系统的实践指南的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

感谢您的回复。我已更新我的需求,如果表述清晰,您的帮助对我而言将非常宝贵。

更多关于Golang并发与并行生态系统的实践指南的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


不完全理解你的需求,但“在goroutine上每x秒执行某个任务而不阻塞你的主应用程序”相对简单。类似下面的代码应该能让你开始:

// Context for our goroutine
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Mutext to protect our long running job and make sure 
// it doesn't run concurrently
mu := sync.Mutex{}

// Spin up goroutine to run our job every 10 seconds
// Obviously adjust timer as needed.
go func() {
	t := time.NewTicker(10 * time.Second)
	defer t.Stop()
	for {
		select {
		case <-ctx.Done():
			return // exit goroutine
		case <-t.C:
			mu.Lock()
			doLongRunningJobWithoutBlockingApp()
			mu.Unlock()
		}
	}
}()

你也可以看看基于cron的库,例如 robfig/cron。我曾用它来处理诸如夜间报告生成和批量邮件处理(待处理账户到期通知等)等任务,效果很好。

在Go中实现这个并发处理场景,我们可以利用goroutine、channel和sync包来构建一个高效的解决方案。以下是一个完整的示例实现:

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"sync"
	"time"
)

// 用户数据结构
type User struct {
	ID   int
	Name string
}

// 外部API调用结果
type APIResult struct {
	Success bool
	Data    interface{}
	Error   error
}

// 主处理函数
func main() {
	// 模拟从数据库获取2000个用户
	users := getUsersFromDB()
	
	// 创建context用于超时控制
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	
	// 启动定时任务
	go startScheduledTask(ctx, users)
	
	// 保持主程序运行
	select {
	case <-ctx.Done():
		log.Println("任务执行完成或超时")
	}
}

// 定时任务
func startScheduledTask(ctx context.Context, users []User) {
	ticker := time.NewTicker(1 * time.Hour) // 每小时执行一次
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			processUsersConcurrently(ctx, users)
		case <-ctx.Done():
			return
		}
	}
}

// 并发处理用户
func processUsersConcurrently(ctx context.Context, users []User) {
	var wg sync.WaitGroup
	userChan := make(chan User, 100) // 缓冲channel控制并发度
	
	// 启动worker池
	workerCount := 50 // 根据实际情况调整
	for i := 0; i < workerCount; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			for user := range userChan {
				processSingleUser(ctx, user)
			}
		}(i)
	}
	
	// 发送用户到channel
	for _, user := range users {
		select {
		case userChan <- user:
		case <-ctx.Done():
			close(userChan)
			wg.Wait()
			return
		}
	}
	
	close(userChan)
	wg.Wait()
	log.Println("所有用户处理完成")
}

// 处理单个用户
func processSingleUser(ctx context.Context, user User) {
	// 步骤a: 调用外部API获取财务规则
	ruleResult := callFinancialAPI(ctx, user)
	if !ruleResult.Success {
		logErrorToDB(user, "步骤a失败", ruleResult.Error)
		return
	}
	
	// 步骤b: 调用另一个外部API
	apiResult := callAnotherAPI(ctx, user, ruleResult.Data)
	if !apiResult.Success {
		logErrorToDB(user, "步骤b失败", apiResult.Error)
		return
	}
	
	// 执行计算操作
	calculationResult := performCalculation(apiResult.Data)
	
	// 发送短信
	if err := sendSMS(user, calculationResult); err != nil {
		logErrorToDB(user, "发送短信失败", err)
		return
	}
	
	// 将数据填充到表中
	if err := insertDataToDB(user, calculationResult); err != nil {
		logErrorToDB(user, "数据插入失败", err)
		return
	}
	
	log.Printf("用户 %s 处理成功", user.Name)
}

// 模拟外部API调用 - 财务规则
func callFinancialAPI(ctx context.Context, user User) APIResult {
	// 这里实现实际的API调用逻辑
	// 使用context控制超时
	select {
	case <-time.After(100 * time.Millisecond):
		return APIResult{
			Success: true,
			Data:    fmt.Sprintf("财务规则数据-%d", user.ID),
		}
	case <-ctx.Done():
		return APIResult{
			Success: false,
			Error:   ctx.Err(),
		}
	}
}

// 模拟另一个外部API调用
func callAnotherAPI(ctx context.Context, user User, ruleData interface{}) APIResult {
	select {
	case <-time.After(150 * time.Millisecond):
		return APIResult{
			Success: true,
			Data:    fmt.Sprintf("API数据-%d", user.ID),
		}
	case <-ctx.Done():
		return APIResult{
			Success: false,
			Error:   ctx.Err(),
		}
	}
}

// 执行计算操作
func performCalculation(data interface{}) interface{} {
	// 实现计算逻辑
	return fmt.Sprintf("计算结果-%v", data)
}

// 发送短信
func sendSMS(user User, data interface{}) error {
	// 实现短信发送逻辑
	log.Printf("向用户 %s 发送短信: %v", user.Name, data)
	return nil
}

// 插入数据到数据库
func insertDataToDB(user User, data interface{}) error {
	// 实现数据库插入逻辑
	log.Printf("插入用户 %s 的数据到数据库", user.Name)
	return nil
}

// 错误记录到数据库
func logErrorToDB(user User, step string, err error) {
	// 实现错误日志记录逻辑
	log.Printf("用户 %s 在步骤 %s 出错: %v", user.Name, step, err)
	
	// 实际实现中这里会有数据库插入操作
	// db.Exec("INSERT INTO error_logs (user_id, step, error) VALUES (?, ?, ?)",
	//     user.ID, step, err.Error())
}

// 模拟从数据库获取用户
func getUsersFromDB() []User {
	var users []User
	for i := 1; i <= 2000; i++ {
		users = append(users, User{
			ID:   i,
			Name: fmt.Sprintf("用户-%d", i),
		})
	}
	return users
}

对于更高级的控制,可以使用errgroupsemaphore来优化:

import "golang.org/x/sync/errgroup"
import "golang.org/x/sync/semaphore"

func processUsersWithErrGroup(ctx context.Context, users []User) error {
	g, ctx := errgroup.WithContext(ctx)
	
	// 使用信号量控制并发数量
	sem := semaphore.NewWeighted(50)
	
	for _, user := range users {
		user := user // 创建局部变量
		
		if err := sem.Acquire(ctx, 1); err != nil {
			return err
		}
		
		g.Go(func() error {
			defer sem.Release(1)
			return processUserWithErrorHandling(ctx, user)
		})
	}
	
	return g.Wait()
}

func processUserWithErrorHandling(ctx context.Context, user User) error {
	// 处理逻辑,返回错误
	// 错误会在errgroup中收集
	return nil
}

关键特性说明:

  1. goroutine池模式:通过channel控制并发goroutine数量,避免创建过多goroutine
  2. context超时控制:确保整个处理过程不会无限期运行
  3. 错误隔离:单个用户处理失败不会影响其他用户
  4. 资源管理:使用WaitGroup确保所有goroutine完成
  5. 缓冲channel:平衡生产者和消费者的速度差异

这个实现充分利用了Go的并发原语,提供了高性能、可扩展且健壮的解决方案来处理你的业务场景。

回到顶部