Golang无缓冲通道 - range用法、done信号与close操作

Golang无缓冲通道 - range用法、done信号与close操作 我正在阅读Katherine所著的《Go并发编程》的源代码。

kat-co/concurrency-in-go-src/blob/master/concurrency-patterns-in-go/pipelines/best-practices-for-constructing-pipelines/fig-pipelines-chan-stream-processing.go

package main

import (
	"fmt"
)

func main() {
	generator := func(done <-chan interface{}, integers ...int) <-chan int {
		intStream := make(chan int)
		go func() {
			defer close(intStream)
			for _, i := range integers {
				select {
				case <-done:
					return
				case intStream <- i:
				}
			}
		}()
		return intStream

通过编辑代码,我发现done并不是用来关闭intStreamaddStream或其他通道的。

一旦integers的范围被迭代完毕,intStream就会自行关闭。

那么,对一个无缓冲通道进行range操作意味着什么? 在无缓冲通道被关闭之前,这个range循环永远不会完全迭代完毕吗?

如果是这样的话,这里的done就完全没有被使用。因为intStream是自行关闭的?我希望有人能帮我确认我的理解是否正确。

https://play.golang.org/p/JSnwit_p6Ls

谢谢


更多关于Golang无缓冲通道 - range用法、done信号与close操作的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang无缓冲通道 - range用法、done信号与close操作的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中,对无缓冲通道进行range操作会持续从通道接收值,直到通道被关闭。你的理解基本正确,但有几个关键点需要澄清:

package main

import (
	"fmt"
	"time"
)

func main() {
	done := make(chan interface{})
	
	generator := func(done <-chan interface{}, integers ...int) <-chan int {
		intStream := make(chan int) // 无缓冲通道
		go func() {
			defer close(intStream) // 确保通道最终被关闭
			for _, i := range integers {
				select {
				case <-done: // done信号用于提前终止
					fmt.Println("generator: received done signal")
					return
				case intStream <- i:
					fmt.Printf("generator: sent %d\n", i)
				}
			}
			fmt.Println("generator: finished sending all integers")
		}()
		return intStream
	}

	// 消费者使用range迭代无缓冲通道
	consumer := func(done <-chan interface{}, intStream <-chan int) {
		for num := range intStream { // range会阻塞直到通道关闭
			select {
			case <-done:
				fmt.Println("consumer: received done signal")
				return
			default:
				fmt.Printf("consumer: received %d\n", num)
				time.Sleep(100 * time.Millisecond) // 模拟处理时间
			}
		}
		fmt.Println("consumer: channel closed")
	}

	// 测试正常完成
	fmt.Println("=== Test 1: Normal completion ===")
	intStream := generator(done, 1, 2, 3, 4, 5)
	go consumer(done, intStream)
	time.Sleep(1 * time.Second)

	// 测试提前终止
	fmt.Println("\n=== Test 2: Early termination ===")
	done2 := make(chan interface{})
	intStream2 := generator(done2, 1, 2, 3, 4, 5)
	
	go func() {
		time.Sleep(250 * time.Millisecond) // 发送部分数据后终止
		close(done2)
	}()
	
	consumer(done2, intStream2)
}

关键点说明:

  1. range与无缓冲通道

    for num := range intStream {
        // 这个循环会一直执行,直到intStream被关闭
        // 每次迭代都会阻塞等待新数据
    }
    
  2. done信号的作用

    • 在generator中:允许提前终止数据生成,而不是等待所有整数发送完毕
    • 在consumer中:允许提前停止消费,而不是等待通道关闭
  3. 通道关闭的时机

    defer close(intStream) // 确保goroutine退出时关闭通道
    

    通道关闭发生在:

    • 所有数据发送完毕(正常完成)
    • 收到done信号提前返回(提前终止)
  4. 实际使用场景示例

func main() {
	done := make(chan interface{})
	
	// 模拟长时间运行的任务
	go func() {
		time.Sleep(2 * time.Second)
		close(done) // 2秒后发送终止信号
	}()

	// 生成无限序列
	generator := func(done <-chan interface{}) <-chan int {
		ch := make(chan int)
		go func() {
			defer close(ch)
			i := 0
			for {
				select {
				case <-done:
					fmt.Println("Stopping infinite generator")
					return
				case ch <- i:
					i++
				}
			}
		}()
		return ch
	}

	ch := generator(done)
	
	// 消费者会在done信号发出后停止
	for num := range ch {
		fmt.Printf("Received: %d\n", num)
		if num > 5 {
			// 如果没有done机制,这个break只会停止当前goroutine
			// generator goroutine会继续运行,造成goroutine泄漏
			break
		}
	}
}

在你的代码中,done信号确实提供了提前终止的机制。如果没有done,即使消费者停止读取,generator goroutine也会继续运行直到所有数据发送完毕(可能会阻塞)。done信号使得整个流水线可以优雅地提前终止。

回到顶部