Golang中如何使用Channel控制Goroutine
Golang中如何使用Channel控制Goroutine 你好,
我有大量可以并行处理的任务,但希望在任何时候最多只允许“n”个任务同时运行。任务可能随时添加。我考虑过创建两个函数,通过通道来递增和递减一个计数器变量作为控制手段,但说实话,我对此有点困惑。对于这类问题,是否有既定的模式?
谢谢 Cam
@christophberger 哈哈,说得很好,谢谢
更多关于Golang中如何使用Channel控制Goroutine的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
看起来不错,在 Playground 里运行得很好!
以下是我的实现:Go Playground - The Go 编程语言
- 我选择了“缓冲通道”方法来设置上限。
- 在启动一个新的 goroutine 之前,我尝试向通道写入。当通道已满时,写入操作会阻塞,直到另一个即将结束的 goroutine 从通道中读取一个值。
- 通道项类型是“空结构体”,用于向读取者表明没有实际数据发送到通道。你也可以使用 bool 或 int。
- 我使用 WaitGroup 让主 goroutine 等待所有其他 goroutine。
事实上,我不喜欢互斥锁 😊 在更复杂的同步场景中,很容易弄错时机,导致难以追踪的竞态条件或死锁。通道是互斥锁的一个便捷抽象(实际上,通道在内部使用了互斥锁),并且更容易推理。
你好 @tranman,
欢迎来到论坛。
我会用以下方式来处理这样的任务。
首先,我只有在有工作需要完成时才会启动一个 goroutine,并且该 goroutine 会在工作完成后立即结束运行。这样可以避免管理一个 goroutine 的“工作池”。
然后,我可以使用各种方法来跟踪和限制同时运行的 goroutine 数量。
例如:
-
创建一个大小为 n 的缓冲通道。在启动一个 goroutine 之前,尝试向该通道发送一个值。如果通道已满,则启动操作必须等待另一个 goroutine 退出。当一个 goroutine 完成时,它会从该通道读取一个项目。
-
使用一个
sync.Pool。预先填充 n 个项目。只要pool.Get()能返回,新的 goroutine 就可以启动。完成的 goroutine 调用pool.Put()。 -
或者使用一个简单的整数,并通过互斥锁管理对其的并发访问。(以避免数据竞争。)
我不确定是否存在一个特定的、可以被认为是既定的模式来处理这种情况。所有这些方法的主要关键在于,计数必须以线程安全的方式进行。
感谢您的回复。经过一番折腾,我最终得到了以下代码。它似乎能完成工作 🙂
谢谢 Cam
import (
"log"
"math/rand"
"strconv"
"sync"
"time"
)
type Job struct {
Mu sync.Mutex
GoRoutineCount int
}
var job Job
func (j *Job) runThread(ch chan int, done chan bool) {
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
log.Printf("%-20s %s\n", "No of Active GoRoutines", strconv.Itoa(j.GoRoutineCount))
<-ch
j.Mu.Lock()
j.GoRoutineCount--
j.Mu.Unlock()
if j.GoRoutineCount == 0 {
done <- true
}
}
func main() {
chBuffered := make(chan int, 8)
noOfJobs := 25
chDone := make(chan bool)
go func() {
for i := 0; i < noOfJobs; i++ {
job.Mu.Lock()
job.GoRoutineCount++
job.Mu.Unlock()
chBuffered <- i
go job.runThread(chBuffered, chDone)
}
close(chBuffered)
}()
<-chDone
}
在Golang中,使用带缓冲的channel是控制并发goroutine数量的标准模式。以下是两种常见实现方式:
方案1:使用带缓冲的channel作为信号量
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, task string, wg *sync.WaitGroup, sem chan struct{}) {
defer wg.Done()
// 获取信号量(如果channel已满则阻塞)
sem <- struct{}{}
defer func() { <-sem }() // 释放信号量
fmt.Printf("Worker %d processing: %s\n", id, task)
time.Sleep(time.Second) // 模拟任务处理
fmt.Printf("Worker %d completed: %s\n", id, task)
}
func main() {
const maxConcurrent = 3 // 最大并发数
const totalTasks = 10
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
for i := 0; i < totalTasks; i++ {
wg.Add(1)
go worker(i, fmt.Sprintf("task-%d", i), &wg, sem)
}
wg.Wait()
fmt.Println("All tasks completed")
}
方案2:使用worker pool模式(更适用于动态添加任务)
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ID int
Name string
}
func workerPool(maxWorkers int, tasks <-chan Task, wg *sync.WaitGroup) {
var workerWg sync.WaitGroup
for i := 0; i < maxWorkers; i++ {
workerWg.Add(1)
go func(workerID int) {
defer workerWg.Done()
for task := range tasks {
fmt.Printf("Worker %d processing: %s\n", workerID, task.Name)
time.Sleep(time.Second) // 模拟任务处理
fmt.Printf("Worker %d completed: %s\n", workerID, task.Name)
wg.Done()
}
}(i)
}
workerWg.Wait()
}
func main() {
const maxWorkers = 3
taskChan := make(chan Task, 100) // 缓冲任务队列
var wg sync.WaitGroup
// 启动worker pool
go workerPool(maxWorkers, taskChan, &wg)
// 动态添加任务
for i := 0; i < 10; i++ {
wg.Add(1)
taskChan <- Task{ID: i, Name: fmt.Sprintf("task-%d", i)}
}
// 可以继续动态添加任务
time.Sleep(500 * time.Millisecond)
for i := 10; i < 15; i++ {
wg.Add(1)
taskChan <- Task{ID: i, Name: fmt.Sprintf("task-%d", i)}
}
close(taskChan) // 关闭channel表示不再添加新任务
wg.Wait()
fmt.Println("All tasks completed")
}
方案3:使用errgroup.Group(第三方库,更简洁)
package main
import (
"context"
"fmt"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
func main() {
const maxConcurrent = 3
g, ctx := errgroup.WithContext(context.Background())
sem := semaphore.NewWeighted(int64(maxConcurrent))
for i := 0; i < 10; i++ {
taskID := i
g.Go(func() error {
// 获取信号量
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
fmt.Printf("Processing task %d\n", taskID)
time.Sleep(time.Second)
fmt.Printf("Completed task %d\n", taskID)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Printf("Error: %v\n", err)
}
fmt.Println("All tasks completed")
}
第一种方案最简单直接,使用带缓冲的channel作为计数信号量。第二种方案worker pool模式更适合需要动态添加任务的场景。第三种方案使用了标准库外的扩展包,提供了更高级的抽象。

