Golang实现灵活超时控制的协程用法

Golang实现灵活超时控制的协程用法 考虑这样一个场景:你有一个通过请求处理器处理请求的服务器(请参考图片)。处理器(通过其工作协程)可能需要很长时间才能完成,因此你需要设置一个超时,以防它永不终止。在我们的案例中,请求处理时间长是可以接受的,只要其中任何一个工作协程仍在进行中。然而,我们设置的初始超时时间限制了这个条件。(也就是说,有没有办法检测不活动状态,而不是设置一个固定的超时?)

Note 04 Dec 2023

例如,你有6个工作协程在一个包含5秒超时的上下文中工作。第 n 个工作协程需要 n 秒来完成(例如,第3个工作协程需要3秒完成)。 由于我们设置了5秒的超时,第6个工作协程将会超时。所以,我的问题是,能否在执行过程中的稍后阶段刷新这个计时器(例如,当一个工作协程完成或判定自己仍在进行时)?

请参考下方帖子中的图片

具体来说,你可以看看这个程序:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func main() {
	fmt.Println("handling request")
	workers := 10

	var wg sync.WaitGroup
	wg.Add(workers)

	timeout := 5 * time.Second

	ctx, _ := context.WithTimeout(context.Background(), timeout)

	for i := 1; i <= workers; i++ {
		go worker(ctx, i, &wg)
	}

	wg.Wait()
	fmt.Println("All workers terminated")
}

// Worker will take index * time.Second to return
func worker(ctx context.Context, index int, wg *sync.WaitGroup) {
	defer wg.Done()
	// fmt.Println("Worker ", index, "waiting for ", index, "seconds")

	select {
	case <-ctx.Done():
		// Cancel worker
		fmt.Println("Worker ", index, "timed out")
		return
	case <-time.After(time.Duration(index) * time.Second):
		// Process finished after some time
		fmt.Println("Worker ", index, "finished")
	}
}

其输出为:

handling request
Worker  1 finished
Worker  2 finished
Worker  3 finished
Worker  4 finished
Worker  5 finished
Worker  6 timed out
All workers terminated

我如何才能允许第6个工作协程完成,因为它仍然在上一个不活动的协程的5秒时间范围内。


更多关于Golang实现灵活超时控制的协程用法的实战教程也可以访问 https://www.itying.com/category-94-b0.html

8 回复

一点建议,对于心跳通道,最好使用空结构体而不是布尔类型。

更多关于Golang实现灵活超时控制的协程用法的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


只是一个建议,对于心跳通道,最好使用空结构体而不是布尔类型。

有什么具体原因吗?

我在这一行收到了一条消息;

ctx, _ := context.WithTimeout(context.Background(), timeout)

你需要添加一个变量来接收取消函数,所以: ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel()

而且我不知道对于你的用例来说,是否可以将超时秒数增加,比如说增加到10秒…

这只是一个良好的实践。空结构体不占用内存,而且能提高代码的可读性。因为当你看到一个使用空结构体的通道时,你就能明白在大多数情况下它是被用作信号量或锁。这比分配内存给一个未使用的布尔值要好,因为后者让人第一眼无法理解这个值是否真的在某个地方被需要。

Context 的设计初衷并非你所设想的使用方式。如果我的理解正确,你或许可以实现一个工作进程的心跳机制,并以此作为判断它们是否仍在活跃状态的依据。例如,你可以创建一个 goroutine 来监听工作进程的心跳,一旦某个进程停止心跳,它就给予其他进程 5 秒的时间来完成工作。如果在这 5 秒内又有其他进程完成,计时器会重新开始计时 5 秒,依此类推。这种机制或许能解决你的问题,但需要大量的额外工作,因为如果你仍然希望工作进程有超时机制,就必须传递它们各自的上下文,并在某处保存所有取消函数和心跳通道。

如果我的理解正确,您或许可以为工作者实现一个心跳机制,并将其作为判断它们是否不再活跃的依据。

