golang实现循环屏障同步机制的并发编程插件库cyclicbarrier的使用
Golang实现循环屏障同步机制的并发编程插件库cyclicbarrier的使用
cyclicbarrier简介
cyclicbarrier是一个同步器,它允许一组goroutine相互等待,直到所有goroutine都到达一个共同的执行点(也称为屏障)。
该库灵感来源于Java的CyclicBarrier和C#的Barrier。
使用方法
初始化
import "github.com/marusama/cyclicbarrier"
...
b1 := cyclicbarrier.New(10) // 创建一个包含10个参与方的循环屏障
...
b2 := cyclicbarrier.NewWithAction(10, func() error { return nil }) // 创建一个包含10个参与方并定义了屏障动作的循环屏障
等待
b.Await(ctx) // 等待其他参与方
重置
b.Reset() // 重置屏障
简单示例
// 创建一个包含10个参与方的屏障,并定义一个增加计数器的动作
// 每次所有goroutine到达屏障时都会调用这个动作
cnt := 0
b := cyclicbarrier.NewWithAction(10, func() error {
cnt++
return nil
})
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ { // 创建10个goroutine(与屏障参与方数量相同)
wg.Add(1)
go func() {
for j := 0; j < 5; j++ {
// 模拟一些耗时工作,执行5次
time.Sleep(100 * time.Millisecond)
err := b.Await(context.TODO()) // 等待屏障上的其他参与方
// 最后一个到达的goroutine将执行屏障动作
// 然后让所有其他goroutine进入下一轮
if err != nil {
panic(err)
}
}
wg.Done()
}()
}
wg.Wait()
fmt.Println(cnt) // cnt = 5,表示屏障被通过了5次
这个示例展示了cyclicbarrier的基本用法:
- 创建了一个包含10个参与方的屏障
- 定义了屏障动作(计数器增加)
- 启动了10个goroutine,每个执行5次工作循环
- 每次工作后调用Await()等待其他goroutine
- 最后打印计数器值,验证屏障被正确触发了5次
更多关于golang实现循环屏障同步机制的并发编程插件库cyclicbarrier的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现循环屏障同步机制的并发编程插件库cyclicbarrier的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang循环屏障(CyclicBarrier)并发编程指南
循环屏障(CyclicBarrier)是一种常见的并发同步机制,它允许多个goroutine在某个屏障点等待,直到所有goroutine都到达该点后才能继续执行。下面我将详细介绍如何在Go中实现和使用循环屏障。
1. 基本概念
循环屏障类似于一个集合点,当一组goroutine都到达屏障点时,它们才能继续执行。与一次性屏障不同,循环屏障可以重复使用。
2. 标准库实现
Go标准库中没有直接提供CyclicBarrier,但我们可以使用sync
包中的原语来实现。下面是一个简单的实现:
package cyclicbarrier
import (
"sync"
)
type CyclicBarrier struct {
parties int
count int
cond *sync.Cond
isBroken bool
barrierFunc func()
}
func New(parties int, barrierFunc func()) *CyclicBarrier {
if parties <= 0 {
panic("parties must be positive number")
}
return &CyclicBarrier{
parties: parties,
count: parties,
cond: sync.NewCond(&sync.Mutex{}),
barrierFunc: barrierFunc,
}
}
func (cb *CyclicBarrier) Await() {
cb.cond.L.Lock()
defer cb.cond.L.Unlock()
if cb.isBroken {
panic("barrier is broken")
}
cb.count--
if cb.count > 0 {
// 等待其他goroutine到达
cb.cond.Wait()
} else {
// 最后一个到达的goroutine执行屏障函数并唤醒其他goroutine
cb.count = cb.parties // 重置计数器
if cb.barrierFunc != nil {
cb.barrierFunc()
}
cb.cond.Broadcast()
}
}
func (cb *CyclicBarrier) Reset() {
cb.cond.L.Lock()
defer cb.cond.L.Unlock()
cb.count = cb.parties
cb.isBroken = false
cb.cond.Broadcast()
}
3. 使用示例
下面是一个使用循环屏障的完整示例:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
const parties = 3
// 创建循环屏障,设置参与方数量和屏障动作
barrier := New(parties, func() {
fmt.Println("\nAll goroutines reached the barrier\n")
})
var wg sync.WaitGroup
wg.Add(parties)
for i := 0; i < parties; i++ {
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ { // 循环3次
// 模拟工作
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
fmt.Printf("Goroutine %d completed phase %d\n", id, j)
// 等待其他goroutine
barrier.Await()
}
}(i)
}
wg.Wait()
fmt.Println("All goroutines finished")
}
4. 第三方库
如果你不想自己实现,可以使用第三方库如github.com/marusama/cyclicbarrier
:
package main
import (
"fmt"
"time"
"github.com/marusama/cyclicbarrier"
)
func main() {
// 创建循环屏障
b := cyclicbarrier.New(3)
// 启动3个goroutine
for i := 0; i < 3; i++ {
go func(id int) {
for j := 0; j < 3; j++ {
fmt.Printf("Goroutine %d is working on phase %d\n", id, j)
time.Sleep(time.Duration(id*100) * time.Millisecond)
// 等待其他goroutine
err := b.Await()
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Goroutine %d passed barrier %d\n", id, j)
}
}(i)
}
time.Sleep(5 * time.Second)
}
5. 使用场景
循环屏障特别适用于以下场景:
- 并行计算中需要同步多个计算阶段
- 多阶段测试,每个阶段需要所有测试用例都完成才能进入下一阶段
- 模拟多玩家游戏中的回合制同步
6. 注意事项
- 确保屏障的参与方数量设置正确
- 处理可能的超时和中断情况
- 避免死锁,确保所有goroutine最终都能到达屏障点
- 考虑使用带超时的Await方法防止永久阻塞
通过合理使用循环屏障,可以有效地协调多个goroutine的执行流程,实现复杂的并发模式。