golang高效控制Goroutine并发数量的线程池管理插件go-workerpool的使用

Golang高效控制Goroutine并发数量的线程池管理插件go-workerpool的使用

介绍

go-workerpool是一个受Java线程池启发的Golang工作池库,旨在控制大量的Goroutine并发。它提供了高效的方式来管理并发任务,避免资源耗尽。

安装

安装该库的最简单方法是运行:

go get github.com/zenthangplus/go-workerpool

基本示例

package main

import (
	"fmt"
	"github.com/zenthangplus/go-workerpool"
)

func main() {
	// 初始化具有3个工作线程的线程池
	pool := workerpool.NewFixedSize(3)

	// 启动线程池
	pool.Start()

	// Submit会阻塞直到池队列中有可用槽位
	// 提交一个可识别的工作,ID将随机生成(使用UUID)
	pool.Submit(workerpool.NewIdentifiableJob(func() {
		// 执行耗时任务
	}))
	
	// 使用NewCustomIdentifierJob如果你不想随机生成ID
	pool.Submit(workerpool.NewCustomIdentifierJob("custom-id", func() { 
		// 执行耗时任务
	}))
	
	// 或者提交一个没有标识符的简单函数
	pool.SubmitFunc(func() { // Submit(FuncJob(func() {}))的简化方式
		// 执行耗时任务
	})

	// SubmitConfidently以自信模式提交工作
	// 当池队列满时,此函数将返回ErrPoolFull
	err := pool.SubmitConfidently(workerpool.NewIdentifiableJob(func() {
		// 执行耗时任务
	}))
	if err == workerpool.ErrPoolFull {
		fmt.Println("池已满")
	}
}

高级用法

package main

import (
	"fmt"
	"github.com/zenthangplus/go-workerpool"
)

func main() {
	// 初始化固定大小的线程池,例如:3个工作线程并发运行
	pool := workerpool.NewFixedSize(3)

	// 或者使用自定义选项初始化固定大小的线程池
	pool = workerpool.NewFixedSize(3,
		// 自定义模式
		workerpool.WithMode(workerpool.FixedSize),
		
		// 自定义工作线程数量
		workerpool.WithNumberWorkers(5),
		
		// 自定义容量
		workerpool.WithCapacity(6),
		
		// 自定义日志函数
		workerpool.WithLogFunc(func(msgFormat string, args ...interface{}) {
			fmt.Printf(msgFormat+"\n", args...)
		}),
	)

	// 启动线程池
	pool.Start()
	
	// 初始化一个ID随机生成的功能性工作
	job1 := workerpool.NewIdentifiableJob(func() {})

	// 初始化一个预定义ID的功能性工作
	job2 := workerpool.NewCustomIdentifierJob("test-an-id", func() {})

	// 以正常模式提交工作,它将阻塞直到池中有可用槽位
	pool.Submit(job1)
	
	// 或者提交一个简单函数
	pool.SubmitFunc(func() {})
	
	// 以自信模式提交工作,当池满时将返回ErrPoolFull
	err := pool.SubmitConfidently(job2)
	if err != nil {
		fmt.Print(err)
	}
}

// CompressDirJob
// 你可以通过实现`Job`接口来创建自定义工作
type CompressDirJob struct {
	directory string
}

func NewCompressDirJob(directory string) *CompressDirJob {
	return &CompressDirJob{directory: directory}
}

func (c CompressDirJob) Id() string {
	return "directory-" + c.directory
}

func (c CompressDirJob) Exec() {
	// 执行目录压缩
}

主要功能

  1. 固定大小线程池:可以控制并发Goroutine的数量
  2. 多种提交方式
    • Submit():阻塞直到有可用槽位
    • SubmitConfidently():立即返回,池满时返回错误
    • SubmitFunc():提交简单函数的便捷方法
  3. 工作标识:可以为每个工作分配唯一ID
  4. 自定义工作:通过实现Job接口创建自定义工作类型
  5. 可配置选项:可以自定义工作线程数量、队列容量、日志函数等

这个库非常适合需要控制Goroutine并发数量的场景,避免资源耗尽,同时保持高效的并发处理能力。


