Golang并发设计中的死锁问题探讨
Golang并发设计中的死锁问题探讨 我是Go语言的新手。前几天决定开始学习它,主要是因为其通过通道实现的一流并发特性。
我构建了一个玩具应用程序,作为学习如何使用通道的一种方式。 这个应用程序:
- 从源位置下载文件。
- 对它们应用一些转换。
- 将转换后的文件上传到输出位置。
转换步骤是最慢的。 我理想的设计是让下载器一次收集少量文件,刚好足以持续供应并让转换器池保持忙碌。当一个转换器处理完一个文件后,它会将其交给完成器进行上传。
以下是我对这个设计的尝试,使用 time.Sleep 来模拟长时间的处理过程。
package main
import (
"fmt"
"strconv"
"time"
)
type task struct {
name string
path string
startTime time.Time
}
func downloader(id int, toDownload chan task, toProcess chan task) {
for {
task := <- toDownload
fmt.Printf("[downloader_%v]: downloading %v \n", id, task.name)
time.Sleep(5 * time.Second)
toProcess <- task
}
}
func transformer(id int, toProcess chan task, toFinish chan task) {
for {
task := <- toProcess
fmt.Printf("[processor_%v]: processing %v \n", id, task.name)
time.Sleep(20 * time.Second)
toFinish <- task
}
}
func finisher(id int, toFinish chan task, finished chan task) {
for {
task := <- toFinish
fmt.Printf("[finisher_%v]: uploading %v \n", id, task.name)
time.Sleep(7 * time.Second)
finished <- task
}
}
func fetchJobs(n int) *[]task {
tasks := make([]task, n)
for i := 0; i < n; i++ {
name := "job_" + strconv.Itoa(i);
tasks[i] = task{name: name}
}
return &tasks
}
func main() {
start := time.Now()
n := 10
toDownload := make(chan task)
toProcess := make(chan task)
toFinish := make(chan task)
finished := make(chan task)
for i := 0; i < 4; i++ {
go downloader(i, toDownload, toProcess)
}
for i := 0; i < 10; i++ {
go transformer(i, toProcess, toFinish)
}
for i := 0; i < 4; i++ {
go finisher(i, toFinish, finished)
}
jobs := fetchJobs(n)
for _, j := range *jobs {
toDownload <- j
}
for task := range finished {
fmt.Printf("finished %v in %s \n", task.name, time.Since(task.startTime) / time.Second)
}
fmt.Printf("all tasks finished in %s", time.Since(start) / time.Second)
}
以下是部分输出。
[processor_1]: processing job_0
[processor_3]: processing job_6
[processor_7]: processing job_5
[processor_6]: processing job_4
[downloader_2]: downloading job_9
[processor_5]: processing job_7
[downloader_1]: downloading job_8
[processor_8]: processing job_8
[processor_9]: processing job_9
[finisher_1]: uploading job_0
[finisher_3]: uploading job_1
[finisher_2]: uploading job_3
[finisher_0]: uploading job_2
[finisher_0]: uploading job_4
finished job_2 in 9.223372036s
finished job_1 in 9.223372036s
finished job_3 in 9.223372036s
finished job_0 in 9.223372036s
[finisher_3]: uploading job_5
[finisher_1]: uploading job_7
[finisher_2]: uploading job_6
[finisher_2]: uploading job_9
finished job_6 in 9.223372036s
finished job_7 in 9.223372036s
finished job_5 in 9.223372036s
finished job_4 in 9.223372036s
[finisher_1]: uploading job_8
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
/Users/.../go/src/drive_uploader/main.go:79 +0x4b9
goroutine 6 [chan receive]:
main.downloader(0x0, 0xc000066060, 0xc0000660c0)
/Users/.../go/src/drive_uploader/main.go:17 +0x74
created by main.main
/Users/.../go/src/drive_uploader/main.go:63 +0x135
goroutine 7 [chan receive]:
main.downloader(0x1, 0xc000066060, 0xc0000660c0)
/Users/.../go/src/drive_uploader/main.go:17 +0x74
created by main.main
/Users/.../go/src/drive_uploader/main.go:63 +0x135
...
...
不幸的是,我遇到了死锁,并且不清楚是什么原因导致的。 我尝试调整每个goroutine的数量;有些组合能够成功运行而不会死锁。但这并不理想。
我也尝试过使用带缓冲的通道以及WaitGroup。同样……有些组合会成功运行,有些则不会。
任何帮助或反馈都将不胜感激!我特别想知道在Go中是否有更符合语言习惯的方法。
更多关于Golang并发设计中的死锁问题探讨的实战教程也可以访问 https://www.itying.com/category-94-b0.html
对于返回两个值的函数:如果我忽略第二个值,就会得到一个编译错误。那么这些是特殊情况吗?
是的
不仅仅是通道,在进行类型断言、使用键从映射中访问值时,第二个变量也可以被忽略。
对于返回两个值的函数:如果我忽略第二个值,就会得到一个编译错误。那么这些是特殊情况吗?
啊,我现在明白如何正确使用 WaitGroup 和 close() 了。
我曾尝试在每个 goroutine 的函数体内部使用 close(),但现在我明白了为什么那样不行……它会为整个工作池关闭通道。
谢谢 Girish!
请不要介意我的英语
没什么好道歉的……你的英语很棒!
这是另一种使用通道的有趣方式。
select 语句仍然让我有点困惑……基本上,如果通道 c 阻塞了,那么它会尝试从通道 d 读取,依此类推?
case cake, ok := <-c:
cake := <- c
这是通道特有的功能吗?可以忽略第二个值 ok 吗?
ksil:
select仍然让我有点困惑……基本上,如果通道 c 阻塞,那么它会尝试从通道 d 读取,依此类推?
select 语句允许一个 goroutine 等待多个通信操作。
select 会阻塞,直到其某个分支可以执行,然后它执行该分支。如果多个分支都准备就绪,它会随机选择一个。来源
ksil:
这是通道特有的功能吗?可以忽略第二个值
ok?
不仅限于通道,在进行类型断言、使用键从映射中访问值时,也可以忽略第二个变量。
var x interface{} = 23
y, ok := x.(int) // 如果类型断言成功,ok 为 true
z := x.(int) // 可以忽略第二个值,如果类型断言不成功,z 将是零值
m := make(map[string]int)
m["abc"], m["xyz"] = 123, 1123
value, ok := m["abc"] // 如果键 "abc" 存在于映射中,ok 为 true,value = 123
value = m["xyz"] // 忽略第二个值
value, ok = m["pqr"] // 这里 ok = false,value = 0(零值)

