Golang中如何绕过最大并发连接数的限制

Golang中如何绕过最大并发连接数的限制 我有两个需要读取数据的 csv 文件:

  1. AllTransactions,包含 2_618_583 条记录
  2. SalesTransactions,包含 1_070_305 条记录

SalesTransactions(即较小的文件)中有一个字段在 AllTransactions(即较大的文件)中不存在。如果大文件中的交易是销售交易(ST_ID 以 “5” 开头),我需要为每条记录添加这个字段。

我想到的办法是,每当在大文件中找到与小文件匹配的条件时,就使用一个 for 循环来遍历小文件的每一行。这意味着,我需要将 SalesTransaction 中的 1_070_305 条记录读取 1_070_305 次。这超过了允许的 1_048_576 次。

因此,我考虑使用 goroutine,并编写了如下代码:

func NetTrx(item string, allTrx *[]models.MainModel, salesTrx *[]models.SalesTransactions) {

	var wg sync.WaitGroup

	for _, v := range *allTrx {
		trx := models.NetTrasActions{
			ST_TYPE:      v.ST_TP,
			ITEM_ID:      v.ITEM_ID,
			ITEM_NAME:    v.ITEM_NAME,
			QTY:          v.QTY,
			CURNT_COST_L: 0,
			PRICE:        v.PRICE,
			TTL_VAL:      v.QTY * v.PRICE,
			TTL_CST:      0,
			SLS_CNTR_ID:  v.SLS_CNTR_ID,
			BARCODE:      v.BARCODE,
		}

		if strings.HasPrefix(v.ST_ID, "5") { // sales orders transactions starts with 5
			wg.Add(1)
			go func() {
				defer wg.Done()
				for _, s := range *salesTrx {
					if v.ST_ID == s.ST_ID && v.ITEM_ID == s.ITEM_ID {
						trx.CURNT_COST_L = s.CURNT_COST_L
						fmt.Println("matched found for item", v.ITEM_ID, "cost is:", s.CURNT_COST_L)
						break
					}
				}
				fmt.Println("Exit goroutine")
			}()
		}

    wg.Wait()

    // balance of the code
}

运行上述代码时,我遇到了以下错误:

PS D:\Desktop\Bravo for Power BI\2022\YTDTransactions> go run trx
Number of transactions: 2618583
Number of sales transactions: 1070305
Exit goroutine
Exit goroutine
Exit goroutine
Exit goroutine
Exit goroutine
Exit goroutine
Exit goroutine
Exit goroutine
matched found for item 100000105 cost is: 74
Exit goroutine
matched found for item 100000105 cost is: 74
Exit goroutine
matched found for item 100000105 cost is: 74
Exit goroutine
matched found for item 100000105 cost is: 74
Exit goroutine
.
.
.
Panic: too many concurrent operations on a single file or socket (max 1048575)

我该如何绕过这个限制?难道 wg.Done() 不应该在 goroutine 完成后关闭不需要的 goroutine 吗?这样每次打印 Exit goroutine(它就在 wg.Done() 之前打印)时,goroutine 的数量就会减少 1

我的假设是,百万级的 goroutine 不应该同时运行,有些会开启,有些会关闭。每个 goroutine 中的操作都很快,所以 goroutine 应该会定期关闭,而不会达到百万个并发!

image


更多关于Golang中如何绕过最大并发连接数的限制的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

你是怎么调用 NetTrx 的?就我所见,NetTrx 本身除了向标准输出执行 fmt.Println 外,并没有进行任何 IO 操作。我怀疑问题出在别的地方。

更多关于Golang中如何绕过最大并发连接数的限制的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


唯一的IO操作是通过fmt.Println调用输出到标准输出。也许你创建了超过一百万个goroutine,而它们都在对fmt.Println的调用上阻塞了?这是相同的代码,但我将消息发送到一个通道,由单个goroutine逐一打印这些消息:

func NetTrx(item string, allTrx *[]models.MainModel, salesTrx *[]models.SalesTransactions) {
	var wg sync.WaitGroup
	msgs := make(chan string, 128)
	defer close(msgs)
	println := func(args ...interface{}) {
		msgs <- fmt.Sprintln(args...)
	}
	go func() {
		for msg := range msgs {
			fmt.Println(msg)
		}
	}()
	for _, v := range *allTrx {
		trx := models.NetTrasActions{
			ST_TYPE:      v.ST_TP,
			ITEM_ID:      v.ITEM_ID,
			ITEM_NAME:    v.ITEM_NAME,
			QTY:          v.QTY,
			CURNT_COST_L: 0,
			PRICE:        v.PRICE,
			TTL_VAL:      v.QTY * v.PRICE,
			TTL_CST:      0,
			SLS_CNTR_ID:  v.SLS_CNTR_ID,
			BARCODE:      v.BARCODE,
		}
		if strings.HasPrefix(v.ST_ID, "5") { // sales orders transactions starts with 5
			wg.Add(1)
			go func() {
				defer wg.Done()
				for _, s := range *salesTrx {
					if v.ST_ID == s.ST_ID && v.ITEM_ID == s.ITEM_ID {
						trx.CURNT_COST_L = s.CURNT_COST_L
						println("matched found for item", v.ITEM_ID, "cost is:", s.CURNT_COST_L)
						break
					}
				}
				println("Exit goroutine")
			}()
		}
	wg.Wait()
	// balance of the code
}

这只是一个猜测;我目前还不是很确定!

你的代码存在并发设计问题。wg.Wait() 放在了循环内部,导致每次迭代都会等待所有 goroutine 完成,实际上变成了串行执行。更严重的是,对于大文件中每个以 “5” 开头的 ST_ID,你都会启动一个 goroutine 来遍历整个 salesTrx 切片,这会导致创建大量 goroutine。

以下是优化方案:

func NetTrx(item string, allTrx *[]models.MainModel, salesTrx *[]models.SalesTransactions) []models.NetTrasActions {
    // 创建销售交易的查找映射,避免线性搜索
    salesMap := make(map[string]map[string]float64) // ST_ID -> ITEM_ID -> CURNT_COST_L
    
    for _, s := range *salesTrx {
        if _, exists := salesMap[s.ST_ID]; !exists {
            salesMap[s.ST_ID] = make(map[string]float64)
        }
        salesMap[s.ST_ID][s.ITEM_ID] = s.CURNT_COST_L
    }
    
    var result []models.NetTrasActions
    var mu sync.Mutex
    var wg sync.WaitGroup
    
    // 控制并发goroutine数量
    sem := make(chan struct{}, 1000) // 限制最大1000个并发
    
    for _, v := range *allTrx {
        trx := models.NetTrasActions{
            ST_TYPE:      v.ST_TP,
            ITEM_ID:      v.ITEM_ID,
            ITEM_NAME:    v.ITEM_NAME,
            QTY:          v.QTY,
            CURNT_COST_L: 0,
            PRICE:        v.PRICE,
            TTL_VAL:      v.QTY * v.PRICE,
            TTL_CST:      0,
            SLS_CNTR_ID:  v.SLS_CNTR_ID,
            BARCODE:      v.BARCODE,
        }
        
        if strings.HasPrefix(v.ST_ID, "5") {
            wg.Add(1)
            sem <- struct{}{} // 获取信号量
            
            go func(v models.MainModel, trx models.NetTrasActions) {
                defer wg.Done()
                defer func() { <-sem }() // 释放信号量
                
                if itemMap, exists := salesMap[v.ST_ID]; exists {
                    if cost, found := itemMap[v.ITEM_ID]; found {
                        trx.CURNT_COST_L = cost
                    }
                }
                
                mu.Lock()
                result = append(result, trx)
                mu.Unlock()
            }(v, trx)
        } else {
            // 非销售交易直接添加到结果
            mu.Lock()
            result = append(result, trx)
            mu.Unlock()
        }
    }
    
    wg.Wait()
    return result
}

更高效的批处理版本:

func NetTrxBatch(allTrx *[]models.MainModel, salesTrx *[]models.SalesTransactions) []models.NetTrasActions {
    // 构建查找映射
    salesMap := make(map[string]map[string]float64)
    for _, s := range *salesTrx {
        if _, exists := salesMap[s.ST_ID]; !exists {
            salesMap[s.ST_ID] = make(map[string]float64)
        }
        salesMap[s.ST_ID][s.ITEM_ID] = s.CURNT_COST_L
    }
    
    result := make([]models.NetTrasActions, len(*allTrx))
    
    // 使用工作池处理
    numWorkers := runtime.NumCPU() * 2
    chunkSize := len(*allTrx) / numWorkers
    
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        start := i * chunkSize
        end := start + chunkSize
        if i == numWorkers-1 {
            end = len(*allTrx)
        }
        
        go func(start, end int) {
            defer wg.Done()
            for j := start; j < end; j++ {
                v := (*allTrx)[j]
                trx := models.NetTrasActions{
                    ST_TYPE:      v.ST_TP,
                    ITEM_ID:      v.ITEM_ID,
                    ITEM_NAME:    v.ITEM_NAME,
                    QTY:          v.QTY,
                    CURNT_COST_L: 0,
                    PRICE:        v.PRICE,
                    TTL_VAL:      v.QTY * v.PRICE,
                    TTL_CST:      0,
                    SLS_CNTR_ID:  v.SLS_CNTR_ID,
                    BARCODE:      v.BARCODE,
                }
                
                if strings.HasPrefix(v.ST_ID, "5") {
                    if itemMap, exists := salesMap[v.ST_ID]; exists {
                        if cost, found := itemMap[v.ITEM_ID]; found {
                            trx.CURNT_COST_L = cost
                        }
                    }
                }
                result[j] = trx
            }
        }(start, end)
    }
    
    wg.Wait()
    return result
}

关键优化点:

  1. 使用映射(map)替代线性搜索,将 O(n²) 复杂度降为 O(n)
  2. 使用信号量或工作池控制并发数量
  3. 避免为每个匹配都创建 goroutine
  4. 使用批处理减少 goroutine 创建开销
回到顶部