Golang中如何正确处理通道及无限for select循环的程序退出流程

Golang中如何正确处理通道及无限for select循环的程序退出流程 我需要使用gRPC实现观察者模式。 有一个服务器向客户端广播消息,然后客户端返回确认。

使用双向流: rpc CommandStream(stream AcknowMessage) returns (stream CommandMessage);

其中服务器向客户端发送CommandMessage。 客户端则回复AcknowMessage。

服务器有一个函数,用于在需要时广播消息, 同时该函数跟踪确认消息或超时,并返回一个结果映射,例如 map[clientId]status。

于是我询问了ChatGPT并得到了下面的代码,程序运行良好。 即使确认信号在超时后到达,也不会被计入。 代码如下:

type client struct {
	id     string
	stream pb.CommandService_CommandStreamServer
	ackCh  chan *pb.Acknowledgement
}

type server struct {
	pb.UnimplementedCommandServiceServer
	mu      sync.Mutex
	clients map[string]*client
}

func newServer() *server {
	return &server{
		clients: make(map[string]*client),
	}
}
//Wen client start, it send CommandStream to suscribe
func (s *server) CommandStream(stream pb.CommandService_CommandStreamServer) error {
	p, _ := peer.FromContext(stream.Context())
	// del IP
	clientID := p.Addr.String() //
	//clientID := fmt.Sprintf("client-%d", time.Now().UnixNano())

	c := &client{
		id:     clientID,
		stream: stream,
		ackCh:  make(chan *pb.Acknowledgement),
	}

	s.mu.Lock()
	s.clients[clientID] = c
	s.mu.Unlock()
	log.Printf("Client connected: %s", clientID)

	defer func() {
		s.mu.Lock()
		delete(s.clients, clientID)
		s.mu.Unlock()
		log.Printf("Client disconnected: %s", clientID)
	}()

	// Load the ack response in the client channel
	for {
		in, err := stream.Recv() //Wait for answer
		if err != nil {
			return err
		}
		if ack := in.GetAck(); ack != nil {
			//log.Printf("Received ACK from %s: %+v", clientID, ack)
			log.Printf("Receive cmd: %s", ack.CommandId)
			c.ackCh <- ack
		}
	}
}


func (s *server) BroadcastCommand(name, jsonArgs string) map[string]string {
	s.mu.Lock()
	defer s.mu.Unlock()

	commandID := fmt.Sprintf("cmd-%d", time.Now().UnixNano())
	log.Printf("Broadcast command: %v", commandID)

	cmd := &pb.CommandMessage{
		Payload: &pb.CommandMessage_Command{
			Command: &pb.Command{
				Id:       commandID,
				Name:     name,
				JsonArgs: jsonArgs,
			},
		},
	}

	results := make(map[string]string)
	var wg sync.WaitGroup
	mu := sync.Mutex{} // Protect results map

	for _, c := range s.clients {
		wg.Add(1)

		go func(cl *client) {
			defer wg.Done()

			// Send command
			if err := c.stream.Send(cmd); err != nil {
				log.Printf("Error sending to %s: %v", c.id, err)
				mu.Lock()
				results[cl.id] = "send_failed"
				mu.Unlock()
				return
			}

			// Wait for ack or timeout
			ackCh := make(chan string, 1)
			begin := time.Now().UnixNano()			
			fmt.Printf(" Wait for ack or timeout  \n")
			reads := 0
		
			go func() {

				for {

					select {
					case ack := <-c.ackCh:
						fmt.Printf("  read channel after: %d  \n", time.Now().UnixNano()-begin)
						fmt.Printf("               cmdId: %s  \n", ack.CommandId)
						reads++
						ackCh <- ack.Status

					case <-time.After(500 * time.Millisecond):
						fmt.Printf("  timeout after: %d  \n", time.Now().UnixNano()-begin)
						ackCh <- "timeout"

					}
				}

			}()

			//fmt.Printf("many reads: %d  \n", reads) // Always print 0, Why ??

			status := <-ackCh
			mu.Lock()
			results[cl.id] = status
			mu.Unlock()
		}(c)
	}

	wg.Wait()
	return results
}