更多关于golang高效控制Goroutine并发数量的线程池管理插件go-workerpool的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效控制Goroutine并发数量的线程池管理插件go-workerpool的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Go-WorkerPool: 高效控制Goroutine并发数量的线程池管理

在Go语言中,虽然goroutine非常轻量级,但在实际开发中我们仍然需要控制并发数量,以避免资源耗尽或系统过载。下面我将介绍如何使用go-workerpool这个库来高效管理goroutine并发。

安装go-workerpool

go get github.com/gammazero/workerpool

基本使用方法

package main

import (
	"fmt"
	"time"
	
	"github.com/gammazero/workerpool"
)

func main() {
	// 创建一个最大并发数为3的worker pool
	wp := workerpool.New(3)
	
	// 提交5个任务
	for i := 0; i < 5; i++ {
		id := i
		wp.Submit(func() {
			fmt.Printf("任务 %d 开始\n", id)
			time.Sleep(1 * time.Second) // 模拟耗时操作
			fmt.Printf("任务 %d 完成\n", id)
		})
	}
	
	// 等待所有任务完成
	wp.StopWait()
	fmt.Println("所有任务完成")
}

高级功能

1. 动态调整worker数量

func main() {
	wp := workerpool.New(2)
	
	// 动态增加worker数量
	wp.SetSize(5)
	
	// 提交10个任务
	for i := 0; i < 10; i++ {
		id := i
		wp.Submit(func() {
			fmt.Printf("处理任务 %d\n", id)
			time.Sleep(500 * time.Millisecond)
		})
	}
	
	// 完成后减少worker数量
	wp.SetSize(2)
	wp.StopWait()
}

2. 带超时的任务提交

func main() {
	wp := workerpool.New(2)
	
	// 提交带超时的任务
	err := wp.SubmitWithTimeout(100*time.Millisecond, func() {
		time.Sleep(200 * time.Millisecond) // 这个任务会超时
		fmt.Println("这个任务会超时")
	})
	
	if err != nil {
		fmt.Println("任务提交超时:", err)
	}
	
	wp.StopWait()
}

3. 获取当前worker数量

func main() {
	wp := workerpool.New(5)
	
	fmt.Println("当前worker数量:", wp.Size())
	
	wp.SetSize(3)
	fmt.Println("调整后worker数量:", wp.Size())
	
	wp.StopWait()
}

实际应用示例

package main

import (
	"fmt"
	"net/http"
	"sync/atomic"
	"time"
	
	"github.com/gammazero/workerpool"
)

func main() {
	wp := workerpool.New(10) // 限制并发请求数为10
	var totalRequests int32
	
	// 模拟100个HTTP请求
	for i := 0; i < 100; i++ {
		url := fmt.Sprintf("http://example.com/page%d", i)
		wp.Submit(func() {
			resp, err := http.Get(url)
			if err != nil {
				fmt.Printf("请求 %s 失败: %v\n", url, err)
				return
			}
			defer resp.Body.Close()
			
			atomic.AddInt32(&totalRequests, 1)
			fmt.Printf("成功请求 %s, 状态码: %d\n", url, resp.StatusCode)
		})
	}
	
	wp.StopWait()
	fmt.Printf("总共完成 %d 个请求\n", totalRequests)
}

性能考虑

  1. worker数量选择

    • CPU密集型任务:通常设置为CPU核心数
    • IO密集型任务:可以设置更高的数量,如CPU核心数的2-5倍
  2. 任务队列

    • go-workerpool使用无界队列,适合任务量不确定但需要控制并发数的场景
    • 如果内存有限,可以考虑有界队列的实现
  3. 错误处理

    • 确保任务函数内部有适当的错误处理
    • 可以使用channel收集任务执行结果和错误

替代方案

除了go-workerpool,还有其他一些goroutine池实现:

  1. ants:高性能goroutine池
  2. tunny:另一个goroutine池实现
  3. 自实现简单池
func SimplePool(size int, tasks <-chan func()) {
	var wg sync.WaitGroup
	for i := 0; i < size; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for task := range tasks {
				task()
			}
		}()
	}
	wg.Wait()
}

总结

go-workerpool是一个简单易用但功能强大的goroutine池实现,特别适合需要控制并发数量的场景。它提供了动态调整worker数量、带超时的任务提交等实用功能,能够帮助开发者更好地管理系统资源。

回到顶部