Golang中如何追踪任务序列的执行进度
Golang中如何追踪任务序列的执行进度 我正在编写一个Go程序,该程序执行一系列需要几分钟才能完成的任务。其中一些任务本身由多个并行子任务组成,只有所有子任务都完成后,整个任务才被视为"完成"。
我希望构建一个库,允许这些独立任务报告进度,然后将进度汇总成某种形式,以便外部观察者能够了解整个流程的总体进度,例如以0到1之间的浮点数形式表示。
目前各个任务并不知道自己在整个流程中的位置,因此虽然它们可以报告自身进度,但要计算总体进度,还需要知道已经完成了多少任务以及还有多少任务待完成。此外,某些任务预计比其他任务耗时更长,应该提供某种方式来设置手动预期值,并将其用于总体进度的计算。
另外,在计算一组并行子任务的进度时,每个子任务可以报告自身进度,但这些子任务可能不会同时完成,因此我需要一种合理的方法来计算这种情况下的总体进度。
还有更多问题需要考虑。这个领域是否已有现成的解决方案?
更多关于Golang中如何追踪任务序列的执行进度的实战教程也可以访问 https://www.itying.com/category-94-b0.html
不要使用 WaitGroup(如某个回复中的示例),而是使用一个通道,在子任务完成时向该通道发送内容(空结构体即可)。如果你知道有多少个步骤,只需从通道接收数据就能轻松计算进度。
更多关于Golang中如何追踪任务序列的执行进度的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
是的,我知道等待组和协程 - 我真正需要的是能够跟踪整个流水线的进度(例如,如果 SubTask 需要几分钟才能完成,我希望从 main 函数获得一个 0…100 的进度指示器来显示完成进度)
opentracing/opentracing-go
opentracing-go - Go语言的OpenTracing API
jaegertracing/jaeger-client-go
jaeger-client-go - Go OpenTracing API的Jaeger绑定实现
我不知道这个领域是否已有现成的包,但我会设计类似这样的接口:
type ProgressTracker interface {
func NewTask(weight float64) Task
func Current() float64
func Done() bool
}
type Task interface {
Progress(v float64) // 0.0 到 1.0,1.0 表示任务完成
Done() // 由于浮点数精度问题,最好在任务完成时提供明确指示
假设你有几个权重不同(即预期耗时不同)的任务和对应的处理例程,你可以这样操作:
p := NewProgressTracker()
t1 := p.NewTask(5.0)
go doSomeTask(t1, ...)
t2 := p.NewTask(2.0)
go doSomeOtherTask(t2, ...)
for !p.Done() {
fmt.Println("Current progress: %.1f %%", p.Current() * 100)
time.Sleep(time.Second)
}
实际的任务例程会这样实现:
func doSomeTask(t Task, ...) {
defer t.Done()
for {
// 工作强度
t.Progress(...)
}
}
当然,Progress 和 Task 的具体实现就留给读者作为练习了 🙂。Progress.Current() 方法只需将各个任务的进度乘以权重再除以总值求和,Done() 方法则是所有任务 Done() 的 && 运算结果。
可以设想其他包含上下文和等待组等元素的接口设计。也许你可以注册一个等待 waitgroup 的任务,这样实际任务如果是短期运行的,就不需要理解 Task 接口。也许 Progress 应该包含等待组处理,以便你可以 Wait() 等待其完成。
还可以有其他 Task 的实现方式,比如那些具有总大小并实现 Reader 和 Writer 来跟踪读写进度的实现。诸如此类。
在Go中实现任务序列进度追踪可以通过构建一个进度管理器来处理。以下是一个完整的解决方案示例:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// ProgressReporter 定义进度报告接口
type ProgressReporter interface {
ReportProgress(progress float64)
}
// TaskProgress 表示单个任务的进度
type TaskProgress struct {
ID string
Weight float64 // 任务权重
Progress float64 // 0-1之间的进度
SubTasks []*TaskProgress
IsParallel bool
mu sync.RWMutex
}
// ProgressManager 管理整体进度
type ProgressManager struct {
tasks []*TaskProgress
totalWeight float64
listener func(overallProgress float64)
mu sync.RWMutex
}
// NewProgressManager 创建新的进度管理器
func NewProgressManager() *ProgressManager {
return &ProgressManager{
tasks: make([]*TaskProgress, 0),
totalWeight: 0,
}
}
// AddTask 添加任务到进度管理器
func (pm *ProgressManager) AddTask(id string, weight float64, isParallel bool) *TaskProgress {
pm.mu.Lock()
defer pm.mu.Unlock()
task := &TaskProgress{
ID: id,
Weight: weight,
Progress: 0,
SubTasks: make([]*TaskProgress, 0),
IsParallel: isParallel,
}
pm.tasks = append(pm.tasks, task)
pm.totalWeight += weight
return task
}
// AddSubTask 为任务添加子任务
func (tp *TaskProgress) AddSubTask(id string, weight float64) *TaskProgress {
tp.mu.Lock()
defer tp.mu.Unlock()
subTask := &TaskProgress{
ID: id,
Weight: weight,
Progress: 0,
SubTasks: make([]*TaskProgress, 0),
}
tp.SubTasks = append(tp.SubTasks, subTask)
return subTask
}
// ReportProgress 报告任务进度
func (tp *TaskProgress) ReportProgress(progress float64) {
tp.mu.Lock()
defer tp.mu.Unlock()
if progress < 0 {
progress = 0
}
if progress > 1 {
progress = 1
}
tp.Progress = progress
}
// CalculateOverallProgress 计算整体进度
func (pm *ProgressManager) CalculateOverallProgress() float64 {
pm.mu.RLock()
defer pm.mu.RUnlock()
if pm.totalWeight == 0 {
return 0
}
var totalProgress float64
for _, task := range pm.tasks {
taskProgress := task.calculateTaskProgress()
totalProgress += taskProgress * task.Weight
}
return totalProgress / pm.totalWeight
}
// calculateTaskProgress 计算单个任务的进度(考虑子任务)
func (tp *TaskProgress) calculateTaskProgress() float64 {
tp.mu.RLock()
defer tp.mu.RUnlock()
if len(tp.SubTasks) == 0 {
return tp.Progress
}
if tp.IsParallel {
// 并行子任务:取平均进度
var totalProgress float64
var totalWeight float64
for _, subTask := range tp.SubTasks {
subTaskProgress := subTask.calculateTaskProgress()
totalProgress += subTaskProgress * subTask.Weight
totalWeight += subTask.Weight
}
if totalWeight == 0 {
return 0
}
return totalProgress / totalWeight
} else {
// 串行子任务:取最小进度(最慢的任务决定整体)
minProgress := 1.0
for _, subTask := range tp.SubTasks {
subTaskProgress := subTask.calculateTaskProgress()
if subTaskProgress < minProgress {
minProgress = subTaskProgress
}
}
return minProgress
}
}
// SetProgressListener 设置进度监听器
func (pm *ProgressManager) SetProgressListener(listener func(overallProgress float64)) {
pm.listener = listener
}
// NotifyListener 通知监听器(如果设置)
func (pm *ProgressManager) NotifyListener() {
if pm.listener != nil {
progress := pm.CalculateOverallProgress()
pm.listener(progress)
}
}
// 使用示例
func main() {
pm := NewProgressManager()
// 设置进度监听器
pm.SetProgressListener(func(progress float64) {
fmt.Printf("总体进度: %.2f%%\n", progress*100)
})
// 添加主任务
task1 := pm.AddTask("data_processing", 0.4, false)
task2 := pm.AddTask("file_upload", 0.3, true)
task3 := pm.AddTask("validation", 0.3, false)
// 为任务1添加串行子任务
subTask1_1 := task1.AddSubTask("data_extraction", 0.4)
subTask1_2 := task1.AddSubTask("data_transformation", 0.6)
// 为任务2添加并行子任务
subTask2_1 := task2.AddSubTask("upload_file1", 0.5)
subTask2_2 := task2.AddSubTask("upload_file2", 0.5)
// 模拟任务执行
go func() {
// 任务1的子任务
for i := 0; i <= 10; i++ {
subTask1_1.ReportProgress(float64(i) / 10)
pm.NotifyListener()
time.Sleep(100 * time.Millisecond)
}
for i := 0; i <= 10; i++ {
subTask1_2.ReportProgress(float64(i) / 10)
pm.NotifyListener()
time.Sleep(150 * time.Millisecond)
}
}()
go func() {
// 任务2的并行子任务
for i := 0; i <= 5; i++ {
subTask2_1.ReportProgress(float64(i) / 5)
subTask2_2.ReportProgress(float64(i) / 5)
pm.NotifyListener()
time.Sleep(200 * time.Millisecond)
}
}()
go func() {
// 任务3
for i := 0; i <= 10; i++ {
task3.ReportProgress(float64(i) / 10)
pm.NotifyListener()
time.Sleep(120 * time.Millisecond)
}
}()
// 等待所有任务完成
time.Sleep(3 * time.Second)
finalProgress := pm.CalculateOverallProgress()
fmt.Printf("最终进度: %.2f%%\n", finalProgress*100)
}
这个实现提供了:
- 权重支持:每个任务可以设置不同的权重来反映其重要性或耗时
- 并行/串行任务处理:并行任务取平均进度,串行任务取最慢进度
- 嵌套任务支持:支持多级子任务
- 线程安全:使用读写锁保护并发访问
- 进度监听:可以设置回调函数实时获取进度更新
对于现成的解决方案,社区中有一些相关库:
github.com/vbauerster/mpb- 多进度条库github.com/cheggaaa/pb- 进度条库github.com/schollz/progressbar- 简单进度条
但这些库主要专注于进度条显示,对于复杂的嵌套任务进度追踪,通常需要根据具体需求定制实现。