是的,您的想法可以解决这个问题。例如,可以有一个公共进程作为心跳接收器。如果它在连续5秒内没有收到任何(来自任何工作者的)反馈,那么它就会使用上下文的取消函数来停止所有工作者。如果它在5秒内确实收到了反馈,那么它应该从头开始重启计时器。

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func main() {
	fmt.Println("handling request")
	workers := 6

	var wg sync.WaitGroup
	wg.Add(workers + 2)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	heartbeat := make(chan bool, 1)

	go heartbeatReceiver(ctx, heartbeat, cancel, &wg)

	go infiniteWorkers(ctx, heartbeat, &wg)

	for i := 1; i <= workers; i++ {
		go worker(ctx, i, heartbeat, &wg)
	}

	wg.Wait()
	fmt.Println("All workers terminated")
}

func heartbeatReceiver(ctx context.Context, heartbeat <-chan bool, cancel context.CancelFunc, wg *sync.WaitGroup) {
	defer wg.Done()
	timeout := 5 * time.Second

	for {
		select {
		case <-time.After(timeout):
			// fmt.Println("timedout cancelling all go routines")
			cancel()
			return
		case <-heartbeat:
			fmt.Println("received heartbeat")
			// Restart timer
		}
	}
}

// Normal workers, will sleep for index * time.Second
func worker(ctx context.Context, index int, heartbeat chan<- bool, wg *sync.WaitGroup) {
	defer func() {
		// Send heartbeat when finished
		heartbeat <- true
		wg.Done()
	}()
	// fmt.Println("Worker ", index, "waiting for ", index, "seconds")

	select {
	case <-ctx.Done():
		// Cancel worker
		fmt.Println("Worker ", index, "timed out")
		return
	case <-time.After(time.Duration(index) * time.Second):
		// Process finished
		fmt.Println("Worker ", index, "finished")
	}
}

func infiniteWorkers(ctx context.Context, heartbeat chan<- bool, wg *sync.WaitGroup) {
	defer func() {
		// Send heartbeat when finished
		heartbeat <- true
		wg.Done()
	}()

	// worker will never finish
	fmt.Println("Worker will never finish unless cancelled...")

	select {
	case <-ctx.Done():
		// The context is over, stop processing results
		fmt.Println(" ->>> Worker cancelled!")
		return
	}
}

输出:

handling request
Worker will never finish unless cancelled...
Worker  1 finished
received heartbeat
Worker  2 finished
received heartbeat
Worker  3 finished
received heartbeat
Worker  4 finished
received heartbeat
Worker  5 finished
received heartbeat
Worker  6 finished
received heartbeat
 ->>> Worker cancelled!
All workers terminated

这符合预期;工作者6被允许完成其工作,而一个永不结束的工作者将被终止。 谢谢!

要实现灵活的协程超时控制,可以通过动态刷新超时计时器来实现。以下是两种实用的解决方案:

方案一:使用可重置的计时器

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func main() {
	fmt.Println("handling request")
	workers := 10

	var wg sync.WaitGroup
	wg.Add(workers)

	// 创建可取消的上下文
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 创建可重置的计时器
	resetTimer := time.NewTimer(5 * time.Second)
	defer resetTimer.Stop()

	// 监控工作协程完成情况
	done := make(chan int, workers)
	
	for i := 1; i <= workers; i++ {
		go worker(ctx, i, &wg, done)
	}

	// 协程完成监控器
	go func() {
		activeWorkers := workers
		for {
			select {
			case <-done:
				activeWorkers--
				// 重置计时器,只要有协程仍在活动
				if activeWorkers > 0 {
					if !resetTimer.Stop() {
						<-resetTimer.C
					}
					resetTimer.Reset(5 * time.Second)
				}
			case <-resetTimer.C:
				// 超时发生,取消所有协程
				cancel()
				return
			}
		}
	}()

	wg.Wait()
	fmt.Println("All workers terminated")
}

func worker(ctx context.Context, index int, wg *sync.WaitGroup, done chan<- int) {
	defer wg.Done()
	defer func() { done <- index }()

	select {
	case <-ctx.Done():
		fmt.Println("Worker", index, "timed out")
		return
	case <-time.After(time.Duration(index) * time.Second):
		fmt.Println("Worker", index, "finished")
	}
}

