Golang从channel接收数据时如何解决goroutine冻结问题
Golang从channel接收数据时如何解决goroutine冻结问题 我有一段如下代码运行正常,协程按预期执行了5次。
package main
import (
"fmt"
"log"
"sync"
pb "walistner/proto"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/types/known/emptypb"
)
// type server pb.StreamServiceServer
type server struct {
pb.UnimplementedStreamServiceServer
}
func (s server) WaMessageStreame(_ *emptypb.Empty, srv pb.StreamService_WaMessageStreameServer) error {
if p, ok := peer.FromContext(srv.Context()); ok {
fmt.Println("Client ip is:", p.Addr.String())
}
md := metadata.New(map[string]string{"Content-Type": "text/event-stream", "Connection": "keep-alive"})
srv.SetHeader(md)
//use wait group to allow process to be concurrent
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(count int64) {
defer wg.Done()
resp := myData
if err := srv.Send(&resp); err != nil {
log.Printf("send error %v", err)
}
log.Printf("finishing request number : %d", count)
}(int64(i))
}
wg.Wait()
return nil
}
我想做一个小的改动,让它不是只运行5次,而是在从通道接收到输入数据时才运行。所以我把它改成了这样:
for { // 移除了计数
wg.Add(1)
go func() {
defer wg.Done()
resp := <- myData // 我把 mydata 改成了从通道获取
if err := srv.Send(&resp); err != nil {
log.Printf("send error %v", err)
}
}()
但做了这个改动后,我的应用就挂起/冻结了。
我犯了什么错误?
更多关于Golang从channel接收数据时如何解决goroutine冻结问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html
感谢您的澄清,非常感谢。
更多关于Golang从channel接收数据时如何解决goroutine冻结问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
看起来你正在无限制地创建 goroutine。 这是你想要的效果吗?
将帖子标题改为“无限循环永远运行”。此行为与从通道接收数据无关。
WaitGroup 是无意义的,因为无限循环没有退出条件。wg.Wait() 永远不会被调用,所以 wg.Add 和 wg.Done 没有任何作用。
我的代码中存在两个错误:
- 我在两个不同的地方监听同一个通道,因此数据有时在到达此通道之前就被消费掉了。
- 我在 goroutine 内部监听通道,而我应该在 goroutine 之前、但在同一个 for 循环内部监听它。
正确的代码变为:
var wg sync.WaitGroup
for {
resp := <- myData
wg.Add(1)
go func() {
defer wg.Done()
if err := srv.Send(&resp); err != nil {
log.Printf("send error %v", err)
}
}()
}
wg.Wait()
问题在于你的goroutine启动方式与通道接收的同步机制不匹配。原代码使用WaitGroup等待固定数量的goroutine完成,但修改后变成了无限循环启动goroutine,且没有控制goroutine数量的机制,这会导致goroutine泄漏和资源耗尽。
以下是正确的实现方式:
func (s server) WaMessageStreame(_ *emptypb.Empty, srv pb.StreamService_WaMessageStreameServer) error {
if p, ok := peer.FromContext(srv.Context()); ok {
fmt.Println("Client ip is:", p.Addr.String())
}
md := metadata.New(map[string]string{"Content-Type": "text/event-stream", "Connection": "keep-alive"})
srv.SetHeader(md)
// 创建带缓冲的通道来控制并发数量
workerCount := 5
sem := make(chan struct{}, workerCount)
errChan := make(chan error, 1)
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
sem <- struct{}{} // 获取信号量
go func() {
defer func() { <-sem }() // 释放信号量
select {
case resp := <-myData:
if err := srv.Send(&resp); err != nil {
select {
case errChan <- err:
default:
}
}
case <-done:
return
}
}()
}
}
}()
// 等待错误或上下文取消
select {
case err := <-errChan:
close(done)
return err
case <-srv.Context().Done():
close(done)
return srv.Context().Err()
}
}
或者使用更简洁的worker pool模式:
func (s server) WaMessageStreame(_ *emptypb.Empty, srv pb.StreamService_WaMessageStreameServer) error {
// ... 初始化代码同上
workerCount := 5
var wg sync.WaitGroup
errChan := make(chan error, 1)
ctx := srv.Context()
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
select {
case resp := <-myData:
if err := srv.Send(&resp); err != nil {
select {
case errChan <- fmt.Errorf("worker %d: %v", workerID, err):
default:
}
return
}
case <-ctx.Done():
return
}
}
}(i)
}
// 等待所有worker完成或出错
wg.Wait()
close(errChan)
if err := <-errChan; err != nil {
return err
}
return ctx.Err()
}
关键点:
- 使用固定数量的worker goroutine而不是无限创建
- 正确处理上下文取消
- 使用带缓冲的通道避免goroutine泄漏
- 实现优雅的错误处理和退出机制

