Golang中如何优雅实现任务队列空闲时自动停止程序

Golang中如何优雅实现任务队列空闲时自动停止程序 我遇到这样一种情况:我有一个作业队列,工人在处理完一个作业后可以向队列中添加新作业。

为了说明这一点,在下面的代码中,一个作业包括计数到 JOB_COUNTING_TO,并且随机地,工人有 1/5 的概率会向队列中添加一个新作业。

因为我的工人可以向队列中添加作业,所以我认为我无法使用通道作为作业队列。这是因为向通道发送是阻塞的,即使使用缓冲通道,由于其递归性质(作业添加作业),这段代码很容易出现所有工人都在向通道发送而没有一个工人可以接收的情况。

这就是为什么我决定使用由互斥锁保护的共享队列。

现在,我希望当所有工人都空闲时程序停止。不幸的是,仅仅通过检查 len(jobQueue) == 0 是无法发现这一点的,因为队列可能为空,但一些工人仍在工作,并且可能在那之后添加新作业。

我想出的解决方案感觉有点笨拙,它使用了变量 var idleWorkerCount intvar isIdle [NB_WORKERS]bool 来跟踪空闲工人,当 idleWorkerCount == NB_WORKERS 时代码停止。

我的问题是,是否有我可以使用的并发模式来使这个逻辑更优雅?

另外,出于某种我不理解的原因,我目前使用的技术(下面的代码)在工人数量变得相当大(例如 300000 个工人)时效率变得非常低:对于相同数量的作业,NB_WORKERS = 300000 的代码会比 NB_WORKERS = 3000 慢 > 10 倍。

非常感谢!

package main

import (
	"math/rand"
	"sync"
)

const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000

var jobQueue []int
var mu sync.Mutex
var idleWorkerCount int
var isIdle [NB_WORKERS]bool

func doJob(workerId int) {

	mu.Lock()

	if len(jobQueue) == 0 {
		if !isIdle[workerId] {
			idleWorkerCount += 1
		}
		isIdle[workerId] = true
		mu.Unlock()
		return
	}

	if isIdle[workerId] {
		idleWorkerCount -= 1
	}
	isIdle[workerId] = false

	var job int
	job, jobQueue = jobQueue[0], jobQueue[1:]
	mu.Unlock()

	for i := 0; i < job; i += 1 {
	}

	if rand.Intn(5) == 0 {
		mu.Lock()
		jobQueue = append(jobQueue, JOB_COUNTING_TO)
		mu.Unlock()
	}

}

func main() {

	// Filling up the queue with initial jobs
	for i := 0; i < NB_INITIAL_JOBS; i += 1 {
		jobQueue = append(jobQueue, JOB_COUNTING_TO)
	}

	var wg sync.WaitGroup
	for i := 0; i < NB_WORKERS; i += 1 {
		wg.Add(1)
		go func(workerId int) {
			for idleWorkerCount != NB_WORKERS {
				doJob(workerId)
			}
			wg.Done()
		}(i)
	}
	wg.Wait()
}

更多关于Golang中如何优雅实现任务队列空闲时自动停止程序的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

你好 @tcosmo

我刚刚看到你的问题,六天后这仍然是个问题吗?

关于你最初的假设和结论,我有两个疑问。

  1. 为什么工作者需要生成新任务,而不是仅仅处理他们接收到的任务?换句话说,是否可以在不允许工作者向队列添加新任务的情况下实现业务目标?
  2. 你说缓冲通道不起作用,因为如果所有工作者都因试图向已满的通道发送新任务而被阻塞,他们便无法接收新任务。这听起来好像工作者运行在单个 goroutine 中。那么,在工作者代码中添加一个额外的 goroutine 来将任务提交回队列怎么样?这样,即使发送 goroutine 被阻塞,工作者也能保持可接收新任务的状态。

更多关于Golang中如何优雅实现任务队列空闲时自动停止程序的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Golang中,可以使用sync.WaitGroup配合context.Context来实现优雅的任务队列空闲停止。以下是改进后的实现:

package main