方案二:使用活动心跳机制

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func main() {
	fmt.Println("handling request")
	workers := 10

	var wg sync.WaitGroup
	wg.Add(workers)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 心跳通道
	heartbeat := make(chan struct{}, workers)
	
	// 超时监控器
	go func() {
		timeout := 5 * time.Second
		timer := time.NewTimer(timeout)
		defer timer.Stop()

		for {
			select {
			case <-heartbeat:
				// 收到心跳,重置计时器
				if !timer.Stop() {
					<-timer.C
				}
				timer.Reset(timeout)
			case <-timer.C:
				// 超时发生
				cancel()
				return
			}
		}
	}()

	for i := 1; i <= workers; i++ {
		go workerWithHeartbeat(ctx, i, &wg, heartbeat)
	}

	wg.Wait()
	fmt.Println("All workers terminated")
}

func workerWithHeartbeat(ctx context.Context, index int, wg *sync.WaitGroup, heartbeat chan<- struct{}) {
	defer wg.Done()

	// 定期发送心跳
	heartbeatTicker := time.NewTicker(1 * time.Second)
	defer heartbeatTicker.Stop()

	// 工作完成信号
	done := make(chan struct{})

	go func() {
		time.Sleep(time.Duration(index) * time.Second)
		close(done)
	}()

	for {
		select {
		case <-ctx.Done():
			fmt.Println("Worker", index, "timed out")
			return
		case <-heartbeatTicker.C:
			// 发送心跳
			select {
			case heartbeat <- struct{}{}:
			default:
			}
		case <-done:
			fmt.Println("Worker", index, "finished")
			return
		}
	}
}

方案三:使用进度报告机制

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type ProgressReporter interface {
	ReportProgress()
}

func main() {
	fmt.Println("handling request")
	workers := 10

	var wg sync.WaitGroup
	wg.Add(workers)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 进度监控器
	monitor := NewActivityMonitor(5*time.Second, cancel)
	defer monitor.Stop()

	for i := 1; i <= workers; i++ {
		go workerWithProgress(ctx, i, &wg, monitor)
	}

	wg.Wait()
	fmt.Println("All workers terminated")
}

type ActivityMonitor struct {
	timeout   time.Duration
	cancel    context.CancelFunc
	timer     *time.Timer
	progress  chan struct{}
	stop      chan struct{}
}

func NewActivityMonitor(timeout time.Duration, cancel context.CancelFunc) *ActivityMonitor {
	m := &ActivityMonitor{
		timeout:  timeout,
		cancel:   cancel,
		progress: make(chan struct{}, 100),
		stop:     make(chan struct{}),
		timer:    time.NewTimer(timeout),
	}

	go m.monitor()
	return m
}

func (m *ActivityMonitor) ReportProgress() {
	select {
	case m.progress <- struct{}{}:
	default:
	}
}

func (m *ActivityMonitor) monitor() {
	for {
		select {
		case <-m.progress:
			if !m.timer.Stop() {
				<-m.timer.C
			}
			m.timer.Reset(m.timeout)
		case <-m.timer.C:
			m.cancel()
			return
		case <-m.stop:
			return
		}
	}
}

func (m *ActivityMonitor) Stop() {
	close(m.stop)
	m.timer.Stop()
}

func workerWithProgress(ctx context.Context, index int, wg *sync.WaitGroup, monitor *ActivityMonitor) {
	defer wg.Done()

	// 模拟工作进度
	for i := 0; i < index; i++ {
		select {
		case <-ctx.Done():
			fmt.Println("Worker", index, "timed out")
			return
		default:
			// 报告进度
			monitor.ReportProgress()
			time.Sleep(1 * time.Second)
		}
	}
	fmt.Println("Worker", index, "finished")
}

这些方案都允许在协程活动时重置超时计时器,确保只有在所有协程都真正不活动时才触发超时。方案一通过协程完成事件重置计时器,方案二使用心跳机制,方案三提供更细粒度的进度报告。

回到顶部