golang高效goroutine管理与取消插件库pool的使用

Golang高效Goroutine管理与取消插件库Pool的使用

包介绍

Pool包实现了一个有限消费者goroutine或无限goroutine池,用于更简单的goroutine处理和取消。

Project status

特性

  • 使用极其简单,不对您的使用方式做任何假设
  • 从返回错误的消费者goroutine自动恢复

Pool v3相比v2的优势

  • 对象不再是接口,减少了未来可能出现的破坏性变更
  • 现在有两种完全可互换的Pool类型:有限工作池和无限池
  • 工作单元使用更简单,现在可以使用work.Wait()而不是<-work.Done

安装

使用go get安装:

go get gopkg.in/go-playground/pool.v3

然后在代码中导入:

import "gopkg.in/go-playground/pool.v3"

重要提示

  • 建议从调用函数中取消池或批处理,而不是在工作单元内部取消
  • 批处理时不要忘记调用batch.QueueComplete(),否则会导致死锁
  • 在阻塞操作后(如等待连接池中的连接)检查WorkUnit.IsCancelled()是您的责任

使用示例

有限池和无限池具有相同的签名并且完全可互换。

单个工作单元示例

package main

import (
	"fmt"
	"time"

	"gopkg.in/go-playground/pool.v3"
)

func main() {
	// 创建限制10个goroutine的池
	p := pool.NewLimited(10)
	defer p.Close()

	// 排队获取用户信息
	user := p.Queue(getUser(13))
	// 排队获取其他信息
	other := p.Queue(getOtherInfo(13))

	// 等待用户信息任务完成
	user.Wait()
	if err := user.Error(); err != nil {
		// 处理错误
	}

	// 使用返回的用户信息
	username := user.Value().(string)
	fmt.Println(username)

	// 等待其他信息任务完成
	other.Wait()
	if err := other.Error(); err != nil {
		// 处理错误
	}

	// 使用返回的其他信息
	otherInfo := other.Value().(string)
	fmt.Println(otherInfo)
}

// 获取用户信息的工作函数
func getUser(id int) pool.WorkFunc {
	return func(wu pool.WorkUnit) (interface{}, error) {
		// 模拟等待TCP连接建立或从连接池获取连接
		time.Sleep(time.Second * 1)

		if wu.IsCancelled() {
			// 返回值不会被使用
			return nil, nil
		}

		// 准备处理...
		return "Joeybloggs", nil
	}
}

// 获取其他信息的工作函数
func getOtherInfo(id int) pool.WorkFunc {
	return func(wu pool.WorkUnit) (interface{}, error) {
		// 模拟等待TCP连接建立或从连接池获取连接
		time.Sleep(time.Second * 1)

		if wu.IsCancelled() {
			// 返回值不会被使用
			return nil, nil
		}

		// 准备处理...
		return "Other Info", nil
	}
}

批处理工作示例

package main

import (
	"fmt"
	"time"

	"gopkg.in/go-playground/pool.v3"
)

func main() {
	// 创建限制10个goroutine的池
	p := pool.NewLimited(10)
	defer p.Close()

	// 创建批处理
	batch := p.Batch()

	// 为了最大速度,在另一个goroutine中排队
	// 这不是必须的,但在所有项目排队完成前不能开始读取结果
	go func() {
		for i := 0; i < 10; i++ {
			batch.Queue(sendEmail("email content"))
		}

		// 不要忘记这个,否则goroutine会死锁
		// 如果调用Cancel(),它会在内部调用QueueComplete()
		batch.QueueComplete()
	}()

	// 处理结果
	for email := range batch.Results() {
		if err := email.Error(); err != nil {
			// 处理错误
			// 可能需要调用batch.Cancel()
		}

		// 使用返回值
		fmt.Println(email.Value().(bool))
	}
}

// 发送邮件的工作函数
func sendEmail(email string) pool.WorkFunc {
	return func(wu pool.WorkUnit) (interface{}, error) {
		// 模拟等待TCP连接建立或从连接池获取连接
		time.Sleep(time.Second * 1)

		if wu.IsCancelled() {
			// 返回值不会被使用
			return nil, nil
		}

		// 准备处理...
		return true, nil // 一切正常,发送nil,如果有错误则发送错误
	}
}

性能基准

在MacBook Pro (Retina, 15-inch, Late 2013) 2.6 GHz Intel Core i7 16 GB 1600 MHz DDR3上使用Go 1.6.2运行测试。

