Golang中如何理解wg.Add()的这种行为?

Golang中如何理解wg.Add()的这种行为? 我遇到了一个让我困惑的问题。假设我有大约35万个XML文件(为了测试,我使用了同一个文件复制多次;文件名是sample_1.xml、sample_2.xml……)。我只想解析它们。以下是一个用于解析的模拟函数:

func singleRun(xmlFile fs.DirEntry) {
	xmlFileOpened, err := os.Open(filepath.Join("data", xmlFile.Name()))
	if err != nil {
		log.Println(err)
		return
	}
	defer xmlFileOpened.Close()

	byteValue, _ := ioutil.ReadAll(xmlFileOpened)
	var users Users
	xml.Unmarshal(byteValue, &users)
	for i := 0; i < len(users.Users); i++ {
		// 我们在这里对数据做一些处理。
        // 这里,我什么都不做
	}
}

所以,这个函数只是打开文件,循环遍历作为XML字段的用户,然后关闭文件。

现在,这是该函数的第一个版本:

func PareseWithGoroutines() {
	files, err := os.ReadDir("data")
	if err != nil {
		log.Fatal(err)
	}
	var xmlFiles []fs.DirEntry
	for _, file := range files {
		if strings.HasSuffix(file.Name(), ".xml") {
			xmlFiles = append(xmlFiles, file)
		}
	}
	if len(files) == len(xmlFiles) {
		log.Println("Found", len(files), "XML files in the data folder.")
	} else {
		log.Print("Among", len(files), "files in the data folder,", len(xmlFiles), "XML files were found.")
	}

	var wg sync.WaitGroup
	wg.Add(len(xmlFiles))
	nFiles := 0

	for _, xmlFile := range xmlFiles {
		go func(file fs.DirEntry) {
			defer wg.Done()
			singleRun(file)
			nFiles += 1
		}(xmlFile)
	}
	wg.Wait()
	fmt.Println("Parsed", nFiles, "XML files.")
}

该函数的核心从 var wg sync.WaitGroup 开始。这个函数完全正常工作(已验证)。我想将其时间与一个略有不同的版本进行比较,在该版本中,我不使用 wg.Add(len(xmlFiles)),而是在循环中使用 wg.Add(1)。以下是这个版本:

func PareseWithGoroutinesV2() {
	files, err := os.ReadDir("data")
	if err != nil {
		log.Fatal(err)
	}

	var xmlFiles []fs.DirEntry
	for _, file := range files {
		if strings.HasSuffix(file.Name(), ".xml") {
			xmlFiles = append(xmlFiles, file)
		}
	}
	if len(files) == len(xmlFiles) {
		log.Println("Found", len(files), "XML files in the data folder.")
	} else {
		log.Print("Among", len(files), "files in the data folder,", len(xmlFiles), "XML files were found.")
	}

	var wg sync.WaitGroup
	nFiles := 0

	for _, xmlFile := range xmlFiles {
		wg.Add(1)
		go func(file fs.DirEntry) {
			defer wg.Done()
			singleRun(file)
			nFiles += 1
		}(xmlFile)
	}
	wg.Wait()
	fmt.Println("Parsed", nFiles, "XML files.")
}

请注意,唯一的区别是移除了 wg.Add(len(xmlFiles)),并在循环内、启动下一个goroutine之前添加了 wg.Add(1)

让我惊讶的是,这第二个版本不工作!它没有关闭文件,运行一段时间后,我收到很多行提示打开的文件太多:

...
2021/07/26 08:12:36 open data/sample_345429.xml: too many open files
2021/07/26 08:12:36 open data/sample_345556.xml: too many open files
2021/07/26 08:12:36 open data/sample_345652.xml: too many open files
...

坦白说,我不知道发生了什么,尽管这可能是我犯的一个非常简单的错误。singleRun() 函数有延迟关闭,循环在goroutine函数内部有 wg.Done(),所以我真的不知道这可能是怎么回事。有人知道发生了什么吗?

(我可以在这里复制完整的代码,但它需要生成数十万个XML文件,所以——既然这似乎是一个普遍问题——也许有人知道发生了什么,而无需运行实际代码?)


更多关于Golang中如何理解wg.Add()的这种行为?的实战教程也可以访问 https://www.itying.com/category-94-b0.html

14 回复

哦,天哪,看来这个解决方案并非每次都有效……

更多关于Golang中如何理解wg.Add()的这种行为?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我很高兴你找到了解决方案。我同意,这确实非常不直观。如果有人能进一步阐明这个问题,我也将不胜感激!

