golang实现循环屏障同步机制的并发编程插件库cyclicbarrier的使用

Golang实现循环屏障同步机制的并发编程插件库cyclicbarrier的使用

cyclicbarrier简介

cyclicbarrier是一个同步器,它允许一组goroutine相互等待,直到所有goroutine都到达一个共同的执行点(也称为屏障)。

该库灵感来源于Java的CyclicBarrier和C#的Barrier。

使用方法

初始化

import "github.com/marusama/cyclicbarrier"
...
b1 := cyclicbarrier.New(10) // 创建一个包含10个参与方的循环屏障
...
b2 := cyclicbarrier.NewWithAction(10, func() error { return nil }) // 创建一个包含10个参与方并定义了屏障动作的循环屏障

等待

b.Await(ctx)    // 等待其他参与方

重置

b.Reset()       // 重置屏障

简单示例

// 创建一个包含10个参与方的屏障,并定义一个增加计数器的动作
// 每次所有goroutine到达屏障时都会调用这个动作
cnt := 0
b := cyclicbarrier.NewWithAction(10, func() error {
    cnt++
    return nil
})

wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {           // 创建10个goroutine(与屏障参与方数量相同)
    wg.Add(1)
    go func() {
        for j := 0; j < 5; j++ {
            
            // 模拟一些耗时工作,执行5次
            time.Sleep(100 * time.Millisecond)                     
            
            err := b.Await(context.TODO()) // 等待屏障上的其他参与方
                                           // 最后一个到达的goroutine将执行屏障动作
                                           // 然后让所有其他goroutine进入下一轮
            if err != nil {
                panic(err)
            }
        }
        wg.Done()
    }()
}

wg.Wait()
fmt.Println(cnt)                    // cnt = 5,表示屏障被通过了5次

这个示例展示了cyclicbarrier的基本用法:

  1. 创建了一个包含10个参与方的屏障
  2. 定义了屏障动作(计数器增加)
  3. 启动了10个goroutine,每个执行5次工作循环
  4. 每次工作后调用Await()等待其他goroutine
  5. 最后打印计数器值,验证屏障被正确触发了5次

更多关于golang实现循环屏障同步机制的并发编程插件库cyclicbarrier的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现循环屏障同步机制的并发编程插件库cyclicbarrier的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang循环屏障(CyclicBarrier)并发编程指南

循环屏障(CyclicBarrier)是一种常见的并发同步机制,它允许多个goroutine在某个屏障点等待,直到所有goroutine都到达该点后才能继续执行。下面我将详细介绍如何在Go中实现和使用循环屏障。

1. 基本概念

循环屏障类似于一个集合点,当一组goroutine都到达屏障点时,它们才能继续执行。与一次性屏障不同,循环屏障可以重复使用。

2. 标准库实现

Go标准库中没有直接提供CyclicBarrier,但我们可以使用sync包中的原语来实现。下面是一个简单的实现:

package cyclicbarrier

import (
	"sync"
)

type CyclicBarrier struct {
	parties      int
	count       int
	cond        *sync.Cond
	isBroken    bool
	barrierFunc func()
}

func New(parties int, barrierFunc func()) *CyclicBarrier {
	if parties <= 0 {
		panic("parties must be positive number")
	}
	return &CyclicBarrier{
		parties:      parties,
		count:        parties,
		cond:        sync.NewCond(&sync.Mutex{}),
		barrierFunc: barrierFunc,
	}
}

func (cb *CyclicBarrier) Await() {
	cb.cond.L.Lock()
	defer cb.cond.L.Unlock()

	if cb.isBroken {
		panic("barrier is broken")
	}

	cb.count--
	if cb.count > 0 {
		// 等待其他goroutine到达
		cb.cond.Wait()
	} else {
		// 最后一个到达的goroutine执行屏障函数并唤醒其他goroutine
		cb.count = cb.parties // 重置计数器
		if cb.barrierFunc != nil {
			cb.barrierFunc()
		}
		cb.cond.Broadcast()
	}
}

func (cb *CyclicBarrier) Reset() {
	cb.cond.L.Lock()
	defer cb.cond.L.Unlock()
	
	cb.count = cb.parties
	cb.isBroken = false
	cb.cond.Broadcast()
}

3. 使用示例

下面是一个使用循环屏障的完整示例:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	const parties = 3
	
	// 创建循环屏障,设置参与方数量和屏障动作
	barrier := New(parties, func() {
		fmt.Println("\nAll goroutines reached the barrier\n")
	})

	var wg sync.WaitGroup
	wg.Add(parties)

	for i := 0; i < parties; i++ {
		go func(id int) {
			defer wg.Done()
			
			for j := 0; j < 3; j++ { // 循环3次
				// 模拟工作
				time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
				fmt.Printf("Goroutine %d completed phase %d\n", id, j)
				
				// 等待其他goroutine
				barrier.Await()
			}
		}(i)
	}

	wg.Wait()
	fmt.Println("All goroutines finished")
}

4. 第三方库

如果你不想自己实现,可以使用第三方库如github.com/marusama/cyclicbarrier

package main

import (
	"fmt"
	"time"

	"github.com/marusama/cyclicbarrier"
)

func main() {
	// 创建循环屏障
	b := cyclicbarrier.New(3)

	// 启动3个goroutine
	for i := 0; i < 3; i++ {
		go func(id int) {
			for j := 0; j < 3; j++ {
				fmt.Printf("Goroutine %d is working on phase %d\n", id, j)
				time.Sleep(time.Duration(id*100) * time.Millisecond)
				// 等待其他goroutine
				err := b.Await()
				if err != nil {
					fmt.Println(err)
					return
				}
				fmt.Printf("Goroutine %d passed barrier %d\n", id, j)
			}
		}(i)
	}

	time.Sleep(5 * time.Second)
}

5. 使用场景

循环屏障特别适用于以下场景:

  1. 并行计算中需要同步多个计算阶段
  2. 多阶段测试,每个阶段需要所有测试用例都完成才能进入下一阶段
  3. 模拟多玩家游戏中的回合制同步

6. 注意事项

  1. 确保屏障的参与方数量设置正确
  2. 处理可能的超时和中断情况
  3. 避免死锁,确保所有goroutine最终都能到达屏障点
  4. 考虑使用带超时的Await方法防止永久阻塞

通过合理使用循环屏障,可以有效地协调多个goroutine的执行流程,实现复杂的并发模式。

回到顶部