Golang中如何解决goroutine无法从channel读取数据的问题

Golang中如何解决goroutine无法从channel读取数据的问题 我编写了以下代码。

package main

import (
	"time"
	"fmt"
)
var ch chan int
var ptimer map[string]*time.Timer
var m map[string]int
var startTime int64
func fn(name string, i int) {
	fmt.Println("this my name:", name)
	flag := false
	for {
		endTime := time.Now().UnixNano()
		fmt.Println("	in for duration:", float64((endTime-startTime)))
		select {
		case tmp := <-ch:
			fmt.Println("	this is ch, tmp:", tmp)
			flag = true
			break
		case <-ptimer[name].C:
			_, ok := m[name]
			if !ok{
				fmt.Println("	", name, "not in m")
				flag = true
				break
			}
			if 1==1 {
				fmt.Println("	this is in 1==1!!!!!!!!!!!!!!!!!!!!")
				ptimer[name].Reset(time.Second*20)
				break
			}
		}
		if flag == true {
			fmt.Println("	in flag == true no.:", i, "\n")
			break
		}
	}
}
func fib(a int) int {
	if a==0 {
		return 0
	}else if a==1{
		return 1
	}else {
		return fib(a-1)+fib(a-2)
	}
}
func main() {
	fmt.Println("this is in main")
	name := "hello"
	m = make(map[string]int)
	ptimer = make(map[string]*time.Timer)
	startTime := time.Now().UnixNano()

	i:=1
	for{
		m[name] = 30
		ptimer[name] = time.NewTimer(time.Second*20)
		go fn(name, i)
		fmt.Println("fib:", fib(m[name]))
		endTime := time.Now().UnixNano()
		fmt.Println("duration:", float64((endTime-startTime)))
		delete(m, name)
		ch = make(chan int, 1)
		ch <- i
		fmt.Println("after send exit_overtime\n")
		i++
		time.Sleep(time.Second)
	}
}

当我将 fib 函数的参数设置为 10 时,一切运行正常。

然而,当我将其设置为 30 时,发生了意想不到的情况。我发现第一个 fn goroutine(其参数 i 为 1)无法接收到 ch 通道的消息。

我在执行过程中打印了一些信息。结果显示,当参数为 30 时,程序会首先进入 fn 函数中的 for 循环。之后,它才完成 fib 函数的执行并将 i 发送到 ch 通道。而在参数为 10 的情况下,它可以在进入 fn 函数之前完成 fib 的计算。因此,第一个 ‘fn’ goroutine 可以接收到 ‘ch’ 通道并退出。

问题是,第一个 ‘fn’ goroutine 只有在 ptimer 超时时才能退出。如果我没有这个 ptimer,第一个 fn goroutine 是否就永远无法退出?

我想知道,为什么 fn 中的 for 循环在主函数发送 ich 通道之前执行,会导致第一个 fn 无法接收到 ch。另外,第一个 ch 接收到的消息将会在第二个 fn goroutine 中被接收。

我想知道这个问题是否有解决方案?更具体地说,在不改变执行顺序的前提下,如何在第一次发送 ich 时让第一个 fn goroutine 退出。这个顺序是 go fn(name, i) 需要在 fib 执行之前执行。


更多关于Golang中如何解决goroutine无法从channel读取数据的问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

我找到了一个解决方案,但我仍然不明白为什么在 fn 函数中,for 循环在主函数向 ch 通道发送 i 之前执行,会导致第一个 fn 无法接收到 ch。如果有人知道原因,能否为我解释一下?谢谢。

更多关于Golang中如何解决goroutine无法从channel读取数据的问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你好 @5151

欢迎来到 Go 论坛。

你的斐波那契函数没问题,但我很难理解 fn() 函数应该做什么,以及全局变量和局部变量的作用。 与其深究逻辑,让我先从在我看来最明显的问题开始。

  1. main()fn() 都并发地访问全局变量。具体来说,是两个映射 mptimer。你在这里遇到数据竞争。要解决这个问题,要么使用互斥锁保护访问,要么(可能更清晰)通过通道传递数据。记住 Go 谚语:“不要通过共享内存来通信,而要通过通信来共享内存。”

    提示:使用 -race 标志编译你的代码,运行代码,并检查发出的数据竞争警告。

  2. 函数 main() 不断用新的计时器覆盖 ptimer["hello"] 中的值,而 fn() 协程则尝试从该映射条目中读取。我猜你希望每个协程有一个计时器,所以也许在协程内部启动该计时器会更直接。

我的问题:

  1. 这些建议有帮助吗?
  2. 如果没有帮助,你能将代码缩减到仍然能复现问题的最简代码吗?

这是一个典型的goroutine调度和channel通信时序问题。问题根源在于goroutine的启动和调度时机。

fib(30)计算量较大时,主goroutine会长时间阻塞在计算中,此时第一个fn goroutine已经启动并进入了select循环。但由于主goroutine尚未执行到ch <- i,第一个fn goroutine会在select中等待。

