Golang中如何解决指令卡死导致的无限循环问题

Golang中如何解决指令卡死导致的无限循环问题 大家好 在以下代码中,我的循环在 c.ReadMessage 行无限期地卡住,只有在有消息返回时才会继续执行,但这个时间点未知。假设终止信号来了,但循环仍然卡在 c.ReadMessage 处无限期等待,那么我该如何终止这个无法继续执行的循环呢?

for {
		select {
		case <-sign:
                    break
		default:
			//我的循环卡在下面这行注释处
                       // 并且sign term收到了终止信号
			msg, _ := c.ReadMessage(-1)
                        go processMessage(msg)

	}

更多关于Golang中如何解决指令卡死导致的无限循环问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

19 回复

并且我们无法在ReadMessage()中使用时间

更多关于Golang中如何解决指令卡死导致的无限循环问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


谢谢,我会看看的。

我正在使用 confulentinc-kafak-go,而不是 segmentio 的。

是的,我一直在处理那部分,这个建议非常棒,谢谢。

如果我的循环卡在 readmessage 处,并且检测到终止信号,有什么方法可以跳出循环吗?

无法添加 time.After,因为我正在从服务器获取数据,而服务器可能在任何时间返回数据。

如果 AddWait 在不同的 goroutine 中执行,总会存在竞态条件。你必须尽量在同一个 goroutine 中执行 AddWait

为什么不能在 ReadMessage 中使用时间?从你给出的代码来看,这似乎是可行的。

如果在 readMessage 中使用时间参数,那么轮询将由我们控制,并且我们会以静态方式轮询服务器。但如果我们使用 -1,那么库会在收到消息时立即提供给我们,并自行进行轮询。如果服务器每秒有大量消息,服务将在静态时间内处于空闲状态,队列中会积压大量未处理的消息。

我正在开发的服务对时间非常敏感,因此我希望在未读取到消息时避免反复检查if条件。

for {
  if somecondition {
   // do this continue
  }
   select{
   case <-signterm:
   //terminate
   default: 
         msg,err:= c.readMessage(2*time.miliseconds)
// rest of program...
    }
}

taalhach:

如果我的循环卡在 readmessage 上,并且检测到终止信号,那么有什么方法可以中断它吗?

无法停止 ReadMessage 调用。唯一可能的方法是设置一个最大等待时间,强制 ReadMessage 在没有消息时返回。这种模式经常被使用,因为定义一个超时值比让一个函数可停止要简单得多。

请尝试我建议的代码,你会看到它是有效的。

实际上,当 readMessage 返回后,消息处理随即开始。如果任务完成,这些消息的索引会被写入一个通道,以便我们可以提交它们的索引。因此,如果处理已经开始,我们就不能取消上下文,必须等待其处理完成。但在我的 shutdown 函数中,我检查了正在处理的任务计数器(等待组),然后关闭了通道。由于竞态条件,处理器在检查计数器之后才开始运行,这导致我关闭了通道,而在处理消息时尝试向已经关闭的通道写入数据。

你能提供 c.ReadMessage() 函数的代码吗?

如果你想使这个函数能被 sign 中断,可以添加一个可取消的上下文。然后在一个goroutine的for循环中组合 ReadMessageprocessMessage,当上下文的Done通道关闭时返回。

另一种可能性是修改 ReadMessage 函数,使其包含一个select语句,当 sign 关闭时返回。

如果你无法修改 ReadMessage 的实现,那就没办法了。

我开发的服务对时间非常敏感,因此我希望在未读取到消息时,避免反复检查 if 条件。

我认为这完全不会降低你的性能。但无论如何,我看到你使用的库需要一个能够接收上下文的新读取器实现。你可以 Fork 它并自行实现。

// 代码示例:实现一个接收上下文的读取器
func NewReaderWithContext(ctx context.Context, r io.Reader) io.Reader {
    // 实现细节...
}

我刚刚检查了 Kafka Go 客户端库,看到了这个方法: m, err := r.ReadMessage(context.Background()) 你使用的是旧版本的库吗?

GitHub

segmentio/kafka-go

avatar

Kafka library in Go. Contribute to segmentio/kafka-go development by creating an account on GitHub.

将读取消息的代码移到select语句中,并添加超时处理。

类似这样:

package main

import (
	"fmt"
	"time"
)

func main() {
	sign := make(chan bool)
	messages := make(chan int)
	go func() {
		for i := 0; i < 3; i++ {
			if i == 2 {
				time.Sleep(10)
			}
			messages <- i
		}
	}()

	for {
		select {
		case <-sign:
			break
		case msg, _ := <-messages:
			fmt.Println(msg)
			// go processMessage()
		case <-time.After(time.Second * 5):
			fmt.Println("timeouted")
			return
		}
	}
}

https://play.golang.org/p/8aYWKaxc15-

taalhach:

如果我在 readMessage 中使用时间参数,那么轮询将由我们这端控制,我们会静态地轮询服务器;但如果使用 -1,那么库会在收到消息时立即提供给我们,并自行轮询。如果服务器每秒有大量消息,服务将在静态时间内闲置,队列中会堆积大量未处理的消息。

这种说法不正确。我们传递给 ReadMessage 的参数是一个超时值,而不是它返回前会阻塞的时间。即使设置了 2 或 5 秒的超时,ReadMessage 仍然会在每条消息到达时立即返回。唯一的区别是,如果 2 或 5 秒内没有消息,ReadMessage 会返回一个错误和 nil 消息。这正是你检查终止信号的时候。对于试图终止程序的人来说,2 到 5 秒的等待时间是合理的。

我们指定的时间不是轮询的周期,而是等待消息的最长时间。

这确实很棘手。所以您想要等待所有 processingMessage 协程完成,并且能够中断 ReadMessage

等待 processingMessage 完成应该使用一个 sync.WaitGroup 来实现,您在启动 processMessage 协程之前递增它。在协程中,您需要在开头放置一条 defer wg.Done() 指令。当您想要终止并确保所有 processMessage 都已完成时,调用 wg.Wait()

现在,通过关闭 sign 来中断 ReadMessage 是不可能的,因为该函数中没有 select。它调用的 Poll 本身就是一个阻塞函数。

看起来解决方案是使用一个不等于 -1 的超时值。请参考以下代码示例。我们指定了 2 秒的超时值,但也可以是更长的时间,比如 5 秒。这将是等待 sign 关闭被考虑到的最大时间。

for {
    select {
        case <-sign:
            break
        default:
            //我的循环卡在下面这行注释处
            // 并且 sign term 收到终止信号
            msg, _ := c.ReadMessage(2*time.Second)
            if msg != nil {
                go processMessage(msg)
            }
    }
}

这是Kafka库中的一个长函数

func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) {

	var absTimeout time.Time
	var timeoutMs int

	if timeout > 0 {
		absTimeout = time.Now().Add(timeout)
		timeoutMs = (int)(timeout.Seconds() * 1000.0)
	} else {
		timeoutMs = (int)(timeout)
	}
	for {
		ev := c.Poll(timeoutMs)
		switch e := ev.(type) {
		case *Message:
			if e.TopicPartition.Error != nil {
				return e, e.TopicPartition.Error
			}
			return e, nil
		case Error:
			return nil, e
		default:
			// Ignore other event types
		}

		if timeout > 0 {
			// Calculate remaining time
			timeoutMs = int(math.Max(0.0, absTimeout.Sub(time.Now()).Seconds()*1000.0))
		}

		if timeoutMs == 0 && ev == nil {
			//log.Println("Here is something",newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT))
			return nil, newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
		}
	}

}