一些基准测试结果:

  • BenchmarkLimitedSmallRun 在1.002492008秒内完成了10秒的处理
  • BenchmarkLimitedSmallCancel 运行20个作业,在第6个作业取消,运行时间为0秒
  • BenchmarkLimitedLargeCancel 运行1000个作业,在第6个作业取消,运行时间为0秒
  • BenchmarkLimitedOverconsumeLargeRun 使用25个工作线程运行100个作业,耗时4.027153081秒

许可证

根据MIT许可证分发,更多详情请参见代码中的许可证文件。


更多关于golang高效goroutine管理与取消插件库pool的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效goroutine管理与取消插件库pool的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang高效Goroutine管理与取消:Pool库使用指南

在Go语言中,goroutine的轻量级特性使得并发编程变得简单,但如果不加以管理,可能会导致goroutine泄漏或资源耗尽问题。本文将介绍如何使用pool库来高效管理goroutine的生命周期和取消机制。

为什么需要goroutine池

  1. 控制并发数量,防止资源耗尽
  2. 复用goroutine,减少创建销毁开销
  3. 统一管理goroutine生命周期
  4. 实现优雅的取消和超时机制

常用goroutine池库

Go生态中有几个优秀的goroutine池实现:

  • ants:高性能goroutine池
  • tunny:工作池实现
  • pond:轻量级工作池
  • go-playground/pool:功能丰富的池实现

使用go-playground/pool示例

1. 基本使用

package main

import (
	"context"
	"fmt"
	"time"
	
	"github.com/go-playground/pool/v3"
)

func main() {
	// 创建一个最大容量为5的goroutine池
	p := pool.NewLimited(5)
	defer p.Close() // 确保关闭池

	// 提交任务到池中
	batch := p.Batch()
	
	// 提交10个任务
	for i := 0; i < 10; i++ {
		count := i
		batch.Queue(func(wu pool.WorkUnit) (interface{}, error) {
			if wu.IsCancelled() {
				return nil, nil
			}
			
			time.Sleep(1 * time.Second)
			fmt.Printf("Task %d completed\n", count)
			return fmt.Sprintf("Result %d", count), nil
		})
	}
	
	// 等待所有任务完成
	batch.QueueComplete()
	results := batch.Results()
	
	for result := range results {
		if result.Error() != nil {
			fmt.Printf("Error: %v\n", result.Error())
			continue
		}
		fmt.Printf("Success: %v\n", result.Value())
	}
}

2. 带取消和超时的池

package main

import (
	"context"
	"fmt"
	"time"
	
	"github.com/go-playground/pool/v3"
)

func main() {
	p := pool.NewLimited(3)
	defer p.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	batch := p.BatchWithContext(ctx)

	for i := 0; i < 10; i++ {
		count := i
		batch.Queue(func(wu pool.WorkUnit) (interface{}, error) {
			select {
			case <-time.After(1 * time.Second):
				if wu.IsCancelled() {
					return nil, nil
				}
				fmt.Printf("Task %d completed\n", count)
				return count, nil
			case <-ctx.Done():
				return nil, ctx.Err()
			}
		})
	}

	batch.QueueComplete()
	results := batch.Results()

	for result := range results {
		if result.Error() != nil {
			fmt.Printf("Error for task: %v\n", result.Error())
			continue
		}
		fmt.Printf("Success: %v\n", result.Value())
	}
}

使用ants库示例

ants是另一个高性能的goroutine池实现:

package main

import (
	"fmt"
	"sync"
	"time"
	
	"github.com/panjf2000/ants/v2"
)

func taskFunc(i int) {
	time.Sleep(time.Second)
	fmt.Printf("Task %d executed\n", i)
}

func main() {
	defer ants.Release() // 释放池

	var wg sync.WaitGroup
	
	// 创建容量为5的池
	p, _ := ants.NewPool(5)
	defer p.Release()

	for i := 0; i < 10; i++ {
		wg.Add(1)
		count := i
		_ = p.Submit(func() {
			defer wg.Done()
			taskFunc(count)
		})
	}

	wg.Wait()
	fmt.Printf("Running goroutines: %d\n", p.Running())
}

最佳实践

  1. 合理设置池大小:通常设置为CPU核心数的2-5倍
  2. 及时释放资源:使用defer确保池被关闭
  3. 处理取消信号:检查IsCancelled()或context取消
  4. 错误处理:妥善处理任务返回的错误
  5. 监控池状态:定期检查池的运行状况

性能考虑

  1. 对于CPU密集型任务,池大小应接近CPU核心数
  2. 对于IO密集型任务,可以设置更大的池
  3. 避免在任务中创建新的goroutine
  4. 使用sync.Pool复用任务中的临时对象

通过合理使用goroutine池,可以显著提高Go程序的稳定性和性能,同时避免资源泄漏问题。

回到顶部