Golang实现灵活超时控制的协程用法
Golang实现灵活超时控制的协程用法 考虑这样一个场景:你有一个通过请求处理器处理请求的服务器(请参考图片)。处理器(通过其工作协程)可能需要很长时间才能完成,因此你需要设置一个超时,以防它永不终止。在我们的案例中,请求处理时间长是可以接受的,只要其中任何一个工作协程仍在进行中。然而,我们设置的初始超时时间限制了这个条件。(也就是说,有没有办法检测不活动状态,而不是设置一个固定的超时?)

例如,你有6个工作协程在一个包含5秒超时的上下文中工作。第 n 个工作协程需要 n 秒来完成(例如,第3个工作协程需要3秒完成)。 由于我们设置了5秒的超时,第6个工作协程将会超时。所以,我的问题是,能否在执行过程中的稍后阶段刷新这个计时器(例如,当一个工作协程完成或判定自己仍在进行时)?
请参考下方帖子中的图片
具体来说,你可以看看这个程序:
package main
import (
"context"
"fmt"
"sync"
"time"
)
func main() {
fmt.Println("handling request")
workers := 10
var wg sync.WaitGroup
wg.Add(workers)
timeout := 5 * time.Second
ctx, _ := context.WithTimeout(context.Background(), timeout)
for i := 1; i <= workers; i++ {
go worker(ctx, i, &wg)
}
wg.Wait()
fmt.Println("All workers terminated")
}
// Worker will take index * time.Second to return
func worker(ctx context.Context, index int, wg *sync.WaitGroup) {
defer wg.Done()
// fmt.Println("Worker ", index, "waiting for ", index, "seconds")
select {
case <-ctx.Done():
// Cancel worker
fmt.Println("Worker ", index, "timed out")
return
case <-time.After(time.Duration(index) * time.Second):
// Process finished after some time
fmt.Println("Worker ", index, "finished")
}
}
其输出为:
handling request
Worker 1 finished
Worker 2 finished
Worker 3 finished
Worker 4 finished
Worker 5 finished
Worker 6 timed out
All workers terminated
我如何才能允许第6个工作协程完成,因为它仍然在上一个不活动的协程的5秒时间范围内。
更多关于Golang实现灵活超时控制的协程用法的实战教程也可以访问 https://www.itying.com/category-94-b0.html
一点建议,对于心跳通道,最好使用空结构体而不是布尔类型。
更多关于Golang实现灵活超时控制的协程用法的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html