但是,在广播函数中有一段代码我无法理解:

			go func() {

				for {
					reads++
					select {
					case ack := <-c.ackCh:						
						ackCh <- ack.Status

					case <-time.After(500 * time.Millisecond):						
						ackCh <- "timeout"
					}
				}

			}()
         fmt.Printf("many reads: %d  \n", reads) // Always print 0, Why ??

为什么读取计数器总是0? 程序流是如何在没有return的情况下退出无限循环的?

根据我对通道的了解,这里缺少一个return,但代码却运行良好。 如果我加上一个return,for循环就只执行一次,并且有时会有之前的确认消息在超时后到达, 因为之前超时后到达的命令被写入了通道,而写操作会一直阻塞,直到有通道读取。 发送的goroutine会一直阻塞,直到另一个goroutine准备好从该通道接收。 我总是需要丢弃迟到的消息,只获取最后一个。

有没有更好的方法来编写这段代码?


更多关于Golang中如何正确处理通道及无限for select循环的程序退出流程的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复

谢谢,这解释了为什么读取次数总是等于0。 下一个问题是:

无限循环没有返回语句,流程是如何退出的?

更多关于Golang中如何正确处理通道及无限for select循环的程序退出流程的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


  1. 那么,如果我每秒触发一次广播,一小时后我将有3600个goroutine在运行并等待panic或程序结束吗?
  2. 你会如何实现done输出?for循环应该接收所有消息但只保留最后一条。
func main() {
    fmt.Println("hello world")
}

看一下你提供的代码的简化版本:

go func() {
    // 循环向通道发送值
}()
fmt.Printf("many reads: %d  \n", reads)

go 关键字会启动一个并行的 goroutine(类似于一个迷你线程),它运行指定的函数,在这个例子中就是你那个向通道发送值的匿名函数。fmt.Printf 这行代码会在 goroutine 创建后立即执行;它不会等待 goroutine 做任何事情。

  1. 没错。
  2. 根据我对你代码的理解,你实际上并不需要一个单独的 goroutine 来等待确认。以下是我改进代码的方式:
		go func(cl *client) {
			// 发送命令...

			// 等待确认或超时
			begin := time.Now().UnixNano()			
			fmt.Printf(" Wait for ack or timeout  \n")
			var status string
			select {
			case ack := <-c.ackCh:
				fmt.Printf("  read channel after: %d  \n", time.Now().UnixNano()-begin)
				fmt.Printf("               cmdId: %s  \n", ack.CommandId)
				status = ack.Status
			case <-time.After(500 * time.Millisecond):
				fmt.Printf("  timeout after: %d  \n", time.Now().UnixNano()-begin)
				status = "timeout"
			}

			// 将状态添加到结果中...
		}(c)

我认为这里的主要问题和疑问源于不仅对通道,而且对 goroutine 整体缺乏理解。这可能导致未来产生更多的误解和错误观念。你可以通过例如在通道上进行范围遍历来解决这个问题:

package main

import (
	"fmt"
	"time"
)

func main() {
	var i int

	ch := make(chan int)

	go func() {
		defer close(ch)

		for j := range 10 {
			ch <- j
			time.Sleep(time.Second)
		}
	}()

	for i = range ch {
	}

	fmt.Println(i)
}

在这个例子中,最后它会得到从通道发送的最后一个值。你可以添加一个空结构体通道来发出退出信号。你可以在你的发送/接收通道周围实现这一点。目前问题比答案多,因为如果没有更多代码,很难确切判断。例如,c.ackCh 在哪里关闭?“timeout” 应该做什么?是用于日志记录吗?

另一个非常重要的话题是你何时以及如何使用缓冲通道。

在您的代码中,reads 计数器总是为 0 的原因是因为 goroutine 的并发执行特性。fmt.Printf("many reads: %d \n", reads) 在主 goroutine 中执行,而 reads++ 在子 goroutine 中执行,两者之间没有同步机制。主 goroutine 在启动子 goroutine 后立即执行打印语句,此时子 goroutine 可能还没有执行到 reads++,或者即使执行了,由于内存可见性问题,主 goroutine 也看不到更新后的值。

