Golang并发设计中的死锁问题探讨

Golang并发设计中的死锁问题探讨 我是Go语言的新手。前几天决定开始学习它,主要是因为其通过通道实现的一流并发特性。

我构建了一个玩具应用程序,作为学习如何使用通道的一种方式。 这个应用程序:

  1. 从源位置下载文件。
  2. 对它们应用一些转换。
  3. 将转换后的文件上传到输出位置。

转换步骤是最慢的。 我理想的设计是让下载器一次收集少量文件,刚好足以持续供应并让转换器池保持忙碌。当一个转换器处理完一个文件后,它会将其交给完成器进行上传。

以下是我对这个设计的尝试,使用 time.Sleep 来模拟长时间的处理过程。

package main

import (
	"fmt"
	"strconv"
	"time"
)

type task struct {
	name string
	path string
	startTime time.Time
}

func downloader(id int, toDownload chan task, toProcess chan task) {
	for {
		task := <- toDownload
		fmt.Printf("[downloader_%v]: downloading %v \n", id, task.name)
		time.Sleep(5 * time.Second)
		toProcess <- task
	}
}

func transformer(id int, toProcess chan task, toFinish chan task) {
	for {
		task := <- toProcess
		fmt.Printf("[processor_%v]: processing %v \n", id, task.name)
		time.Sleep(20 * time.Second)
		toFinish <- task
	}
}

func finisher(id int, toFinish chan task, finished chan task) {
	for {
		task := <- toFinish
		fmt.Printf("[finisher_%v]: uploading %v \n", id, task.name)
		time.Sleep(7 * time.Second)
		finished <- task
	}
}

func fetchJobs(n int) *[]task {
	tasks := make([]task, n)
	for i := 0; i < n; i++ {
		name := "job_" + strconv.Itoa(i);
		tasks[i] = task{name: name}
	}
	return &tasks
}

func main() {
	start := time.Now()

	n := 10

	toDownload := make(chan task)
	toProcess := make(chan task)
	toFinish := make(chan task)
	finished := make(chan task)


	for i := 0; i < 4; i++ {
		go downloader(i, toDownload, toProcess)
	}

	for i := 0; i < 10; i++ {
		go transformer(i, toProcess, toFinish)
	}

	for i := 0; i < 4; i++ {
		go finisher(i, toFinish, finished)
	}

	jobs := fetchJobs(n)
	for _, j := range *jobs {
		toDownload <- j
	}

	for task := range finished {
		fmt.Printf("finished %v in %s \n", task.name, time.Since(task.startTime) / time.Second)
	}

	fmt.Printf("all tasks finished in %s", time.Since(start) / time.Second)
}

以下是部分输出。

[processor_1]: processing job_0
[processor_3]: processing job_6
[processor_7]: processing job_5
[processor_6]: processing job_4
[downloader_2]: downloading job_9
[processor_5]: processing job_7
[downloader_1]: downloading job_8
[processor_8]: processing job_8
[processor_9]: processing job_9
[finisher_1]: uploading job_0
[finisher_3]: uploading job_1
[finisher_2]: uploading job_3
[finisher_0]: uploading job_2
[finisher_0]: uploading job_4
finished job_2 in 9.223372036s
finished job_1 in 9.223372036s
finished job_3 in 9.223372036s
finished job_0 in 9.223372036s
[finisher_3]: uploading job_5
[finisher_1]: uploading job_7
[finisher_2]: uploading job_6
[finisher_2]: uploading job_9
finished job_6 in 9.223372036s
finished job_7 in 9.223372036s
finished job_5 in 9.223372036s
finished job_4 in 9.223372036s
[finisher_1]: uploading job_8
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
	/Users/.../go/src/drive_uploader/main.go:79 +0x4b9

goroutine 6 [chan receive]:
main.downloader(0x0, 0xc000066060, 0xc0000660c0)
	/Users/.../go/src/drive_uploader/main.go:17 +0x74
created by main.main
	/Users/.../go/src/drive_uploader/main.go:63 +0x135

