golang并发限制协程池管理插件gowp的使用

golang并发限制协程池管理插件gowp的使用

简介

gowp是一个Golang的worker pool实现,主要特点包括:

  • 限制任务执行的并发数,而不是排队任务的数量
  • 提交任务时不会阻塞,无论有多少任务排队
  • 支持超时设置
  • 支持通过安全队列(queue)管理任务

安装

使用以下命令安装gowp:

$ go get github.com/xxjwxc/gowp

基本使用示例

1. 基本协程池使用

package main

import (
	"fmt"
	"time"

	"github.com/xxjwxc/gowp/workpool"
)

func main() {
	wp := workpool.New(10)     // 设置最大线程数
	for i := 0; i < 20; i++ { // 开启20个请求
		ii := i
		wp.Do(func() error {
			for j := 0; j < 10; j++ { // 每个任务打印0-10的值
				fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
				time.Sleep(1 * time.Second)
			}
			return nil
		})
	}

	wp.Wait()
	fmt.Println("down")
}

2. 支持错误返回

package main

import (
	"fmt"
	"time"

	"github.com/xxjwxc/gowp/workpool"
)

func main() {
	wp := workpool.New(10)             // 设置最大线程数
	for i := 0; i < 20; i++ { 
		ii := i
		wp.Do(func() error {
			for j := 0; j < 10; j++ { 
				fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
				if ii == 1 {
					return errors.Cause(errors.New("my test err")) // 返回错误
				}
				time.Sleep(1 * time.Second)
			}
			return nil
		})
	}

	err := wp.Wait()
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("down")
}

3. 非阻塞完成判断

package main

import (
	"fmt"
	"time"

	"github.com/xxjwxc/gowp/workpool"
)

func main() {
	wp := workpool.New(5)              // 设置最大线程数
	for i := 0; i < 10; i++ { 
		wp.Do(func() error {
			for j := 0; j < 5; j++ { 
				time.Sleep(1 * time.Second)
			}
			return nil
		})

		fmt.Println(wp.IsDone()) // 非阻塞判断是否完成
	}
	wp.Wait()
	fmt.Println(wp.IsDone())
	fmt.Println("down")
}

4. 同步等待结果

package main

import (
	"fmt"
	"time"

	"github.com/xxjwxc/gowp/workpool"
)

func main() {
	wp := workpool.New(5) // 设置最大线程数
	for i := 0; i < 10; i++ { 
		ii := i
		wp.DoWait(func() error {
			for j := 0; j < 5; j++ { 
				fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
				time.Sleep(1 * time.Second)
			}
			return nil
		})
	}

	err := wp.Wait()
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("down")
}

5. 支持超时退出

package main

import (
	"fmt"
	"time"

	"github.com/xxjwxc/gowp/workpool"
)

func main() {
	wp := workpool.New(5)              // 设置最大线程数
	wp.SetTimeout(time.Millisecond) // 设置超时时间
	for i := 0; i < 10; i++ { 
		ii := i
		wp.DoWait(func() error {
			for j := 0; j < 5; j++ {
				fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
				time.Sleep(1 * time.Second)
			}
			return nil
		})
	}

	err := wp.Wait()
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("down")
}

限流器(Limiter)使用

1. 基于缓存的限流器

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/xxjwxc/gowp/limiter"
)

func main() {
	limiter := limiter.NewLimiter(limiter.WithLimit(10), limiter.WithNamespace("test"), limiter.WithTsTimeout(true))
	var wg sync.WaitGroup
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			token, _ := limiter.Acquire(10) // 获取令牌
			fmt.Println(token)

			time.Sleep(1 * time.Second)
			limiter.Release(token) // 释放令牌
		}()
	}
	wg.Wait()
	fmt.Println("down")
}

2. 基于Redis的限流器

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/xxjwxc/gowp/limiter"
	"github.com/xxjwxc/public/myredis"
)

func main() {
	conf := myredis.InitRedis(myredis.WithAddr("127.0.0.1:6379"), myredis.WithPwd("123456"), myredis.WithGroupName("test"))
	res, err := myredis.NewRedis(conf)
	if err != nil {
		fmt.Println(err)
		return
	}

	limiter := limiter.NewLimiter(limiter.WithRedis(res), limiter.WithLimit(10), limiter.WithNamespace("test"))
	var wg sync.WaitGroup
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			token, _ := limiter.Acquire(10) 
			fmt.Println(token)

			time.Sleep(1 * time.Second)
			limiter.Release(token) 
		}()
	}
	wg.Wait()
	fmt.Println("down")
}