我很高兴你构建了一个可行的解决方案!是的——存在一个收益递减点是合理的。在某个时刻,如果同时打开并尝试读取太多文件,你的总吞吐量可能会降低。

这两段代码看起来基本没问题,所以我真的不明白为什么第二个版本不工作。 唯一让我注意到的是,你没有对共享变量 nFiles 的写入进行同步。 或许你应该将其改为使用 sync.atomic 的 Add 操作。 但这并不能解释你所看到的现象。 为什么不尝试启用竞态检测器来运行呢?

func main() {
    fmt.Println("hello world")
}

嗯,实际上,我觉得这并不是一个真正的并发解决方案……当我使用 go runReader(xmlFiles, chanFiles) 时,这只是一个 goroutine 在做所有的工作,再加上主 goroutine。我会继续研究这个问题,如果找到了真正的并发解决方案,我会在这里报告。(不过我必须承认,这对我来说是一个学习所有这些知识的好例子 😊。)

go runReader(xmlFiles, chanFiles)

非常感谢,Dean!

实际上,ulimit -n 给我的结果比 256 要多一点:65,536 😉。但多亏了你,我知道了问题所在。我确实会使用一个带缓冲的通道来处理这种情况。当我(或如果)成功找到解决方案时,我会在这里发布。

// 代码部分保持原样,此处仅为示例占位,实际HTML内容中无Go代码。
// 如果原始内容包含Go代码,应在此处按格式呈现。

顺便提一下,我相当失望地发现,在Python中使用multiprocessing和八个逻辑(四个物理)核心解析这些文件,速度比上述Go代码快了大约6倍。也许我可以优化Go代码,但目前我还没有能力做到这一点。此外,尽管Python本身很慢,但我使用的lxml库以其速度闻名。据我所知,它是用Cython编写的,而Cython是Python的一种强大子语言——因此脚本大部分时间运行在C语言层面,而不是Python层面。尽管如此,我没想到Go会比Python慢这么多。目前我还没有头绪,但既然我正在学习Go,也许我能找到让代码更快的方法。

你好 nyggus,

这似乎更像是一个与当前打开的文件描述符数量相关的 Unix 问题。至于为什么这会影响第二个解析函数而不是第一个,我不太确定,我知道这些信息本应对你最有帮助。

我在 Stack Overflow 上找到的两种方法包括:在 singleRun 函数结束时立即关闭打开的文件,而不是使用 defer;或者,使用计数信号量来限制打开文件的数量。

以下是相关链接: 方法 1: https://stackoverflow.com/questions/37804804/too-many-open-file-error-in-golang

方法 2: https://stackoverflow.com/questions/38824899/golang-too-many-open-files-in-go-function-goroutine

希望这对你有所帮助, Ed.

谢谢,Edward。遗憾的是,将文件关闭操作移到函数末尾并没有改变任何情况。那些讨论中提出的其他解决方案也都没有帮助。核心思路是限制打开文件的数量,但请注意,我确实在 singleRun() 中关闭了这些文件;而这正是奇怪之处,因为显然这没有奏效——文件没有被关闭。解析单个文件的时间极短。除非代码执行得如此之快,以至于同时打开了大量文件?但如果是那样,难道在代码的第一个版本(即使用 wg.Add(len(xmlFiles)) 的那个)中不会出现同样的问题吗?

这确实看起来是一件奇怪的事情,还因为第一个代码版本能工作而第二个却不能,它们之间的差异相当小(真的小吗?)。我看不出有什么能导致这种奇怪的行为,但由于我不是Go语言的专家,我希望我只是忽略了一些简单的东西。

好的,我找到了一个解决方案,可以在此处找到。

其思路是同时延迟关闭函数并return它(作为错误)。以下版本没有问题:

func singleRun(xmlFile fs.DirEntry) error {
	xmlFileOpened, err := os.Open(filepath.Join("data", xmlFile.Name()))
	if err != nil {
		log.Println(err)
		return err
	}
	defer xmlFileOpened.Close()

	byteValue, _ := ioutil.ReadAll(xmlFileOpened)
	xmlFileOpened.Close()

	var users Users
	xml.Unmarshal(byteValue, &users)
	for i := 0; i < len(users.Users); i++ {
		// 我们在这里对数据做一些处理。
        // 这里,我什么也不做
	}
	return xmlFileOpened.Close()
}

当然,调用此函数的部分需要稍作修改,以读取并检查错误。但这确实有效,无论这个技巧看起来多么丑陋和不直观。

但如果有人能展示更符合 Go 语言习惯的写法,我将不胜感激!

