Golang中如何正确处理通道及无限for select循环的程序退出流程
Golang中如何正确处理通道及无限for select循环的程序退出流程 我需要使用gRPC实现观察者模式。 有一个服务器向客户端广播消息,然后客户端返回确认。
使用双向流: rpc CommandStream(stream AcknowMessage) returns (stream CommandMessage);
其中服务器向客户端发送CommandMessage。 客户端则回复AcknowMessage。
服务器有一个函数,用于在需要时广播消息, 同时该函数跟踪确认消息或超时,并返回一个结果映射,例如 map[clientId]status。
于是我询问了ChatGPT并得到了下面的代码,程序运行良好。 即使确认信号在超时后到达,也不会被计入。 代码如下:
type client struct {
id string
stream pb.CommandService_CommandStreamServer
ackCh chan *pb.Acknowledgement
}
type server struct {
pb.UnimplementedCommandServiceServer
mu sync.Mutex
clients map[string]*client
}
func newServer() *server {
return &server{
clients: make(map[string]*client),
}
}
//Wen client start, it send CommandStream to suscribe
func (s *server) CommandStream(stream pb.CommandService_CommandStreamServer) error {
p, _ := peer.FromContext(stream.Context())
// del IP
clientID := p.Addr.String() //
//clientID := fmt.Sprintf("client-%d", time.Now().UnixNano())
c := &client{
id: clientID,
stream: stream,
ackCh: make(chan *pb.Acknowledgement),
}
s.mu.Lock()
s.clients[clientID] = c
s.mu.Unlock()
log.Printf("Client connected: %s", clientID)
defer func() {
s.mu.Lock()
delete(s.clients, clientID)
s.mu.Unlock()
log.Printf("Client disconnected: %s", clientID)
}()
// Load the ack response in the client channel
for {
in, err := stream.Recv() //Wait for answer
if err != nil {
return err
}
if ack := in.GetAck(); ack != nil {
//log.Printf("Received ACK from %s: %+v", clientID, ack)
log.Printf("Receive cmd: %s", ack.CommandId)
c.ackCh <- ack
}
}
}
func (s *server) BroadcastCommand(name, jsonArgs string) map[string]string {
s.mu.Lock()
defer s.mu.Unlock()
commandID := fmt.Sprintf("cmd-%d", time.Now().UnixNano())
log.Printf("Broadcast command: %v", commandID)
cmd := &pb.CommandMessage{
Payload: &pb.CommandMessage_Command{
Command: &pb.Command{
Id: commandID,
Name: name,
JsonArgs: jsonArgs,
},
},
}
results := make(map[string]string)
var wg sync.WaitGroup
mu := sync.Mutex{} // Protect results map
for _, c := range s.clients {
wg.Add(1)
go func(cl *client) {
defer wg.Done()
// Send command
if err := c.stream.Send(cmd); err != nil {
log.Printf("Error sending to %s: %v", c.id, err)
mu.Lock()
results[cl.id] = "send_failed"
mu.Unlock()
return
}
// Wait for ack or timeout
ackCh := make(chan string, 1)
begin := time.Now().UnixNano()
fmt.Printf(" Wait for ack or timeout \n")
reads := 0
go func() {
for {
select {
case ack := <-c.ackCh:
fmt.Printf(" read channel after: %d \n", time.Now().UnixNano()-begin)
fmt.Printf(" cmdId: %s \n", ack.CommandId)
reads++
ackCh <- ack.Status
case <-time.After(500 * time.Millisecond):
fmt.Printf(" timeout after: %d \n", time.Now().UnixNano()-begin)
ackCh <- "timeout"
}
}
}()
//fmt.Printf("many reads: %d \n", reads) // Always print 0, Why ??
status := <-ackCh
mu.Lock()
results[cl.id] = status
mu.Unlock()
}(c)
}
wg.Wait()
return results
}
但是,在广播函数中有一段代码我无法理解:
go func() {
for {
reads++
select {
case ack := <-c.ackCh:
ackCh <- ack.Status
case <-time.After(500 * time.Millisecond):
ackCh <- "timeout"
}
}
}()
fmt.Printf("many reads: %d \n", reads) // Always print 0, Why ??
为什么读取计数器总是0? 程序流是如何在没有return的情况下退出无限循环的?
根据我对通道的了解,这里缺少一个return,但代码却运行良好。 如果我加上一个return,for循环就只执行一次,并且有时会有之前的确认消息在超时后到达, 因为之前超时后到达的命令被写入了通道,而写操作会一直阻塞,直到有通道读取。 发送的goroutine会一直阻塞,直到另一个goroutine准备好从该通道接收。 我总是需要丢弃迟到的消息,只获取最后一个。
有没有更好的方法来编写这段代码?
更多关于Golang中如何正确处理通道及无限for select循环的程序退出流程的实战教程也可以访问 https://www.itying.com/category-94-b0.html
谢谢,这解释了为什么读取次数总是等于0。 下一个问题是:
无限循环没有返回语句,流程是如何退出的?
更多关于Golang中如何正确处理通道及无限for select循环的程序退出流程的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
- 那么,如果我每秒触发一次广播,一小时后我将有3600个goroutine在运行并等待panic或程序结束吗?
- 你会如何实现done输出?for循环应该接收所有消息但只保留最后一条。
func main() {
fmt.Println("hello world")
}
看一下你提供的代码的简化版本:
go func() {
// 循环向通道发送值
}()
fmt.Printf("many reads: %d \n", reads)
go 关键字会启动一个并行的 goroutine(类似于一个迷你线程),它运行指定的函数,在这个例子中就是你那个向通道发送值的匿名函数。fmt.Printf 这行代码会在 goroutine 创建后立即执行;它不会等待 goroutine 做任何事情。
- 没错。
- 根据我对你代码的理解,你实际上并不需要一个单独的 goroutine 来等待确认。以下是我改进代码的方式:
go func(cl *client) {
// 发送命令...
// 等待确认或超时
begin := time.Now().UnixNano()
fmt.Printf(" Wait for ack or timeout \n")
var status string
select {
case ack := <-c.ackCh:
fmt.Printf(" read channel after: %d \n", time.Now().UnixNano()-begin)
fmt.Printf(" cmdId: %s \n", ack.CommandId)
status = ack.Status
case <-time.After(500 * time.Millisecond):
fmt.Printf(" timeout after: %d \n", time.Now().UnixNano()-begin)
status = "timeout"
}
// 将状态添加到结果中...
}(c)
我认为这里的主要问题和疑问源于不仅对通道,而且对 goroutine 整体缺乏理解。这可能导致未来产生更多的误解和错误观念。你可以通过例如在通道上进行范围遍历来解决这个问题:
package main
import (
"fmt"
"time"
)
func main() {
var i int
ch := make(chan int)
go func() {
defer close(ch)
for j := range 10 {
ch <- j
time.Sleep(time.Second)
}
}()
for i = range ch {
}
fmt.Println(i)
}
在这个例子中,最后它会得到从通道发送的最后一个值。你可以添加一个空结构体通道来发出退出信号。你可以在你的发送/接收通道周围实现这一点。目前问题比答案多,因为如果没有更多代码,很难确切判断。例如,c.ackCh 在哪里关闭?“timeout” 应该做什么?是用于日志记录吗?
另一个非常重要的话题是你何时以及如何使用缓冲通道。
在您的代码中,reads 计数器总是为 0 的原因是因为 goroutine 的并发执行特性。fmt.Printf("many reads: %d \n", reads) 在主 goroutine 中执行,而 reads++ 在子 goroutine 中执行,两者之间没有同步机制。主 goroutine 在启动子 goroutine 后立即执行打印语句,此时子 goroutine 可能还没有执行到 reads++,或者即使执行了,由于内存可见性问题,主 goroutine 也看不到更新后的值。
关于无限循环的退出问题:当 ackCh <- ack.Status 或 ackCh <- "timeout" 执行后,主 goroutine 中的 status := <-ackCh 会接收到值,但子 goroutine 中的无限循环会继续运行。这会导致以下问题:
- 子 goroutine 会泄漏,永远不会退出
- 如果后续有新的广播,会创建新的 goroutine,导致 goroutine 数量不断增长
- 迟到的确认消息可能会被错误的广播处理
以下是改进后的代码示例,解决了这些问题:
func (s *server) BroadcastCommand(name, jsonArgs string) map[string]string {
s.mu.Lock()
defer s.mu.Unlock()
commandID := fmt.Sprintf("cmd-%d", time.Now().UnixNano())
log.Printf("Broadcast command: %v", commandID)
cmd := &pb.CommandMessage{
Payload: &pb.CommandMessage_Command{
Command: &pb.Command{
Id: commandID,
Name: name,
JsonArgs: jsonArgs,
},
},
}
results := make(map[string]string)
var wg sync.WaitGroup
mu := sync.Mutex{}
for _, c := range s.clients {
wg.Add(1)
go func(cl *client) {
defer wg.Done()
// Send command
if err := cl.stream.Send(cmd); err != nil {
log.Printf("Error sending to %s: %v", cl.id, err)
mu.Lock()
results[cl.id] = "send_failed"
mu.Unlock()
return
}
// Create buffered channel for ack response
ackCh := make(chan string, 1)
done := make(chan struct{})
defer close(done)
// Start goroutine to wait for ack
go func() {
select {
case ack := <-cl.ackCh:
// Filter by command ID to ignore stale acks
if ack.CommandId == commandID {
select {
case ackCh <- ack.Status:
case <-done:
// Receiver already timed out
}
}
case <-done:
// Timeout or command completed
}
}()
// Wait for ack or timeout
select {
case status := <-ackCh:
mu.Lock()
results[cl.id] = status
mu.Unlock()
case <-time.After(500 * time.Millisecond):
mu.Lock()
results[cl.id] = "timeout"
mu.Unlock()
}
}(c)
}
wg.Wait()
return results
}
关键改进:
- 使用带缓冲的通道:
ackCh := make(chan string, 1)避免 goroutine 泄漏 - 添加 done 通道:用于通知子 goroutine 退出
- 命令 ID 过滤:只处理当前命令的确认,忽略迟到的消息
- select 中的 done 检查:确保超时后子 goroutine 能正确退出
- defer close(done):保证 done 通道一定会被关闭
对于 CommandStream 方法,也需要确保只传递当前命令的确认:
func (s *server) CommandStream(stream pb.CommandService_CommandStreamServer) error {
p, _ := peer.FromContext(stream.Context())
clientID := p.Addr.String()
c := &client{
id: clientID,
stream: stream,
ackCh: make(chan *pb.Acknowledgement, 10), // 添加缓冲区防止阻塞
}
s.mu.Lock()
s.clients[clientID] = c
s.mu.Unlock()
log.Printf("Client connected: %s", clientID)
defer func() {
s.mu.Lock()
delete(s.clients, clientID)
s.mu.Unlock()
log.Printf("Client disconnected: %s", clientID)
}()
for {
in, err := stream.Recv()
if err != nil {
return err
}
if ack := in.GetAck(); ack != nil {
select {
case c.ackCh <- ack:
// Successfully sent ack to channel
default:
// Channel is full, discard old ack
// This prevents blocking if no one is listening
}
}
}
}
这些改进确保了:
- 没有 goroutine 泄漏
- 迟到的确认消息被正确忽略
- 内存使用可控
- 程序能正确清理资源