goroutine 7 [chan receive]:
main.downloader(0x1, 0xc000066060, 0xc0000660c0)
	/Users/.../go/src/drive_uploader/main.go:17 +0x74
created by main.main
	/Users/.../go/src/drive_uploader/main.go:63 +0x135
...
...

不幸的是,我遇到了死锁,并且不清楚是什么原因导致的。 我尝试调整每个goroutine的数量;有些组合能够成功运行而不会死锁。但这并不理想。

我也尝试过使用带缓冲的通道以及WaitGroup。同样……有些组合会成功运行,有些则不会。

任何帮助或反馈都将不胜感激!我特别想知道在Go中是否有更符合语言习惯的方法。


更多关于Golang并发设计中的死锁问题探讨的实战教程也可以访问 https://www.itying.com/category-94-b0.html

8 回复

明白了。 谢谢你的帮助!

更多关于Golang并发设计中的死锁问题探讨的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


对于返回两个值的函数:如果我忽略第二个值,就会得到一个编译错误。那么这些是特殊情况吗?

是的

不仅仅是通道,在进行类型断言、使用键从映射中访问值时,第二个变量也可以被忽略。

对于返回两个值的函数:如果我忽略第二个值,就会得到一个编译错误。那么这些是特殊情况吗?

啊,我现在明白如何正确使用 WaitGroupclose() 了。 我曾尝试在每个 goroutine 的函数体内部使用 close(),但现在我明白了为什么那样不行……它会为整个工作池关闭通道。

谢谢 Girish!

请不要介意我的英语

没什么好道歉的……你的英语很棒!

这是另一种使用通道的有趣方式。

select 语句仍然让我有点困惑……基本上,如果通道 c 阻塞了,那么它会尝试从通道 d 读取,依此类推?

case cake, ok := <-c:
cake := <- c

这是通道特有的功能吗?可以忽略第二个值 ok 吗?

ksil:

select 仍然让我有点困惑……基本上,如果通道 c 阻塞,那么它会尝试从通道 d 读取,依此类推?

select 语句允许一个 goroutine 等待多个通信操作。 select 会阻塞,直到其某个分支可以执行,然后它执行该分支。如果多个分支都准备就绪,它会随机选择一个。来源

ksil:

这是通道特有的功能吗?可以忽略第二个值 ok

不仅限于通道,在进行类型断言、使用键从映射中访问值时,也可以忽略第二个变量。

var x interface{} = 23
y, ok := x.(int)   // 如果类型断言成功,ok 为 true
z := x.(int)  // 可以忽略第二个值,如果类型断言不成功,z 将是零值

m := make(map[string]int)
m["abc"], m["xyz"] = 123, 1123

value, ok := m["abc"]   // 如果键 "abc" 存在于映射中,ok 为 true,value = 123
value = m["xyz"]  // 忽略第二个值
value, ok = m["pqr"]  // 这里 ok = false,value = 0(零值)

slight_smile

package main

import (
	"fmt"
	"runtime"
	"strconv"
	"time"
)


func main() {
	start := time.Now()
	runtime.GOMAXPROCS(4)

	c := make(chan string)
	d := make(chan string)
	s := make(chan string)
	
	go makeCakeAndSend(c, "vanilla", 1)
	go makeCakeAndSend(d, "choco", 1)
	go makeCakeAndSend(s, "strabery", 4)

	go receiveAndPack(c, d, s)
	//time.Sleep(2 * 1e9)
	fmt.Printf("Process took %s \n", time.Since(start))
	fmt.Println(time.Now().Format("January 01, 2006 3:4:5 pm"))
}

func makeCakeAndSend(c chan string, flavour string, count int) {
	defer close(c)

	for i := 0; i < count; i++ {
		cakeName := flavour + " cake " + strconv.Itoa(i)
		c <- cakeName
	}
}

