Golang中如何跟踪并控制WorkerPool中特定Job/Task的状态(暂停/恢复/终止)?

Golang中如何跟踪并控制WorkerPool中特定Job/Task的状态(暂停/恢复/终止)? 例如: 在短信发送中,发送一条短信需要以下流程:

  1. 验证手机号码
  2. 检查用户余额并扣费
  3. 选择合适的短信服务提供商
  4. 派发给提供商

假设有100个用户正在发送短信,我需要跟踪用户A的任务,并希望在其流程的第2步暂停他的发送过程。 我希望能够从外部按用户管理任务。 准确地说,是否可以在工作池中管理任务/作业?

任何建议都会有帮助 🙂

2 回复

你尝试过什么?听起来 context.Context 正是你所需要的。

更多关于Golang中如何跟踪并控制WorkerPool中特定Job/Task的状态(暂停/恢复/终止)?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中,可以通过context和channel来实现对WorkerPool中特定任务的跟踪和控制。以下是一个示例实现:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type JobStatus int

const (
	JobPending JobStatus = iota
	JobRunning
	JobPaused
	JobCompleted
	JobCancelled
)

type Job struct {
	ID       string
	UserID   string
	Status   JobStatus
	Data     interface{}
	ctx      context.Context
	cancel   context.CancelFunc
	pauseCh  chan struct{}
	resumeCh chan struct{}
	mu       sync.RWMutex
}

type JobManager struct {
	jobs     map[string]*Job
	jobQueue chan *Job
	mu       sync.RWMutex
}

func NewJobManager(workerCount int) *JobManager {
	jm := &JobManager{
		jobs:     make(map[string]*Job),
		jobQueue: make(chan *Job, 100),
	}
	
	for i := 0; i < workerCount; i++ {
		go jm.worker(i)
	}
	
	return jm
}

func (jm *JobManager) AddJob(userID, jobID string, data interface{}) {
	ctx, cancel := context.WithCancel(context.Background())
	
	job := &Job{
		ID:       jobID,
		UserID:   userID,
		Status:   JobPending,
		Data:     data,
		ctx:      ctx,
		cancel:   cancel,
		pauseCh:  make(chan struct{}),
		resumeCh: make(chan struct{}),
	}
	
	jm.mu.Lock()
	jm.jobs[jobID] = job
	jm.mu.Unlock()
	
	jm.jobQueue <- job
}

func (jm *JobManager) worker(id int) {
	for job := range jm.jobQueue {
		job.mu.Lock()
		job.Status = JobRunning
		job.mu.Unlock()
		
		fmt.Printf("Worker %d processing job %s for user %s\n", id, job.ID, job.UserID)
		
		// 模拟短信发送流程
		steps := []func(*Job) error{
			job.step1ValidatePhone,
			job.step2CheckBalance,
			job.step3SelectProvider,
			job.step4SendToProvider,
		}
		
		for i, step := range steps {
			select {
			case <-job.ctx.Done():
				job.mu.Lock()
				job.Status = JobCancelled
				job.mu.Unlock()
				fmt.Printf("Job %s cancelled\n", job.ID)
				return
			case <-job.pauseCh:
				job.mu.Lock()
				job.Status = JobPaused
				job.mu.Unlock()
				fmt.Printf("Job %s paused at step %d\n", job.ID, i+1)
				
				// 等待恢复信号
				<-job.resumeCh
				job.mu.Lock()
				job.Status = JobRunning
				job.mu.Unlock()
				fmt.Printf("Job %s resumed\n", job.ID)
			default:
				if err := step(job); err != nil {
					fmt.Printf("Job %s failed at step %d: %v\n", job.ID, i+1, err)
					return
				}
			}
		}
		
		job.mu.Lock()
		job.Status = JobCompleted
		job.mu.Unlock()
		fmt.Printf("Job %s completed\n", job.ID)
	}
}

func (j *Job) step1ValidatePhone() error {
	fmt.Printf("Job %s: Validating phone for user %s\n", j.ID, j.UserID)
	time.Sleep(500 * time.Millisecond)
	return nil
}