关于无限循环的退出问题:当 ackCh <- ack.StatusackCh <- "timeout" 执行后,主 goroutine 中的 status := <-ackCh 会接收到值,但子 goroutine 中的无限循环会继续运行。这会导致以下问题:

  1. 子 goroutine 会泄漏,永远不会退出
  2. 如果后续有新的广播,会创建新的 goroutine,导致 goroutine 数量不断增长
  3. 迟到的确认消息可能会被错误的广播处理

以下是改进后的代码示例,解决了这些问题:

func (s *server) BroadcastCommand(name, jsonArgs string) map[string]string {
	s.mu.Lock()
	defer s.mu.Unlock()

	commandID := fmt.Sprintf("cmd-%d", time.Now().UnixNano())
	log.Printf("Broadcast command: %v", commandID)

	cmd := &pb.CommandMessage{
		Payload: &pb.CommandMessage_Command{
			Command: &pb.Command{
				Id:       commandID,
				Name:     name,
				JsonArgs: jsonArgs,
			},
		},
	}

	results := make(map[string]string)
	var wg sync.WaitGroup
	mu := sync.Mutex{}

	for _, c := range s.clients {
		wg.Add(1)

		go func(cl *client) {
			defer wg.Done()

			// Send command
			if err := cl.stream.Send(cmd); err != nil {
				log.Printf("Error sending to %s: %v", cl.id, err)
				mu.Lock()
				results[cl.id] = "send_failed"
				mu.Unlock()
				return
			}

			// Create buffered channel for ack response
			ackCh := make(chan string, 1)
			done := make(chan struct{})
			defer close(done)

			// Start goroutine to wait for ack
			go func() {
				select {
				case ack := <-cl.ackCh:
					// Filter by command ID to ignore stale acks
					if ack.CommandId == commandID {
						select {
						case ackCh <- ack.Status:
						case <-done:
							// Receiver already timed out
						}
					}
				case <-done:
					// Timeout or command completed
				}
			}()

			// Wait for ack or timeout
			select {
			case status := <-ackCh:
				mu.Lock()
				results[cl.id] = status
				mu.Unlock()
			case <-time.After(500 * time.Millisecond):
				mu.Lock()
				results[cl.id] = "timeout"
				mu.Unlock()
			}
		}(c)
	}

	wg.Wait()
	return results
}

关键改进:

  1. 使用带缓冲的通道ackCh := make(chan string, 1) 避免 goroutine 泄漏
  2. 添加 done 通道:用于通知子 goroutine 退出
  3. 命令 ID 过滤:只处理当前命令的确认,忽略迟到的消息
  4. select 中的 done 检查:确保超时后子 goroutine 能正确退出
  5. defer close(done):保证 done 通道一定会被关闭

对于 CommandStream 方法,也需要确保只传递当前命令的确认:

func (s *server) CommandStream(stream pb.CommandService_CommandStreamServer) error {
	p, _ := peer.FromContext(stream.Context())
	clientID := p.Addr.String()

	c := &client{
		id:     clientID,
		stream: stream,
		ackCh:  make(chan *pb.Acknowledgement, 10), // 添加缓冲区防止阻塞
	}

	s.mu.Lock()
	s.clients[clientID] = c
	s.mu.Unlock()
	log.Printf("Client connected: %s", clientID)

	defer func() {
		s.mu.Lock()
		delete(s.clients, clientID)
		s.mu.Unlock()
		log.Printf("Client disconnected: %s", clientID)
	}()

	for {
		in, err := stream.Recv()
		if err != nil {
			return err
		}
		if ack := in.GetAck(); ack != nil {
			select {
			case c.ackCh <- ack:
				// Successfully sent ack to channel
			default:
				// Channel is full, discard old ack
				// This prevents blocking if no one is listening
			}
		}
	}
}

这些改进确保了:

  • 没有 goroutine 泄漏
  • 迟到的确认消息被正确忽略
  • 内存使用可控
  • 程序能正确清理资源
回到顶部