我在这一行收到了一条消息;
ctx, _ := context.WithTimeout(context.Background(), timeout)
你需要添加一个变量来接收取消函数,所以: ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel()
而且我不知道对于你的用例来说,是否可以将超时秒数增加,比如说增加到10秒…
这只是一个良好的实践。空结构体不占用内存,而且能提高代码的可读性。因为当你看到一个使用空结构体的通道时,你就能明白在大多数情况下它是被用作信号量或锁。这比分配内存给一个未使用的布尔值要好,因为后者让人第一眼无法理解这个值是否真的在某个地方被需要。
Context 的设计初衷并非你所设想的使用方式。如果我的理解正确,你或许可以实现一个工作进程的心跳机制,并以此作为判断它们是否仍在活跃状态的依据。例如,你可以创建一个 goroutine 来监听工作进程的心跳,一旦某个进程停止心跳,它就给予其他进程 5 秒的时间来完成工作。如果在这 5 秒内又有其他进程完成,计时器会重新开始计时 5 秒,依此类推。这种机制或许能解决你的问题,但需要大量的额外工作,因为如果你仍然希望工作进程有超时机制,就必须传递它们各自的上下文,并在某处保存所有取消函数和心跳通道。
如果我的理解正确,您或许可以为工作者实现一个心跳机制,并将其作为判断它们是否不再活跃的依据。
是的,您的想法可以解决这个问题。例如,可以有一个公共进程作为心跳接收器。如果它在连续5秒内没有收到任何(来自任何工作者的)反馈,那么它就会使用上下文的取消函数来停止所有工作者。如果它在5秒内确实收到了反馈,那么它应该从头开始重启计时器。
package main
import (
"context"
"fmt"
"sync"
"time"
)
func main() {
fmt.Println("handling request")
workers := 6
var wg sync.WaitGroup
wg.Add(workers + 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
heartbeat := make(chan bool, 1)
go heartbeatReceiver(ctx, heartbeat, cancel, &wg)
go infiniteWorkers(ctx, heartbeat, &wg)
for i := 1; i <= workers; i++ {
go worker(ctx, i, heartbeat, &wg)
}
wg.Wait()
fmt.Println("All workers terminated")
}
func heartbeatReceiver(ctx context.Context, heartbeat <-chan bool, cancel context.CancelFunc, wg *sync.WaitGroup) {
defer wg.Done()
timeout := 5 * time.Second
for {
select {
case <-time.After(timeout):
// fmt.Println("timedout cancelling all go routines")
cancel()
return
case <-heartbeat:
fmt.Println("received heartbeat")
// Restart timer
}
}
}
// Normal workers, will sleep for index * time.Second
func worker(ctx context.Context, index int, heartbeat chan<- bool, wg *sync.WaitGroup) {
defer func() {
// Send heartbeat when finished
heartbeat <- true
wg.Done()
}()
// fmt.Println("Worker ", index, "waiting for ", index, "seconds")
select {
case <-ctx.Done():
// Cancel worker
fmt.Println("Worker ", index, "timed out")
return
case <-time.After(time.Duration(index) * time.Second):
// Process finished
fmt.Println("Worker ", index, "finished")
}
}
func infiniteWorkers(ctx context.Context, heartbeat chan<- bool, wg *sync.WaitGroup) {
defer func() {
// Send heartbeat when finished
heartbeat <- true
wg.Done()
}()
// worker will never finish
fmt.Println("Worker will never finish unless cancelled...")
select {
case <-ctx.Done():
// The context is over, stop processing results
fmt.Println(" ->>> Worker cancelled!")
return
}
}
输出:
handling request
Worker will never finish unless cancelled...
Worker 1 finished
received heartbeat
Worker 2 finished
received heartbeat
Worker 3 finished
received heartbeat
Worker 4 finished
received heartbeat
Worker 5 finished
received heartbeat
Worker 6 finished
received heartbeat
->>> Worker cancelled!
All workers terminated
这符合预期;工作者6被允许完成其工作,而一个永不结束的工作者将被终止。 谢谢!
要实现灵活的协程超时控制,可以通过动态刷新超时计时器来实现。以下是两种实用的解决方案:
方案一:使用可重置的计时器
package main
import (
"context"
"fmt"
"sync"
"time"
)
func main() {
fmt.Println("handling request")
workers := 10
var wg sync.WaitGroup
wg.Add(workers)
// 创建可取消的上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 创建可重置的计时器
resetTimer := time.NewTimer(5 * time.Second)
defer resetTimer.Stop()
// 监控工作协程完成情况
done := make(chan int, workers)
for i := 1; i <= workers; i++ {
go worker(ctx, i, &wg, done)
}
// 协程完成监控器
go func() {
activeWorkers := workers
for {
select {
case <-done:
activeWorkers--
// 重置计时器,只要有协程仍在活动
if activeWorkers > 0 {
if !resetTimer.Stop() {
<-resetTimer.C
}
resetTimer.Reset(5 * time.Second)
}
case <-resetTimer.C:
// 超时发生,取消所有协程
cancel()
return
}
}
}()
wg.Wait()
fmt.Println("All workers terminated")
}
func worker(ctx context.Context, index int, wg *sync.WaitGroup, done chan<- int) {
defer wg.Done()
defer func() { done <- index }()
select {
case <-ctx.Done():
fmt.Println("Worker", index, "timed out")
return
case <-time.After(time.Duration(index) * time.Second):
fmt.Println("Worker", index, "finished")
}
}
方案二:使用活动心跳机制
package main
import (
"context"
"fmt"
"sync"
"time"
)
func main() {
fmt.Println("handling request")
workers := 10
var wg sync.WaitGroup
wg.Add(workers)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 心跳通道
heartbeat := make(chan struct{}, workers)
// 超时监控器
go func() {
timeout := 5 * time.Second
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-heartbeat:
// 收到心跳,重置计时器
if !timer.Stop() {
<-timer.C
}
timer.Reset(timeout)
case <-timer.C:
// 超时发生
cancel()
return
}
}
}()
for i := 1; i <= workers; i++ {
go workerWithHeartbeat(ctx, i, &wg, heartbeat)
}
wg.Wait()
fmt.Println("All workers terminated")
}
func workerWithHeartbeat(ctx context.Context, index int, wg *sync.WaitGroup, heartbeat chan<- struct{}) {
defer wg.Done()
// 定期发送心跳
heartbeatTicker := time.NewTicker(1 * time.Second)
defer heartbeatTicker.Stop()
// 工作完成信号
done := make(chan struct{})
go func() {
time.Sleep(time.Duration(index) * time.Second)
close(done)
}()
for {
select {
case <-ctx.Done():
fmt.Println("Worker", index, "timed out")
return
case <-heartbeatTicker.C:
// 发送心跳
select {
case heartbeat <- struct{}{}:
default:
}
case <-done:
fmt.Println("Worker", index, "finished")
return
}
}
}
方案三:使用进度报告机制
package main
import (
"context"
"fmt"
"sync"
"time"
)
type ProgressReporter interface {
ReportProgress()
}
func main() {
fmt.Println("handling request")
workers := 10
var wg sync.WaitGroup
wg.Add(workers)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 进度监控器
monitor := NewActivityMonitor(5*time.Second, cancel)
defer monitor.Stop()
for i := 1; i <= workers; i++ {
go workerWithProgress(ctx, i, &wg, monitor)
}
wg.Wait()
fmt.Println("All workers terminated")
}
type ActivityMonitor struct {
timeout time.Duration
cancel context.CancelFunc
timer *time.Timer
progress chan struct{}
stop chan struct{}
}
func NewActivityMonitor(timeout time.Duration, cancel context.CancelFunc) *ActivityMonitor {
m := &ActivityMonitor{
timeout: timeout,
cancel: cancel,
progress: make(chan struct{}, 100),
stop: make(chan struct{}),
timer: time.NewTimer(timeout),
}
go m.monitor()
return m
}
func (m *ActivityMonitor) ReportProgress() {
select {
case m.progress <- struct{}{}:
default:
}
}
func (m *ActivityMonitor) monitor() {
for {
select {
case <-m.progress:
if !m.timer.Stop() {
<-m.timer.C
}
m.timer.Reset(m.timeout)
case <-m.timer.C:
m.cancel()
return
case <-m.stop:
return
}
}
}
func (m *ActivityMonitor) Stop() {
close(m.stop)
m.timer.Stop()
}
func workerWithProgress(ctx context.Context, index int, wg *sync.WaitGroup, monitor *ActivityMonitor) {
defer wg.Done()
// 模拟工作进度
for i := 0; i < index; i++ {
select {
case <-ctx.Done():
fmt.Println("Worker", index, "timed out")
return
default:
// 报告进度
monitor.ReportProgress()
time.Sleep(1 * time.Second)
}
}
fmt.Println("Worker", index, "finished")
}
这些方案都允许在协程活动时重置超时计时器,确保只有在所有协程都真正不活动时才触发超时。方案一通过协程完成事件重置计时器,方案二使用心跳机制,方案三提供更细粒度的进度报告。


