golang管理可调整大小的上下文感知goroutine池插件库gpool的使用

golang管理可调整大小的上下文感知goroutine池插件库gpool的使用

介绍

gpool是一个通用的、上下文感知的可调整大小的goroutine池,用于限制并发度。一个JobEnqueued到池中,同时最多只有N个job可以并发处理。

主要特点:

  • Job是一个简单的func(){},当调用Enqueue(..)时,该调用会在job开始处理时返回。如果池已满,则会阻塞直到:

    1. 池中有空间可以处理该job
    2. job的context被取消
    3. 池被停止
  • 可以通过Resize()调整池的大小,这是并发安全的操作。增大池会解除阻塞的enqueue调用,缩小池不会影响已经在处理/等待的jobs

  • Enqueuing一个Job会在job开始时返回nil错误,如果池已关闭则返回ErrPoolClosed,或者在等待池时job的context被取消则返回context的错误

  • 使用pool.Stop()停止池会等待所有正在处理的jobs完成后再返回,同时会解除所有阻塞的job enqueues(这些enqueues会返回ErrPoolClosed)

  • StartStopResize(N)都是并发安全的,可以从多个goroutine调用

安装

$ go get github.com/sherifabdlnaby/gpool
import "github.com/sherifabdlnaby/gpool"

使用示例

示例1 - 简单Job入队

func main() {
    concurrency := 2

    // 创建并启动pool
    pool, _ := gpool.NewPool(concurrency)
    defer pool.Stop()

    // 创建JOB
    resultChan1 := make(chan int)
    ctx := context.Background()
    job := func() {
        time.Sleep(2000 * time.Millisecond)
        resultChan1 <- 1337
    }

    // 入队Job
    err1 := pool.Enqueue(ctx, job)
    if err1 != nil {
        log.Printf("Job was not enqueued. Error: [%s]", err1.Error())
        return
    }

    log.Printf("Job Enqueued and started processing")
    log.Printf("Job Done, Received: %v", <-resultChan1)
}

示例2 - 带超时的Job入队

func main() {
  concurrency := 2

  // 创建并启动pool
  pool, _ := gpool.NewPool(concurrency)
  defer pool.Stop()

  // 创建JOB
  resultChan := make(chan int)
  ctx := context.Background()
  job := func() {
    resultChan <- 1337
  }

  // 入队2个Jobs填满pool(除非从resultChan拉取结果否则不会完成)
  _ = pool.Enqueue(ctx, job)
  _ = pool.Enqueue(ctx, job)

  ctxWithTimeout, _ := context.WithTimeout(ctx, 1000 * time.Millisecond)

  // 由于超时只会阻塞1秒
  err1 := pool.Enqueue(ctxWithTimeout, job)

  if err1 != nil {
    log.Printf("Job was not enqueued. Error: [%s]", err1.Error())
  }

  log.Printf("Job 1 Done, Received: %v", <-resultChan)
  log.Printf("Job 2 Done, Received: %v", <-resultChan)
}

示例3 - 完整示例

// 池的大小/并发jobs数
const concurrency = 2

func main() {
  pool, _ := gpool.NewPool(concurrency)
  defer pool.Stop()

  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()

  go func() {
    for i := 0; i < 10; i++ {

      // 小间隔使输出更易读
      time.Sleep(500 * time.Millisecond)

      go func(i int) {
        x := make(chan int, 1)

        log.Printf("Job [%v] Enqueueing", i)

        err := pool.Enqueue(ctx, func() {
          time.Sleep(2000 * time.Millisecond)
          x <- i
        })

        if err != nil {
          log.Printf("Job [%v] was not enqueued. [%s]", i, err.Error())
          return
        }

        log.Printf("Job [%v] Enqueue-ed ", i)

        log.Printf("Job [%v] Receieved, Result: [%v]", i, <-x)
      }(i)
    }
  }()

  // 取消注释以演示ctx取消jobs
  //time.Sleep(100 * time.Millisecond)
  //cancel()

  time.Sleep(5000 * time.Millisecond)

  fmt.Println("Stopping...")

  pool.Stop()

  fmt.Println("Stopped")

  fmt.Println("Sleeping for couple of seconds so canceled job have a chance to print out their status")

  time.Sleep(4000 * time.Millisecond)
}

不同类型的"Enqueues"

  • Enqueue(ctx, job) - 在job开始执行时返回(不是在job完成/返回后)
  • EnqueueAndWait(ctx, job) - 在job开始完成执行时返回
  • TryEnqueue(job) - 如果池已满不会阻塞,在job开始执行时返回true,池满返回false
  • TryEnqueueAndWait(job) - 如果池已满不会阻塞,在job开始完成执行时返回true,池满返回false