当主goroutine完成计算并发送数据到channel时,第一个fn goroutine可能正在执行select的某个case(很可能是timer case),导致错过了channel的接收。

以下是解决方案:

package main

import (
	"time"
	"fmt"
	"sync"
)

var ch chan int
var ptimer map[string]*time.Timer
var m map[string]int
var startTime int64
var mu sync.Mutex

func fn(name string, i int, wg *sync.WaitGroup) {
	defer wg.Done()
	fmt.Println("this my name:", name)
	flag := false
	
	// 确保在进入select循环前channel已准备好
	mu.Lock()
	if ch == nil {
		ch = make(chan int, 1)
	}
	mu.Unlock()
	
	for {
		endTime := time.Now().UnixNano()
		fmt.Println("	in for duration:", float64((endTime-startTime)))
		select {
		case tmp := <-ch:
			fmt.Println("	this is ch, tmp:", tmp)
			flag = true
		case <-ptimer[name].C:
			_, ok := m[name]
			if !ok {
				fmt.Println("	", name, "not in m")
				flag = true
			}
			if 1 == 1 {
				fmt.Println("	this is in 1==1!!!!!!!!!!!!!!!!!!!!")
				ptimer[name].Reset(time.Second * 20)
			}
		}
		if flag {
			fmt.Println("	in flag == true no.:", i, "\n")
			break
		}
	}
}

func fib(a int) int {
	if a == 0 {
		return 0
	} else if a == 1 {
		return 1
	} else {
		return fib(a-1) + fib(a-2)
	}
}

func main() {
	fmt.Println("this is in main")
	name := "hello"
	m = make(map[string]int)
	ptimer = make(map[string]*time.Timer)
	startTime = time.Now().UnixNano()

	// 提前初始化channel
	ch = make(chan int, 1)
	
	var wg sync.WaitGroup
	i := 1
	for {
		m[name] = 30
		ptimer[name] = time.NewTimer(time.Second * 20)
		
		wg.Add(1)
		go fn(name, i, &wg)
		
		// 立即发送数据到channel
		ch <- i
		fmt.Println("after send exit_overtime\n")
		
		fmt.Println("fib:", fib(m[name]))
		endTime := time.Now().UnixNano()
		fmt.Println("duration:", float64((endTime-startTime)))
		
		delete(m, name)
		i++
		
		// 等待goroutine完成
		wg.Wait()
		time.Sleep(time.Second)
		
		// 重置channel用于下一次循环
		ch = make(chan int, 1)
	}
}

或者使用带缓冲的channel和同步机制:

package main

import (
	"time"
	"fmt"
	"context"
)

var ch chan int
var ptimer map[string]*time.Timer
var m map[string]int
var startTime int64

func fn(name string, i int, ctx context.Context) {
	fmt.Println("this my name:", name)
	flag := false
	for {
		select {
		case <-ctx.Done():
			fmt.Println("	context cancelled, exiting")
			return
		case tmp := <-ch:
			fmt.Println("	this is ch, tmp:", tmp)
			flag = true
		case <-ptimer[name].C:
			_, ok := m[name]
			if !ok {
				fmt.Println("	", name, "not in m")
				flag = true
			}
			if 1 == 1 {
				fmt.Println("	this is in 1==1!!!!!!!!!!!!!!!!!!!!")
				ptimer[name].Reset(time.Second * 20)
			}
		}
		if flag {
			fmt.Println("	in flag == true no.:", i, "\n")
			break
		}
	}
}

func fib(a int) int {
	if a == 0 {
		return 0
	} else if a == 1 {
		return 1
	} else {
		return fib(a-1) + fib(a-2)
	}
}

func main() {
	fmt.Println("this is in main")
	name := "hello"
	m = make(map[string]int)
	ptimer = make(map[string]*time.Timer)
	startTime = time.Now().UnixNano()

	// 使用带缓冲的channel
	ch = make(chan int, 10)
	
	i := 1
	for {
		m[name] = 30
		ptimer[name] = time.NewTimer(time.Second * 20)
		
		// 创建context用于控制goroutine生命周期
		ctx, cancel := context.WithCancel(context.Background())
		
		// 先发送数据到channel
		ch <- i
		
		// 再启动goroutine
		go fn(name, i, ctx)
		
		fmt.Println("after send exit_overtime\n")
		
		fmt.Println("fib:", fib(m[name]))
		endTime := time.Now().UnixNano()
		fmt.Println("duration:", float64((endTime-startTime)))
		
		delete(m, name)
		i++
		
		// 取消context确保goroutine退出
		cancel()
		time.Sleep(time.Second)
	}
}

关键点:

  1. 在启动goroutine前先初始化channel
  2. 使用带缓冲的channel确保数据不会丢失
  3. 使用同步机制(WaitGroup或Context)控制goroutine生命周期
  4. 确保发送操作在goroutine开始监听前执行

这样无论fib函数执行时间多长,第一个fn goroutine都能正确接收到channel的数据。

回到顶部