Golang实现多生产者与多消费者模式的最佳实践

Golang实现多生产者与多消费者模式的最佳实践 我一直在编写多消费者和多生产者模式的代码,但我的代码出现了死锁错误。

我创建了一个匿名函数,以便消费者函数中的“for…range”不会永远等待。

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	noOfProducer := 10
	noOfConsumer := 10
	data := make(chan int64)
	var wg sync.WaitGroup

	// producer
	var ops int64
	for i := 0; i < noOfProducer; i++ {
		wg.Add(1)
		go func() {
			for c := 0; c < 100; c++ {
				atomic.AddInt64(&ops, 1)
				data <- atomic.LoadInt64(&ops)
			}
			wg.Done()
		}()
	}

	go func() {
		wg.Wait()
		close(data)
	}()

	// consumer
	for i := 0; i < noOfConsumer; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			for data := range data {
				fmt.Printf("Value of i = %d Printed by consumer %d\n", data, i)
			}
		}(i)
	}
	wg.Wait()
}

非常感谢任何帮助。


更多关于Golang实现多生产者与多消费者模式的最佳实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html

4 回复

感谢您的快速回复;这个解释解决了问题。

更多关于Golang实现多生产者与多消费者模式的最佳实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


消费者例程在数据通道关闭后调用 defer wg.Done()。但是通道在所有例程完成后,在 wg.Wait() 之后才关闭。看到这些条件是如何相互阻塞的了吗?

这里应该使用两个不同的 sync.WaitGroup,一个用于生产者,另一个用于消费者。wg.Wait() 无法判断它应该等待消费者完成,还是等待生产者完成。

这里 是一个包含所讨论更改的有效代码。

func main() {
    fmt.Println("hello world")
}

你的代码存在两个关键问题:1)sync.WaitGroup被重复使用导致计数混乱;2)生产者goroutine完成后过早关闭通道。以下是修正后的实现:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	noOfProducer := 10
	noOfConsumer := 10
	data := make(chan int64)
	var producerWg sync.WaitGroup
	var consumerWg sync.WaitGroup

	// 生产者
	var ops int64
	for i := 0; i < noOfProducer; i++ {
		producerWg.Add(1)
		go func() {
			defer producerWg.Done()
			for c := 0; c < 100; c++ {
				val := atomic.AddInt64(&ops, 1)
				data <- val
			}
		}()
	}

	// 消费者
	for i := 0; i < noOfConsumer; i++ {
		consumerWg.Add(1)
		go func(id int) {
			defer consumerWg.Done()
			for val := range data {
				fmt.Printf("Value %d printed by consumer %d\n", val, id)
			}
		}(i)
	}

	// 等待所有生产者完成,然后关闭通道
	go func() {
		producerWg.Wait()
		close(data)
	}()

	// 等待所有消费者完成
	consumerWg.Wait()
}

更简洁的版本使用单个WaitGroup但正确管理计数:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	noOfProducer := 10
	noOfConsumer := 10
	data := make(chan int64)
	var wg sync.WaitGroup

	// 启动生产者
	var ops int64
	wg.Add(noOfProducer)
	for i := 0; i < noOfProducer; i++ {
		go func() {
			defer wg.Done()
			for c := 0; c < 100; c++ {
				val := atomic.AddInt64(&ops, 1)
				data <- val
			}
		}()
	}

	// 启动消费者
	wg.Add(noOfConsumer)
	for i := 0; i < noOfConsumer; i++ {
		go func(id int) {
			defer wg.Done()
			for val := range data {
				fmt.Printf("Value %d printed by consumer %d\n", val, id)
			}
		}(i)
	}

	// 等待生产者完成并关闭通道
	go func() {
		wg.Wait()
		close(data)
	}()

	// 等待消费者完成
	wg.Wait()
}

关键修正点:

  1. 使用独立的WaitGroup或正确管理单个WaitGroup的计数
  2. 确保所有生产者完成后才关闭通道
  3. 消费者在通道关闭后自动退出range循环
  4. 使用defer wg.Done()确保goroutine结束时计数减少
回到顶部