golang结构化并发编程简化插件库flowmatic的使用

Golang结构化并发编程简化插件库flowmatic的使用

Flowmatic是一个通用的Go库,提供了一种结构化的并发编程方法。它让你可以轻松管理并发任务,方式简单但有效且灵活。

特性

  • 具有简单的API,比使用channel/waitgroup/mutex更具可读性
  • 处理各种并发问题,如异构任务组、对切片执行同构任务以及动态工作生成
  • 聚合错误
  • 正确跨goroutine边界传播panic
  • 提供上下文取消的帮助函数
  • 依赖少
  • 测试覆盖率高

如何使用Flowmatic

执行异构任务

Flowmatic解决了管理并行执行多个独立任务的问题。例如,你想向三个不同的下游API发送数据。如果任何发送失败,你想返回一个错误。使用传统的Go并发,这会很快变得复杂难以管理。Flowmatic让它变得简单。

使用flowmatic.Do执行异构任务:

err := flowmatic.Do(
    func() error {
        return doThingA(),
    },
    func() error {
        return doThingB(),
    },
    func() error {
        return doThingC(),
    })

要在第一个错误时取消任务的上下文,使用flowmatic.All。要在第一个成功时取消任务的上下文,使用flowmatic.Race

// 创建变量保存响应
var pageA, pageB, pageC string
// 竞争请求看谁先响应
err := flowmatic.Race(ctx,
    func(ctx context.Context) error {
        var err error
        pageA, err = request(ctx, "A")
        return err
    },
    func(ctx context.Context) error {
        var err error
        pageB, err = request(ctx, "B")
        return err
    },
    func(ctx context.Context) error {
        var err error
        pageC, err = request(ctx, "C")
        return err
    },
)

执行同构任务

flowmatic.Each在需要对切片中的每个项目使用工作池执行相同任务时很有用:

things := []someType{thingA, thingB, thingC}

err := flowmatic.Each(numWorkers, things,
    func(thing someType) error {
        foo := thing.Frobincate()
        return foo.DoSomething()
    })

使用flowmatic.Map将输入切片映射到输出切片:

func Google(ctx context.Context, query string) ([]Result, error) {
    searches := []Search{Web, Image, Video}
    return flowmatic.Map(ctx, flowmatic.MaxProcs, searches,
        func(ctx context.Context, search Search) (Result, error) {
            return search(ctx, query)
        })
}

管理生成新任务的任务

对于可能创建更多工作的任务,使用flowmatic.ManageTasks。创建一个将被串行执行的manager,并让它保存结果并检查任务的输出以决定是否有更多工作要做。

// 任务获取页面并提取URL
task := func(u string) ([]string, error) {
    page, err := getURL(ctx, u)
    if err != nil {
        return nil, err
    }
    return getLinks(page), nil
}

// 从页面到链接的映射
// 不需要锁,因为只有manager会访问它
results := map[string][]string{}
var managerErr error

// manager跟踪哪些页面已被访问和结果图
manager := func(req string, links []string, err error) ([]string, bool) {
    // 在第一个错误后停止执行
    if err != nil {
        managerErr = err
        return nil, false
    }
    // 在map中保存最终结果
    results[req] = urls

    // 检查要抓取的新页面
    var newpages []string
    for _, link := range links {
        if _, ok := results[link]; ok {
            // 已见过,尝试下一个链接
            continue
        }
        // 添加到新页面列表
        newpages = append(newpages, link)
        // 向map添加占位符以防止重复抓取
        results[link] = nil
    }
    return newpages, true
}

// 使用与GOMAXPROCS一样多的工作者处理任务
flowmatic.ManageTasks(flowmatic.MaxProcs, task, manager, "http://example.com/")
// 检查是否有错误
if managerErr != nil {
    fmt.Println("error", managerErr)
}

使用TaskPool的高级模式

对于非常高级的用途,flowmatic.TaskPool消除了管理工作池的样板代码。