基准测试

$ go test -bench=. -cpu=2 -benchmem

基准测试结果显示了不同池大小和批量作业数量下的性能表现。

许可证

MIT License
Copyright © 2019 Sherif Abdel-Naby

贡献

欢迎提交PR。


更多关于golang管理可调整大小的上下文感知goroutine池插件库gpool的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang管理可调整大小的上下文感知goroutine池插件库gpool的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用gpool管理可调整大小的上下文感知goroutine池

gpool是一个Go语言库,用于管理可动态调整大小的、上下文感知的goroutine池。下面我将详细介绍如何使用这个库。

基本概念

gpool提供以下核心功能:

  • 动态调整goroutine池大小
  • 上下文感知的任务执行
  • 任务超时控制
  • 优雅关闭

安装

go get github.com/snail007/gpool

基本使用示例

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/snail007/gpool"
)

func main() {
	// 创建一个最小5个、最大10个worker的goroutine池
	pool := gpool.NewPool(5, 10)
	defer pool.Release() // 释放资源

	// 提交任务
	for i := 0; i < 20; i++ {
		count := i
		pool.Submit(context.Background(), func() {
			fmt.Printf("执行任务 %d\n", count)
			time.Sleep(1 * time.Second)
		})
	}

	// 等待所有任务完成
	pool.Wait()
}

高级功能

1. 带上下文的任务

func main() {
	pool := gpool.NewPool(2, 5)
	defer pool.Release()

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	err := pool.Submit(ctx, func() {
		time.Sleep(1 * time.Second)
		fmt.Println("这个任务会因为超时而被取消")
	})

	if err != nil {
		fmt.Println("提交任务失败:", err)
	}

	pool.Wait()
}

2. 动态调整池大小

func main() {
	pool := gpool.NewPool(2, 10)
	defer pool.Release()

	// 调整最小worker数
	pool.TuneMin(5)
	
	// 调整最大worker数
	pool.TuneMax(15)
	
	// 获取当前worker数
	fmt.Println("当前worker数:", pool.Running())
}

3. 带返回值的任务

func main() {
	pool := gpool.NewPool(2, 5)
	defer pool.Release()

	future, err := pool.SubmitWithResult(context.Background(), func() interface{} {
		time.Sleep(1 * time.Second)
		return "任务结果"
	})

	if err != nil {
		fmt.Println("提交任务失败:", err)
		return
	}

	result, err := future.Get()
	if err != nil {
		fmt.Println("获取结果失败:", err)
		return
	}

	fmt.Println("任务结果:", result)
}

4. 优雅关闭

func main() {
	pool := gpool.NewPool(2, 5)

	// 提交一些任务
	for i := 0; i < 10; i++ {
		count := i
		pool.Submit(context.Background(), func() {
			time.Sleep(time.Duration(count) * 100 * time.Millisecond)
			fmt.Printf("完成任务 %d\n", count)
		})
	}

	// 优雅关闭,等待所有任务完成
	pool.Release()
	
	// 尝试提交新任务会返回错误
	err := pool.Submit(context.Background(), func() {
		fmt.Println("这个任务不会被执")
	})
	fmt.Println("提交任务错误:", err)
}

最佳实践

  1. 合理设置池大小

    • 计算密集型任务:worker数≈CPU核心数
    • I/O密集型任务:可以设置更大的worker数
  2. 总是使用context

    • 为任务设置合理的超时时间
    • 支持调用链取消
  3. 错误处理

    • 检查Submit返回的错误
    • 处理任务中的panic
  4. 资源清理

    • 使用defer pool.Release()
    • 或者在程序退出前调用Release()

性能考虑

  1. 避免在任务中创建大量临时对象,这可能导致GC压力
  2. 对于非常短的任务,直接使用goroutine可能更高效
  3. 监控池的使用情况,适时调整大小

替代方案比较

gpool相比标准库的sync.WaitGroup或简单的channel模式提供了更多功能:

  • 动态调整大小
  • 上下文感知
  • 任务超时控制
  • 优雅关闭

但相比更重量级的框架如antstunny,gpool更加轻量简洁。

希望这个指南能帮助你有效地使用gpool管理goroutine池。根据你的具体需求调整池大小和任务提交策略,可以显著提高程序的并发性能和资源利用率。

回到顶部