Golang中如何追踪任务序列的执行进度

Golang中如何追踪任务序列的执行进度 我正在编写一个Go程序,该程序执行一系列需要几分钟才能完成的任务。其中一些任务本身由多个并行子任务组成,只有所有子任务都完成后,整个任务才被视为"完成"。

我希望构建一个库,允许这些独立任务报告进度,然后将进度汇总成某种形式,以便外部观察者能够了解整个流程的总体进度,例如以0到1之间的浮点数形式表示。

目前各个任务并不知道自己在整个流程中的位置,因此虽然它们可以报告自身进度,但要计算总体进度,还需要知道已经完成了多少任务以及还有多少任务待完成。此外,某些任务预计比其他任务耗时更长,应该提供某种方式来设置手动预期值,并将其用于总体进度的计算。

另外,在计算一组并行子任务的进度时,每个子任务可以报告自身进度,但这些子任务可能不会同时完成,因此我需要一种合理的方法来计算这种情况下的总体进度。

还有更多问题需要考虑。这个领域是否已有现成的解决方案?


更多关于Golang中如何追踪任务序列的执行进度的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复

不要使用 WaitGroup(如某个回复中的示例),而是使用一个通道,在子任务完成时向该通道发送内容(空结构体即可)。如果你知道有多少个步骤,只需从通道接收数据就能轻松计算进度。

更多关于Golang中如何追踪任务序列的执行进度的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


是的,我知道等待组和协程 - 我真正需要的是能够跟踪整个流水线的进度(例如,如果 SubTask 需要几分钟才能完成,我希望从 main 函数获得一个 0…100 的进度指示器来显示完成进度)

检查一下:

package main

import (
        "log"
        "sync"
)

func RootTask(task int, wg *sync.WaitGroup) {
        for i := 0; i < 5; i++ {
                SubTask(task, i)
        }
        log.Println("Task", task, "finished with all sub tasks!")
        wg.Done()
        return
}

func SubTask(root int, i int) {
        log.Println("sub Task", i, "of root", root, "Running")

}
func main() {
        var wg sync.WaitGroup
        for i := 0; i < 5; i++ {
                wg.Add(1)
                go RootTask(i, &wg)
        }
        wg.Wait()
        log.Println("All root tasks finished!")
}

GitHub GitHub

头像

opentracing/opentracing-go

opentracing-go - Go语言的OpenTracing API


GitHub GitHub

头像

jaegertracing/jaeger-client-go

jaeger-client-go - Go OpenTracing API的Jaeger绑定实现

我不知道这个领域是否已有现成的包,但我会设计类似这样的接口:

type ProgressTracker interface {
  func NewTask(weight float64) Task
  func Current() float64
  func Done() bool
}

