golang并发协程数量控制与管理插件库goccm的使用

Golang并发协程数量控制与管理插件库goccm的使用

简介

Golang Concurrency Manager (goccm) 是一个用于限制并发运行的goroutine数量的包。

安装

运行以下命令安装该包:

$ go get -u github.com/zenthangplus/goccm

示例

下面是一个完整的示例,展示如何使用goccm控制并发goroutine数量:

package main

import (
    "fmt"
    "github.com/zenthangplus/goccm"
    "time"
)

func main() {
    // 限制最多3个goroutine并发运行
    c := goccm.New(3)

    for i := 1; i <= 10; i++ {

        // 在启动任何goroutine前必须调用此函数
        c.Wait()

        go func(i int) {
            fmt.Printf("Job %d is running\n", i)
            time.Sleep(2 * time.Second)

            // 当一个goroutine完成时必须调用此函数
            // 或者可以在goroutine开头使用 `defer c.Done()`
            c.Done()
        }(i)
    }

    // 必须调用此函数确保所有goroutine都已完成
    // 在关闭主程序前
    c.WaitAllDone()
}

支持的功能列表

package main

import "github.com/zenthangplus/goccm"

func main() {
    // 创建并发管理器
    // 第一个参数是允许并发运行的goroutine最大数量
    c := goccm.New(10)

    // 等待直到有可用的槽位启动新goroutine
    c.Wait()

    // 标记一个goroutine已完成
    c.Done()

    // 等待所有goroutine完成
    c.WaitAllDone()

    // 手动关闭管理器
    c.Close()

    // 返回当前正在运行的goroutine数量
    c.RunningCount()
}

使用说明

  1. 使用goccm.New(n)创建一个并发管理器,n为最大并发数
  2. 在启动goroutine前调用c.Wait()等待可用槽位
  3. 在goroutine完成时调用c.Done()或使用defer c.Done()
  4. 在主程序中调用c.WaitAllDone()确保所有goroutine完成
  5. 可选使用c.Close()手动关闭管理器
  6. 使用c.RunningCount()获取当前运行的goroutine数量

更多关于golang并发协程数量控制与管理插件库goccm的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang并发协程数量控制与管理插件库goccm的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Go并发协程数量控制与管理:goccm库使用指南

在Go语言中,goroutine的轻量级特性使得并发编程变得简单,但如果不加以控制,大量goroutine可能会导致资源耗尽。goccm是一个简单易用的goroutine并发控制器,可以帮助我们限制同时运行的goroutine数量。

goccm简介

goccm (Go Concurrency Control Manager) 是一个轻量级的并发控制库,主要功能是限制同时运行的goroutine数量。它适用于需要控制并发度的场景,如批量处理任务、API请求限制等。

安装goccm

go get github.com/korovkin/goccm

基本使用方法

1. 基本示例

package main

import (
	"fmt"
	"time"
	
	"github.com/korovkin/goccm"
)

func main() {
	// 创建一个最大并发数为3的控制器
	c := goccm.New(3)

	for i := 1; i <= 10; i++ {
		// 等待可用槽位
		c.Wait()

		go func(id int) {
			defer c.Done() // 任务完成时通知控制器
			
			fmt.Printf("任务%d开始\n", id)
			time.Sleep(2 * time.Second)
			fmt.Printf("任务%d结束\n", id)
		}(i)
	}

	// 等待所有goroutine完成
	c.WaitAllDone()
	fmt.Println("所有任务完成")
}

2. 带错误处理的示例

package main

import (
	"errors"
	"fmt"
	"time"
	
	"github.com/korovkin/goccm"
)

func worker(id int) error {
	fmt.Printf("Worker %d 开始工作\n", id)
	time.Sleep(time.Second)
	
	if id%4 == 0 {
		return errors.New("模拟错误发生")
	}
	
	fmt.Printf("Worker %d 完成工作\n", id)
	return nil
}

func main() {
	maxConcurrent := 2
	c := goccm.New(maxConcurrent)
	errChan := make(chan error, 10)

	for i := 1; i <= 10; i++ {
		c.Wait()
		
		go func(id int) {
			defer c.Done()
			
			if err := worker(id); err != nil {
				errChan <- err
			}
		}(i)
	}

	// 等待所有goroutine完成
	c.WaitAllDone()
	close(errChan)
	
	// 检查错误
	for err := range errChan {
		fmt.Println("捕获到错误:", err)
	}
	
	fmt.Println("程序结束")
}

