Golang并发编程问题解析与解决方案

Golang并发编程问题解析与解决方案

我有一个并发问题,程序无法正常终止。我查阅了许多资料但未找到答案,特此寻求帮助

我已在下方发布了程序。其功能是扫描C盘中的所有文件并将路径记录在txt文件中。我在标记为TODO的位置描述了我的问题。注释掉TODO位置的代码行会导致整个程序无法终止,这是一个奇怪的现象。我怀疑是CPU缓存问题,但这与实际情况不符。我正在寻求帮助。请不要介意代码中的中文。

我的想法是使用一个无限for循环来不断检查四个通道是否都没有内容,然后跳出for循环并结束程序。然而事实是,如果我不在for循环内部执行一些耗时操作,程序将永远不会结束。我的代码运行在Windows 10上。

经过研究,我简化了原始代码。下面第一部分是为了便于理解而简化的代码,第二部分是原始代码。

这是附加信息。简化代码是在找出问题根源后创建的,移除了不必要的逻辑以便于阅读。移除runtime.Gosched()后,程序可能无法正常退出。如果可能,尝试增加rec函数的输入参数,例如改为rec(80)。这可能与你的CPU核心数有关,CPU核心数越高,rec所需的参数值就越大。如果你调整通道的缓冲区大小,也需要相应修改rec的参数值。如果缓冲区增大,rec的参数值也必须增加,否则程序可能仍然无法正常退出。

简化代码:

package main

import (
    "fmt"
    "runtime"
)

var c = make(chan struct{}, 1)

func main() {
    rec(8)
    for {
        // TODO 不在这里执行一些耗时操作会导致整个程序无法终止。
        runtime.Gosched()
        if len(c) == 0 {
            fmt.Println("end")
            break
        }
    }
}
func rec(count int) {
    if count == 0 {
        return
    }
    c <- struct{}{}
    for i := 0; i < 5; i++ {
        go rec(count - 1)
    }
    <-c
}

原始代码:

package main

import (
    "bufio"
    "fmt"
    "os"
    "runtime"
    "time"
)

var (
    countDir   = 0
    countFile  = 0
    countErr   = 0
    dirsPath   = `\dirsPath`
    filesPath  = `\filesPath`
    errsPath   = `\errsPath`
    chanFile   = make(chan string)
    chanDir    = make(chan string)
    chanErr    = make(chan string)
    writeClose = make(chan struct{})
    goMax      chan struct{}
)

