Golang中如何跟踪并控制WorkerPool中特定Job/Task的状态(暂停/恢复/终止)?
Golang中如何跟踪并控制WorkerPool中特定Job/Task的状态(暂停/恢复/终止)? 例如: 在短信发送中,发送一条短信需要以下流程:
- 验证手机号码
- 检查用户余额并扣费
- 选择合适的短信服务提供商
- 派发给提供商
假设有100个用户正在发送短信,我需要跟踪用户A的任务,并希望在其流程的第2步暂停他的发送过程。 我希望能够从外部按用户管理任务。 准确地说,是否可以在工作池中管理任务/作业?
任何建议都会有帮助 🙂
2 回复
你尝试过什么?听起来 context.Context 正是你所需要的。
更多关于Golang中如何跟踪并控制WorkerPool中特定Job/Task的状态(暂停/恢复/终止)?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Go中,可以通过context和channel来实现对WorkerPool中特定任务的跟踪和控制。以下是一个示例实现:
package main
import (
"context"
"fmt"
"sync"
"time"
)
type JobStatus int
const (
JobPending JobStatus = iota
JobRunning
JobPaused
JobCompleted
JobCancelled
)
type Job struct {
ID string
UserID string
Status JobStatus
Data interface{}
ctx context.Context
cancel context.CancelFunc
pauseCh chan struct{}
resumeCh chan struct{}
mu sync.RWMutex
}
type JobManager struct {
jobs map[string]*Job
jobQueue chan *Job
mu sync.RWMutex
}
func NewJobManager(workerCount int) *JobManager {
jm := &JobManager{
jobs: make(map[string]*Job),
jobQueue: make(chan *Job, 100),
}
for i := 0; i < workerCount; i++ {
go jm.worker(i)
}
return jm
}
func (jm *JobManager) AddJob(userID, jobID string, data interface{}) {
ctx, cancel := context.WithCancel(context.Background())
job := &Job{
ID: jobID,
UserID: userID,
Status: JobPending,
Data: data,
ctx: ctx,
cancel: cancel,
pauseCh: make(chan struct{}),
resumeCh: make(chan struct{}),
}
jm.mu.Lock()
jm.jobs[jobID] = job
jm.mu.Unlock()
jm.jobQueue <- job
}
func (jm *JobManager) worker(id int) {
for job := range jm.jobQueue {
job.mu.Lock()
job.Status = JobRunning
job.mu.Unlock()
fmt.Printf("Worker %d processing job %s for user %s\n", id, job.ID, job.UserID)
// 模拟短信发送流程
steps := []func(*Job) error{
job.step1ValidatePhone,
job.step2CheckBalance,
job.step3SelectProvider,
job.step4SendToProvider,
}
for i, step := range steps {
select {
case <-job.ctx.Done():
job.mu.Lock()
job.Status = JobCancelled
job.mu.Unlock()
fmt.Printf("Job %s cancelled\n", job.ID)
return
case <-job.pauseCh:
job.mu.Lock()
job.Status = JobPaused
job.mu.Unlock()
fmt.Printf("Job %s paused at step %d\n", job.ID, i+1)
// 等待恢复信号
<-job.resumeCh
job.mu.Lock()
job.Status = JobRunning
job.mu.Unlock()
fmt.Printf("Job %s resumed\n", job.ID)
default:
if err := step(job); err != nil {
fmt.Printf("Job %s failed at step %d: %v\n", job.ID, i+1, err)
return
}
}
}
job.mu.Lock()
job.Status = JobCompleted
job.mu.Unlock()
fmt.Printf("Job %s completed\n", job.ID)
}
}
func (j *Job) step1ValidatePhone() error {
fmt.Printf("Job %s: Validating phone for user %s\n", j.ID, j.UserID)
time.Sleep(500 * time.Millisecond)
return nil
}
func (j *Job) step2CheckBalance() error {
fmt.Printf("Job %s: Checking balance for user %s\n", j.ID, j.UserID)
time.Sleep(500 * time.Millisecond)
return nil
}
func (j *Job) step3SelectProvider() error {
fmt.Printf("Job %s: Selecting provider for user %s\n", j.ID, j.UserID)
time.Sleep(500 * time.Millisecond)
return nil
}
func (j *Job) step4SendToProvider() error {
fmt.Printf("Job %s: Sending to provider for user %s\n", j.ID, j.UserID)
time.Sleep(500 * time.Millisecond)
return nil
}
func (jm *JobManager) PauseJob(jobID string) bool {
jm.mu.RLock()
job, exists := jm.jobs[jobID]
jm.mu.RUnlock()
if !exists {
return false
}
job.mu.RLock()
if job.Status != JobRunning {
job.mu.RUnlock()
return false
}
job.mu.RUnlock()
select {
case job.pauseCh <- struct{}{}:
return true
default:
return false
}
}
func (jm *JobManager) ResumeJob(jobID string) bool {
jm.mu.RLock()
job, exists := jm.jobs[jobID]
jm.mu.RUnlock()
if !exists {
return false
}
job.mu.RLock()
if job.Status != JobPaused {
job.mu.RUnlock()
return false
}
job.mu.RUnlock()
select {
case job.resumeCh <- struct{}{}:
return true
default:
return false
}
}
func (jm *JobManager) CancelJob(jobID string) bool {
jm.mu.RLock()
job, exists := jm.jobs[jobID]
jm.mu.RUnlock()
if !exists {
return false
}
job.cancel()
return true
}
func (jm *JobManager) GetJobStatus(jobID string) (JobStatus, bool) {
jm.mu.RLock()
job, exists := jm.jobs[jobID]
jm.mu.RUnlock()
if !exists {
return JobPending, false
}
job.mu.RLock()
status := job.Status
job.mu.RUnlock()
return status, true
}
func main() {
jm := NewJobManager(3)
// 添加示例任务
for i := 1; i <= 5; i++ {
jobID := fmt.Sprintf("job-%d", i)
userID := fmt.Sprintf("user-%d", (i%3)+1) // 3个用户
jm.AddJob(userID, jobID, "sms data")
}
// 等待一段时间让任务开始执行
time.Sleep(300 * time.Millisecond)
// 暂停特定任务
if jm.PauseJob("job-2") {
fmt.Println("Job-2 paused successfully")
}
time.Sleep(2 * time.Second)
// 恢复任务
if jm.ResumeJob("job-2") {
fmt.Println("Job-2 resumed successfully")
}
// 取消特定任务
if jm.CancelJob("job-4") {
fmt.Println("Job-4 cancelled successfully")
}
// 检查任务状态
if status, exists := jm.GetJobStatus("job-2"); exists {
fmt.Printf("Job-2 status: %v\n", status)
}
time.Sleep(3 * time.Second)
}
这个实现提供了以下功能:
-
任务状态跟踪:每个Job有自己的状态(Pending/Running/Paused/Completed/Cancelled)
-
按用户管理:Job结构包含UserID字段,可以通过用户ID过滤和管理任务
-
精确控制:
PauseJob():在任务执行到任何步骤时暂停ResumeJob():恢复暂停的任务CancelJob():终止任务执行GetJobStatus():获取任务当前状态
-
Worker池管理:多个worker并发处理任务队列
关键设计点:
- 使用context实现任务取消
- 使用channel实现暂停/恢复机制
- 每个步骤都检查暂停和取消信号
- 线程安全的map存储任务状态
- 支持按用户ID进行任务分组和查询
这个方案可以扩展支持按用户ID查询所有任务、批量操作等功能。

