golang并发协程数量控制与管理插件库goccm的使用
Golang并发协程数量控制与管理插件库goccm的使用
简介
Golang Concurrency Manager (goccm) 是一个用于限制并发运行的goroutine数量的包。
安装
运行以下命令安装该包:
$ go get -u github.com/zenthangplus/goccm
示例
下面是一个完整的示例,展示如何使用goccm控制并发goroutine数量:
package main
import (
"fmt"
"github.com/zenthangplus/goccm"
"time"
)
func main() {
// 限制最多3个goroutine并发运行
c := goccm.New(3)
for i := 1; i <= 10; i++ {
// 在启动任何goroutine前必须调用此函数
c.Wait()
go func(i int) {
fmt.Printf("Job %d is running\n", i)
time.Sleep(2 * time.Second)
// 当一个goroutine完成时必须调用此函数
// 或者可以在goroutine开头使用 `defer c.Done()`
c.Done()
}(i)
}
// 必须调用此函数确保所有goroutine都已完成
// 在关闭主程序前
c.WaitAllDone()
}
支持的功能列表
package main
import "github.com/zenthangplus/goccm"
func main() {
// 创建并发管理器
// 第一个参数是允许并发运行的goroutine最大数量
c := goccm.New(10)
// 等待直到有可用的槽位启动新goroutine
c.Wait()
// 标记一个goroutine已完成
c.Done()
// 等待所有goroutine完成
c.WaitAllDone()
// 手动关闭管理器
c.Close()
// 返回当前正在运行的goroutine数量
c.RunningCount()
}
使用说明
- 使用
goccm.New(n)
创建一个并发管理器,n为最大并发数 - 在启动goroutine前调用
c.Wait()
等待可用槽位 - 在goroutine完成时调用
c.Done()
或使用defer c.Done()
- 在主程序中调用
c.WaitAllDone()
确保所有goroutine完成 - 可选使用
c.Close()
手动关闭管理器 - 使用
c.RunningCount()
获取当前运行的goroutine数量
更多关于golang并发协程数量控制与管理插件库goccm的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang并发协程数量控制与管理插件库goccm的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Go并发协程数量控制与管理:goccm库使用指南
在Go语言中,goroutine的轻量级特性使得并发编程变得简单,但如果不加以控制,大量goroutine可能会导致资源耗尽。goccm是一个简单易用的goroutine并发控制器,可以帮助我们限制同时运行的goroutine数量。
goccm简介
goccm (Go Concurrency Control Manager) 是一个轻量级的并发控制库,主要功能是限制同时运行的goroutine数量。它适用于需要控制并发度的场景,如批量处理任务、API请求限制等。
安装goccm
go get github.com/korovkin/goccm
基本使用方法
1. 基本示例
package main
import (
"fmt"
"time"
"github.com/korovkin/goccm"
)
func main() {
// 创建一个最大并发数为3的控制器
c := goccm.New(3)
for i := 1; i <= 10; i++ {
// 等待可用槽位
c.Wait()
go func(id int) {
defer c.Done() // 任务完成时通知控制器
fmt.Printf("任务%d开始\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("任务%d结束\n", id)
}(i)
}
// 等待所有goroutine完成
c.WaitAllDone()
fmt.Println("所有任务完成")
}
2. 带错误处理的示例
package main
import (
"errors"
"fmt"
"time"
"github.com/korovkin/goccm"
)
func worker(id int) error {
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
if id%4 == 0 {
return errors.New("模拟错误发生")
}
fmt.Printf("Worker %d 完成工作\n", id)
return nil
}
func main() {
maxConcurrent := 2
c := goccm.New(maxConcurrent)
errChan := make(chan error, 10)
for i := 1; i <= 10; i++ {
c.Wait()
go func(id int) {
defer c.Done()
if err := worker(id); err != nil {
errChan <- err
}
}(i)
}
// 等待所有goroutine完成
c.WaitAllDone()
close(errChan)
// 检查错误
for err := range errChan {
fmt.Println("捕获到错误:", err)
}
fmt.Println("程序结束")
}
高级功能
1. 动态调整并发数
package main
import (
"fmt"
"time"
"github.com/korovkin/goccm"
)
func main() {
c := goccm.New(2) // 初始并发数为2
// 启动监控goroutine动态调整并发数
go func() {
time.Sleep(3 * time.Second)
fmt.Println("提高并发数到5")
c.Resize(5)
}()
for i := 1; i <= 10; i++ {
c.Wait()
go func(id int) {
defer c.Done()
fmt.Printf("任务%d开始\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("任务%d结束\n", id)
}(i)
}
c.WaitAllDone()
fmt.Println("所有任务完成")
}
2. 获取当前运行状态
package main
import (
"fmt"
"time"
"github.com/korovkin/goccm"
)
func main() {
c := goccm.New(3)
for i := 1; i <= 5; i++ {
c.Wait()
go func(id int) {
defer c.Done()
fmt.Printf("任务%d开始 (当前运行中: %d)\n",
id, c.RunningCount())
time.Sleep(time.Second)
fmt.Printf("任务%d结束\n", id)
}(i)
}
c.WaitAllDone()
fmt.Printf("最终状态 - 运行中: %d, 最大并发数: %d\n",
c.RunningCount(), c.GetMax())
}
实际应用场景
1. 限制HTTP请求并发数
package main
import (
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/korovkin/goccm"
)
func fetchURL(url string, c *goccm.ConcurrencyManager) {
defer c.Done()
start := time.Now()
resp, err := http.Get(url)
if err != nil {
fmt.Printf("获取 %s 失败: %v\n", url, err)
return
}
defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Printf("读取 %s 响应失败: %v\n", url, err)
return
}
fmt.Printf("获取 %s 成功, 耗时: %v\n", url, time.Since(start))
}
func main() {
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.golang.org",
"https://www.example.com",
"https://www.stackoverflow.com",
}
c := goccm.New(2) // 限制同时最多2个请求
for _, url := range urls {
c.Wait()
go fetchURL(url, c)
}
c.WaitAllDone()
fmt.Println("所有请求完成")
}
2. 批量处理文件
package main
import (
"fmt"
"io/ioutil"
"log"
"path/filepath"
"time"
"github.com/korovkin/goccm"
)
func processFile(path string, c *goccm.ConcurrencyManager) {
defer c.Done()
start := time.Now()
data, err := ioutil.ReadFile(path)
if err != nil {
log.Printf("读取文件 %s 失败: %v", path, err)
return
}
// 模拟处理过程
time.Sleep(500 * time.Millisecond)
fmt.Printf("处理文件 %s (大小: %d bytes), 耗时: %v\n",
filepath.Base(path), len(data), time.Since(start))
}
func main() {
files := []string{
"file1.txt",
"file2.txt",
"file3.txt",
"file4.txt",
"file5.txt",
}
c := goccm.New(3) // 同时最多处理3个文件
for _, file := range files {
c.Wait()
go processFile(file, c)
}
c.WaitAllDone()
fmt.Println("所有文件处理完成")
}
注意事项
- 确保每个
Wait()
都有对应的Done()
调用,否则会导致goroutine泄漏 WaitAllDone()
会阻塞直到所有goroutine完成- 动态调整并发数 (
Resize()
) 会影响后续的Wait()
调用 - 对于需要返回结果的场景,可以使用channel来收集结果
goccm是一个简单而有效的并发控制工具,适合大多数需要限制goroutine数量的场景。相比更复杂的worker pool实现,goccm提供了更轻量级的解决方案。