func main() {
    goMax = make(chan struct{}, runtime.NumCPU()-1)
    date := time.Now().Format("20060102")
    go print()

    root := `C:\`
    pwd, _ := os.Getwd()
    go write(fmt.Sprintf("%s\\%s-%s.txt", pwd, dirsPath, date), fmt.Sprintf("%s\\%s-%s.txt", pwd, filesPath, date), fmt.Sprintf("%s\\%s-%s.txt", pwd, errsPath, date))

    recTraverse(root)

    for {
        // TODO 不在这里执行一些耗时操作会导致整个程序无法终止。
        runtime.Gosched()
        if len(goMax) == 0 && len(chanDir) == 0 && len(chanFile) == 0 && len(chanErr) == 0 {
            close(chanDir)
            close(chanFile)
            close(chanErr)
            writeClose <- struct{}{}
            break
        }
    }

    fmt.Printf("执行完毕, 共记录文件 %d 个, 文件夹 %d 个, 错误 %d 条 \n", countFile, countDir, countErr)
    // for {
    // }
}

func recTraverse(path string) {
    goMax <- struct{}{}
    entries, err := os.ReadDir(path)
    if err != nil {
        chanErr <- fmt.Sprintf("目录读取失败 %s ; 日志: %s ", path, err)
    }
    for _, entry := range entries {
        // 检查目录项是否是目录
        if entry.IsDir() {
            newPath := path + entry.Name()
            chanDir <- newPath
            go recTraverse(newPath + `\`)
        } else {
            chanFile <- path + entry.Name()
        }
    }
    <-goMax
}

func write(dirPath string, filePath string, errPath string) {

    // 打开文件,获取文件句柄和 bufio.Writer
    dirFile, err := os.Create(dirPath)
    fFile, err := os.Create(filePath)
    errFile, err := os.Create(errPath)
    if err != nil {
        panic(err)
    }
    defer dirFile.Close()
    defer fFile.Close()
    defer errFile.Close()
    dirWriter := bufio.NewWriter(dirFile)
    fileWriter := bufio.NewWriter(fFile)
    errWriter := bufio.NewWriter(errFile)

    for {
        select {
        case data, _ := <-chanDir:
            countDir++
            fmt.Fprintf(dirWriter, "%s\n", data)
        case data, _ := <-chanFile:
            countFile++
            fmt.Fprintf(fileWriter, "%s\n", data)
        case data, _ := <-chanErr:
            countErr++
            fmt.Fprintf(errWriter, "%s\n", data)
        case <-writeClose:
            break
        }
    }

    // 将缓冲区的内容写入文件
    dirWriter.Flush()
    fileWriter.Flush()
    errWriter.Flush()
}

func print() {
    fmt.Printf("\n")
    start := time.Now()
    ticker := time.Tick(time.Millisecond)
    for {
        select {
        case <-ticker:
            elapsed := time.Since(start)
            fmt.Printf("运行时间: %.3f 秒 \r", elapsed.Seconds())
        case <-writeClose:
            elapsed := time.Since(start)
            fmt.Printf("运行时间: %.3f 秒 \n", elapsed.Seconds())
            return
        }
    }
}

更多关于Golang并发编程问题解析与解决方案的实战教程也可以访问 https://www.itying.com/category-94-b0.html

16 回复

在并发方面,在 main 函数中调用 rec 是同步的,因此对 len(c) == 0 的检查发生在 goroutines 之后。

更多关于Golang并发编程问题解析与解决方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你尝试过移除 runtime.Gosched() 之后吗?

在注释掉 runtime.Gosched() 之后,运行程序导致其永不结束。调试显示通道已满。如果我增加通道的缓冲区大小,我还需要增加递归函数 rec 的输入参数。否则,程序可能仍然无法正常退出。

我也不确定。我只能确认这个程序与CPU线程数量有关。如果它在你的电脑上正常运行,我也不明白原因。也许你可以请几个朋友在他们的电脑上运行一下,看看会发生什么。这个程序导致我的电脑和我朋友的电脑上的程序无法正常退出。

@FIshInInnkGIT, 你好,

write 函数中存在一个错误:你从内部的 switch 语句中 break,而不是从外部的无限 for 循环中跳出。write 函数将永远不会退出。

我相当确定,在简化的示例中,len(c) == 0 的检查发生在 rec 协程启动之前,因此计数仍然是 0,程序也就终止了。

skillian:

runtime.Gosched()

简化的代码只是让问题更容易被观察到。如果你能在注释掉 runtime.Gosched() 之后运行代码,可以尝试增加递归函数的输入参数,例如,将其改为 rec(80)。如果你使用的是简化后的代码的话。

runtime.Gosched()

是的,我确实移除了 Gosched(),它工作得很好。你自己看看吧:Go Playground - Go 编程语言

我甚至在 go1.20 和 go1.19 上都运行过,工作得很好。告诉我我还应该改什么。

@FIshInInnkGIT 你是说你简化的代码修复了问题,还是它展示了问题?我尝试了两种方式运行:注释掉 runtime.Gosched() 和取消注释,两种情况下程序都完成了。

func main() {
    fmt.Println("hello world")
}

FIshInInnkGIT:

你好 @FIshInInnkGIT

你通过简化程序做得很好。这是一个很棒的想法,有助于更好地理解任何问题。

然而,我能够正常运行这段简化的代码,它工作正常。我已经将 rec(8) 增加到 rec(8000),仍然运行良好。

在你的机器上表现如何?

观察该算法的指数级特性。如果将每次 rec 调用视为树中的一个节点,那么树的较低层级拥有海量的节点。

因此,如果一次只能运行少数几个 goroutine,那么调度所有低层级的节点将耗费大量时间。假设有 1 个 goroutine 已经向通道写入数据,但在它从通道读取之前被抢占。在这个 goroutine 两次连续运行之间的间隙,需要调度并运行大量其他 goroutine,而由于通道已满,它们全都无事可做。

不完全正确,这个程序并不会导致恐慌。它与CPU相关的原因在于,当通道缓冲区满时,程序会陷入死锁(或无限循环)。如果CPU资源充足,递归逻辑甚至在通道满之前就会结束。据我推测,这个程序不退出的原因是通道已满,导致goroutine阻塞,而只有主goroutine在运行for循环。主goroutine没有机会唤醒其他goroutine来消费通道。这就是为什么添加runtime.Gosched()可以让程序结束。然而,考虑到每次rec函数结束时都应该消费通道,通道为什么会满仍然说不通。这可能与GPM调度机制和锁定机制有关,但我还没有找到答案。

FIshInInnkGIT:

这个程序与CPU线程数量有关。

可能就是这个问题。你的电脑资源可能比我的或Go Playground的少,仅仅是生成过多的goroutine就会让它变得非常慢。看起来它永远不会结束,但它最终会因恐慌而结束(对单个文件或套接字进行了太多并发操作),因为你启动了太多的goroutine……但在恐慌发生之前,由于计算机变得非常慢,需要一段时间。

你能测试一下吗:尝试运行8、20、40个,找出它不会退出的最小数量。然后以那个数量运行并等待半小时,它应该要么完成,要么恐慌(对单个文件或套接字进行了太多并发操作)。

telo_tade:

FIshInInnkGIT:

你好 @FIshInInnkGIT, 写入函数中存在一个错误:你从内部的 switch 语句中 break,而不是从外部的无限 for 循环中跳出。写入函数将永远不会退出。

并非如此,经过我的研究,我将代码简化为只包含递归方法和主方法。问题并不在于写入函数。以下是简化后的代码:

package main

import (
    "fmt"
    "runtime"
)

var c = make(chan struct{}, 1)

func main() {
    rec(8)
    for {
        runtime.Gosched() // TODO
        if len(c) == 0 {
            fmt.Println("end")
            break
        }
    }
}
func rec(count int) {
    if count == 0 {
        return
    }
    c <- struct{}{}
    for i := 0; i < 5; i++ {
        go rec(count - 1)
    }
    <-c
}

skillian:

在简化的示例中,len(c) == 0 检查发生在之前

@skillian,

如果对 len(c) 的检查很可能发生在第一个 goroutine 启动之前,并且在 rec 写入通道然后从通道消费之后。这将导致程序正常结束,只是那些 goroutine 没有运行。程序结束,而且很快。

如果我们移除这个检查,并用一个足够长的休眠来替代,对于较小的值,程序会正常结束。对于 rec(8000),我得到“panic: 单个文件或套接字上的并发操作过多 (最大 1048575)”。

我看不到这个程序会永远运行(死锁)的场景,它会结束,而且很可能很快。什么场景会导致它死锁呢?

不相关:这个程序的复杂度是指数级的 O(5 的初始 count 次方,即调用 rec 时传入的初始值)。实现任何指数级算法都是一个非常糟糕的主意(即使对于很小的输入,你也会很快耗尽 CPU,遇到整数溢出等问题)。

这是一个典型的Go并发编程中的goroutine调度和通道同步问题。问题的核心在于主goroutine在检查通道长度时,其他goroutine可能尚未被调度执行

问题分析

在简化代码中,当移除runtime.Gosched()后,主goroutine会持续检查len(c),但由于Go调度器可能没有机会切换到其他goroutine,导致:

  1. rec()函数中的goroutine没有被执行
  2. 通道c永远不会被填充
  3. 主goroutine陷入无限循环

解决方案

使用正确的同步原语替代len(channel)检查。以下是修改后的简化代码:

package main

import (
    "fmt"
    "sync"
)

var (
    c  = make(chan struct{}, 1)
    wg sync.WaitGroup
)

func main() {
    rec(8)
    
    // 等待所有goroutine完成
    wg.Wait()
    
    // 清空通道
    for len(c) > 0 {
        <-c
    }
    
    fmt.Println("end")
}

func rec(count int) {
    if count == 0 {
        return
    }
    
    wg.Add(1)
    go func(cnt int) {
        defer wg.Done()
        
        c <- struct{}{}
        for i := 0; i < 5; i++ {
            go rec(cnt - 1)
        }
        <-c
    }(count)
}

原始代码的修复方案

对于原始代码,应该使用sync.WaitGroup来跟踪所有recTraverse goroutine的完成状态:

package main

import (
    "bufio"
    "fmt"
    "os"
    "runtime"
    "sync"
    "time"
)

var (
    countDir   = 0
    countFile  = 0
    countErr   = 0
    dirsPath   = `\dirsPath`
    filesPath  = `\filesPath`
    errsPath   = `\errsPath`
    chanFile   = make(chan string)
    chanDir    = make(chan string)
    chanErr    = make(chan string)
    writeClose = make(chan struct{})
    goMax      chan struct{}
    wg         sync.WaitGroup
)

func main() {
    goMax = make(chan struct{}, runtime.NumCPU()-1)
    date := time.Now().Format("20060102")
    go print()

    root := `C:\`
    pwd, _ := os.Getwd()
    go write(fmt.Sprintf("%s\\%s-%s.txt", pwd, dirsPath, date), 
             fmt.Sprintf("%s\\%s-%s.txt", pwd, filesPath, date), 
             fmt.Sprintf("%s\\%s-%s.txt", pwd, errsPath, date))

    wg.Add(1)
    go recTraverse(root)

    // 等待所有遍历goroutine完成
    wg.Wait()
    
    // 关闭输入通道
    close(chanDir)
    close(chanFile)
    close(chanErr)
    
    // 通知写入goroutine结束
    writeClose <- struct{}{}
    
    // 等待写入goroutine完成(如果需要)
    // time.Sleep(time.Millisecond * 100)

    fmt.Printf("执行完毕, 共记录文件 %d 个, 文件夹 %d 个, 错误 %d 条 \n", countFile, countDir, countErr)
}

func recTraverse(path string) {
    defer wg.Done()
    
    goMax <- struct{}{}
    defer func() { <-goMax }()
    
    entries, err := os.ReadDir(path)
    if err != nil {
        chanErr <- fmt.Sprintf("目录读取失败 %s ; 日志: %s ", path, err)
        return
    }
    
    for _, entry := range entries {
        if entry.IsDir() {
            newPath := path + entry.Name()
            chanDir <- newPath
            
            wg.Add(1)
            go recTraverse(newPath + `\`)
        } else {
            chanFile <- path + entry.Name()
        }
    }
}

func write(dirPath string, filePath string, errPath string) {
    dirFile, _ := os.Create(dirPath)
    fFile, _ := os.Create(filePath)
    errFile, _ := os.Create(errPath)
    
    defer dirFile.Close()
    defer fFile.Close()
    defer errFile.Close()
    
    dirWriter := bufio.NewWriter(dirFile)
    fileWriter := bufio.NewWriter(fFile)
    errWriter := bufio.NewWriter(errFile)
    
    defer dirWriter.Flush()
    defer fileWriter.Flush()
    defer errWriter.Flush()

    for {
        select {
        case data, ok := <-chanDir:
            if !ok {
                chanDir = nil
            } else {
                countDir++
                fmt.Fprintf(dirWriter, "%s\n", data)
            }
        case data, ok := <-chanFile:
            if !ok {
                chanFile = nil
            } else {
                countFile++
                fmt.Fprintf(fileWriter, "%s\n", data)
            }
        case data, ok := <-chanErr:
            if !ok {
                chanErr = nil
            } else {
                countErr++
                fmt.Fprintf(errWriter, "%s\n", data)
            }
        case <-writeClose:
            return
        }
        
        // 所有通道都关闭后退出
        if chanDir == nil && chanFile == nil && chanErr == nil {
            return
        }
    }
}

func print() {
    fmt.Printf("\n")
    start := time.Now()
    ticker := time.NewTicker(time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            elapsed := time.Since(start)
            fmt.Printf("运行时间: %.3f 秒 \r", elapsed.Seconds())
        case <-writeClose:
            elapsed := time.Since(start)
            fmt.Printf("运行时间: %.3f 秒 \n", elapsed.Seconds())
            return
        }
    }
}

关键改进点

  1. 使用sync.WaitGroup替代通道长度检查:确保所有goroutine完成后再继续执行
  2. 正确关闭通道:使用通道关闭状态检测而非len(channel)
  3. 移除runtime.Gosched():使用正确的同步原语,避免依赖调度器行为
  4. 添加defer语句:确保资源正确释放和计数器递减

这种方案避免了竞态条件,不依赖于goroutine调度顺序,是Go并发编程中的标准做法。

回到顶部