Golang从channel接收数据时如何解决goroutine冻结问题

Golang从channel接收数据时如何解决goroutine冻结问题 我有一段如下代码运行正常,协程按预期执行了5次。

package main

import (
	"fmt"
	"log"
	"sync"
	pb "walistner/proto"

	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/peer"
	"google.golang.org/protobuf/types/known/emptypb"
)

// type server pb.StreamServiceServer
type server struct {
	pb.UnimplementedStreamServiceServer
}

func (s server) WaMessageStreame(_ *emptypb.Empty, srv pb.StreamService_WaMessageStreameServer) error {
	if p, ok := peer.FromContext(srv.Context()); ok {
		fmt.Println("Client ip is:", p.Addr.String())
	}
	md := metadata.New(map[string]string{"Content-Type": "text/event-stream", "Connection": "keep-alive"})
	srv.SetHeader(md)
	//use wait group to allow process to be concurrent
	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {

		wg.Add(1)
		go func(count int64) {

			defer wg.Done()
     
			resp := myData

			if err := srv.Send(&resp); err != nil {
				log.Printf("send error %v", err)
			}
			log.Printf("finishing request number : %d", count)
		}(int64(i))
	}

	wg.Wait()
	return nil
}

我想做一个小的改动,让它不是只运行5次,而是在从通道接收到输入数据时才运行。所以我把它改成了这样:

	for { // 移除了计数

		wg.Add(1)
		go func() {

			defer wg.Done()
     
			resp := <- myData  // 我把 mydata 改成了从通道获取

			if err := srv.Send(&resp); err != nil {
				log.Printf("send error %v", err)
			}
		}()

但做了这个改动后,我的应用就挂起/冻结了。

我犯了什么错误?


更多关于Golang从channel接收数据时如何解决goroutine冻结问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复

感谢您的澄清,非常感谢。

更多关于Golang从channel接收数据时如何解决goroutine冻结问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


看起来你正在无限制地创建 goroutine。 这是你想要的效果吗?

将帖子标题改为“无限循环永远运行”。此行为与从通道接收数据无关。

WaitGroup 是无意义的,因为无限循环没有退出条件。wg.Wait() 永远不会被调用,所以 wg.Add 和 wg.Done 没有任何作用。

我的代码中存在两个错误:

  1. 我在两个不同的地方监听同一个通道,因此数据有时在到达此通道之前就被消费掉了。
  2. 我在 goroutine 内部监听通道,而我应该在 goroutine 之前、但在同一个 for 循环内部监听它。

正确的代码变为:

	var wg sync.WaitGroup

	for {

		resp := <- myData

		wg.Add(1)

		go func() {
			defer wg.Done()

			if err := srv.Send(&resp); err != nil {
				log.Printf("send error %v", err)
			}

		}()
	}

	wg.Wait()

问题在于你的goroutine启动方式与通道接收的同步机制不匹配。原代码使用WaitGroup等待固定数量的goroutine完成,但修改后变成了无限循环启动goroutine,且没有控制goroutine数量的机制,这会导致goroutine泄漏和资源耗尽。

以下是正确的实现方式:

func (s server) WaMessageStreame(_ *emptypb.Empty, srv pb.StreamService_WaMessageStreameServer) error {
	if p, ok := peer.FromContext(srv.Context()); ok {
		fmt.Println("Client ip is:", p.Addr.String())
	}
	md := metadata.New(map[string]string{"Content-Type": "text/event-stream", "Connection": "keep-alive"})
	srv.SetHeader(md)

	// 创建带缓冲的通道来控制并发数量
	workerCount := 5
	sem := make(chan struct{}, workerCount)
	errChan := make(chan error, 1)
	done := make(chan struct{})

	go func() {
		for {
			select {
			case <-done:
				return
			default:
				sem <- struct{}{} // 获取信号量
				go func() {
					defer func() { <-sem }() // 释放信号量
					
					select {
					case resp := <-myData:
						if err := srv.Send(&resp); err != nil {
							select {
							case errChan <- err:
							default:
							}
						}
					case <-done:
						return
					}
				}()
			}
		}
	}()

	// 等待错误或上下文取消
	select {
	case err := <-errChan:
		close(done)
		return err
	case <-srv.Context().Done():
		close(done)
		return srv.Context().Err()
	}
}

或者使用更简洁的worker pool模式:

func (s server) WaMessageStreame(_ *emptypb.Empty, srv pb.StreamService_WaMessageStreameServer) error {
	// ... 初始化代码同上
	
	workerCount := 5
	var wg sync.WaitGroup
	errChan := make(chan error, 1)
	ctx := srv.Context()

	for i := 0; i < workerCount; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			
			for {
				select {
				case resp := <-myData:
					if err := srv.Send(&resp); err != nil {
						select {
						case errChan <- fmt.Errorf("worker %d: %v", workerID, err):
						default:
						}
						return
					}
				case <-ctx.Done():
					return
				}
			}
		}(i)
	}

	// 等待所有worker完成或出错
	wg.Wait()
	close(errChan)
	
	if err := <-errChan; err != nil {
		return err
	}
	return ctx.Err()
}

关键点:

  1. 使用固定数量的worker goroutine而不是无限创建
  2. 正确处理上下文取消
  3. 使用带缓冲的通道避免goroutine泄漏
  4. 实现优雅的错误处理和退出机制
回到顶部