Golang中如何精确控制goroutine的数量
Golang中如何精确控制goroutine的数量
func Routines(){
for {
go JustAnotherFunc()
}
}
这是我的当前函数,但我想要限制同时只能有100个协程运行。如果1个协程执行完成,另一个协程才会开始运行,但协程的总数必须保持在100个。
有什么方法可以实现这个需求吗?
5 回复
另外我想和你分享一些有趣的内容。 看看这个 可视化Go并发
更多关于Golang中如何精确控制goroutine的数量的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
const maxRoutines = 100
func Routines() {
var wg sync.WaitGroup
for {
for i := 0; i < 100; i++ {
wg.Add(1)
go anotherFunc(wg){
defer wg.Done()
// Do work
}(wg)
}
wg.Wait()
}
}
类似这样的代码吗? 不过也许你可以看看工作模式。这可能是你正在寻找的东西。
// Example provided with help from Jason Waldrip.
// Package work manages a pool of goroutines to perform work.
package work
import "sync"
// Worker must be implemented by types that want to use
// the work pool.
type Worker interface {
Task()
}
// Pool provides a pool of goroutines that can execute any Worker
// tasks that are submitted.
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
// New creates a new work pool.
此文件已被截断。显示完整内容
使用 Go 通道,通道是线程安全的队列
/*
* Copyright 2018. bigpigeon. All rights reserved.
* Use of this source code is governed by a MIT style
* license that can be found in the LICENSE file.
*/
package main
import (
"fmt"
"math/rand"
"time"
)
type Task time.Duration
func (t Task) Run() {
fmt.Println("sleep ", time.Duration(t))
time.Sleep(time.Duration(t))
}
func Work(task Task, token chan struct{}) {
task.Run()
<-token
}
// max routines
var MaxRoutines = 100
type Routines struct {
tokens chan struct{}
}
func (r Routines) Call(task Task) {
// if routines > MaxRoutine , will block
r.tokens <- struct{}{}
go Work(task, r.tokens)
}
func main() {
routines := Routines{tokens: make(chan struct{}, MaxRoutines)}
for i := 0; true; i++ {
task := Task(time.Duration(rand.Intn(10)+1) * time.Second)
routines.Call(task)
fmt.Println("n ", i)
}
}
可以使用带缓冲的 channel 来实现 goroutine 数量的精确控制。以下是几种实现方式:
方法一:使用带缓冲的 channel 作为信号量
package main
import (
"fmt"
"sync"
"time"
)
func JustAnotherFunc(id int) {
fmt.Printf("Goroutine %d started\n", id)
time.Sleep(time.Second) // 模拟工作
fmt.Printf("Goroutine %d completed\n", id)
}
func Routines() {
maxGoroutines := 100
semaphore := make(chan struct{}, maxGoroutines)
var wg sync.WaitGroup
for i := 0; ; i++ {
semaphore <- struct{}{} // 获取信号量,如果缓冲区满则会阻塞
wg.Add(1)
go func(id int) {
defer wg.Done()
defer func() { <-semaphore }() // 释放信号量
JustAnotherFunc(id)
}(i)
}
wg.Wait()
}
方法二:使用 worker pool 模式
package main
import (
"fmt"
"sync"
"time"
)
func JustAnotherFunc(id int) {
fmt.Printf("Goroutine %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Goroutine %d completed\n", id)
}
func Routines() {
maxGoroutines := 100
jobs := make(chan int, maxGoroutines)
var wg sync.WaitGroup
// 启动固定数量的 worker
for i := 0; i < maxGoroutines; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for jobID := range jobs {
JustAnotherFunc(jobID)
}
}(i)
}
// 发送任务
for i := 0; ; i++ {
jobs <- i
}
close(jobs)
wg.Wait()
}
方法三:使用 errgroup 包(需要导入 “golang.org/x/sync/errgroup”)
package main
import (
"context"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func JustAnotherFunc(ctx context.Context, id int) error {
fmt.Printf("Goroutine %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Goroutine %d completed\n", id)
return nil
}
func Routines() {
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(100) // 限制最大并发数为100
for i := 0; ; i++ {
id := i
g.Go(func() error {
return JustAnotherFunc(ctx, id)
})
}
if err := g.Wait(); err != nil {
fmt.Printf("Error: %v\n", err)
}
}
推荐使用方法一
方法一最为简洁高效,通过带缓冲的 channel 作为计数信号量来控制并发数量。当 channel 缓冲区满时,新的 goroutine 创建会被阻塞,直到有 goroutine 完成工作并释放信号量。
// 简化版本
func Routines() {
sem := make(chan struct{}, 100)
for i := 0; ; i++ {
sem <- struct{}{}
go func(id int) {
defer func() { <-sem }()
JustAnotherFunc(id)
}(i)
}
}
这种方法确保了在任何时刻运行的 goroutine 数量都不会超过 100 个,且能够自动维持这个数量。

