golang基于通道的干净可组合并发工具包插件库rill的使用
Golang基于通道的干净可组合并发工具包插件库Rill的使用
Rill是一个为Go语言设计的并发工具包,它通过可组合的通道操作简化了并发程序的构建过程。下面我将详细介绍Rill的使用方法,并提供完整的示例代码。
安装
go get -u github.com/destel/rill
快速开始
以下是一个从API获取用户、激活用户并保存更改的完整示例:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 将用户ID切片转换为通道
ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
// 从API读取用户,并发度为3
users := rill.Map(ids, 3, func(id int) (*mockapi.User, error) {
return mockapi.GetUser(ctx, id)
})
// 激活用户,并发度为2
err := rill.ForEach(users, 2, func(u *mockapi.User) error {
if u.IsActive {
fmt.Printf("User %d is already active\n", u.ID)
return nil
}
u.IsActive = true
err := mockapi.SaveUser(ctx, u)
if err != nil {
return err
}
fmt.Printf("User saved: %+v\n", u)
return nil
})
// 处理错误
fmt.Println("Error:", err)
}
批处理示例
批处理可以显著提高性能,特别是在处理外部服务或数据库时:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 将用户ID切片转换为通道
ids := rill.FromSlice([]int{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
}, nil)
// 将ID分组为每批5个
idBatches := rill.Batch(ids, 5, -1)
// 批量从API获取用户,并发度为3
userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*mockapi.User, error) {
return mockapi.GetUsers(ctx, ids)
})
// 将批处理流转换回平铺的用户流
users := rill.Unbatch(userBatches)
// 激活用户,并发度为2
err := rill.ForEach(users, 2, func(u *mockapi.User) error {
if u.IsActive {
fmt.Printf("User %d is already active\n", u.ID)
return nil
}
u.IsActive = true
err := mockapi.SaveUser(ctx, u)
if err != nil {
return err
}
fmt.Printf("User saved: %+v\n", u)
return nil
})
// 处理错误
fmt.Println("Error:", err)
}
实时批处理示例
对于需要处理不可预测速率事件的应用程序,可以使用超时批处理:
func main() {
// 启动处理更新的后台工作程序
go updateUserTimestampWorker()
// 执行一些更新,它们会自动分组为批处理
UpdateUserTimestamp(1)
UpdateUserTimestamp(2)
UpdateUserTimestamp(3)
UpdateUserTimestamp(4)
UpdateUserTimestamp(5)
UpdateUserTimestamp(6)
UpdateUserTimestamp(7)
time.Sleep(500 * time.Millisecond) // 模拟稀疏更新
UpdateUserTimestamp(8)
}
// 这是要更新的用户ID队列
var userIDsToUpdate = make(chan int)
// UpdateUserTimestamp是更新users表中last_active_at列的公共API
func UpdateUserTimestamp(userID int) {
userIDsToUpdate <- userID
}
// 这是将队列中的更新批量发送到数据库的后台工作程序
func updateUserTimestampWorker() {
ids := rill.FromChan(userIDsToUpdate, nil)
// 将ID分组为每批5个,超时100毫秒
idBatches := rill.Batch(ids, 5, 100*time.Millisecond)
_ = rill.ForEach(idBatches, 1, func(batch []int) error {
fmt.Printf("Executed: UPDATE users SET last_active_at = NOW() WHERE id IN (%v)\n", batch)
return nil
})
}
错误处理和上下文
Rill简化了并发应用中的错误处理:
func main() {
ctx := context.Background()
// ID 999不存在,所以获取会在遇到它后停止
err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10, 11, 12, 13, 14, 15})
fmt.Printf("Check result: %v\n", err)
}
// CheckAllUsersExist使用多个并发工作程序检查具有给定ID的所有用户是否存在
func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error {
// 创建将在函数返回时取消的新上下文
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 将切片转换为流
idsStream := rill.FromSlice(ids, nil)
// 并发获取用户
users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) {
u, err := mockapi.GetUser(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to fetch user %d: %w", id, err)
}
fmt.Printf("Fetched user %d\n", id)
return u, nil
})
// 返回第一个错误(如果有)并通过上下文取消剩余的获取
return rill.Err(users)
}
顺序保留(有序扇入)
以下示例展示了如何在并发处理中保留原始顺序:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 要在下载文件中搜索的字符串
needle := []byte("26")
// 生成从https://example.com/file-0.txt到https://example.com/file-999.txt的URL流
// 如果上下文被取消,则停止生成URL
urls := rill.Generate(func(send func(string), sendErr func(error)) {
for i := 0; i < 1000 && ctx.Err() == nil; i++ {
send(fmt.Sprintf("https://example.com/file-%d.txt", i))
}
})
// 下载并处理文件
// 最多同时下载并保留5个文件在内存中
matchedUrls := rill.OrderedFilter(urls, 5, func(url string) (bool, error) {
fmt.Println("Downloading:", url)
content, err := mockapi.DownloadFile(ctx, url)
if err != nil {
return false, err
}
// 只保留包含needle的文件URL
return bytes.Contains(content, needle), nil
})
// 找到第一个匹配的URL
firstMatchedUrl, found, err := rill.First(matchedUrls)
if err != nil {
fmt.Println("Error:", err)
return
}
// 打印结果
if found {
fmt.Println("Found in:", firstMatchedUrl)
} else {
fmt.Println("Not found")
}
}
流合并和FlatMap
以下示例展示了如何使用FlatMap从多个部门合并用户流:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 从部门名称流开始
departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil)
// 从所有部门并发流式传输用户
// 最多同时处理3个部门
users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] {
return StreamUsers(ctx, &mockapi.UserQuery{Department: department})
})
// 打印合并流中的用户
err := rill.ForEach(users, 1, func(user *mockapi.User) error {
fmt.Printf("%+v\n", user)
return nil
})
fmt.Println("Error:", err)
}
// StreamUsers是mockapi.ListUsers的可重用流式包装器
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) {
var currentQuery mockapi.UserQuery
if query != nil {
currentQuery = *query
}
for page := 0; ; page++ {
currentQuery.Page = page
users, err := mockapi.ListUsers(ctx, ¤tQuery)
if err != nil {
sendErr(err)
return
}
if len(users) == 0 {
break
}
for _, user := range users {
send(user)
}
}
})
}
Go 1.23迭代器
从Go 1.23开始,可以使用迭代器与Rill集成:
func main() {
// 将数字切片转换为流
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
// 转换每个数字,并发度为3
squares := rill.Map(numbers, 3, func(x int) (int, error) {
return square(x), nil
})
// 将流转换为迭代器并使用for-range打印结果
for val, err := range rill.ToSeq2(squares) {
if err != nil {
fmt.Println("Error:", err)
break // 无论是否提前退出都会执行清理
}
fmt.Printf("%+v\n", val)
}
}
以上示例展示了Rill库的主要功能和用法,它通过简洁的API和强大的组合能力,使得Go语言中的并发编程变得更加简单和高效。
更多关于golang基于通道的干净可组合并发工具包插件库rill的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang基于通道的干净可组合并发工具包插件库rill的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Rill: Go基于通道的干净可组合并发工具包
Rill是一个轻量级的Go并发工具包,它基于Go的通道(channel)原语,提供了更高级别的抽象来简化并发编程。下面我将介绍Rill的核心概念和使用方法,并提供一些示例代码。
Rill的核心特性
- 基于通道的并发模型
- 可组合的并发原语
- 简洁干净的API设计
- 错误处理和取消机制
安装Rill
go get github.com/deref/rill
基本使用示例
1. 创建简单的管道
package main
import (
"fmt"
"github.com/deref/rill"
)
func main() {
// 创建源生成器
source := rill.Generator(func(send func(int) bool) {
for i := 0; i < 10; i++ {
if !send(i) {
return
}
}
})
// 创建处理阶段 - 平方每个数字
square := rill.Map(func(x int) int {
return x * x
})
// 创建接收器
sink := rill.Sink(func(x int) {
fmt.Println("Received:", x)
})
// 组合并运行管道
pipeline := source.Pipe(square).Pipe(sink)
pipeline.Run()
}
2. 并发处理模式
package main
import (
"fmt"
"time"
"github.com/deref/rill"
)
func main() {
// 创建并发工作者池
workerPool := rill.WorkerPool(3, func(item string) string {
time.Sleep(100 * time.Millisecond) // 模拟工作负载
return "processed_" + item
})
// 创建源
source := rill.Generator(func(send func(string) bool) {
for _, item := range []string{"a", "b", "c", "d", "e"} {
if !send(item) {
return
}
}
})
// 组合管道
pipeline := source.Pipe(workerPool).Pipe(rill.Sink(func(result string) {
fmt.Println(result)
}))
// 运行并等待完成
pipeline.Run()
}
3. 错误处理和取消
package main
import (
"errors"
"fmt"
"github.com/deref/rill"
)
func main() {
// 可能出错的阶段
errorProneStage := rill.Map(func(x int) (int, error) {
if x == 3 {
return 0, errors.New("I don't like 3")
}
return x * 2, nil
})
// 带错误处理的接收器
sink := rill.SinkWithError(func(x int) error {
fmt.Println("Processed:", x)
return nil
})
// 创建管道
pipeline := rill.Range(1, 5).Pipe(errorProneStage).Pipe(sink)
// 运行并检查错误
if err := pipeline.Run(); err != nil {
fmt.Println("Pipeline failed:", err)
}
}
高级组合模式
1. 扇出/扇入模式
package main
import (
"fmt"
"github.com/deref/rill"
)
func main() {
// 创建源
source := rill.Range(1, 10)
// 创建两个处理阶段
stage1 := rill.Map(func(x int) int { return x * 2 })
stage2 := rill.Map(func(x int) int { return x * 3 })
// 扇出到两个阶段,然后扇入
fanned := source.Pipe(rill.FanOut(stage1, stage2)).Pipe(rill.FanIn())
// 收集结果
var results []int
collector := rill.Sink(func(x int) {
results = append(results, x)
})
fanned.Pipe(collector).Run()
fmt.Println("Results:", results)
}
2. 超时控制
package main
import (
"context"
"fmt"
"time"
"github.com/deref/rill"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// 长时间运行的任务
longTask := rill.Generator(func(send func(int) bool) {
for i := 0; ; i++ {
time.Sleep(500 * time.Millisecond)
if !send(i) {
return
}
}
})
// 带上下文的接收器
sink := rill.SinkWithContext(func(ctx context.Context, x int) {
select {
case <-ctx.Done():
fmt.Println("Cancelled")
return
default:
fmt.Println("Processing:", x)
}
})
pipeline := longTask.Pipe(sink)
pipeline.RunWithContext(ctx)
}
Rill的优势
- 更清晰的代码结构:将并发逻辑分解为可组合的阶段
- 内置错误处理:自动传播错误,简化错误处理逻辑
- 资源管理:自动清理goroutine,避免泄漏
- 灵活的并发控制:轻松实现各种并发模式
Rill特别适合需要构建复杂数据处理管道的场景,如ETL流程、网络服务中间件、并发任务处理等。它的设计哲学是保持简单和可组合性,同时不隐藏Go的并发原语。
希望这些示例能帮助你开始使用Rill构建更干净、更可维护的并发Go程序!