type Task interface {
  Progress(v float64) // 0.0 到 1.0,1.0 表示任务完成
  Done() // 由于浮点数精度问题,最好在任务完成时提供明确指示

假设你有几个权重不同(即预期耗时不同)的任务和对应的处理例程,你可以这样操作:

p := NewProgressTracker()

t1 := p.NewTask(5.0)
go doSomeTask(t1, ...)

t2 := p.NewTask(2.0)
go doSomeOtherTask(t2, ...)

for !p.Done() {
  fmt.Println("Current progress: %.1f %%", p.Current() * 100)
  time.Sleep(time.Second)
}

实际的任务例程会这样实现:

func doSomeTask(t Task, ...) {
  defer t.Done()
  for {
    // 工作强度
    t.Progress(...)
  }
}

当然,Progress 和 Task 的具体实现就留给读者作为练习了 🙂。Progress.Current() 方法只需将各个任务的进度乘以权重再除以总值求和,Done() 方法则是所有任务 Done() 的 && 运算结果。

可以设想其他包含上下文和等待组等元素的接口设计。也许你可以注册一个等待 waitgroup 的任务,这样实际任务如果是短期运行的,就不需要理解 Task 接口。也许 Progress 应该包含等待组处理,以便你可以 Wait() 等待其完成。

还可以有其他 Task 的实现方式,比如那些具有总大小并实现 Reader 和 Writer 来跟踪读写进度的实现。诸如此类。

在Go中实现任务序列进度追踪可以通过构建一个进度管理器来处理。以下是一个完整的解决方案示例:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// ProgressReporter 定义进度报告接口
type ProgressReporter interface {
    ReportProgress(progress float64)
}

// TaskProgress 表示单个任务的进度
type TaskProgress struct {
    ID         string
    Weight     float64  // 任务权重
    Progress   float64  // 0-1之间的进度
    SubTasks   []*TaskProgress
    IsParallel bool
    mu         sync.RWMutex
}

// ProgressManager 管理整体进度
type ProgressManager struct {
    tasks      []*TaskProgress
    totalWeight float64
    listener   func(overallProgress float64)
    mu         sync.RWMutex
}

// NewProgressManager 创建新的进度管理器
func NewProgressManager() *ProgressManager {
    return &ProgressManager{
        tasks:      make([]*TaskProgress, 0),
        totalWeight: 0,
    }
}

// AddTask 添加任务到进度管理器
func (pm *ProgressManager) AddTask(id string, weight float64, isParallel bool) *TaskProgress {
    pm.mu.Lock()
    defer pm.mu.Unlock()
    
    task := &TaskProgress{
        ID:         id,
        Weight:     weight,
        Progress:   0,
        SubTasks:   make([]*TaskProgress, 0),
        IsParallel: isParallel,
    }
    
    pm.tasks = append(pm.tasks, task)
    pm.totalWeight += weight
    
    return task
}

// AddSubTask 为任务添加子任务
func (tp *TaskProgress) AddSubTask(id string, weight float64) *TaskProgress {
    tp.mu.Lock()
    defer tp.mu.Unlock()
    
    subTask := &TaskProgress{
        ID:       id,
        Weight:   weight,
        Progress: 0,
        SubTasks: make([]*TaskProgress, 0),
    }
    
    tp.SubTasks = append(tp.SubTasks, subTask)
    return subTask
}

// ReportProgress 报告任务进度
func (tp *TaskProgress) ReportProgress(progress float64) {
    tp.mu.Lock()
    defer tp.mu.Unlock()
    
    if progress < 0 {
        progress = 0
    }
    if progress > 1 {
        progress = 1
    }
    
    tp.Progress = progress
}

// CalculateOverallProgress 计算整体进度
func (pm *ProgressManager) CalculateOverallProgress() float64 {
    pm.mu.RLock()
    defer pm.mu.RUnlock()
    
    if pm.totalWeight == 0 {
        return 0
    }
    
    var totalProgress float64
    for _, task := range pm.tasks {
        taskProgress := task.calculateTaskProgress()
        totalProgress += taskProgress * task.Weight
    }
    
    return totalProgress / pm.totalWeight
}

// calculateTaskProgress 计算单个任务的进度(考虑子任务)
func (tp *TaskProgress) calculateTaskProgress() float64 {
    tp.mu.RLock()
    defer tp.mu.RUnlock()
    
    if len(tp.SubTasks) == 0 {
        return tp.Progress
    }
    
    if tp.IsParallel {
        // 并行子任务:取平均进度
        var totalProgress float64
        var totalWeight float64
        
        for _, subTask := range tp.SubTasks {
            subTaskProgress := subTask.calculateTaskProgress()
            totalProgress += subTaskProgress * subTask.Weight
            totalWeight += subTask.Weight
        }
        
        if totalWeight == 0 {
            return 0
        }
        return totalProgress / totalWeight
    } else {
        // 串行子任务:取最小进度(最慢的任务决定整体)
        minProgress := 1.0
        for _, subTask := range tp.SubTasks {
            subTaskProgress := subTask.calculateTaskProgress()
            if subTaskProgress < minProgress {
                minProgress = subTaskProgress
            }
        }
        return minProgress
    }
}

// SetProgressListener 设置进度监听器
func (pm *ProgressManager) SetProgressListener(listener func(overallProgress float64)) {
    pm.listener = listener
}

// NotifyListener 通知监听器(如果设置)
func (pm *ProgressManager) NotifyListener() {
    if pm.listener != nil {
        progress := pm.CalculateOverallProgress()
        pm.listener(progress)
    }
}

// 使用示例
func main() {
    pm := NewProgressManager()
    
    // 设置进度监听器
    pm.SetProgressListener(func(progress float64) {
        fmt.Printf("总体进度: %.2f%%\n", progress*100)
    })
    
    // 添加主任务
    task1 := pm.AddTask("data_processing", 0.4, false)
    task2 := pm.AddTask("file_upload", 0.3, true)
    task3 := pm.AddTask("validation", 0.3, false)
    
    // 为任务1添加串行子任务
    subTask1_1 := task1.AddSubTask("data_extraction", 0.4)
    subTask1_2 := task1.AddSubTask("data_transformation", 0.6)
    
    // 为任务2添加并行子任务
    subTask2_1 := task2.AddSubTask("upload_file1", 0.5)
    subTask2_2 := task2.AddSubTask("upload_file2", 0.5)
    
    // 模拟任务执行
    go func() {
        // 任务1的子任务
        for i := 0; i <= 10; i++ {
            subTask1_1.ReportProgress(float64(i) / 10)
            pm.NotifyListener()
            time.Sleep(100 * time.Millisecond)
        }
        
        for i := 0; i <= 10; i++ {
            subTask1_2.ReportProgress(float64(i) / 10)
            pm.NotifyListener()
            time.Sleep(150 * time.Millisecond)
        }
    }()
    
    go func() {
        // 任务2的并行子任务
        for i := 0; i <= 5; i++ {
            subTask2_1.ReportProgress(float64(i) / 5)
            subTask2_2.ReportProgress(float64(i) / 5)
            pm.NotifyListener()
            time.Sleep(200 * time.Millisecond)
        }
    }()
    
    go func() {
        // 任务3
        for i := 0; i <= 10; i++ {
            task3.ReportProgress(float64(i) / 10)
            pm.NotifyListener()
            time.Sleep(120 * time.Millisecond)
        }
    }()
    
    // 等待所有任务完成
    time.Sleep(3 * time.Second)
    finalProgress := pm.CalculateOverallProgress()
    fmt.Printf("最终进度: %.2f%%\n", finalProgress*100)
}

这个实现提供了:

  1. 权重支持:每个任务可以设置不同的权重来反映其重要性或耗时
  2. 并行/串行任务处理:并行任务取平均进度,串行任务取最慢进度
  3. 嵌套任务支持:支持多级子任务
  4. 线程安全:使用读写锁保护并发访问
  5. 进度监听:可以设置回调函数实时获取进度更新

对于现成的解决方案,社区中有一些相关库:

  • github.com/vbauerster/mpb - 多进度条库
  • github.com/cheggaaa/pb - 进度条库
  • github.com/schollz/progressbar - 简单进度条

但这些库主要专注于进度条显示,对于复杂的嵌套任务进度追踪,通常需要根据具体需求定制实现。

回到顶部