golang高性能轻量级goroutine工作池插件pond的使用

Golang高性能轻量级goroutine工作池插件pond的使用

pond是一个极简且高性能的Go库,旨在优雅地管理并发任务。它基于Worker Pool模式,允许同时运行大量任务,同时限制同时运行的goroutine数量。

安装

go get -u github.com/alitto/pond/v2

基本使用示例

package main

import (
	"fmt"

	"github.com/alitto/pond/v2"
)

func main() {
	// 创建并发限制为100的工作池
	pool := pond.NewPool(100)

	// 提交1000个任务
	for i := 0; i < 1000; i++ {
		i := i
		pool.Submit(func() {
			fmt.Printf("Running task #%d\n", i)
		})
	}

	// 停止池并等待所有提交的任务完成
	pool.StopAndWait()
}

提交返回错误的任务

// 创建并发限制为100的工作池
pool := pond.NewPool(100)

// 提交返回错误的任务
task := pool.SubmitErr(func() error {
	return errors.New("An error occurred")
})

// 等待任务完成并获取错误
err := task.Wait()

提交返回结果的任务

// 创建接受返回字符串任务的池
pool := pond.NewResultPool[string](10)

// 提交返回字符串的任务
task := pool.Submit(func() (string) {
	return "Hello, World!"
})

// 等待任务完成并获取结果
result, err := task.Wait()
// result = "Hello, World!" 且 err = nil

任务组使用示例

// 创建并发限制为10的工作池
pool := pond.NewPool(10)

// 创建任务组
group := pool.NewGroup()

// 提交一组任务
for i := 0; i < 20; i++ {
	i := i
	group.Submit(func() {
		fmt.Printf("Running group task #%d\n", i)
	})
}

// 等待组内所有任务完成
err := group.Wait()

带上下文的任务组

// 创建并发限制为10的工作池
pool := pond.NewPool(10)

// 创建5秒超时的上下文
timeout, _ := context.WithTimeout(context.Background(), 5*time.Second)

// 创建带上下文的任务组
group := pool.NewGroupContext(timeout)

// 提交一组任务
for i := 0; i < 20; i++ {
	i := i
	group.Submit(func() {
		fmt.Printf("Running group task #%d\n", i)
	})
}

// 等待组内所有任务完成或超时发生,以先到者为准
err := group.Wait()

子池使用示例

// 创建并发限制为10的工作池
pool := pond.NewPool(10)

// 创建子池,使用父池最多5个worker
subpool := pool.NewSubpool(5)

// 向子池提交任务
subpool.Submit(func() {
	fmt.Println("Running task in subpool")
})

// 停止子池并等待所有提交的任务完成
subpool.StopAndWait()

默认池使用示例

// 向默认池提交任务并等待完成
err := pond.SubmitErr(func() error {
	fmt.Println("Running task in default pool")
	return nil
}).Wait()

if err != nil {
	fmt.Printf("Failed to run task: %v", err)
} else {
	fmt.Println("Task completed successfully")
}

有界任务队列

// 创建队列最多10个任务的池
pool := pond.NewPool(1, pond.WithQueueSize(10))

// 尝试向池提交任务
task, ok := pool.TrySubmit(func() {
	// 执行一些工作
})

// 检查任务是否提交成功
if !ok {
	fmt.Println("Task submission failed because the queue is full")
}

动态调整池大小

// 创建5个worker的池
pool := pond.NewPool(5)

// 提交一些任务
for i := 0; i < 20; i++ {
    pool.Submit(func() {
        // 执行一些工作
    })
}

// 将池大小增加到10个worker
pool.Resize(10)

// 提交更多任务,将使用增加后的容量
for i := 0; i < 20; i++ {
    pool.Submit(func() {
        // 执行一些工作
    })
}

// 将池大小减少回5个worker
pool.Resize(5)

监控指标

pond提供了以下监控指标方法:

  • pool.RunningWorkers() int64: 当前运行的worker数量
  • pool.SubmittedTasks() uint64: 自池创建以来提交的任务总数
  • pool.WaitingTasks() uint64: 当前等待执行的任务数
  • pool.SuccessfulTasks() uint64: 成功完成的任务数
  • pool.FailedTasks() uint64: 因panic失败的任务数
  • pool.CompletedTasks() uint64: 已完成的任务总数(成功+失败)
  • pool.DroppedTasks() uint64: 因队列满被丢弃的任务数

pond是一个功能强大且灵活的工作池实现,适用于各种并发场景,特别是需要限制并发量的情况。


