Golang中如何精确控制goroutine的数量

Golang中如何精确控制goroutine的数量

func Routines(){
    for {
        go JustAnotherFunc()
    }
}

这是我的当前函数,但我想要限制同时只能有100个协程运行。如果1个协程执行完成,另一个协程才会开始运行,但协程的总数必须保持在100个。

有什么方法可以实现这个需求吗?

5 回复

另外我想和你分享一些有趣的内容。 看看这个 可视化Go并发

更多关于Golang中如何精确控制goroutine的数量的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


尝试使用信号量。

Package semaphore

信号量包

信号量包提供了加权信号量的实现。

const maxRoutines = 100

func Routines() {
var wg sync.WaitGroup
	for {

		for i := 0; i < 100; i++ {
			wg.Add(1)
			go anotherFunc(wg){
				defer wg.Done()
                                // Do work
			}(wg)
		}
		wg.Wait()

	}

}

类似这样的代码吗? 不过也许你可以看看工作模式。这可能是你正在寻找的东西。

// Example provided with help from Jason Waldrip.
// Package work manages a pool of goroutines to perform work.
package work

import "sync"

// Worker must be implemented by types that want to use
// the work pool.
type Worker interface {
	Task()
}

// Pool provides a pool of goroutines that can execute any Worker
// tasks that are submitted.
type Pool struct {
	work chan Worker
	wg   sync.WaitGroup
}

// New creates a new work pool.

此文件已被截断。显示完整内容

使用 Go 通道,通道是线程安全的队列

/*
 * Copyright 2018. bigpigeon. All rights reserved.
 * Use of this source code is governed by a MIT style
 * license that can be found in the LICENSE file.
 */

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Task time.Duration

func (t Task) Run() {
	fmt.Println("sleep ", time.Duration(t))
	time.Sleep(time.Duration(t))
}

func Work(task Task, token chan struct{}) {
	task.Run()
	<-token
}

// max routines
var MaxRoutines = 100

type Routines struct {
	tokens chan struct{}
}

func (r Routines) Call(task Task) {
	// if routines > MaxRoutine , will block
	r.tokens <- struct{}{}
	go Work(task, r.tokens)
}

func main() {
	routines := Routines{tokens: make(chan struct{}, MaxRoutines)}

	for i := 0; true; i++ {
		task := Task(time.Duration(rand.Intn(10)+1) * time.Second)
		routines.Call(task)
		fmt.Println("n ", i)
	}
}

可以使用带缓冲的 channel 来实现 goroutine 数量的精确控制。以下是几种实现方式:

方法一:使用带缓冲的 channel 作为信号量

package main

import (
    "fmt"
    "sync"
    "time"
)

func JustAnotherFunc(id int) {
    fmt.Printf("Goroutine %d started\n", id)
    time.Sleep(time.Second) // 模拟工作
    fmt.Printf("Goroutine %d completed\n", id)
}

func Routines() {
    maxGoroutines := 100
    semaphore := make(chan struct{}, maxGoroutines)
    var wg sync.WaitGroup
    
    for i := 0; ; i++ {
        semaphore <- struct{}{} // 获取信号量,如果缓冲区满则会阻塞
        wg.Add(1)
        
        go func(id int) {
            defer wg.Done()
            defer func() { <-semaphore }() // 释放信号量
            
            JustAnotherFunc(id)
        }(i)
    }
    
    wg.Wait()
}

方法二:使用 worker pool 模式

package main

import (
    "fmt"
    "sync"
    "time"
)

func JustAnotherFunc(id int) {
    fmt.Printf("Goroutine %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Goroutine %d completed\n", id)
}

func Routines() {
    maxGoroutines := 100
    jobs := make(chan int, maxGoroutines)
    var wg sync.WaitGroup
    
    // 启动固定数量的 worker
    for i := 0; i < maxGoroutines; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for jobID := range jobs {
                JustAnotherFunc(jobID)
            }
        }(i)
    }
    
    // 发送任务
    for i := 0; ; i++ {
        jobs <- i
    }
    
    close(jobs)
    wg.Wait()
}

方法三:使用 errgroup 包(需要导入 “golang.org/x/sync/errgroup”)

package main

import (
    "context"
    "fmt"
    "time"
    
    "golang.org/x/sync/errgroup"
)

func JustAnotherFunc(ctx context.Context, id int) error {
    fmt.Printf("Goroutine %d started\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Goroutine %d completed\n", id)
    return nil
}

func Routines() {
    g, ctx := errgroup.WithContext(context.Background())
    g.SetLimit(100) // 限制最大并发数为100
    
    for i := 0; ; i++ {
        id := i
        g.Go(func() error {
            return JustAnotherFunc(ctx, id)
        })
    }
    
    if err := g.Wait(); err != nil {
        fmt.Printf("Error: %v\n", err)
    }
}

推荐使用方法一

方法一最为简洁高效,通过带缓冲的 channel 作为计数信号量来控制并发数量。当 channel 缓冲区满时,新的 goroutine 创建会被阻塞,直到有 goroutine 完成工作并释放信号量。

// 简化版本
func Routines() {
    sem := make(chan struct{}, 100)
    
    for i := 0; ; i++ {
        sem <- struct{}{}
        go func(id int) {
            defer func() { <-sem }()
            JustAnotherFunc(id)
        }(i)
    }
}

这种方法确保了在任何时刻运行的 goroutine 数量都不会超过 100 个,且能够自动维持这个数量。

回到顶部