func (j *Job) step2CheckBalance() error {
	fmt.Printf("Job %s: Checking balance for user %s\n", j.ID, j.UserID)
	time.Sleep(500 * time.Millisecond)
	return nil
}

func (j *Job) step3SelectProvider() error {
	fmt.Printf("Job %s: Selecting provider for user %s\n", j.ID, j.UserID)
	time.Sleep(500 * time.Millisecond)
	return nil
}

func (j *Job) step4SendToProvider() error {
	fmt.Printf("Job %s: Sending to provider for user %s\n", j.ID, j.UserID)
	time.Sleep(500 * time.Millisecond)
	return nil
}

func (jm *JobManager) PauseJob(jobID string) bool {
	jm.mu.RLock()
	job, exists := jm.jobs[jobID]
	jm.mu.RUnlock()
	
	if !exists {
		return false
	}
	
	job.mu.RLock()
	if job.Status != JobRunning {
		job.mu.RUnlock()
		return false
	}
	job.mu.RUnlock()
	
	select {
	case job.pauseCh <- struct{}{}:
		return true
	default:
		return false
	}
}

func (jm *JobManager) ResumeJob(jobID string) bool {
	jm.mu.RLock()
	job, exists := jm.jobs[jobID]
	jm.mu.RUnlock()
	
	if !exists {
		return false
	}
	
	job.mu.RLock()
	if job.Status != JobPaused {
		job.mu.RUnlock()
		return false
	}
	job.mu.RUnlock()
	
	select {
	case job.resumeCh <- struct{}{}:
		return true
	default:
		return false
	}
}

func (jm *JobManager) CancelJob(jobID string) bool {
	jm.mu.RLock()
	job, exists := jm.jobs[jobID]
	jm.mu.RUnlock()
	
	if !exists {
		return false
	}
	
	job.cancel()
	return true
}

func (jm *JobManager) GetJobStatus(jobID string) (JobStatus, bool) {
	jm.mu.RLock()
	job, exists := jm.jobs[jobID]
	jm.mu.RUnlock()
	
	if !exists {
		return JobPending, false
	}
	
	job.mu.RLock()
	status := job.Status
	job.mu.RUnlock()
	
	return status, true
}

func main() {
	jm := NewJobManager(3)
	
	// 添加示例任务
	for i := 1; i <= 5; i++ {
		jobID := fmt.Sprintf("job-%d", i)
		userID := fmt.Sprintf("user-%d", (i%3)+1) // 3个用户
		jm.AddJob(userID, jobID, "sms data")
	}
	
	// 等待一段时间让任务开始执行
	time.Sleep(300 * time.Millisecond)
	
	// 暂停特定任务
	if jm.PauseJob("job-2") {
		fmt.Println("Job-2 paused successfully")
	}
	
	time.Sleep(2 * time.Second)
	
	// 恢复任务
	if jm.ResumeJob("job-2") {
		fmt.Println("Job-2 resumed successfully")
	}
	
	// 取消特定任务
	if jm.CancelJob("job-4") {
		fmt.Println("Job-4 cancelled successfully")
	}
	
	// 检查任务状态
	if status, exists := jm.GetJobStatus("job-2"); exists {
		fmt.Printf("Job-2 status: %v\n", status)
	}
	
	time.Sleep(3 * time.Second)
}

这个实现提供了以下功能:

  1. 任务状态跟踪:每个Job有自己的状态(Pending/Running/Paused/Completed/Cancelled)

  2. 按用户管理:Job结构包含UserID字段,可以通过用户ID过滤和管理任务

  3. 精确控制

    • PauseJob():在任务执行到任何步骤时暂停
    • ResumeJob():恢复暂停的任务
    • CancelJob():终止任务执行
    • GetJobStatus():获取任务当前状态
  4. Worker池管理:多个worker并发处理任务队列

关键设计点:

  • 使用context实现任务取消
  • 使用channel实现暂停/恢复机制
  • 每个步骤都检查暂停和取消信号
  • 线程安全的map存储任务状态
  • 支持按用户ID进行任务分组和查询

这个方案可以扩展支持按用户ID查询所有任务、批量操作等功能。

回到顶部