Golang中如何使用Channel控制Goroutine

Golang中如何使用Channel控制Goroutine 你好,

我有大量可以并行处理的任务,但希望在任何时候最多只允许“n”个任务同时运行。任务可能随时添加。我考虑过创建两个函数,通过通道来递增和递减一个计数器变量作为控制手段,但说实话,我对此有点困惑。对于这类问题,是否有既定的模式?

谢谢 Cam

5 回复

@christophberger 哈哈,说得很好,谢谢

更多关于Golang中如何使用Channel控制Goroutine的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


看起来不错,在 Playground 里运行得很好!

以下是我的实现:Go Playground - The Go 编程语言

  • 我选择了“缓冲通道”方法来设置上限。
  • 在启动一个新的 goroutine 之前,我尝试向通道写入。当通道已满时,写入操作会阻塞,直到另一个即将结束的 goroutine 从通道中读取一个值。
  • 通道项类型是“空结构体”,用于向读取者表明没有实际数据发送到通道。你也可以使用 bool 或 int。
  • 我使用 WaitGroup 让主 goroutine 等待所有其他 goroutine。

事实上,我不喜欢互斥锁 😊 在更复杂的同步场景中,很容易弄错时机,导致难以追踪的竞态条件或死锁。通道是互斥锁的一个便捷抽象(实际上,通道在内部使用了互斥锁),并且更容易推理。

你好 @tranman

欢迎来到论坛。

我会用以下方式来处理这样的任务。

首先,我只有在有工作需要完成时才会启动一个 goroutine,并且该 goroutine 会在工作完成后立即结束运行。这样可以避免管理一个 goroutine 的“工作池”。

然后,我可以使用各种方法来跟踪和限制同时运行的 goroutine 数量。

例如:

  1. 创建一个大小为 n 的缓冲通道。在启动一个 goroutine 之前,尝试向该通道发送一个值。如果通道已满,则启动操作必须等待另一个 goroutine 退出。当一个 goroutine 完成时,它会从该通道读取一个项目。

  2. 使用一个 sync.Pool。预先填充 n 个项目。只要 pool.Get() 能返回,新的 goroutine 就可以启动。完成的 goroutine 调用 pool.Put()

  3. 或者使用一个简单的整数,并通过互斥锁管理对其的并发访问。(以避免数据竞争。)

我不确定是否存在一个特定的、可以被认为是既定的模式来处理这种情况。所有这些方法的主要关键在于,计数必须以线程安全的方式进行。

@christophberger

感谢您的回复。经过一番折腾,我最终得到了以下代码。它似乎能完成工作 🙂

谢谢 Cam

import (
	"log"
	"math/rand"
	"strconv"
	"sync"
	"time"
)

type Job struct {
	Mu             sync.Mutex
	GoRoutineCount int
}

var job Job

func (j *Job) runThread(ch chan int, done chan bool) {
	time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
	log.Printf("%-20s %s\n", "No of Active GoRoutines", strconv.Itoa(j.GoRoutineCount))
	<-ch
	j.Mu.Lock()
	j.GoRoutineCount--
	j.Mu.Unlock()
	if j.GoRoutineCount == 0 {
		done <- true
	}
}
func main() {
	chBuffered := make(chan int, 8)
	noOfJobs := 25
	chDone := make(chan bool)
	go func() {
		for i := 0; i < noOfJobs; i++ {
			job.Mu.Lock()
			job.GoRoutineCount++
			job.Mu.Unlock()
			chBuffered <- i
			go job.runThread(chBuffered, chDone)
		}
		close(chBuffered)
	}()
	<-chDone
}

在Golang中,使用带缓冲的channel是控制并发goroutine数量的标准模式。以下是两种常见实现方式:

方案1:使用带缓冲的channel作为信号量

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, task string, wg *sync.WaitGroup, sem chan struct{}) {
	defer wg.Done()
	
	// 获取信号量(如果channel已满则阻塞)
	sem <- struct{}{}
	defer func() { <-sem }() // 释放信号量
	
	fmt.Printf("Worker %d processing: %s\n", id, task)
	time.Sleep(time.Second) // 模拟任务处理
	fmt.Printf("Worker %d completed: %s\n", id, task)
}

func main() {
	const maxConcurrent = 3 // 最大并发数
	const totalTasks = 10
	
	sem := make(chan struct{}, maxConcurrent)
	var wg sync.WaitGroup
	
	for i := 0; i < totalTasks; i++ {
		wg.Add(1)
		go worker(i, fmt.Sprintf("task-%d", i), &wg, sem)
	}
	
	wg.Wait()
	fmt.Println("All tasks completed")
}

方案2:使用worker pool模式(更适用于动态添加任务)

package main

import (
	"fmt"
	"sync"
	"time"
)

type Task struct {
	ID   int
	Name string
}

func workerPool(maxWorkers int, tasks <-chan Task, wg *sync.WaitGroup) {
	var workerWg sync.WaitGroup
	
	for i := 0; i < maxWorkers; i++ {
		workerWg.Add(1)
		go func(workerID int) {
			defer workerWg.Done()
			for task := range tasks {
				fmt.Printf("Worker %d processing: %s\n", workerID, task.Name)
				time.Sleep(time.Second) // 模拟任务处理
				fmt.Printf("Worker %d completed: %s\n", workerID, task.Name)
				wg.Done()
			}
		}(i)
	}
	
	workerWg.Wait()
}

func main() {
	const maxWorkers = 3
	taskChan := make(chan Task, 100) // 缓冲任务队列
	var wg sync.WaitGroup
	
	// 启动worker pool
	go workerPool(maxWorkers, taskChan, &wg)
	
	// 动态添加任务
	for i := 0; i < 10; i++ {
		wg.Add(1)
		taskChan <- Task{ID: i, Name: fmt.Sprintf("task-%d", i)}
	}
	
	// 可以继续动态添加任务
	time.Sleep(500 * time.Millisecond)
	for i := 10; i < 15; i++ {
		wg.Add(1)
		taskChan <- Task{ID: i, Name: fmt.Sprintf("task-%d", i)}
	}
	
	close(taskChan) // 关闭channel表示不再添加新任务
	wg.Wait()
	fmt.Println("All tasks completed")
}

方案3:使用errgroup.Group(第三方库,更简洁)

package main

import (
	"context"
	"fmt"
	"time"
	
	"golang.org/x/sync/errgroup"
	"golang.org/x/sync/semaphore"
)

func main() {
	const maxConcurrent = 3
	
	g, ctx := errgroup.WithContext(context.Background())
	sem := semaphore.NewWeighted(int64(maxConcurrent))
	
	for i := 0; i < 10; i++ {
		taskID := i
		g.Go(func() error {
			// 获取信号量
			if err := sem.Acquire(ctx, 1); err != nil {
				return err
			}
			defer sem.Release(1)
			
			fmt.Printf("Processing task %d\n", taskID)
			time.Sleep(time.Second)
			fmt.Printf("Completed task %d\n", taskID)
			return nil
		})
	}
	
	if err := g.Wait(); err != nil {
		fmt.Printf("Error: %v\n", err)
	}
	fmt.Println("All tasks completed")
}

第一种方案最简单直接,使用带缓冲的channel作为计数信号量。第二种方案worker pool模式更适合需要动态添加任务的场景。第三种方案使用了标准库外的扩展包,提供了更高级的抽象。

回到顶部