Golang并发编程问题解析与解决方案
Golang并发编程问题解析与解决方案
我有一个并发问题,程序无法正常终止。我查阅了许多资料但未找到答案,特此寻求帮助
我已在下方发布了程序。其功能是扫描C盘中的所有文件并将路径记录在txt文件中。我在标记为TODO的位置描述了我的问题。注释掉TODO位置的代码行会导致整个程序无法终止,这是一个奇怪的现象。我怀疑是CPU缓存问题,但这与实际情况不符。我正在寻求帮助。请不要介意代码中的中文。
我的想法是使用一个无限for循环来不断检查四个通道是否都没有内容,然后跳出for循环并结束程序。然而事实是,如果我不在for循环内部执行一些耗时操作,程序将永远不会结束。我的代码运行在Windows 10上。
经过研究,我简化了原始代码。下面第一部分是为了便于理解而简化的代码,第二部分是原始代码。
这是附加信息。简化代码是在找出问题根源后创建的,移除了不必要的逻辑以便于阅读。移除runtime.Gosched()后,程序可能无法正常退出。如果可能,尝试增加rec函数的输入参数,例如改为rec(80)。这可能与你的CPU核心数有关,CPU核心数越高,rec所需的参数值就越大。如果你调整通道的缓冲区大小,也需要相应修改rec的参数值。如果缓冲区增大,rec的参数值也必须增加,否则程序可能仍然无法正常退出。
简化代码:
package main
import (
"fmt"
"runtime"
)
var c = make(chan struct{}, 1)
func main() {
rec(8)
for {
// TODO 不在这里执行一些耗时操作会导致整个程序无法终止。
runtime.Gosched()
if len(c) == 0 {
fmt.Println("end")
break
}
}
}
func rec(count int) {
if count == 0 {
return
}
c <- struct{}{}
for i := 0; i < 5; i++ {
go rec(count - 1)
}
<-c
}
原始代码:
package main
import (
"bufio"
"fmt"
"os"
"runtime"
"time"
)
var (
countDir = 0
countFile = 0
countErr = 0
dirsPath = `\dirsPath`
filesPath = `\filesPath`
errsPath = `\errsPath`
chanFile = make(chan string)
chanDir = make(chan string)
chanErr = make(chan string)
writeClose = make(chan struct{})
goMax chan struct{}
)
func main() {
goMax = make(chan struct{}, runtime.NumCPU()-1)
date := time.Now().Format("20060102")
go print()
root := `C:\`
pwd, _ := os.Getwd()
go write(fmt.Sprintf("%s\\%s-%s.txt", pwd, dirsPath, date), fmt.Sprintf("%s\\%s-%s.txt", pwd, filesPath, date), fmt.Sprintf("%s\\%s-%s.txt", pwd, errsPath, date))
recTraverse(root)
for {
// TODO 不在这里执行一些耗时操作会导致整个程序无法终止。
runtime.Gosched()
if len(goMax) == 0 && len(chanDir) == 0 && len(chanFile) == 0 && len(chanErr) == 0 {
close(chanDir)
close(chanFile)
close(chanErr)
writeClose <- struct{}{}
break
}
}
fmt.Printf("执行完毕, 共记录文件 %d 个, 文件夹 %d 个, 错误 %d 条 \n", countFile, countDir, countErr)
// for {
// }
}
func recTraverse(path string) {
goMax <- struct{}{}
entries, err := os.ReadDir(path)
if err != nil {
chanErr <- fmt.Sprintf("目录读取失败 %s ; 日志: %s ", path, err)
}
for _, entry := range entries {
// 检查目录项是否是目录
if entry.IsDir() {
newPath := path + entry.Name()
chanDir <- newPath
go recTraverse(newPath + `\`)
} else {
chanFile <- path + entry.Name()
}
}
<-goMax
}
func write(dirPath string, filePath string, errPath string) {
// 打开文件,获取文件句柄和 bufio.Writer
dirFile, err := os.Create(dirPath)
fFile, err := os.Create(filePath)
errFile, err := os.Create(errPath)
if err != nil {
panic(err)
}
defer dirFile.Close()
defer fFile.Close()
defer errFile.Close()
dirWriter := bufio.NewWriter(dirFile)
fileWriter := bufio.NewWriter(fFile)
errWriter := bufio.NewWriter(errFile)
for {
select {
case data, _ := <-chanDir:
countDir++
fmt.Fprintf(dirWriter, "%s\n", data)
case data, _ := <-chanFile:
countFile++
fmt.Fprintf(fileWriter, "%s\n", data)
case data, _ := <-chanErr:
countErr++
fmt.Fprintf(errWriter, "%s\n", data)
case <-writeClose:
break
}
}
// 将缓冲区的内容写入文件
dirWriter.Flush()
fileWriter.Flush()
errWriter.Flush()
}
func print() {
fmt.Printf("\n")
start := time.Now()
ticker := time.Tick(time.Millisecond)
for {
select {
case <-ticker:
elapsed := time.Since(start)
fmt.Printf("运行时间: %.3f 秒 \r", elapsed.Seconds())
case <-writeClose:
elapsed := time.Since(start)
fmt.Printf("运行时间: %.3f 秒 \n", elapsed.Seconds())
return
}
}
}
更多关于Golang并发编程问题解析与解决方案的实战教程也可以访问 https://www.itying.com/category-94-b0.html
在并发方面,在 main 函数中调用 rec 是同步的,因此对 len(c) == 0 的检查发生在 goroutines 之后。
更多关于Golang并发编程问题解析与解决方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
你尝试过移除 runtime.Gosched() 之后吗?
在注释掉 runtime.Gosched() 之后,运行程序导致其永不结束。调试显示通道已满。如果我增加通道的缓冲区大小,我还需要增加递归函数 rec 的输入参数。否则,程序可能仍然无法正常退出。
我也不确定。我只能确认这个程序与CPU线程数量有关。如果它在你的电脑上正常运行,我也不明白原因。也许你可以请几个朋友在他们的电脑上运行一下,看看会发生什么。这个程序导致我的电脑和我朋友的电脑上的程序无法正常退出。
@FIshInInnkGIT, 你好,
write 函数中存在一个错误:你从内部的 switch 语句中 break,而不是从外部的无限 for 循环中跳出。write 函数将永远不会退出。
我相当确定,在简化的示例中,len(c) == 0 的检查发生在 rec 协程启动之前,因此计数仍然是 0,程序也就终止了。
skillian:
runtime.Gosched()
简化的代码只是让问题更容易被观察到。如果你能在注释掉 runtime.Gosched() 之后运行代码,可以尝试增加递归函数的输入参数,例如,将其改为 rec(80)。如果你使用的是简化后的代码的话。
runtime.Gosched()
是的,我确实移除了 Gosched(),它工作得很好。你自己看看吧:Go Playground - Go 编程语言
我甚至在 go1.20 和 go1.19 上都运行过,工作得很好。告诉我我还应该改什么。
@FIshInInnkGIT 你是说你简化的代码修复了问题,还是它展示了问题?我尝试了两种方式运行:注释掉 runtime.Gosched() 和取消注释,两种情况下程序都完成了。
func main() {
fmt.Println("hello world")
}
FIshInInnkGIT:
你好 @FIshInInnkGIT,
你通过简化程序做得很好。这是一个很棒的想法,有助于更好地理解任何问题。
然而,我能够正常运行这段简化的代码,它工作正常。我已经将 rec(8) 增加到 rec(8000),仍然运行良好。
在你的机器上表现如何?
观察该算法的指数级特性。如果将每次 rec 调用视为树中的一个节点,那么树的较低层级拥有海量的节点。
因此,如果一次只能运行少数几个 goroutine,那么调度所有低层级的节点将耗费大量时间。假设有 1 个 goroutine 已经向通道写入数据,但在它从通道读取之前被抢占。在这个 goroutine 两次连续运行之间的间隙,需要调度并运行大量其他 goroutine,而由于通道已满,它们全都无事可做。
不完全正确,这个程序并不会导致恐慌。它与CPU相关的原因在于,当通道缓冲区满时,程序会陷入死锁(或无限循环)。如果CPU资源充足,递归逻辑甚至在通道满之前就会结束。据我推测,这个程序不退出的原因是通道已满,导致goroutine阻塞,而只有主goroutine在运行for循环。主goroutine没有机会唤醒其他goroutine来消费通道。这就是为什么添加runtime.Gosched()可以让程序结束。然而,考虑到每次rec函数结束时都应该消费通道,通道为什么会满仍然说不通。这可能与GPM调度机制和锁定机制有关,但我还没有找到答案。
FIshInInnkGIT:
这个程序与CPU线程数量有关。
可能就是这个问题。你的电脑资源可能比我的或Go Playground的少,仅仅是生成过多的goroutine就会让它变得非常慢。看起来它永远不会结束,但它最终会因恐慌而结束(对单个文件或套接字进行了太多并发操作),因为你启动了太多的goroutine……但在恐慌发生之前,由于计算机变得非常慢,需要一段时间。
你能测试一下吗:尝试运行8、20、40个,找出它不会退出的最小数量。然后以那个数量运行并等待半小时,它应该要么完成,要么恐慌(对单个文件或套接字进行了太多并发操作)。
telo_tade:
FIshInInnkGIT:
你好 @FIshInInnkGIT, 写入函数中存在一个错误:你从内部的
switch语句中break,而不是从外部的无限for循环中跳出。写入函数将永远不会退出。
并非如此,经过我的研究,我将代码简化为只包含递归方法和主方法。问题并不在于写入函数。以下是简化后的代码:
package main
import (
"fmt"
"runtime"
)
var c = make(chan struct{}, 1)
func main() {
rec(8)
for {
runtime.Gosched() // TODO
if len(c) == 0 {
fmt.Println("end")
break
}
}
}
func rec(count int) {
if count == 0 {
return
}
c <- struct{}{}
for i := 0; i < 5; i++ {
go rec(count - 1)
}
<-c
}
skillian:
在简化的示例中,
len(c) == 0检查发生在之前
如果对 len(c) 的检查很可能发生在第一个 goroutine 启动之前,并且在 rec 写入通道然后从通道消费之后。这将导致程序正常结束,只是那些 goroutine 没有运行。程序结束,而且很快。
如果我们移除这个检查,并用一个足够长的休眠来替代,对于较小的值,程序会正常结束。对于 rec(8000),我得到“panic: 单个文件或套接字上的并发操作过多 (最大 1048575)”。
我看不到这个程序会永远运行(死锁)的场景,它会结束,而且很可能很快。什么场景会导致它死锁呢?
不相关:这个程序的复杂度是指数级的 O(5 的初始 count 次方,即调用 rec 时传入的初始值)。实现任何指数级算法都是一个非常糟糕的主意(即使对于很小的输入,你也会很快耗尽 CPU,遇到整数溢出等问题)。
这是一个典型的Go并发编程中的goroutine调度和通道同步问题。问题的核心在于主goroutine在检查通道长度时,其他goroutine可能尚未被调度执行。
问题分析
在简化代码中,当移除runtime.Gosched()后,主goroutine会持续检查len(c),但由于Go调度器可能没有机会切换到其他goroutine,导致:
rec()函数中的goroutine没有被执行- 通道
c永远不会被填充 - 主goroutine陷入无限循环
解决方案
使用正确的同步原语替代len(channel)检查。以下是修改后的简化代码:
package main
import (
"fmt"
"sync"
)
var (
c = make(chan struct{}, 1)
wg sync.WaitGroup
)
func main() {
rec(8)
// 等待所有goroutine完成
wg.Wait()
// 清空通道
for len(c) > 0 {
<-c
}
fmt.Println("end")
}
func rec(count int) {
if count == 0 {
return
}
wg.Add(1)
go func(cnt int) {
defer wg.Done()
c <- struct{}{}
for i := 0; i < 5; i++ {
go rec(cnt - 1)
}
<-c
}(count)
}
原始代码的修复方案
对于原始代码,应该使用sync.WaitGroup来跟踪所有recTraverse goroutine的完成状态:
package main
import (
"bufio"
"fmt"
"os"
"runtime"
"sync"
"time"
)
var (
countDir = 0
countFile = 0
countErr = 0
dirsPath = `\dirsPath`
filesPath = `\filesPath`
errsPath = `\errsPath`
chanFile = make(chan string)
chanDir = make(chan string)
chanErr = make(chan string)
writeClose = make(chan struct{})
goMax chan struct{}
wg sync.WaitGroup
)
func main() {
goMax = make(chan struct{}, runtime.NumCPU()-1)
date := time.Now().Format("20060102")
go print()
root := `C:\`
pwd, _ := os.Getwd()
go write(fmt.Sprintf("%s\\%s-%s.txt", pwd, dirsPath, date),
fmt.Sprintf("%s\\%s-%s.txt", pwd, filesPath, date),
fmt.Sprintf("%s\\%s-%s.txt", pwd, errsPath, date))
wg.Add(1)
go recTraverse(root)
// 等待所有遍历goroutine完成
wg.Wait()
// 关闭输入通道
close(chanDir)
close(chanFile)
close(chanErr)
// 通知写入goroutine结束
writeClose <- struct{}{}
// 等待写入goroutine完成(如果需要)
// time.Sleep(time.Millisecond * 100)
fmt.Printf("执行完毕, 共记录文件 %d 个, 文件夹 %d 个, 错误 %d 条 \n", countFile, countDir, countErr)
}
func recTraverse(path string) {
defer wg.Done()
goMax <- struct{}{}
defer func() { <-goMax }()
entries, err := os.ReadDir(path)
if err != nil {
chanErr <- fmt.Sprintf("目录读取失败 %s ; 日志: %s ", path, err)
return
}
for _, entry := range entries {
if entry.IsDir() {
newPath := path + entry.Name()
chanDir <- newPath
wg.Add(1)
go recTraverse(newPath + `\`)
} else {
chanFile <- path + entry.Name()
}
}
}
func write(dirPath string, filePath string, errPath string) {
dirFile, _ := os.Create(dirPath)
fFile, _ := os.Create(filePath)
errFile, _ := os.Create(errPath)
defer dirFile.Close()
defer fFile.Close()
defer errFile.Close()
dirWriter := bufio.NewWriter(dirFile)
fileWriter := bufio.NewWriter(fFile)
errWriter := bufio.NewWriter(errFile)
defer dirWriter.Flush()
defer fileWriter.Flush()
defer errWriter.Flush()
for {
select {
case data, ok := <-chanDir:
if !ok {
chanDir = nil
} else {
countDir++
fmt.Fprintf(dirWriter, "%s\n", data)
}
case data, ok := <-chanFile:
if !ok {
chanFile = nil
} else {
countFile++
fmt.Fprintf(fileWriter, "%s\n", data)
}
case data, ok := <-chanErr:
if !ok {
chanErr = nil
} else {
countErr++
fmt.Fprintf(errWriter, "%s\n", data)
}
case <-writeClose:
return
}
// 所有通道都关闭后退出
if chanDir == nil && chanFile == nil && chanErr == nil {
return
}
}
}
func print() {
fmt.Printf("\n")
start := time.Now()
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
elapsed := time.Since(start)
fmt.Printf("运行时间: %.3f 秒 \r", elapsed.Seconds())
case <-writeClose:
elapsed := time.Since(start)
fmt.Printf("运行时间: %.3f 秒 \n", elapsed.Seconds())
return
}
}
}
关键改进点
- 使用
sync.WaitGroup替代通道长度检查:确保所有goroutine完成后再继续执行 - 正确关闭通道:使用通道关闭状态检测而非
len(channel) - 移除
runtime.Gosched():使用正确的同步原语,避免依赖调度器行为 - 添加
defer语句:确保资源正确释放和计数器递减
这种方案避免了竞态条件,不依赖于goroutine调度顺序,是Go并发编程中的标准做法。

