golang简单高效的异步工作池插件库worker-pool的使用

Golang简单高效的异步工作池插件库worker-pool的使用

关于worker-pool

Worker pool是一种用于实现任务并发执行的软件设计模式。它维护多个等待分配任务的工作线程以实现并发执行。通过维护一个工作线程池,该模型提高了性能并避免了执行延迟。可以根据可用的计算资源调整可用工作线程的数量。

基本示例

下面是一个使用worker-pool的基本示例代码:

package main

import (
    "fmt"
    "sync"

    "github.com/vardius/worker-pool/v2"
)

func main() {
	var wg sync.WaitGroup

	poolSize := 1      // 池大小
	jobsAmount := 3    // 任务数量
	workersAmount := 2 // 工作线程数量

	// 创建新的工作池
	pool := workerpool.New(poolSize)
	out := make(chan int, jobsAmount)
	
	// 定义工作函数
	worker := func(i int) {
        defer wg.Done()
        out <- i
    }

	// 添加工作线程到池中
	for i := 1; i <= workersAmount; i++ {
		if err := pool.AddWorker(worker); err != nil {
			panic(err)
		}
	}

	wg.Add(jobsAmount)

	// 分配任务
	for i := 0; i < jobsAmount; i++ {
		if err := pool.Delegate(i); err != nil {
			panic(err)
		}
	}

	go func() {
		// 所有任务完成后停止工作线程
		wg.Wait()
		close(out)
		pool.Stop() // 停止会从池中移除所有工作线程,如需继续工作需重新添加
	}()

	sum := 0
	for n := range out {
		sum += n
	}

	fmt.Println(sum)
	// 输出:
	// 3
}

性能基准测试

在3.3 GHz Intel Core i7 CPU和16 GB 2133 MHz LPDDR3内存上的基准测试结果:

goos: darwin
goarch: amd64
pkg: github.com/vardius/worker-pool/v2
BenchmarkWorker1-4                	 3944299	       284 ns/op	      56 B/op	       3 allocs/op
BenchmarkWorker1Parallel-4        	 7394715	       138 ns/op	      48 B/op	       2 allocs/op
BenchmarkWorker100-4              	 1657569	       693 ns/op	      56 B/op	       3 allocs/op
BenchmarkWorker100Parallel-4      	 3673483	       368 ns/op	      48 B/op	       2 allocs/op
BenchmarkWorkerNumCPU-4           	 2590293	       445 ns/op	      56 B/op	       3 allocs/op
BenchmarkWorkerNumCPUParallel-4   	 3591553	       298 ns/op	      48 B/op	       2 allocs/op
PASS
ok  	github.com/vardius/worker-pool/v2	9.511s

使用说明

  1. 首先创建一个工作池实例:pool := workerpool.New(poolSize)
  2. 定义工作函数,它接收一个参数并执行具体任务
  3. 使用AddWorker方法将工作函数添加到池中
  4. 使用Delegate方法分配任务给工作线程
  5. 任务完成后使用Stop方法停止工作池

这个库提供了简单高效的方式来管理并发任务,适合需要处理大量异步任务的场景。


更多关于golang简单高效的异步工作池插件库worker-pool的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang简单高效的异步工作池插件库worker-pool的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang异步工作池worker-pool使用指南

worker-pool是一个简单高效的Golang异步工作池插件库,它可以帮助你管理并发任务,控制goroutine数量,避免资源耗尽。下面我将详细介绍如何使用这个库。

安装

首先安装worker-pool库:

go get github.com/gammazero/workerpool

基本使用

1. 创建和关闭工作池

package main

import (
	"fmt"
	"time"
	
	"github.com/gammazero/workerpool"
)

func main() {
	// 创建一个最多有5个worker的工作池
	wp := workerpool.New(5)
	
	// 确保在工作完成后关闭工作池
	defer wp.Stop()
	
	// 提交任务到工作池
	for i := 0; i < 10; i++ {
		num := i
		wp.Submit(func() {
			fmt.Printf("开始处理任务 %d\n", num)
			time.Sleep(1 * time.Second) // 模拟耗时操作
			fmt.Printf("完成任务 %d\n", num)
		})
	}
	
	// 等待所有任务完成
	wp.StopWait()
}

2. 控制并发数量

worker-pool会自动管理goroutine数量,不会超过你设置的worker数量:

func main() {
	// 限制最多2个并发
	wp := workerpool.New(2)
	defer wp.Stop()
	
	for i := 0; i < 5; i++ {
		id := i
		wp.Submit(func() {
			fmt.Printf("Worker %d 开始\n", id)
			time.Sleep(2 * time.Second)
			fmt.Printf("Worker %d 结束\n", id)
		})
	}
	
	wp.StopWait()
}

3. 处理任务返回值

如果需要获取任务执行结果,可以使用channel:

func main() {
	wp := workerpool.New(3)
	defer wp.Stop()
	
	resultChan := make(chan int, 10)
	
	for i := 0; i < 10; i++ {
		num := i
		wp.Submit(func() {
			// 模拟计算
			res := num * num
			resultChan <- res
		})
	}
	
	go func() {
		wp.StopWait()
		close(resultChan)
	}()
	
	// 收集结果
	for res := range resultChan {
		fmt.Println("结果:", res)
	}
}

高级功能

1. 动态调整worker数量

func main() {
	wp := workerpool.New(2)
	defer wp.Stop()
	
	// 动态调整worker数量
	wp.SetSize(4) // 增加到4个worker
	
	// 提交更多任务...
	
	wp.SetSize(1) // 减少到1个worker
}

2. 等待特定任务完成

func main() {
	wp := workerpool.New(3)
	defer wp.Stop()
	
	done := make(chan struct{})
	
	wp.Submit(func() {
		defer close(done)
		// 执行重要任务
		time.Sleep(2 * time.Second)
		fmt.Println("重要任务完成")
	})
	
	// 提交其他任务...
	
	// 等待重要任务完成
	<-done
}

3. 错误处理

func main() {
	wp := workerpool.New(4)
	defer wp.Stop()
	
	errChan := make(chan error, 10)
	
	for i := 0; i < 10; i++ {
		num := i
		wp.Submit(func() {
			if num == 3 {
				errChan <- fmt.Errorf("任务 %d 失败", num)
				return
			}
			fmt.Printf("任务 %d 成功\n", num)
		})
	}
	
	go func() {
		wp.StopWait()
		close(errChan)
	}()
	
	for err := range errChan {
		fmt.Println("错误:", err)
	}
}

性能考虑

  1. worker数量设置:通常设置为CPU核心数的2-4倍,具体取决于任务类型
  2. 任务粒度:任务不应太细粒度,否则调度开销可能超过实际工作
  3. 内存使用:大量任务时注意内存使用,可考虑限制队列大小

与其他库的比较

worker-pool相比标准库的sync.WaitGroup提供了更高级的控制:

  1. 自动限制并发goroutine数量
  2. 内置任务队列
  3. 动态调整worker数量
  4. 更简洁的API

相比其他工作池实现,worker-pool的特点是:

  • 轻量级
  • 无外部依赖
  • 良好的文档和测试覆盖率

总结

worker-pool是一个简单但功能强大的Golang异步工作池实现,适合需要控制并发数量的场景。通过合理设置worker数量,可以避免资源耗尽,同时保持较高的吞吐量。

以上示例展示了worker-pool的基本和高级用法,你可以根据实际需求进行调整和扩展。

回到顶部