golang结构化并发编程简化插件库flowmatic的使用
Golang结构化并发编程简化插件库flowmatic的使用
Flowmatic是一个通用的Go库,提供了一种结构化的并发编程方法。它让你可以轻松管理并发任务,方式简单但有效且灵活。
特性
- 具有简单的API,比使用channel/waitgroup/mutex更具可读性
- 处理各种并发问题,如异构任务组、对切片执行同构任务以及动态工作生成
- 聚合错误
- 正确跨goroutine边界传播panic
- 提供上下文取消的帮助函数
- 依赖少
- 测试覆盖率高
如何使用Flowmatic
执行异构任务
Flowmatic解决了管理并行执行多个独立任务的问题。例如,你想向三个不同的下游API发送数据。如果任何发送失败,你想返回一个错误。使用传统的Go并发,这会很快变得复杂难以管理。Flowmatic让它变得简单。
使用flowmatic.Do
执行异构任务:
err := flowmatic.Do(
func() error {
return doThingA(),
},
func() error {
return doThingB(),
},
func() error {
return doThingC(),
})
要在第一个错误时取消任务的上下文,使用flowmatic.All
。要在第一个成功时取消任务的上下文,使用flowmatic.Race
。
// 创建变量保存响应
var pageA, pageB, pageC string
// 竞争请求看谁先响应
err := flowmatic.Race(ctx,
func(ctx context.Context) error {
var err error
pageA, err = request(ctx, "A")
return err
},
func(ctx context.Context) error {
var err error
pageB, err = request(ctx, "B")
return err
},
func(ctx context.Context) error {
var err error
pageC, err = request(ctx, "C")
return err
},
)
执行同构任务
flowmatic.Each
在需要对切片中的每个项目使用工作池执行相同任务时很有用:
things := []someType{thingA, thingB, thingC}
err := flowmatic.Each(numWorkers, things,
func(thing someType) error {
foo := thing.Frobincate()
return foo.DoSomething()
})
使用flowmatic.Map
将输入切片映射到输出切片:
func Google(ctx context.Context, query string) ([]Result, error) {
searches := []Search{Web, Image, Video}
return flowmatic.Map(ctx, flowmatic.MaxProcs, searches,
func(ctx context.Context, search Search) (Result, error) {
return search(ctx, query)
})
}
管理生成新任务的任务
对于可能创建更多工作的任务,使用flowmatic.ManageTasks
。创建一个将被串行执行的manager,并让它保存结果并检查任务的输出以决定是否有更多工作要做。
// 任务获取页面并提取URL
task := func(u string) ([]string, error) {
page, err := getURL(ctx, u)
if err != nil {
return nil, err
}
return getLinks(page), nil
}
// 从页面到链接的映射
// 不需要锁,因为只有manager会访问它
results := map[string][]string{}
var managerErr error
// manager跟踪哪些页面已被访问和结果图
manager := func(req string, links []string, err error) ([]string, bool) {
// 在第一个错误后停止执行
if err != nil {
managerErr = err
return nil, false
}
// 在map中保存最终结果
results[req] = urls
// 检查要抓取的新页面
var newpages []string
for _, link := range links {
if _, ok := results[link]; ok {
// 已见过,尝试下一个链接
continue
}
// 添加到新页面列表
newpages = append(newpages, link)
// 向map添加占位符以防止重复抓取
results[link] = nil
}
return newpages, true
}
// 使用与GOMAXPROCS一样多的工作者处理任务
flowmatic.ManageTasks(flowmatic.MaxProcs, task, manager, "http://example.com/")
// 检查是否有错误
if managerErr != nil {
fmt.Println("error", managerErr)
}
使用TaskPool的高级模式
对于非常高级的用途,flowmatic.TaskPool
消除了管理工作池的样板代码。
// MD5All读取根目录下的所有文件
// 并返回从文件路径到文件内容MD5总和的映射
// 如果目录遍历失败或任何读取操作失败,
// MD5All返回一个错误
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
// 创建20个digester的池
in, out := flowmatic.TaskPool(20, digest)
m := make(map[string][md5.Size]byte)
// 打开两个goroutine:
// 一个用于通过遍历文件系统读取文件名
// 一个用于记录digester的结果到map中
err := flowmatic.All(ctx,
func(ctx context.Context) error {
return walkFilesystem(ctx, root, in)
},
func(ctx context.Context) error {
for r := range out {
if r.Err != nil {
return r.Err
}
m[r.In] = *r.Out
}
return nil
},
)
return m, err
}
关于panic的注意事项
在Go中,如果goroutine中出现panic且未被恢复,则整个进程将关闭。Flowmatic通过捕获工作goroutine中发生的panic并在父goroutine中重新传播它来解决这个问题,因此可以在适当的级别捕获和记录panic。
更多关于golang结构化并发编程简化插件库flowmatic的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang结构化并发编程简化插件库flowmatic的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang结构化并发编程与flowmatic库使用指南
什么是结构化并发
结构化并发是一种编程范式,它强调并发任务应该具有明确的生命周期和清晰的父子关系。在Go语言中,我们通常使用goroutine和channel来实现并发,但原生goroutine缺乏结构化特性,容易导致"goroutine泄漏"等问题。
flowmatic库简介
flowmatic是一个简化Go结构化并发编程的轻量级库,它提供了更高级的抽象来管理goroutine的生命周期。下面我将介绍flowmatic的核心功能和使用方法。
基本用法示例
1. 任务组(TaskGroup)
package main
import (
"fmt"
"time"
"github.com/carlmjohnson/flowmatic"
)
func worker(id int) error {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
return nil
}
func main() {
// 创建并运行3个worker
err := flowmatic.Do(
func() error { return worker(1) },
func() error { return worker(2) },
func() error { return worker(3) },
)
if err != nil {
fmt.Println("Error:", err)
}
}
2. 带错误处理的并发任务
func main() {
err := flowmatic.Do(
func() error { return worker(1) },
func() error { return fmt.Errorf("模拟错误") },
func() error { return worker(3) },
)
// 会返回第一个遇到的错误
fmt.Println("结果:", err) // 输出: 结果: 模拟错误
}
3. 限制并发数
func main() {
tasks := make([]func() error, 10)
for i := range tasks {
id := i + 1
tasks[i] = func() error { return worker(id) }
}
// 最多同时运行2个任务
err := flowmatic.Do(flowmatic.MaxConcurrency(2, tasks...))
if err != nil {
fmt.Println("Error:", err)
}
}
高级用法
1. 管道模式(Pipeline)
func double(in int) (int, error) {
return in * 2, nil
}
func addFive(in int) (int, error) {
return in + 5, nil
}
func main() {
result, err := flowmatic.Pipe(
10,
double,
addFive,
double,
)
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("Result:", result) // 输出: Result: 50
}
2. 超时控制
func slowTask() error {
time.Sleep(2 * time.Second)
return nil
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
err := flowmatic.DoContext(ctx,
func() error { return slowTask() },
func() error { return worker(2) },
)
if err != nil {
fmt.Println("Error:", err) // 会输出context deadline exceeded错误
}
}
最佳实践
- 明确任务边界:每个任务函数应该完成一个明确的子任务
- 合理设置并发数:根据资源情况限制最大并发数
- 错误处理:确保任务函数能返回适当的错误
- 上下文传递:长时间运行的任务应该支持context取消
与原生goroutine对比
特性 | 原生goroutine | flowmatic |
---|---|---|
生命周期管理 | 手动 | 自动 |
错误传播 | 复杂 | 简单 |
并发控制 | 需要额外代码 | 内置支持 |
结构化 | 无 | 有 |
资源清理 | 容易泄漏 | 自动清理 |
flowmatic通过提供更高级的抽象,使得并发代码更易于编写和维护,特别适合需要管理多个并发任务的场景。