// MD5All读取根目录下的所有文件
// 并返回从文件路径到文件内容MD5总和的映射
// 如果目录遍历失败或任何读取操作失败,
// MD5All返回一个错误
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
    // 创建20个digester的池
    in, out := flowmatic.TaskPool(20, digest)

    m := make(map[string][md5.Size]byte)
    // 打开两个goroutine:
    // 一个用于通过遍历文件系统读取文件名
    // 一个用于记录digester的结果到map中
    err := flowmatic.All(ctx,
        func(ctx context.Context) error {
            return walkFilesystem(ctx, root, in)
        },
        func(ctx context.Context) error {
            for r := range out {
                if r.Err != nil {
                    return r.Err
                }
                m[r.In] = *r.Out
            }
            return nil
        },
    )

    return m, err
}

关于panic的注意事项

在Go中,如果goroutine中出现panic且未被恢复,则整个进程将关闭。Flowmatic通过捕获工作goroutine中发生的panic并在父goroutine中重新传播它来解决这个问题,因此可以在适当的级别捕获和记录panic。


更多关于golang结构化并发编程简化插件库flowmatic的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang结构化并发编程简化插件库flowmatic的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang结构化并发编程与flowmatic库使用指南

什么是结构化并发

结构化并发是一种编程范式,它强调并发任务应该具有明确的生命周期和清晰的父子关系。在Go语言中,我们通常使用goroutine和channel来实现并发,但原生goroutine缺乏结构化特性,容易导致"goroutine泄漏"等问题。

flowmatic库简介

flowmatic是一个简化Go结构化并发编程的轻量级库,它提供了更高级的抽象来管理goroutine的生命周期。下面我将介绍flowmatic的核心功能和使用方法。

基本用法示例

1. 任务组(TaskGroup)

package main

import (
	"fmt"
	"time"
	
	"github.com/carlmjohnson/flowmatic"
)

func worker(id int) error {
	fmt.Printf("Worker %d starting\n", id)
	time.Sleep(time.Second)
	fmt.Printf("Worker %d done\n", id)
	return nil
}

func main() {
	// 创建并运行3个worker
	err := flowmatic.Do(
		func() error { return worker(1) },
		func() error { return worker(2) },
		func() error { return worker(3) },
	)
	
	if err != nil {
		fmt.Println("Error:", err)
	}
}

2. 带错误处理的并发任务

func main() {
	err := flowmatic.Do(
		func() error { return worker(1) },
		func() error { return fmt.Errorf("模拟错误") },
		func() error { return worker(3) },
	)
	
	// 会返回第一个遇到的错误
	fmt.Println("结果:", err) // 输出: 结果: 模拟错误
}

3. 限制并发数

func main() {
	tasks := make([]func() error, 10)
	for i := range tasks {
		id := i + 1
		tasks[i] = func() error { return worker(id) }
	}
	
	// 最多同时运行2个任务
	err := flowmatic.Do(flowmatic.MaxConcurrency(2, tasks...))
	if err != nil {
		fmt.Println("Error:", err)
	}
}

高级用法

1. 管道模式(Pipeline)

func double(in int) (int, error) {
	return in * 2, nil
}

func addFive(in int) (int, error) {
	return in + 5, nil
}

func main() {
	result, err := flowmatic.Pipe(
		10,
		double,
		addFive,
		double,
	)
	
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	
	fmt.Println("Result:", result) // 输出: Result: 50
}

2. 超时控制

func slowTask() error {
	time.Sleep(2 * time.Second)
	return nil
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	
	err := flowmatic.DoContext(ctx,
		func() error { return slowTask() },
		func() error { return worker(2) },
	)
	
	if err != nil {
		fmt.Println("Error:", err) // 会输出context deadline exceeded错误
	}
}

最佳实践

  1. 明确任务边界:每个任务函数应该完成一个明确的子任务
  2. 合理设置并发数:根据资源情况限制最大并发数
  3. 错误处理:确保任务函数能返回适当的错误
  4. 上下文传递:长时间运行的任务应该支持context取消

与原生goroutine对比

特性 原生goroutine flowmatic
生命周期管理 手动 自动
错误传播 复杂 简单
并发控制 需要额外代码 内置支持
结构化
资源清理 容易泄漏 自动清理

flowmatic通过提供更高级的抽象,使得并发代码更易于编写和维护,特别适合需要管理多个并发任务的场景。

回到顶部