你有35万个XML文件?ulimit -n的结果是什么?这是一个进程可以拥有的最大文件描述符数量。我的结果是256,这意味着用你的代码我很快就会遇到问题。你可以增加这个限制,但这并不是一个好的解决方案(你已经看到你的代码间歇性地工作,这是因为有时你的函数返回得足够快,没有达到最大描述符限制,而有时则不然)。参见Stack Overflow上的这条评论

即使你增加了ulimit,最终仍然会用尽文件描述符。尝试使用带缓冲的通道,并将打开文件的goroutine数量限制在ulimit以下。同时打开1000个文件应该绰绰有余了……

基本上,你只需要将同时打开的文件数量限制在一个合理的范围内,这个范围取决于你运行此程序的计算机。为此,可以使用:

好的,确实缓冲通道有效!

func goroutinesBufferedChannel(chanSize int) {
	files, err := os.ReadDir("data")
	if err != nil {
		log.Fatal(err)
	}
	var xmlFiles []fs.DirEntry
	for _, file := range files {
		if strings.HasSuffix(file.Name(), ".xml") {
			xmlFiles = append(xmlFiles, file)
		}
	}
	if len(files) == len(xmlFiles) {
		log.Println("Found", len(files), "XML files in the data folder.")
	} else {
		log.Print("Among", len(files), "files in the data folder,", len(xmlFiles), "XML files were found.")
	}

	nFiles := 0
	chanFiles := make(chan fs.DirEntry, chanSize)
	go runReader(xmlFiles, chanFiles)
	for _ = range chanFiles {
		nFiles += 1
	}
	fmt.Println("Parsed", nFiles, "XML files.")
}

func runReader(xmlFiles []fs.DirEntry, chanFiles chan fs.DirEntry) {
	for _, xmlFile := range xmlFiles {
		_ = singleRun(xmlFile)
		chanFiles <- xmlFile
	}
	close(chanFiles)
}

顺便提一下,使用较小缓冲区(如10或100)的脚本,其运行速度比使用10,000缓冲区的要慢,但将缓冲区增加到20,000或50,000反而会降低脚本速度。我认为这取决于多种因素,包括XML文件的大小,当然也取决于系统本身。

好的!多亏了 @Dean_Davidson,我通过使用工作池解决了这个问题。以下是代码的简化版本(运行速度确实很快!):

// worker 基于 https://gobyexample.com/worker-pools
func worker(id int, jobs <-chan fs.DirEntry, results chan<- int) {
	for j := range jobs {
		_ = singleRun(j)
		// 向结果通道发送任意内容;稍后结果可以获取 singleRun 的返回值。
        results <- 1 
	}
}

// readAll 解析来自 dirPath 的所有 XML 文件。
// 它使用一个包含 noOfWorkers 的工作池;代码结构来自 https://gobyexample.com/worker-pools
func readAll(dirPath string, noOfWorkers int) {
	// 从数据目录读取所有文件
	files, err := os.ReadDir(dirPath)
	if err != nil {
		log.Fatal(err)
	}
	var xmlFiles []fs.DirEntry
	for _, file := range files {
		if strings.HasSuffix(file.Name(), ".xml") {
			xmlFiles = append(xmlFiles, file)
		}
	}
	if len(files) == len(xmlFiles) {
		log.Println("Found", len(files), "XML files in the data folder.")
	} else {
		log.Print("Among", len(files), "files in the data folder,", len(xmlFiles), "XML files were found.")
	}

	// 设置工作池
	numJobs := len(xmlFiles)
	jobs := make(chan fs.DirEntry, numJobs)
	results := make(chan int, numJobs)

	nFiles := 0
	for w := 1; w <= noOfWorkers; w++ {
		go worker(w, jobs, results)
	}

	for _, xmlFile := range xmlFiles {
		jobs <- xmlFile
	}
	close(jobs)

	for a := 1; a <= numJobs; a++ {
		<-results
	}
	fmt.Println("Parsed", nFiles, "XML files.")
}

// singleRun 解析单个 XML 文件
func singleRun(xmlFile fs.DirEntry) string {
	xmlFileOpened, err := os.Open(filepath.Join("..", "data", xmlFile.Name()))
	if err != nil {
		log.Println(err)
	}
	defer xmlFileOpened.Close()

	byteValue, _ := ioutil.ReadAll(xmlFileOpened)
	xmlFileOpened.Close()

	var users Users
	xml.Unmarshal(byteValue, &users)
	for i := 0; i < len(users.Users); i++ {
		// 我们可以在这里对数据做一些处理
	}
	// 返回任意内容以表明它运行正常 :-)
	return users.Users[0].Type
}