在WebSocket读取操作中处理阻塞问题,可以使用带超时的上下文或设置读取截止时间。以下是几种解决方案:

方案1:使用带超时的上下文(推荐)

for {
    select {
    case <-sign:
        return // 使用return而不是break
    default:
        // 创建带超时的上下文
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        
        // 使用通道接收读取结果
        msgChan := make(chan []byte)
        errChan := make(chan error)
        
        go func() {
            msg, err := c.ReadMessage(-1)
            if err != nil {
                errChan <- err
                return
            }
            msgChan <- msg
        }()
        
        select {
        case <-ctx.Done():
            cancel()
            continue // 超时后继续循环检查终止信号
        case msg := <-msgChan:
            cancel()
            go processMessage(msg)
        case err := <-errChan:
            cancel()
            // 处理错误
            continue
        }
    }
}

方案2:设置读取截止时间

// 在连接上设置读取截止时间
c.SetReadDeadline(time.Now().Add(5 * time.Second))

for {
    select {
    case <-sign:
        return
    default:
        msg, err := c.ReadMessage(-1)
        if err != nil {
            if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
                // 重置截止时间并继续
                c.SetReadDeadline(time.Now().Add(5 * time.Second))
                continue
            }
            // 其他错误处理
            return
        }
        
        // 重置截止时间
        c.SetReadDeadline(time.Now().Add(5 * time.Second))
        go processMessage(msg)
    }
}

方案3:使用goroutine和select组合

done := make(chan struct{})
msgChan := make(chan []byte)
errChan := make(chan error)

// 启动读取goroutine
go func() {
    for {
        msg, err := c.ReadMessage(-1)
        if err != nil {
            errChan <- err
            return
        }
        msgChan <- msg
    }
}()

for {
    select {
    case <-sign:
        close(done)
        return
    case msg := <-msgChan:
        go processMessage(msg)
    case err := <-errChan:
        // 处理错误
        return
    }
}

方案4:使用net.Conn的SetReadDeadline(针对原始连接)

// 获取底层连接
conn := c.UnderlyingConn()
if conn != nil {
    conn.SetReadDeadline(time.Now().Add(5 * time.Second))
}

for {
    select {
    case <-sign:
        return
    default:
        msg, err := c.ReadMessage(-1)
        if err != nil {
            if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
                // 检查终止信号
                select {
                case <-sign:
                    return
                default:
                    // 重置截止时间
                    conn.SetReadDeadline(time.Now().Add(5 * time.Second))
                    continue
                }
            }
            return
        }
        
        // 重置截止时间
        conn.SetReadDeadline(time.Now().Add(5 * time.Second))
        go processMessage(msg)
    }
}

关键点:

  1. break只能跳出select,不能跳出for循环,应该使用return
  2. 通过设置超时或截止时间来避免永久阻塞
  3. 使用goroutine分离读取操作,通过通道进行通信
  4. 每次读取后需要重置截止时间
回到顶部