更多关于golang高性能轻量级goroutine工作池插件pond的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能轻量级goroutine工作池插件pond的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang高性能轻量级Goroutine工作池Pond使用指南

Pond是一个轻量级、高性能的Goroutine工作池库,专为Go语言设计。它提供了简单易用的API来管理并发任务,可以有效控制资源使用,避免无限制创建Goroutine带来的问题。

基本特性

  • 轻量级实现,核心代码精简
  • 高性能任务调度
  • 动态调整工作池大小
  • 任务超时控制
  • 优雅关闭支持
  • 任务优先级支持

安装

go get github.com/alitto/pond

基本使用示例

package main

import (
	"fmt"
	"time"
	
	"github.com/alitto/pond"
)

func main() {
	// 创建一个工作池
	// 参数:worker数量(2),任务队列容量(100)
	pool := pond.New(2, 100)
	
	// 提交任务到工作池
	for i := 0; i < 10; i++ {
		n := i
		pool.Submit(func() {
			fmt.Printf("Running task #%d\n", n)
			time.Sleep(1 * time.Second)
		})
	}
	
	// 等待所有任务完成
	pool.StopAndWait()
	fmt.Println("All tasks completed")
}

高级功能示例

1. 带返回值的任务

pool := pond.New(2, 100)
future := pool.Submit(func() interface{} {
	time.Sleep(1 * time.Second)
	return 42
})

// 获取任务结果
result := future.Value()
fmt.Printf("Task result: %v\n", result)

2. 任务超时控制

pool := pond.New(2, 100, pond.IdleTimeout(5*time.Second))

future := pool.Submit(func() interface{} {
	time.Sleep(10 * time.Second) // 长时间运行的任务
	return "done"
})

// 等待结果,带超时
result, err := future.Get(2 * time.Second)
if err == pond.ErrTimeout {
	fmt.Println("Task timed out")
} else {
	fmt.Printf("Task result: %v\n", result)
}

3. 动态调整工作池大小

pool := pond.New(2, 100)

// 扩大工作池
pool.SetMaxWorkers(4)

// 缩小工作池
pool.SetMaxWorkers(1)

4. 批量提交任务

pool := pond.New(4, 100)

tasks := make([]func(), 0, 10)
for i := 0; i < 10; i++ {
	n := i
	tasks = append(tasks, func() {
		fmt.Printf("Running batch task #%d\n", n)
		time.Sleep(500 * time.Millisecond)
	})
}

// 批量提交
pool.SubmitGroup(tasks)
pool.StopAndWait()

5. 优雅关闭

pool := pond.New(4, 100, pond.Strategy(pond.Eager()))

// 提交一些任务
for i := 0; i < 20; i++ {
	n := i
	pool.Submit(func() {
		fmt.Printf("Task %d running\n", n)
		time.Sleep(1 * time.Second)
	})
}

// 优雅关闭,等待正在运行的任务完成
pool.Stop()

性能优化建议

  1. 合理设置工作池大小:根据CPU核心数和任务类型调整

    • CPU密集型任务:接近CPU核心数
    • IO密集型任务:可以设置更大值
  2. 使用适当的策略

    • pond.Eager():立即创建worker直到达到最大值
    • pond.Lazy():按需创建worker
  3. 监控工作池状态

    fmt.Printf("Running workers: %d\n", pool.RunningWorkers())
    fmt.Printf("Idle workers: %d\n", pool.IdleWorkers())
    fmt.Printf("Waiting tasks: %d\n", pool.WaitingTasks())
    

与其他工作池库对比

  1. 与标准库相比

    • 避免无限制创建Goroutine
    • 提供更丰富的控制功能
  2. 与其他工作池库(如ants)相比

    • 更轻量级
    • API更简洁
    • 动态调整能力更强

总结

Pond是一个非常适合需要控制并发度的场景的工作池库,它的轻量级设计和高性能特性使其成为许多Go项目的首选。通过合理配置,可以在资源控制和性能之间取得良好平衡。

// 完整示例:处理HTTP请求的工作池
func main() {
	pool := pond.New(100, 1000)
	defer pool.StopAndWait()
	
	http.HandleFunc("/process", func(w http.ResponseWriter, r *http.Request) {
		// 将耗时任务提交到工作池
		future := pool.Submit(func() interface{} {
			return processRequest(r)
		})
		
		result := future.Value().(string)
		w.Write([]byte(result))
	})
	
	http.ListenAndServe(":8080", nil)
}

func processRequest(r *http.Request) string {
	// 模拟耗时处理
	time.Sleep(100 * time.Millisecond)
	return "Processed: " + r.URL.Query().Get("id")
}
回到顶部