Golang中如何在一个goroutine满足条件时停止所有goroutines

Golang中如何在一个goroutine满足条件时停止所有goroutines 我正在学习 Go 语言中并发的工作原理以及如何使用它。 我现在面临的挑战是,当其中一个 goroutine 满足给定条件时,如何停止正在运行的 goroutine。

以下是一个简单的示例(这可能不是完成任务的最佳方式,但对我来说足够理解如何使用 goroutine,我希望 :D)

假设你有一个包含 N 个切片的切片(N 是一个相当大的数字),每个内部切片包含 M 个整数(其中 M 可以是 1 到 1000+ 之间的任意数字)。你现在想检查这些内部切片中是否包含一个给定的值(比如 2)。

为了利用并发,你可以并行地遍历外部切片的元素,然后遍历整数值以检查是否存在 2。为了避免等待内部迭代完成,你可以使用一个 stop 通道来通知 goroutine 已经找到了 2

以下是我为较小数据集编写的代码:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// main function
func main() {
	rand.Seed(time.Now().UnixNano())
	min := 1
	max := 50

	sofs := make([][]int, 50)

	for i := 0; i < 50; i++ {
		new_length := rand.Intn(max-min+1) + min

		sofs[i] = make([]int, new_length)

		for j := 0; j < new_length; j++ {
			sofs[i][j] = rand.Intn(max-min+1) + min
		}
	}

	// A channel
	achan := make(chan bool, len(sofs))
	var wg sync.WaitGroup

	stopCh := make(chan struct{})

	for _, x := range sofs {
		wg.Add(1)
		go func(s []int) {
			defer wg.Done()
			for l, vs := range s {
				select {
				case <-stopCh:
				default:
					if vs == 2 {
						close(stopCh)
						achan <- true
						fmt.Println(s)
						fmt.Println(l)
						return
					}
				}
			}
		}(x)
	}

	wg.Wait()
	close(achan)

	for v := range achan {
		if v {
			fmt.Println("2 is in sofs!")
			break
		}
	}

}

虽然它可以工作(如果随机数在某个内部切片中生成了 2),但内部迭代

for l, vs := range s {
				select {
				case <-stopCh:
				default:
					if vs == 2 {
						close(stopCh)
						achan <- true
						fmt.Println(s)
						fmt.Println(l)
						return
					}
				}
			}

需要先完成任务(vs == 2?)才能检查是否已向 stopCh 发送了停止信号。这意味着,如果任务不是简单的(? vs == 2),而是一个更耗时的任务,你可以用以下代码模拟:

for l, vs := range s {
				select {
				case <-stopCh:
				default:
					if vs == 2 {
                        time.Sleep(1000 * time.Second)
						close(stopCh)
						achan <- true
						fmt.Println(s)
						fmt.Println(l)
						return
					}
				}
			}

你会浪费时间…… 有没有办法在不等待 goroutine 检查 stopCh 的情况下终止它们?


更多关于Golang中如何在一个goroutine满足条件时停止所有goroutines的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

感谢提供这么棒的信息。

更多关于Golang中如何在一个goroutine满足条件时停止所有goroutines的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你的 goroutine 需要更频繁地检查是否已被取消。没有其他办法。

@clbanning 提到的 context 取消机制做的事情是一样的:你的代码和许多库函数会不时检查 context 是否已被取消并返回。这并不是即时返回的,在最后两次连续检查之间,资源确实会被消耗。

在Web服务器和数据库微服务中,取消Go协程是常见操作,有时会使用截止时间(Deadline)或传递给Go协程的context.Context变量来实现。实际上,标准库中添加了“context”包就是为了满足这类取消需求。(是的,你可能需要在Go协程中使用defer来协助清理工作,但这正是defer的用途所在。)

你好 @Pisistrato

在我看来,强制终止 goroutine 似乎不是一个好主意。在不可预测的时间点中断一个操作可能会导致数据损坏或更糟的情况。

换句话说,要被中断的代码应该意识到这种中断,并且应该有机会进行清理,留下一个一致的状态。

如果 select 语句中的某个 case 是一个真正长时间运行的操作,可以尝试将该操作拆分为独立的步骤,每个步骤都有确定的结果(也许还需要一个回滚策略)。这样,在代码逐步完成给定任务的同时,select 块可以更频繁地检查停止信号。

并且,首先进行一些基准测试,看看代码是否真的需要改进。过早的优化可能浪费时间,并导致代码更难阅读和维护。

在Go中,当需要在一个goroutine满足条件时停止所有goroutines,可以使用context.Context来实现更优雅的取消机制。以下是改进后的代码示例:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	min := 1
	max := 50

	sofs := make([][]int, 50)

	for i := 0; i < 50; i++ {
		newLength := rand.Intn(max-min+1) + min
		sofs[i] = make([]int, newLength)

		for j := 0; j < newLength; j++ {
			sofs[i][j] = rand.Intn(max-min+1) + min
		}
	}

	ctx, cancel := context.WithCancel(context.Background())
	var wg sync.WaitGroup
	foundChan := make(chan bool, 1)

	for _, x := range sofs {
		wg.Add(1)
		go func(s []int) {
			defer wg.Done()
			for l, vs := range s {
				select {
				case <-ctx.Done():
					return
				default:
					if vs == 2 {
						select {
						case foundChan <- true:
							cancel()
							fmt.Printf("Found 2 at slice %v, position %d\n", s, l)
						default:
						}
						return
					}
				}
			}
		}(x)
	}

	wg.Wait()
	close(foundChan)

	if <-foundChan {
		fmt.Println("2 is in sofs!")
	}
}

对于耗时任务的场景,可以使用带超时的context:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func processValue(ctx context.Context, value int) bool {
	select {
	case <-ctx.Done():
		return false
	case <-time.After(100 * time.Millisecond):
		return value == 2
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())
	min := 1
	max := 50

	sofs := make([][]int, 50)

	for i := 0; i < 50; i++ {
		newLength := rand.Intn(max-min+1) + min
		sofs[i] = make([]int, newLength)

		for j := 0; j < newLength; j++ {
			sofs[i][j] = rand.Intn(max-min+1) + min
		}
	}

	ctx, cancel := context.WithCancel(context.Background())
	var wg sync.WaitGroup
	foundChan := make(chan bool, 1)

	for _, x := range sofs {
		wg.Add(1)
		go func(s []int) {
			defer wg.Done()
			for l, vs := range s {
				select {
				case <-ctx.Done():
					return
				default:
					if processValue(ctx, vs) {
						select {
						case foundChan <- true:
							cancel()
							fmt.Printf("Found 2 at slice %v, position %d\n", s, l)
						default:
						}
						return
					}
				}
			}
		}(x)
	}

	wg.Wait()
	close(foundChan)

	if <-foundChan {
		fmt.Println("2 is in sofs!")
	}
}

使用context.ContextWithCancel创建可取消的上下文,当某个goroutine找到目标值时调用cancel(),其他goroutine通过检查ctx.Done()通道来立即退出,避免了不必要的计算。

回到顶部