golang轻松编排goroutines的并发控制插件库go-floc的使用
Golang轻松编排goroutines的并发控制插件库go-floc的使用
简介
go-floc是一个用于轻松编排goroutines并发控制的Golang库。它的目标是让并行运行goroutines和同步它们的过程变得简单。
主要特性
- 易于使用的函数式接口
- 简单的并行化和任务同步
- 与直接使用goroutines和同步原语相比,尽可能少的开销
- 通过一个入口点和一个出口点提供更好的执行控制
安装要求
该包需要Go v1.12或更高版本。
安装命令:
go get github.com/workanator/go-floc/v3
核心概念
Flow(流程)
Flow是整个流程,可以通过floc.Flow
控制。Flow可以在执行的任何点被取消或完成。Flow只有一个入口点和一个出口点。
// 设计任务
flow := run.Sequence(do, something, here, ...)
// 入口点:运行任务
result, data, err := floc.Run(flow)
// 出口点:检查任务结果
if err != nil {
// 处理错误
} else if result.IsCompleted() {
// 处理成功情况
} else {
// 处理其他情况
}
Job(任务)
Job是Flow中最小的执行单元。Job函数的原型是floc.Job
。每个Job都可以通过floc.Context
读写数据,并通过floc.Control
控制流程。
func ValidateContentLength(ctx floc.Context, ctrl floc.Control) error {
request := ctx.Value("request").(http.Request)
// 如果请求体太大,取消流程并返回错误
if request.ContentLength > MaxContentLength {
return errors.New("content is too big")
}
return nil
}
完整示例
下面是一个计算文本统计信息的完整示例:
const Text = `Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed
do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim
veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo
consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum
dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident,
sunt in culpa qui officia deserunt mollit anim id est laborum.`
type myKey int
const keyStatistics myKey = 1
var sanitizeWordRe = regexp.MustCompile(`\W`)
type Statistics struct {
Words []string
Characters int
Occurrence map[string]int
}
// 分割单词并清理
SplitToWords := func(ctx floc.Context, ctrl floc.Control) error {
statistics := ctx.Value(keyStatistics).(*Statistics)
statistics.Words = strings.Split(Text, " ")
for i, word := range statistics.Words {
statistics.Words[i] = sanitizeWordRe.ReplaceAllString(word, "")
}
return nil
}
// 计算并汇总每个单词的字符数
CountCharacters := func(ctx floc.Context, ctrl floc.Control) error {
statistics := ctx.Value(keyStatistics).(*Statistics)
for _, word := range statistics.Words {
statistics.Characters += len(word)
}
return nil
}
// 计算唯一单词数量
CountUniqueWords := func(ctx floc.Context, ctrl floc.Control) error {
statistics := ctx.Value(keyStatistics).(*Statistics)
statistics.Occurrence = make(map[string]int)
for _, word := range statistics.Words {
statistics.Occurrence[word] = statistics.Occurrence[word] + 1
}
return nil
}
// 打印结果
PrintResult := func(ctx floc.Context, ctrl floc.Control) error {
statistics := ctx.Value(keyStatistics).(*Statistics)
fmt.Printf("Words Total : %d\n", len(statistics.Words))
fmt.Printf("Unique Word Count : %d\n", len(statistics.Occurrence))
fmt.Printf("Character Count : %d\n", statistics.Characters)
return nil
}
// 设计流程并运行
flow := run.Sequence(
SplitToWords,
run.Parallel(
CountCharacters,
CountUniqueWords,
),
PrintResult,
)
ctx := floc.NewContext()
ctx.AddValue(keyStatistics, new(Statistics))
ctrl := floc.NewControl(ctx)
_, _, err := floc.RunWith(ctx, ctrl, flow)
if err != nil {
panic(err)
}
// 输出:
// Words Total : 64
// Unique Word Count : 60
// Character Count : 370
这个示例展示了如何使用go-floc来编排多个goroutine,包括顺序执行和并行执行任务,并通过上下文共享数据。
更多关于golang轻松编排goroutines的并发控制插件库go-floc的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang轻松编排goroutines的并发控制插件库go-floc的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Go-Floc: 轻松编排Goroutines的并发控制库
Go-Floc 是一个用于编排和控制goroutines并发流程的Go语言库,它提供了一种声明式的方式来管理并发任务。下面我将详细介绍它的使用方法和示例代码。
基本概念
Go-Floc 的核心概念包括:
- Flow - 表示一组任务的执行流程
- Control - 控制流程的执行和取消
- State - 在任务间共享的状态
安装
go get github.com/workanator/go-floc
基本使用示例
1. 简单顺序执行
package main
import (
"fmt"
"github.com/workanator/go-floc"
"github.com/workanator/go-floc/run"
)
func main() {
// 创建流程
flow := run.Sequence(
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Task 1")
},
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Task 2")
},
)
// 运行流程
floc.Run(flow)
}
2. 并行执行任务
package main
import (
"fmt"
"github.com/workanator/go-floc"
"github.com/workanator/go-floc/run"
)
func main() {
flow := run.Parallel(
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Parallel Task 1")
},
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Parallel Task 2")
},
)
floc.Run(flow)
}
3. 带状态的任务
package main
import (
"fmt"
"github.com/workanator/go-floc"
"github.com/workanator/go-floc/run"
)
func main() {
flow := run.Sequence(
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
// 修改共享状态
state := data.(map[string]interface{})
state["count"] = 1
},
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
// 读取共享状态
state := data.(map[string]interface{})
fmt.Printf("Current count: %d\n", state["count"])
},
)
// 初始化共享状态
state := make(map[string]interface{})
floc.RunWith(flow, state)
}
4. 条件执行
package main
import (
"fmt"
"github.com/workanator/go-floc"
"github.com/workanator/go-floc/pred"
"github.com/workanator/go-floc/run"
)
func main() {
flow := run.If(
pred.True(), // 条件判断
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Condition is true")
},
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Condition is false")
},
)
floc.Run(flow)
}
5. 循环执行
package main
import (
"fmt"
"github.com/workanator/go-floc"
"github.com/workanator/go-floc/pred"
"github.com/workanator/go-floc/run"
)
func main() {
flow := run.Loop(
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
state := data.(map[string]interface{})
count := state["count"].(int)
fmt.Printf("Iteration %d\n", count)
state["count"] = count + 1
},
pred.LessThan(5), // 循环条件
)
// 初始化状态
state := map[string]interface{}{"count": 0}
floc.RunWith(flow, state)
}
6. 超时控制
package main
import (
"fmt"
"time"
"github.com/workanator/go-floc"
"github.com/workanator/go-floc/run"
)
func main() {
flow := run.Sequence(
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
// 设置超时
deadline := time.Now().Add(2 * time.Second)
ctx.Update(deadline)
},
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
// 模拟长时间运行任务
select {
case <-time.After(3 * time.Second):
fmt.Println("Task completed")
case <-ctx.Done():
fmt.Println("Task canceled due to timeout")
ctrl.Cancel(nil)
}
},
)
floc.Run(flow)
}
高级特性
1. 组合流程
package main
import (
"fmt"
"github.com/workanator/go-floc"
"github.com/workanator/go-floc/run"
)
func main() {
subFlow1 := run.Sequence(
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Sub-task 1")
},
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Sub-task 2")
},
)
subFlow2 := run.Parallel(
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Parallel sub-task 1")
},
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Parallel sub-task 2")
},
)
mainFlow := run.Sequence(subFlow1, subFlow2)
floc.Run(mainFlow)
}
2. 错误处理
package main
import (
"fmt"
"errors"
"github.com/workanator/go-floc"
"github.com/workanator/go-floc/run"
)
func main() {
flow := run.Sequence(
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
fmt.Println("Task 1")
},
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
// 模拟错误
ctrl.Fail(errors.New("something went wrong"))
},
func(ctx floc.Context, ctrl floc.Control, data interface{}) {
// 这个任务不会执行
fmt.Println("Task 3")
},
)
result, _, _ := floc.Run(flow)
if result.IsFailed() {
fmt.Printf("Flow failed: %v\n", result.Error())
}
}
总结
Go-Floc 提供了强大的goroutine编排能力,主要特点包括:
- 声明式API设计,代码可读性高
- 支持顺序、并行、条件、循环等流程控制
- 内置状态管理和错误处理机制
- 支持超时控制和流程取消
通过组合这些基本构建块,你可以创建复杂的并发流程,而无需手动管理goroutine的生命周期和同步问题。
对于更复杂的场景,你还可以探索Go-Floc提供的其他功能,如后台任务、延迟执行、事件触发等高级特性。