package main
import (
"fmt"
"runtime"
"strconv"
"time"
)
func main() {
start := time.Now()
runtime.GOMAXPROCS(4)
c := make(chan string)
d := make(chan string)
s := make(chan string)
go makeCakeAndSend(c, "vanilla", 1)
go makeCakeAndSend(d, "choco", 1)
go makeCakeAndSend(s, "strabery", 4)
go receiveAndPack(c, d, s)
//time.Sleep(2 * 1e9)
fmt.Printf("Process took %s \n", time.Since(start))
fmt.Println(time.Now().Format("January 01, 2006 3:4:5 pm"))
}
func makeCakeAndSend(c chan string, flavour string, count int) {
defer close(c)
for i := 0; i < count; i++ {
cakeName := flavour + " cake " + strconv.Itoa(i)
c <- cakeName
}
}
func receiveAndPack(c chan string, d chan string, s chan string) {
cClosed, dClosed, sClosed := false, false,false
for {
if cClosed && dClosed && sClosed {
return
}
//fmt.Println("Waiting for a new cake ...")
select {
case cake, ok := <-c:
if ok == false {
cClosed = true
fmt.Println(" ... vanila channel closed!")
} else {
fmt.Println(cake)
}
case cake, ok := <-d:
if ok == false {
dClosed = true
fmt.Println(" ... choco channel closed!")
} else {
fmt.Println(cake)
}
case cake, ok := <-s:
if ok == false {
sClosed = true
fmt.Println(" ... strabery channel closed!")
} else {
fmt.Println(cake)
}
default:
fmt.Println(" ... all channels closed!")
}
}
}
也许这能帮到你。
你的代码出现了死锁,主要原因是finished通道没有被关闭,导致main函数中的for task := range finished循环无法正常结束。当所有任务处理完成后,没有goroutine关闭finished通道,主goroutine会一直阻塞在range循环上,而其他goroutine也在等待通道操作,最终导致所有goroutine都进入休眠状态。
以下是修复后的代码:
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
type task struct {
name string
path string
startTime time.Time
}
func downloader(id int, toDownload chan task, toProcess chan task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range toDownload {
fmt.Printf("[downloader_%v]: downloading %v \n", id, task.name)
time.Sleep(5 * time.Second)
toProcess <- task
}
}
func transformer(id int, toProcess chan task, toFinish chan task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range toProcess {
fmt.Printf("[processor_%v]: processing %v \n", id, task.name)
time.Sleep(20 * time.Second)
toFinish <- task
}
}
func finisher(id int, toFinish chan task, finished chan task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range toFinish {
fmt.Printf("[finisher_%v]: uploading %v \n", id, task.name)
time.Sleep(7 * time.Second)
finished <- task
}
}
func fetchJobs(n int) []task {
tasks := make([]task, n)
for i := 0; i < n; i++ {
name := "job_" + strconv.Itoa(i)
tasks[i] = task{name: name}
}
return tasks
}
func main() {
start := time.Now()
n := 10
toDownload := make(chan task)
toProcess := make(chan task)
toFinish := make(chan task)
finished := make(chan task)
var wg sync.WaitGroup
// 启动downloader goroutines
for i := 0; i < 4; i++ {
wg.Add(1)
go downloader(i, toDownload, toProcess, &wg)
}
// 启动transformer goroutines
for i := 0; i < 10; i++ {
wg.Add(1)
go transformer(i, toProcess, toFinish, &wg)
}
// 启动finisher goroutines
for i := 0; i < 4; i++ {
wg.Add(1)
go finisher(i, toFinish, finished, &wg)
}
// 启动一个goroutine来等待所有worker完成并关闭finished通道
go func() {
wg.Wait()
close(finished)
}()
// 发送任务到下载队列
go func() {
jobs := fetchJobs(n)
for _, j := range jobs {
toDownload <- j
}
close(toDownload)
}()
// 收集完成的任务
for task := range finished {
fmt.Printf("finished %v in %s \n", task.name, time.Since(task.startTime)/time.Second)
}
fmt.Printf("all tasks finished in %s", time.Since(start)/time.Second)
}
主要修改:
- 使用
sync.WaitGroup来跟踪所有worker goroutine的完成状态 - 在发送完所有任务后关闭
toDownload通道,触发下游的连锁关闭 - 添加一个单独的goroutine等待所有worker完成,然后关闭
finished通道 - 每个worker函数使用
for task := range channel模式,在通道关闭时自动退出 - 将
fetchJobs的返回值改为切片而不是指针,更符合Go的习惯
这个实现确保了:
- 所有任务都能被正确处理
- 所有goroutine都能正常退出
- 不会产生死锁
- 主函数能够正常结束
通道关闭的顺序很重要:toDownload → toProcess → toFinish → finished,每个阶段的worker在输入通道关闭且处理完所有任务后会自动退出。

