Golang中缓冲通道与死锁问题求解答

Golang中缓冲通道与死锁问题求解答 大家好,

首先,我想感谢大家花时间查看我的代码并帮助我理解这个问题。

下面的代码是一个简单的缓冲通道执行示例。我假设这里发生的情况是,由于通道的缓冲区大小为1,在短时间内多次写入会导致缓冲区溢出。但是,由于出现了死锁错误信息,我不确定是否确实如此。这就是我感到困惑的原因,希望能得到一些帮助来理解这个概念。

以下是代码:

package main

import (
	"fmt"
)

func printer(reqChan chan string, resChan chan string){
	for {
		select {
			case m := <- reqChan:
				fmt.Printf("received job [%s]\n", m)
				resChan <- m
		}
	}
}

func main(){
	jobs := 5
	numWorkers := 1
	queSize := 1

	bufChan := make(chan string, queSize)
	resChan := make(chan string, queSize)

	for i := 0; i < numWorkers; i++ {
		go printer(bufChan, resChan)
	}

	// send jobs
	for i := 0; i < jobs; i++ {
		fmt.Println("sending job", i)
		bufChan <- fmt.Sprintf("buff %d", i)
		fmt.Println("sent job", i)
	}

	// collect responses
	for i := 0; i < jobs; i++ {
		fmt.Println(<- resChan)
	}
}

以下是输出和错误:

sending job 0
sent job 0
sending job 1
sent job 1
sending job 2
received job [buff 0]
received job [buff 1]
sent job 2
sending job 3
fatal error: all goroutines are asleep - deadlock!

再次感谢!


更多关于Golang中缓冲通道与死锁问题求解答的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

不,这是因为消息没有被消费。

更多关于Golang中缓冲通道与死锁问题求解答的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


@iegomez 感谢您的回复。我认为这对我来说有一定道理。如果我对您的回答理解正确的话,`resChan` 无法写入是因为任务涌入速度过快吗?

非常感谢,我现在明白了。resChan 的输出从未成功传递。实际上,也没有 ‘buff…’ 的输出,现在完全说得通了。我需要重新设计这部分。

// 代码示例
func main() {
    // 此处需要重新实现通道逻辑
}

问题在于,printer 接收了 job 0,向容量为1的 resChan 写入数据,然后接收 job 1,但由于第一个响应尚未被消费,无法再次写入 resChan。另一方面,main 协程成功写入作业0和1(这些作业已被消费),但在尝试发送作业3时,printer 无法接收,因为它正阻塞等待 main 协程消费响应。这种情况永远不会发生,因为 main 协程仍在等待完成作业发送,从而导致 printermain 相互等待,形成死锁。

你的代码确实存在死锁问题,我来分析原因并提供解决方案。

问题分析

死锁发生在以下情况:

  1. 主goroutine向bufChan发送了3个任务后阻塞
  2. 工作goroutine从bufChan接收了2个任务并向resChan发送了2个响应
  3. 主goroutine从resChan接收了0个响应(因为循环还没开始执行)
  4. 所有goroutine都处于等待状态:主goroutine等待向bufChan发送,工作goroutine等待从resChan读取

关键问题在于printer函数中的无限循环没有退出机制,且resChan的发送操作会阻塞。

解决方案

方案1:使用独立的goroutine关闭通道

package main

import (
	"fmt"
	"sync"
)

func printer(reqChan chan string, resChan chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for m := range reqChan {
		fmt.Printf("received job [%s]\n", m)
		resChan <- m
	}
}

func main() {
	jobs := 5
	numWorkers := 1
	queSize := 1

	bufChan := make(chan string, queSize)
	resChan := make(chan string, jobs) // 缓冲区大小设为jobs数量

	var wg sync.WaitGroup
	wg.Add(numWorkers)

	for i := 0; i < numWorkers; i++ {
		go printer(bufChan, resChan, &wg)
	}

	// 发送任务
	for i := 0; i < jobs; i++ {
		fmt.Println("sending job", i)
		bufChan <- fmt.Sprintf("buff %d", i)
		fmt.Println("sent job", i)
	}
	close(bufChan) // 关闭通道,通知工作goroutine停止

	// 等待所有工作goroutine完成
	wg.Wait()
	close(resChan)

	// 收集响应
	for res := range resChan {
		fmt.Println(res)
	}
}

方案2:使用context控制goroutine生命周期

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func printer(ctx context.Context, reqChan chan string, resChan chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		select {
		case <-ctx.Done():
			return
		case m, ok := <-reqChan:
			if !ok {
				return
			}
			fmt.Printf("received job [%s]\n", m)
			resChan <- m
		}
	}
}

func main() {
	jobs := 5
	numWorkers := 1
	queSize := 1

	bufChan := make(chan string, queSize)
	resChan := make(chan string, jobs)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var wg sync.WaitGroup
	wg.Add(numWorkers)

	for i := 0; i < numWorkers; i++ {
		go printer(ctx, bufChan, resChan, &wg)
	}

	// 发送任务
	for i := 0; i < jobs; i++ {
		fmt.Println("sending job", i)
		bufChan <- fmt.Sprintf("buff %d", i)
		fmt.Println("sent job", i)
	}

	// 等待所有任务处理完成
	time.Sleep(100 * time.Millisecond)
	cancel()
	wg.Wait()
	close(resChan)

	// 收集响应
	for res := range resChan {
		fmt.Println(res)
	}
}

方案3:简化版本(推荐)

package main

import (
	"fmt"
)

func printer(reqChan chan string, resChan chan string, done chan bool) {
	for m := range reqChan {
		fmt.Printf("received job [%s]\n", m)
		resChan <- m
	}
	done <- true
}

func main() {
	jobs := 5
	numWorkers := 1
	queSize := 1

	bufChan := make(chan string, queSize)
	resChan := make(chan string, jobs)
	done := make(chan bool, numWorkers)

	for i := 0; i < numWorkers; i++ {
		go printer(bufChan, resChan, done)
	}

	// 发送任务
	for i := 0; i < jobs; i++ {
		fmt.Println("sending job", i)
		bufChan <- fmt.Sprintf("buff %d", i)
		fmt.Println("sent job", i)
	}
	close(bufChan)

	// 等待所有工作goroutine完成
	for i := 0; i < numWorkers; i++ {
		<-done
	}
	close(resChan)

	// 收集响应
	for res := range resChan {
		fmt.Println(res)
	}
}

主要修改点:

  1. resChan提供足够的缓冲区或确保及时消费
  2. 添加goroutine退出机制
  3. 使用close()range安全地处理通道
  4. 使用sync.WaitGroup或完成通道协调goroutine

这些修改可以避免死锁,确保所有goroutine都能正常退出。

回到顶部