Golang中扇出模式的实际用途是什么

Golang中扇出模式的实际用途是什么 如果我没理解错的话,扇出模式是指多个 Go 协程从同一个通道读取数据的模式。我认为这样做的目的是为了更快地从通道接收值,但我不太确定。另外,能否请您介绍一下扇入模式及其目的和用例?

4 回复

其用途

在CPU核心之间分配负载。

更多关于Golang中扇出模式的实际用途是什么的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


首先,如果我说错了,希望有人能在这里纠正我,因为我还没有深入研究通道及其相关模式。

据我理解,一个潜在目的是能够建立某种广播系统——例如,如果你想向多个监听器广播消息,而不仅仅是一个。这可以有多种用例,分配工作是其中之一。

例如,在我的模拟实验案例中,我希望多个"聚合"实例在它们包含的"代理"死亡时接收消息(然后执行一些操作,比如停止监听来自该死亡代理的进一步消息,检查聚合是否也需要销毁,等等)。

另一个用例可能是聊天系统,其中消息可以广播给多个客户端。

是的,扇出是一种向多个客户端发送消息的模式。如果你想在 Go 中模拟这种情况,必须为每个客户端使用一个通道,否则第一个"空闲"的客户端会读取消息并将其从通道中移除。如下所示:

package main

import (
	"fmt"
	"time"
)

func client(name string, c <-chan string) {
	for {
		message := <-c
		fmt.Println(name, "got", message)
	}
}

func main() {
	n := 10
	c := make(chan string, 1)

	for i := 1; i <= n; i++ {
		name := fmt.Sprintf("Client %d", i)
		go client(name, c)
	}

	c <- "Message 1"
	c <- "Message 2"

	for {
		// 只是休眠,否则程序会在 goroutine 运行前结束
		time.Sleep(time.Second)
	}
}

结果:

$ go run main2.go
Client 5 got Message 2
Client 2 got Message 1

但是将所有通道保存在一个切片中并遍历它们,可以让所有客户端都收到消息

package main

import (
	"fmt"
	"time"
)

func client(name string, c <-chan string) {
	for {
		message := <-c
		fmt.Println(name, "got", message)
	}
}

func main() {
	n := 10
	clientChannels := []chan string{}
	for i := 1; i <= n; i++ {
		c := make(chan string, 1)
		clientChannels = append(clientChannels, c)
		name := fmt.Sprintf("Client %d", i)
		go client(name, c)
	}
	for _, c := range clientChannels {
		c <- "Message 1"
		c <- "Message 2"
	}

	for {
		// 只是休眠,否则程序会在 goroutine 运行前结束
		time.Sleep(time.Second)
	}
}

结果:

 go run main.go
Client 3 got Message 1
Client 3 got Message 2
Client 4 got Message 1
Client 4 got Message 2
Client 7 got Message 1
Client 7 got Message 2
Client 5 got Message 1
Client 5 got Message 2
Client 6 got Message 1
Client 6 got Message 2
Client 2 got Message 1
Client 2 got Message 2
Client 9 got Message 1
Client 9 got Message 2
Client 10 got Message 1
Client 10 got Message 2
Client 1 got Message 1
Client 8 got Message 1
Client 8 got Message 2
Client 1 got Message 2

在Go语言中,扇出模式(fan-out)和扇入模式(fan-in)是并发编程中处理数据流的常见模式,它们分别用于分发和聚合数据。

扇出模式(Fan-out)

扇出模式指的是多个goroutine从同一个通道读取数据,目的是并行处理数据以提高吞吐量。这通常用于工作池场景,其中多个worker goroutine并发处理来自单个输入通道的任务。

实际用途示例

  • 处理来自消息队列(如Kafka或RabbitMQ)的事件,多个消费者并行处理消息。
  • 并行执行CPU密集型任务,如图像处理或数据转换。

代码示例

package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
    }
}

func main() {
    jobs := make(chan int, 100)
    var wg sync.WaitGroup

    // 启动3个worker goroutines(扇出)
    for w := 1; w <= 3; w++ {
        wg.Add(1)
        go worker(w, jobs, &wg)
    }

    // 发送任务到通道
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)

    wg.Wait()
}

扇入模式(Fan-in)

扇入模式指的是多个通道的数据被合并到一个通道中,目的是聚合来自多个源的数据流。这通常用于收集多个goroutine的结果。

实际用途示例

  • 合并多个API调用的响应。
  • 聚合来自不同数据源的日志或指标。

代码示例

package main

import (
    "fmt"
    "sync"
)

func producer(id int) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; i < 3; i++ {
            ch <- id*10 + i
        }
    }()
    return ch
}

func fanIn(inputs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    // 为每个输入通道启动一个goroutine
    for _, input := range inputs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for n := range ch {
                out <- n
            }
        }(input)
    }

    // 等待所有输入通道关闭后关闭输出通道
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    // 创建3个生产者通道
    p1 := producer(1)
    p2 := producer(2)
    p3 := producer(3)

    // 扇入合并
    result := fanIn(p1, p2, p3)

    // 消费合并后的数据
    for n := range result {
        fmt.Println(n)
    }
}

扇出模式通过并行处理提高性能,扇入模式通过数据聚合简化消费逻辑。在实际应用中,这两种模式经常结合使用,例如在管道处理架构中,多个worker处理数据后通过扇入模式合并结果。

回到顶部