更多关于golang并发限制协程池管理插件gowp的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang并发限制协程池管理插件gowp的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


GoWP - Golang 并发协程池管理插件

GoWP 是一个轻量级的 Golang 并发协程池管理库,它可以帮助你控制并发 goroutine 的数量,避免无限制创建 goroutine 导致的资源耗尽问题。

基本特性

  • 限制最大并发 goroutine 数量
  • 任务队列管理
  • 支持同步/异步任务提交
  • 任务超时控制
  • 简单易用的 API 接口

安装

go get github.com/xxjwxc/gowp

基本使用示例

1. 创建协程池

package main

import (
	"fmt"
	"time"
	
	"github.com/xxjwxc/gowp"
)

func main() {
	// 创建一个最大并发数为3的协程池
	wp := gowp.New(3)
	defer wp.Wait() // 等待所有任务完成
	
	for i := 0; i < 10; i++ {
		idx := i
		wp.Do(func() error {
			fmt.Printf("Task %d started\n", idx)
			time.Sleep(time.Second) // 模拟耗时任务
			fmt.Printf("Task %d done\n", idx)
			return nil
		})
	}
}

2. 带错误处理的协程池

func main() {
	wp := gowp.New(3)
	defer wp.Wait()
	
	results := make(chan int, 10)
	
	for i := 0; i < 10; i++ {
		idx := i
		wp.Do(func() error {
			if idx%2 == 0 {
				return fmt.Errorf("error on task %d", idx)
			}
			results <- idx
			return nil
		})
	}
	
	// 处理结果和错误
	go func() {
		for err := range wp.ErrChan() {
			fmt.Println("Error:", err)
		}
	}()
	
	for result := range results {
		fmt.Println("Success:", result)
	}
}

3. 带超时控制的协程池

func main() {
	// 创建协程池,设置最大并发数和超时时间
	wp := gowp.New(3, gowp.WithTimeout(time.Second*2))
	defer wp.Wait()
	
	for i := 0; i < 5; i++ {
		idx := i
		wp.Do(func() error {
			if idx == 3 {
				time.Sleep(time.Second * 3) // 这个任务会超时
			} else {
				time.Sleep(time.Second)
			}
			fmt.Printf("Task %d completed\n", idx)
			return nil
		})
	}
}

高级功能

1. 动态调整协程池大小

func main() {
	wp := gowp.New(3)
	defer wp.Wait()
	
	// 动态调整协程池大小
	wp.Resize(5) // 扩大协程池
	wp.Resize(2) // 缩小协程池
	
	for i := 0; i < 10; i++ {
		idx := i
		wp.Do(func() error {
			fmt.Printf("Task %d running\n", idx)
			time.Sleep(time.Second)
			return nil
		})
	}
}

2. 获取协程池状态

func main() {
	wp := gowp.New(3)
	defer wp.Wait()
	
	for i := 0; i < 10; i++ {
		idx := i
		wp.Do(func() error {
			time.Sleep(time.Second)
			return nil
		})
		
		// 获取协程池状态
		fmt.Printf("Running: %d, Waiting: %d\n", 
			wp.Running(), wp.Waiting())
	}
}

最佳实践

  1. 合理设置并发数:根据任务类型和系统资源设置合适的并发数,通常设置为 CPU 核心数的 2-4 倍。

  2. 及时处理错误:使用 ErrChan() 方法获取错误通道,及时处理任务中的错误。

  3. 资源清理:使用 defer wp.Wait() 确保所有任务完成后再退出程序。

  4. 避免任务阻塞:长时间运行的任务应该支持取消或超时机制。

  5. 监控协程池状态:通过 Running()Waiting() 方法监控协程池状态,必要时动态调整。

性能考虑

  • GoWP 使用 channel 实现任务队列,性能开销小
  • 协程复用减少了 goroutine 创建销毁的开销
  • 适合 IO 密集型任务,对于 CPU 密集型任务效果有限

GoWP 是一个简单但功能完备的协程池实现,适合大多数需要控制并发数的场景。对于更复杂的需求,可以考虑使用更强大的工作池实现如 antstunny

回到顶部