golang结构化并发编程工具插件库conc的使用
Golang结构化并发编程工具插件库conc的使用
简介
conc
是Go语言的结构化并发工具包,使常见任务更简单、更安全。
安装:
go get github.com/sourcegraph/conc
主要功能概览
- 使用
conc.WaitGroup
作为更安全的sync.WaitGroup
替代 - 使用
pool.Pool
实现并发限制的任务运行器 - 使用
pool.ResultPool
收集任务结果 - 使用
pool.(Result)?ErrorPool
处理可能失败的任务 - 使用
pool.(Result)?ContextPool
实现失败时取消任务 - 使用
stream.Stream
并行处理有序任务流 - 使用
iter.Map
并发映射切片 - 使用
iter.ForEach
并发迭代切片 - 使用
panics.Catcher
捕获goroutine中的panic
完整示例
基本WaitGroup使用
func main() {
var wg conc.WaitGroup
defer wg.Wait() // 确保等待所有goroutine完成
startTheThing(&wg)
}
func startTheThing(wg *conc.WaitGroup) {
wg.Go(func() {
// 并发任务代码
})
}
带并发限制的任务池
func process(stream chan int) {
p := pool.New().WithMaxGoroutines(10) // 限制10个并发
for elem := range stream {
elem := elem // 重要:创建局部变量副本
p.Go(func() {
handle(elem)
})
}
p.Wait()
}
并发映射切片
func concMap(input []int, f func(*int) int) []int {
return iter.Map(input, f)
}
处理有序流
func mapStream(in chan int, out chan int, f func(int) int) {
s := stream.New().WithMaxGoroutines(10)
for elem := range in {
elem := elem
s.Go(func() stream.Callback {
res := f(elem)
return func() { out <- res }
})
}
s.Wait()
}
主要目标
- 防止goroutine泄漏:所有并发操作都有明确的作用域和所有者
- 优雅处理panic:自动捕获并传播panic信息
- 简化并发代码:减少样板代码,提高可读性
状态说明
当前版本为1.0之前的预览版,在1.0正式发布前可能会有小的API调整。
更多关于golang结构化并发编程工具插件库conc的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang结构化并发编程工具插件库conc的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang结构化并发编程工具库conc的使用指南
conc
是一个Go语言的并发工具库,旨在提供更安全、更结构化的并发编程方式。它通过提供高级抽象来简化并发模式,减少goroutine泄漏和竞态条件的风险。
主要特性
- 结构化并发:确保所有goroutine都能正确完成或取消
- 任务组:方便地管理一组相关goroutine
- 迭代器:简化并行迭代操作
- 流处理:提供类似管道的并发数据处理
安装
go get github.com/sourcegraph/conc
核心组件及使用示例
1. WaitGroup替代品 - conc.WaitGroup
比标准库的sync.WaitGroup
更安全,能捕获panic并提供更好的错误处理。
package main
import (
"fmt"
"github.com/sourcegraph/conc"
)
func main() {
var wg conc.WaitGroup
defer wg.Wait() // 确保所有任务完成
wg.Go(func() {
fmt.Println("Task 1 running")
})
wg.Go(func() {
fmt.Println("Task 2 running")
})
// 会自动等待所有任务完成
}
2. 并行迭代 - conc.Iterator
package main
import (
"fmt"
"github.com/sourcegraph/conc"
)
func main() {
items := []int{1, 2, 3, 4, 5}
// 并行处理每个元素
conc.Iterator(len(items), func(i int) {
item := items[i]
fmt.Printf("Processing item %d\n", item)
// 处理逻辑...
})
}
3. 结构化并发 - conc.Context
package main
import (
"context"
"fmt"
"time"
"github.com/sourcegraph/conc"
)
func main() {
ctx := context.Background()
concCtx := conc.NewContext(ctx)
concCtx.Go(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default:
fmt.Println("Worker running")
time.Sleep(500 * time.Millisecond)
}
}
})
// 5秒后取消所有goroutine
time.Sleep(5 * time.Second)
concCtx.Cancel()
}
4. 流处理 - conc.Stream
package main
import (
"fmt"
"github.com/sourcegraph/conc"
)
func main() {
// 创建数据源
source := make(chan int)
go func() {
defer close(source)
for i := 0; i < 10; i++ {
source <- i
}
}()
// 创建流处理器
stream := conc.NewStream(source)
// 添加处理阶段 - 平方运算
squared := stream.Map(func(x int) int {
return x * x
})
// 添加处理阶段 - 过滤偶数
filtered := squared.Filter(func(x int) bool {
return x%2 == 0
})
// 收集结果
results := make([]int, 0)
for x := range filtered.Out() {
results = append(results, x)
fmt.Println("Processed:", x)
}
fmt.Println("Final results:", results)
}
5. 错误处理 - conc.WithRecover
package main
import (
"fmt"
"github.com/sourcegraph/conc"
)
func main() {
var wg conc.WaitGroup
wg.Go(conc.WithRecover(func() {
// 这个任务可能会panic
panic("something went wrong")
}))
wg.Go(conc.WithRecover(func() {
fmt.Println("This task will run successfully")
}))
wg.Wait() // 不会因为panic而崩溃
}
最佳实践
- 始终使用
defer wg.Wait()
:确保所有goroutine完成 - 限制并发度:使用
conc.NewPool(workers int)
创建有限数量的worker - 处理错误:使用
conc.WithRecover
捕获panic - 使用context:通过context实现优雅取消
性能考虑
conc
在提供安全性的同时会有轻微的性能开销,适合以下场景:
- 需要结构化并发管理的业务逻辑
- 复杂的并发流程控制
- 需要更好的错误处理和可观测性的场景
对于极高性能要求的底层并发,可能更适合直接使用原生goroutine和channel。
conc
库通过提供更高级的抽象,使Go并发编程更安全、更易维护,特别适合中大型项目中的并发控制需求。