高级功能

1. 动态调整并发数

package main

import (
	"fmt"
	"time"
	
	"github.com/korovkin/goccm"
)

func main() {
	c := goccm.New(2) // 初始并发数为2

	// 启动监控goroutine动态调整并发数
	go func() {
		time.Sleep(3 * time.Second)
		fmt.Println("提高并发数到5")
		c.Resize(5)
	}()

	for i := 1; i <= 10; i++ {
		c.Wait()
		
		go func(id int) {
			defer c.Done()
			
			fmt.Printf("任务%d开始\n", id)
			time.Sleep(2 * time.Second)
			fmt.Printf("任务%d结束\n", id)
		}(i)
	}

	c.WaitAllDone()
	fmt.Println("所有任务完成")
}

2. 获取当前运行状态

package main

import (
	"fmt"
	"time"
	
	"github.com/korovkin/goccm"
)

func main() {
	c := goccm.New(3)

	for i := 1; i <= 5; i++ {
		c.Wait()
		
		go func(id int) {
			defer c.Done()
			
			fmt.Printf("任务%d开始 (当前运行中: %d)\n", 
				id, c.RunningCount())
			time.Sleep(time.Second)
			fmt.Printf("任务%d结束\n", id)
		}(i)
	}

	c.WaitAllDone()
	fmt.Printf("最终状态 - 运行中: %d, 最大并发数: %d\n",
		c.RunningCount(), c.GetMax())
}

实际应用场景

1. 限制HTTP请求并发数

package main

import (
	"fmt"
	"io/ioutil"
	"net/http"
	"time"
	
	"github.com/korovkin/goccm"
)

func fetchURL(url string, c *goccm.ConcurrencyManager) {
	defer c.Done()
	
	start := time.Now()
	resp, err := http.Get(url)
	if err != nil {
		fmt.Printf("获取 %s 失败: %v\n", url, err)
		return
	}
	defer resp.Body.Close()
	
	_, err = ioutil.ReadAll(resp.Body)
	if err != nil {
		fmt.Printf("读取 %s 响应失败: %v\n", url, err)
		return
	}
	
	fmt.Printf("获取 %s 成功, 耗时: %v\n", url, time.Since(start))
}

func main() {
	urls := []string{
		"https://www.google.com",
		"https://www.github.com",
		"https://www.golang.org",
		"https://www.example.com",
		"https://www.stackoverflow.com",
	}

	c := goccm.New(2) // 限制同时最多2个请求

	for _, url := range urls {
		c.Wait()
		go fetchURL(url, c)
	}

	c.WaitAllDone()
	fmt.Println("所有请求完成")
}

2. 批量处理文件

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"path/filepath"
	"time"
	
	"github.com/korovkin/goccm"
)

func processFile(path string, c *goccm.ConcurrencyManager) {
	defer c.Done()
	
	start := time.Now()
	data, err := ioutil.ReadFile(path)
	if err != nil {
		log.Printf("读取文件 %s 失败: %v", path, err)
		return
	}
	
	// 模拟处理过程
	time.Sleep(500 * time.Millisecond)
	fmt.Printf("处理文件 %s (大小: %d bytes), 耗时: %v\n",
		filepath.Base(path), len(data), time.Since(start))
}

func main() {
	files := []string{
		"file1.txt",
		"file2.txt",
		"file3.txt",
		"file4.txt",
		"file5.txt",
	}

	c := goccm.New(3) // 同时最多处理3个文件

	for _, file := range files {
		c.Wait()
		go processFile(file, c)
	}

	c.WaitAllDone()
	fmt.Println("所有文件处理完成")
}

注意事项

  1. 确保每个 Wait() 都有对应的 Done() 调用,否则会导致goroutine泄漏
  2. WaitAllDone() 会阻塞直到所有goroutine完成
  3. 动态调整并发数 (Resize()) 会影响后续的 Wait() 调用
  4. 对于需要返回结果的场景,可以使用channel来收集结果

goccm是一个简单而有效的并发控制工具,适合大多数需要限制goroutine数量的场景。相比更复杂的worker pool实现,goccm提供了更轻量级的解决方案。

回到顶部