import (
	"context"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000

type JobQueue struct {
	mu        sync.Mutex
	jobs      []int
	active    int32
	ctx       context.Context
	cancel    context.CancelFunc
	wg        sync.WaitGroup
	workerWg  sync.WaitGroup
}

func NewJobQueue() *JobQueue {
	ctx, cancel := context.WithCancel(context.Background())
	return &JobQueue{
		jobs:   make([]int, 0, NB_INITIAL_JOBS),
		ctx:    ctx,
		cancel: cancel,
	}
}

func (q *JobQueue) AddJob(job int) {
	q.mu.Lock()
	q.jobs = append(q.jobs, job)
	q.mu.Unlock()
	atomic.AddInt32(&q.active, 1)
}

func (q *JobQueue) GetJob() (int, bool) {
	q.mu.Lock()
	defer q.mu.Unlock()
	
	if len(q.jobs) == 0 {
		return 0, false
	}
	
	job := q.jobs[0]
	q.jobs = q.jobs[1:]
	return job, true
}

func (q *JobQueue) Worker(id int) {
	defer q.workerWg.Done()
	
	for {
		select {
		case <-q.ctx.Done():
			return
		default:
			job, ok := q.GetJob()
			if !ok {
				// 队列为空,等待一段时间再检查
				time.Sleep(10 * time.Millisecond)
				continue
			}
			
			// 执行作业
			atomic.AddInt32(&q.active, -1)
			for i := 0; i < job; i++ {
				// 模拟工作
			}
			
			// 有1/5概率添加新作业
			if rand.Intn(5) == 0 {
				q.AddJob(JOB_COUNTING_TO)
			}
		}
	}
}

func (q *JobQueue) Start() {
	q.workerWg.Add(NB_WORKERS)
	for i := 0; i < NB_WORKERS; i++ {
		go q.Worker(i)
	}
	
	// 监控协程:当队列为空且没有活跃作业时停止
	go func() {
		ticker := time.NewTicker(100 * time.Millisecond)
		defer ticker.Stop()
		
		for {
			select {
			case <-q.ctx.Done():
				return
			case <-ticker.C:
				q.mu.Lock()
				queueEmpty := len(q.jobs) == 0
				q.mu.Unlock()
				
				if queueEmpty && atomic.LoadInt32(&q.active) == 0 {
					// 等待一小段时间确认真的空闲
					time.Sleep(200 * time.Millisecond)
					q.mu.Lock()
					queueEmpty = len(q.jobs) == 0
					q.mu.Unlock()
					
					if queueEmpty && atomic.LoadInt32(&q.active) == 0 {
						q.cancel()
						return
					}
				}
			}
		}
	}()
}

func (q *JobQueue) Wait() {
	q.workerWg.Wait()
}

func main() {
	queue := NewJobQueue()
	
	// 添加初始作业
	for i := 0; i < NB_INITIAL_JOBS; i++ {
		queue.AddJob(JOB_COUNTING_TO)
	}
	
	queue.Start()
	queue.Wait()
}

另一种更简洁的实现使用通道和原子计数器:

package main

import (
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000

type WorkerPool struct {
	jobs      chan int
	active    int32
	wg        sync.WaitGroup
	stopOnce  sync.Once
	stopChan  chan struct{}
}

func NewWorkerPool(workers int, bufferSize int) *WorkerPool {
	return &WorkerPool{
		jobs:     make(chan int, bufferSize),
		stopChan: make(chan struct{}),
	}
}

func (p *WorkerPool) AddJob(job int) {
	select {
	case p.jobs <- job:
		atomic.AddInt32(&p.active, 1)
	case <-p.stopChan:
		return
	}
}

func (p *WorkerPool) worker(id int) {
	defer p.wg.Done()
	
	for {
		select {
		case <-p.stopChan:
			return
		case job, ok := <-p.jobs:
			if !ok {
				return
			}
			
			// 执行作业
			atomic.AddInt32(&p.active, -1)
			for i := 0; i < job; i++ {
				// 模拟工作
			}
			
			// 有1/5概率添加新作业
			if rand.Intn(5) == 0 {
				p.AddJob(JOB_COUNTING_TO)
			}
		}
	}
}

func (p *WorkerPool) Start(workers int) {
	p.wg.Add(workers)
	for i := 0; i < workers; i++ {
		go p.worker(i)
	}
	
	// 空闲检测协程
	go p.monitor()
}

func (p *WorkerPool) monitor() {
	ticker := time.NewTicker(50 * time.Millisecond)
	defer ticker.Stop()
	
	for {
		select {
		case <-p.stopChan:
			return
		case <-ticker.C:
			if len(p.jobs) == 0 && atomic.LoadInt32(&p.active) == 0 {
				// 双重检查避免竞态条件
				time.Sleep(100 * time.Millisecond)
				if len(p.jobs) == 0 && atomic.LoadInt32(&p.active) == 0 {
					p.Stop()
					return
				}
			}
		}
	}
}

func (p *WorkerPool) Stop() {
	p.stopOnce.Do(func() {
		close(p.stopChan)
		close(p.jobs)
	})
}

func (p *WorkerPool) Wait() {
	p.wg.Wait()
}

func main() {
	pool := NewWorkerPool(NB_WORKERS, NB_INITIAL_JOBS*2)
	
	// 添加初始作业
	for i := 0; i < NB_INITIAL_JOBS; i++ {
		pool.AddJob(JOB_COUNTING_TO)
	}
	
	pool.Start(NB_WORKERS)
	pool.Wait()
}

关于性能问题,当工人数量很大时(300000),每个工人都在竞争互斥锁,导致大量的锁竞争。解决方案是使用工作窃取(work stealing)模式:

package main

import (
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000

type WorkStealingPool struct {
	queues    []*deque
	active    int32
	stopChan  chan struct{}
	wg        sync.WaitGroup
}

type deque struct {
	mu    sync.Mutex
	jobs  []int
}

func NewWorkStealingPool(workers int) *WorkStealingPool {
	queues := make([]*deque, workers)
	for i := range queues {
		queues[i] = &deque{
			jobs: make([]int, 0, 32),
		}
	}
	
	return &WorkStealingPool{
		queues:   queues,
		stopChan: make(chan struct{}),
	}
}

func (p *WorkStealingPool) worker(id int) {
	defer p.wg.Done()
	
	for {
		select {
		case <-p.stopChan:
			return
		default:
			// 尝试从自己的队列获取工作
			if job, ok := p.getJobFromQueue(id); ok {
				atomic.AddInt32(&p.active, 1)
				p.processJob(job, id)
				atomic.AddInt32(&p.active, -1)
				continue
			}
			
			// 工作窃取:从其他队列获取工作
			stolen := false
			for i := 0; i < len(p.queues); i++ {
				if i == id {
					continue
				}
				if job, ok := p.stealJob(i); ok {
					atomic.AddInt32(&p.active, 1)
					p.processJob(job, id)
					atomic.AddInt32(&p.active, -1)
					stolen = true
					break
				}
			}
			
			if !stolen {
				// 没有工作可做,短暂休眠
				time.Sleep(10 * time.Millisecond)
				
				// 检查是否应该停止
				if atomic.LoadInt32(&p.active) == 0 {
					select {
					case <-p.stopChan:
						return
					default:
					}
				}
			}
		}
	}
}

func (p *WorkStealingPool) getJobFromQueue(id int) (int, bool) {
	q := p.queues[id]
	q.mu.Lock()
	defer q.mu.Unlock()
	
	if len(q.jobs) == 0 {
		return 0, false
	}
	
	job := q.jobs[0]
	q.jobs = q.jobs[1:]
	return job, true
}

func (p *WorkStealingPool) stealJob(victim int) (int, bool) {
	q := p.queues[victim]
	q.mu.Lock()
	defer q.mu.Unlock()
	
	if len(q.jobs) == 0 {
		return 0, false
	}
	
	job := q.jobs[len(q.jobs)-1]
	q.jobs = q.jobs[:len(q.jobs)-1]
	return job, true
}

func (p *WorkStealingPool) processJob(job int, workerId int) {
	// 执行作业
	for i := 0; i < job; i++ {
		// 模拟工作
	}
	
	// 有1/5概率添加新作业到随机队列
	if rand.Intn(5) == 0 {
		target := rand.Intn(len(p.queues))
		q := p.queues[target]
		q.mu.Lock()
		q.jobs = append(q.jobs, JOB_COUNTING_TO)
		q.mu.Unlock()
	}
}

func (p *WorkStealingPool) Start(workers int) {
	p.wg.Add(workers)
	for i := 0; i < workers; i++ {
		go p.worker(i)
	}
	
	// 初始化作业分配到各个队列
	for i := 0; i < NB_INITIAL_JOBS; i++ {
		target := i % workers
		q := p.queues[target]
		q.mu.Lock()
		q.jobs = append(q.jobs, JOB_COUNTING_TO)
		q.mu.Unlock()
	}
	
	// 监控协程
	go p.monitor()
}

func (p *WorkStealingPool) monitor() {
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()
	
	idleCount := 0
	for {
		select {
		case <-p.stopChan:
			return
		case <-ticker.C:
			if atomic.LoadInt32(&p.active) == 0 {
				idleCount++
				if idleCount >= 3 { // 连续3次检测到空闲
					close(p.stopChan)
					return
				}
			} else {
				idleCount = 0
			}
		}
	}
}

func (p *WorkStealingPool) Wait() {
	p.wg.Wait()
}

func main() {
	pool := NewWorkStealingPool(NB_WORKERS)
	pool.Start(NB_WORKERS)
	pool.Wait()
}

这些实现通过以下方式解决原问题:

  1. 使用原子计数器跟踪活跃作业数
  2. 通过定期检测队列状态和活跃作业数来判断空闲
  3. 工作窃取模式减少锁竞争,提高大量worker时的性能
  4. 使用context或channel实现优雅停止
回到顶部