各位大佬,请教一个 Golang Go语言多线程的阻塞问题

编程小白,在写一个多线程目录文件遍历的时候,出现了阻塞问题,求教各位大佬~

通过增大 var taskChan = make(chan string, 1000),chan 缓冲区为 100 万的时候程序不会阻塞

但是我通过打印日志发现 taskChan 占用很小,只有十几,而且存在通道写入失败的情况

taskChan 的缓冲区为 1000 时,阻塞的日志如下:

[DEBUG]:增加目录,增加 wg, [1802], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1801], taskChan = [5]
[DEBUG]:任务完成,减小 wg, [1798], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1797], taskChan = [17]
[DEBUG]:任务完成,减小 wg, [1816], taskChan = [4]
[DEBUG]:增加目录,增加 wg, [1803], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1843], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1797], taskChan = [21]
[DEBUG]:任务完成,减小 wg, [1841], taskChan = [24]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1840], taskChan = [6]
[DEBUG]:任务完成,减小 wg, [1798], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1840], taskChan = [13]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1840], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [2]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1845], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1846], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1847], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1848], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1849], taskChan = [0]

如果把 taskChan 的缓冲区为 100 万的时候,程序可以正常退出,日志如下:

[DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [3], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [0], taskChan = [0]
[INFO]:目录扫描完毕
[DEBUG]:func GetAllFilePath end
[DEBUG]:func StartScan end
[DEBUG]:func btnStartScanOnclick end

代码如下:

package core

import ( “DopliGo/logs” “github.com/panjf2000/ants/v2” “os” “path/filepath” “sync” “sync/atomic” )

func GetAllFilePath(rootPath string) { //logs.IsLogDebug = false logs.Debug(“func GetAllFilePath start”) // 创建任务通道和结果通道 var taskChan = make(chan string, 1000000) var resultChan = make(chan string, 1000000) var wg sync.WaitGroup var counter int64 = 0

// 创建生产者 goroutine 池
producerPool, _ := ants.NewPoolWithFunc(16, func(i interface{}) {
	produceTasks(i.(string), taskChan, resultChan, &counter, &wg)
})

logs.Debug("cap:%d", producerPool.Cap())
defer producerPool.Release()

taskChan <- rootPath
wg.Add(1) // 这里增加计数器
atomic.AddInt64(&counter, 1)
logs.Debug("任务开始,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(&counter), len(resultChan))

// 启动生产者
go func() {
	//defer logs.Debug("生产者退出")
	for task := range taskChan {
		err := producerPool.Invoke(task)
		if err != nil {
			logs.Error("failed to producerPool Invoke, err: %s", err)
			return
		}
	}
}()

// 启动结果处理 goroutine
go func() {
	//defer logs.Debug("消费者退出")
	for result := range resultChan {
		_ = result
	}
}()

// 等待所有任务完成
wg.Wait()
close(resultChan)
close(taskChan)
logs.Info("目录扫描完毕")
logs.Debug("func GetAllFilePath end")

}

func produceTasks(rootPath string, taskChan chan string, resultChan chan string, counter *int64, wg *sync.WaitGroup) { defer wg.Done() // 确保每次 produceTasks 完成时,调用 Done // logs.Debug(“func produceTasks start”)

entries, err := os.ReadDir(rootPath)
if err != nil {
	logs.Error("failed to read dir: %s , err: %s", rootPath, err)
	return
}

for _, entry := range entries {
	path := filepath.Join(rootPath, entry.Name())
	if entry.IsDir() {
		wg.Add(1)
		atomic.AddInt64(counter, 1)
		select {
		case taskChan <- path:
			// 发送成功
		default:
			// 发送失败,通道已满
			logs.Error("写入通道失败...")
		}
		logs.Debug("增加目录,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
	} else {
		resultChan <- path
	}
}

atomic.AddInt64(counter, -1)
logs.Debug("任务完成,减小 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
//logs.Debug("func produceTasks end")

}


各位大佬,请教一个 Golang Go语言多线程的阻塞问题

更多关于各位大佬,请教一个 Golang Go语言多线程的阻塞问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

19 回复

select 的时候 写入到 chan 不阻塞 chan 满的时候会直接执行 default

更多关于各位大佬,请教一个 Golang Go语言多线程的阻塞问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


go<br> if entry.IsDir() {<br> atomic.AddInt64(counter, 1)<br> select {<br> case taskChan &lt;- path:<br> // 发送成功<br> wg.Add(1)<br> default:<br> // 发送失败,通道已满<br> logs.Error("写入通道失败...")<br> }<br><br>
你把 wg.Add(1) 放到里面,然后 channel 容量设置大点就可以了,这样只有发送成功才处理

produceTasks 中 for 循环 wg.add(1)多次,但是只 done 了一次(函数结束)为什么?而且你这个 wg 用的好奇怪

default 删了,如果你想控制退出,把 default 换成 context 。

分析了一下你这个代码打印一定是阻塞在了读取 taskChan ,为什么堵塞,大概率是协程池 invoke 的时候堵塞了,我换成 go 携程跑没问题。具体为什么可能需要探索下 ants
go func() {
for task := range taskChan {
fmt.Printf(“task: %s\n”, task)
// err := producerPool.Invoke(task)
// if err != nil {
// fmt.Printf(“failed to producerPool Invoke, err: %s\n”, err)
// return
// }
go produceTasks(task, taskChan, resultChan, &counter, &wg)
}
}()

#5 produceTasks 中另外 defalut 要删掉,通道满了应该也要等吧,不然可能会漏?

大佬牛逼,taskChan 设置为 1000 我用协程跑也没问题了,ants 我得再去看看怎么用比较合适; default 是我为了诊断是不是 taskChan 写不进去才加的,正常应该是没有。
另外请教下,为什么 taskChan <- path 写不进去,我实时打印 len ,占用都只有几十,我长度设置的 1000🤣。

是的,但是我打印的日志 taskChan 占用只有几十,容量设置是 1000 ,应该不会满才对

#7 #7 taskChan = [%d]", len(resultChan) 打印错变量了

#9 感谢大佬,我这低级错误过分了,但是我修改正确之后,结果还是一样的😂

用 runtime/pprof 。

default:
wg.Done()
logs.Error(“Failed to write to channel…”)


写入通过失败了你没有减少锁,所以一直卡住了。

wg 计数的时候注意,一定要确保可以执行 wg.Done()操作,否则就卡住了。

#13 大佬,还是不行 ,我现在是使用的协程直接跑,taskChan 是 1000 ,然后 wg.done 也加了。跑起来就挂 ,如果把 taskChan 改成 100 万就很正常

#7 并不是占用的问题,是用 ants 的时候你的 channel 大小小于了文件数量,导致死锁的,用 go 携程可以正常等待结束了继续往通道里面放。这种问题直接用极端的办法,把 channel 大小设置为 1 ,看会不会死锁。你用 go 原生协程跑,channel 为 1 都 ok 的,只是慢一点

#7 跑不满通道的原因可能是处理比较快,一直都没有满过,只能说明设置为 1000 没必要哈哈哈

#16
明白了,大佬,感谢感谢!

#13 大佬,不好意思,可以了,添加了 wg.done 是 ok 的,我运行错程序了 ,感谢指点

各位大佬好,关于Golang中多线程(在Go中通常称为goroutine)的阻塞问题,这是一个比较核心且常见的议题。

在Go中,goroutine是一种轻量级的线程,由Go运行时管理。当涉及到阻塞问题时,通常是因为goroutine在等待某些资源(如I/O操作、channel通信、锁等)而无法继续执行。

解决或处理Goroutine阻塞问题的关键通常在于:

  1. 避免死锁:确保所有使用锁的goroutine在适当的时候能够释放锁。使用defer语句可以帮助确保在函数返回前释放锁。

  2. 合理使用channel:channel是Go中实现goroutine间通信的主要方式。确保发送和接收操作能够匹配,避免无缓冲channel导致的永久阻塞。

  3. 监控和调试:使用Go的工具(如pprofrace detector)来监控和分析程序的运行情况,识别可能的阻塞点。

  4. 优化I/O操作:对于网络或文件I/O操作,考虑使用非阻塞或异步I/O库来减少goroutine的阻塞时间。

  5. 设置超时:对于可能阻塞的操作(如数据库查询、HTTP请求),设置合理的超时时间,以便在必要时放弃等待。

总之,处理Go中的多线程阻塞问题需要综合考虑程序设计、资源管理和工具使用等多个方面。希望这些建议对你有所帮助!如果有更具体的问题或代码示例,欢迎继续讨论。

回到顶部