func main() {
	readAll("../data", 10000)
}

编写这个程序对我来说是很好的一课。谢谢大家!

这是一个典型的并发控制问题。问题的核心在于 wg.Add(1) 在循环中的位置导致了goroutine启动速度远超过系统文件描述符的限制。

在第一个版本中,wg.Add(len(xmlFiles)) 一次性设置了等待计数,然后所有goroutine几乎同时启动。虽然这也会快速消耗文件描述符,但由于 defer xmlFileOpened.Close() 的存在,文件会在每个goroutine执行完毕后及时关闭。

在第二个版本中,wg.Add(1) 在循环内部,每次迭代都会立即启动一个goroutine。当你有35万个文件时,系统会尝试同时打开大量文件,远超过操作系统对单个进程的文件描述符限制(通常是1024或4096)。

问题的关键在于:wg.Add(1) 在goroutine启动前调用,但goroutine的实际执行是异步的。这意味着循环会以极快的速度创建goroutine,而每个goroutine中的 singleRun() 函数需要时间执行。在文件关闭之前,新的文件已经被打开,导致同时打开的文件数量迅速超过系统限制。

这里有一个更清晰的示例来说明这个问题:

// 问题版本 - 会导致 "too many open files"
func problematicVersion() {
    var wg sync.WaitGroup
    
    for i := 0; i < 100000; i++ {
        wg.Add(1)  // 立即增加计数
        go func(id int) {
            defer wg.Done()
            // 模拟文件操作
            f, _ := os.Open(fmt.Sprintf("file_%d.txt", id))
            defer f.Close()
            time.Sleep(time.Millisecond * 10)  // 模拟处理时间
        }(i)
    }
    wg.Wait()
}

// 解决方案 - 使用工作池限制并发数
func fixedVersion() {
    var wg sync.WaitGroup
    maxConcurrent := 100  // 限制并发goroutine数量
    
    sem := make(chan struct{}, maxConcurrent)
    
    for i := 0; i < 100000; i++ {
        wg.Add(1)
        sem <- struct{}{}  // 获取信号量
        
        go func(id int) {
            defer wg.Done()
            defer func() { <-sem }()  // 释放信号量
            
            f, _ := os.Open(fmt.Sprintf("file_%d.txt", id))
            defer f.Close()
            time.Sleep(time.Millisecond * 10)
        }(i)
    }
    wg.Wait()
}

// 另一种解决方案 - 批量处理
func batchVersion() {
    var wg sync.WaitGroup
    batchSize := 1000
    
    files := make([]string, 100000)
    for i := range files {
        files[i] = fmt.Sprintf("file_%d.txt", i)
    }
    
    for i := 0; i < len(files); i += batchSize {
        end := i + batchSize
        if end > len(files) {
            end = len(files)
        }
        
        batch := files[i:end]
        wg.Add(len(batch))
        
        for _, file := range batch {
            go func(f string) {
                defer wg.Done()
                fp, _ := os.Open(f)
                defer fp.Close()
                time.Sleep(time.Millisecond * 10)
            }(file)
        }
        wg.Wait()  // 等待当前批次完成
    }
}

在你的具体案例中,虽然两个版本都有相同数量的goroutine,但第二个版本的goroutine启动节奏不同。第一个版本中,所有goroutine几乎同时启动和竞争执行,而第二个版本中,循环会持续快速地创建新的goroutine,导致系统资源被迅速耗尽。

要修复这个问题,你需要限制并发goroutine的数量。这里是一个适用于你场景的修正版本:

func PareseWithGoroutinesFixed() {
    files, err := os.ReadDir("data")
    if err != nil {
        log.Fatal(err)
    }

    var xmlFiles []fs.DirEntry
    for _, file := range files {
        if strings.HasSuffix(file.Name(), ".xml") {
            xmlFiles = append(xmlFiles, file)
        }
    }

    var wg sync.WaitGroup
    maxWorkers := 100  // 根据系统调整这个值
    sem := make(chan struct{}, maxWorkers)
    
    for _, xmlFile := range xmlFiles {
        wg.Add(1)
        sem <- struct{}{}  // 控制并发数
        
        go func(file fs.DirEntry) {
            defer wg.Done()
            defer func() { <-sem }()  // 释放信号量
            singleRun(file)
        }(xmlFile)
    }
    wg.Wait()
}

这个问题的根本原因是操作系统对单个进程同时打开文件数量的限制,而不是 sync.WaitGroup 本身的行为问题。wg.Add(1) 在循环内部本身没有错,但它会导致goroutine创建速度超过系统资源处理能力。

回到顶部