func receiveAndPack(c chan string, d chan string, s chan string) {
	cClosed, dClosed, sClosed := false, false,false
	for {
		if cClosed && dClosed && sClosed {
			return
		}
		//fmt.Println("Waiting for a new cake ...")
		select {
		case cake, ok := <-c:
			if ok == false {
				cClosed = true
				fmt.Println(" ... vanila channel closed!")
			} else {
				fmt.Println(cake)
			}
		case cake, ok := <-d:
			if ok == false {
				dClosed = true
				fmt.Println(" ... choco channel closed!")
			} else {
				fmt.Println(cake)
			}
		case cake, ok := <-s:
			if ok == false {
				sClosed = true
				fmt.Println(" ... strabery channel closed!")
			} else {
				fmt.Println(cake)
			}
		default:
			fmt.Println(" ... all channels closed!")
		}
	}

}

也许这能帮到你。

你的代码出现了死锁,主要原因是finished通道没有被关闭,导致main函数中的for task := range finished循环无法正常结束。当所有任务处理完成后,没有goroutine关闭finished通道,主goroutine会一直阻塞在range循环上,而其他goroutine也在等待通道操作,最终导致所有goroutine都进入休眠状态。

以下是修复后的代码:

package main

import (
	"fmt"
	"strconv"
	"sync"
	"time"
)

type task struct {
	name      string
	path      string
	startTime time.Time
}

func downloader(id int, toDownload chan task, toProcess chan task, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range toDownload {
		fmt.Printf("[downloader_%v]: downloading %v \n", id, task.name)
		time.Sleep(5 * time.Second)
		toProcess <- task
	}
}

func transformer(id int, toProcess chan task, toFinish chan task, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range toProcess {
		fmt.Printf("[processor_%v]: processing %v \n", id, task.name)
		time.Sleep(20 * time.Second)
		toFinish <- task
	}
}

func finisher(id int, toFinish chan task, finished chan task, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range toFinish {
		fmt.Printf("[finisher_%v]: uploading %v \n", id, task.name)
		time.Sleep(7 * time.Second)
		finished <- task
	}
}

func fetchJobs(n int) []task {
	tasks := make([]task, n)
	for i := 0; i < n; i++ {
		name := "job_" + strconv.Itoa(i)
		tasks[i] = task{name: name}
	}
	return tasks
}

func main() {
	start := time.Now()

	n := 10

	toDownload := make(chan task)
	toProcess := make(chan task)
	toFinish := make(chan task)
	finished := make(chan task)

	var wg sync.WaitGroup

	// 启动downloader goroutines
	for i := 0; i < 4; i++ {
		wg.Add(1)
		go downloader(i, toDownload, toProcess, &wg)
	}

	// 启动transformer goroutines
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go transformer(i, toProcess, toFinish, &wg)
	}

	// 启动finisher goroutines
	for i := 0; i < 4; i++ {
		wg.Add(1)
		go finisher(i, toFinish, finished, &wg)
	}

	// 启动一个goroutine来等待所有worker完成并关闭finished通道
	go func() {
		wg.Wait()
		close(finished)
	}()

	// 发送任务到下载队列
	go func() {
		jobs := fetchJobs(n)
		for _, j := range jobs {
			toDownload <- j
		}
		close(toDownload)
	}()

	// 收集完成的任务
	for task := range finished {
		fmt.Printf("finished %v in %s \n", task.name, time.Since(task.startTime)/time.Second)
	}

	fmt.Printf("all tasks finished in %s", time.Since(start)/time.Second)
}

主要修改:

  1. 使用sync.WaitGroup来跟踪所有worker goroutine的完成状态
  2. 在发送完所有任务后关闭toDownload通道,触发下游的连锁关闭
  3. 添加一个单独的goroutine等待所有worker完成,然后关闭finished通道
  4. 每个worker函数使用for task := range channel模式,在通道关闭时自动退出
  5. fetchJobs的返回值改为切片而不是指针,更符合Go的习惯

这个实现确保了:

  • 所有任务都能被正确处理
  • 所有goroutine都能正常退出
  • 不会产生死锁
  • 主函数能够正常结束

通道关闭的顺序很重要:toDownloadtoProcesstoFinishfinished,每个阶段的worker在输入通道关闭且处理完所有任务后会自动退出。

回到顶部