golang基于通道的干净可组合并发工具包插件库rill的使用

Golang基于通道的干净可组合并发工具包插件库Rill的使用

Rill是一个为Go语言设计的并发工具包,它通过可组合的通道操作简化了并发程序的构建过程。下面我将详细介绍Rill的使用方法,并提供完整的示例代码。

安装

go get -u github.com/destel/rill

快速开始

以下是一个从API获取用户、激活用户并保存更改的完整示例:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 将用户ID切片转换为通道
    ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

    // 从API读取用户,并发度为3
    users := rill.Map(ids, 3, func(id int) (*mockapi.User, error) {
        return mockapi.GetUser(ctx, id)
    })

    // 激活用户,并发度为2
    err := rill.ForEach(users, 2, func(u *mockapi.User) error {
        if u.IsActive {
            fmt.Printf("User %d is already active\n", u.ID)
            return nil
        }

        u.IsActive = true
        err := mockapi.SaveUser(ctx, u)
        if err != nil {
            return err
        }

        fmt.Printf("User saved: %+v\n", u)
        return nil
    })

    // 处理错误
    fmt.Println("Error:", err)
}

批处理示例

批处理可以显著提高性能,特别是在处理外部服务或数据库时:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 将用户ID切片转换为通道
    ids := rill.FromSlice([]int{
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
        21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
    }, nil)

    // 将ID分组为每批5个
    idBatches := rill.Batch(ids, 5, -1)

    // 批量从API获取用户,并发度为3
    userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*mockapi.User, error) {
        return mockapi.GetUsers(ctx, ids)
    })

    // 将批处理流转换回平铺的用户流
    users := rill.Unbatch(userBatches)

    // 激活用户,并发度为2
    err := rill.ForEach(users, 2, func(u *mockapi.User) error {
        if u.IsActive {
            fmt.Printf("User %d is already active\n", u.ID)
            return nil
        }

        u.IsActive = true
        err := mockapi.SaveUser(ctx, u)
        if err != nil {
            return err
        }

        fmt.Printf("User saved: %+v\n", u)
        return nil
    })

    // 处理错误
    fmt.Println("Error:", err)
}

实时批处理示例

对于需要处理不可预测速率事件的应用程序,可以使用超时批处理:

func main() {
    // 启动处理更新的后台工作程序
    go updateUserTimestampWorker()

    // 执行一些更新,它们会自动分组为批处理
    UpdateUserTimestamp(1)
    UpdateUserTimestamp(2)
    UpdateUserTimestamp(3)
    UpdateUserTimestamp(4)
    UpdateUserTimestamp(5)
    UpdateUserTimestamp(6)
    UpdateUserTimestamp(7)
    time.Sleep(500 * time.Millisecond) // 模拟稀疏更新
    UpdateUserTimestamp(8)
}

// 这是要更新的用户ID队列
var userIDsToUpdate = make(chan int)

// UpdateUserTimestamp是更新users表中last_active_at列的公共API
func UpdateUserTimestamp(userID int) {
    userIDsToUpdate <- userID
}

// 这是将队列中的更新批量发送到数据库的后台工作程序
func updateUserTimestampWorker() {
    ids := rill.FromChan(userIDsToUpdate, nil)

    // 将ID分组为每批5个,超时100毫秒
    idBatches := rill.Batch(ids, 5, 100*time.Millisecond)

    _ = rill.ForEach(idBatches, 1, func(batch []int) error {
        fmt.Printf("Executed: UPDATE users SET last_active_at = NOW() WHERE id IN (%v)\n", batch)
        return nil
    })
}

错误处理和上下文

Rill简化了并发应用中的错误处理:

func main() {
    ctx := context.Background()

    // ID 999不存在,所以获取会在遇到它后停止
    err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10, 11, 12, 13, 14, 15})
    fmt.Printf("Check result: %v\n", err)
}

// CheckAllUsersExist使用多个并发工作程序检查具有给定ID的所有用户是否存在
func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error {
    // 创建将在函数返回时取消的新上下文
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // 将切片转换为流
    idsStream := rill.FromSlice(ids, nil)

    // 并发获取用户
    users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) {
        u, err := mockapi.GetUser(ctx, id)
        if err != nil {
            return nil, fmt.Errorf("failed to fetch user %d: %w", id, err)
        }

        fmt.Printf("Fetched user %d\n", id)
        return u, nil
    })

    // 返回第一个错误(如果有)并通过上下文取消剩余的获取
    return rill.Err(users)
}

顺序保留(有序扇入)

以下示例展示了如何在并发处理中保留原始顺序:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 要在下载文件中搜索的字符串
    needle := []byte("26")

    // 生成从https://example.com/file-0.txt到https://example.com/file-999.txt的URL流
    // 如果上下文被取消,则停止生成URL
    urls := rill.Generate(func(send func(string), sendErr func(error)) {
        for i := 0; i < 1000 && ctx.Err() == nil; i++ {
            send(fmt.Sprintf("https://example.com/file-%d.txt", i))
        }
    })

    // 下载并处理文件
    // 最多同时下载并保留5个文件在内存中
    matchedUrls := rill.OrderedFilter(urls, 5, func(url string) (bool, error) {
        fmt.Println("Downloading:", url)

        content, err := mockapi.DownloadFile(ctx, url)
        if err != nil {
            return false, err
        }

        // 只保留包含needle的文件URL
        return bytes.Contains(content, needle), nil
    })

    // 找到第一个匹配的URL
    firstMatchedUrl, found, err := rill.First(matchedUrls)
    if err != nil {
        fmt.Println("Error:", err)
        return
    }

    // 打印结果
    if found {
        fmt.Println("Found in:", firstMatchedUrl)
    } else {
        fmt.Println("Not found")
    }
}

流合并和FlatMap

以下示例展示了如何使用FlatMap从多个部门合并用户流:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 从部门名称流开始
    departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil)

    // 从所有部门并发流式传输用户
    // 最多同时处理3个部门
    users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] {
        return StreamUsers(ctx, &mockapi.UserQuery{Department: department})
    })

    // 打印合并流中的用户
    err := rill.ForEach(users, 1, func(user *mockapi.User) error {
        fmt.Printf("%+v\n", user)
        return nil
    })
    fmt.Println("Error:", err)
}

// StreamUsers是mockapi.ListUsers的可重用流式包装器
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
    return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) {
        var currentQuery mockapi.UserQuery
        if query != nil {
            currentQuery = *query
        }

        for page := 0; ; page++ {
            currentQuery.Page = page

            users, err := mockapi.ListUsers(ctx, &currentQuery)
            if err != nil {
                sendErr(err)
                return
            }

            if len(users) == 0 {
                break
            }

            for _, user := range users {
                send(user)
            }
        }
    })
}

Go 1.23迭代器

从Go 1.23开始,可以使用迭代器与Rill集成:

func main() {
    // 将数字切片转换为流
    numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

    // 转换每个数字,并发度为3
    squares := rill.Map(numbers, 3, func(x int) (int, error) {
        return square(x), nil
    })

    // 将流转换为迭代器并使用for-range打印结果
    for val, err := range rill.ToSeq2(squares) {
        if err != nil {
            fmt.Println("Error:", err)
            break // 无论是否提前退出都会执行清理
        }
        fmt.Printf("%+v\n", val)
    }
}

以上示例展示了Rill库的主要功能和用法,它通过简洁的API和强大的组合能力,使得Go语言中的并发编程变得更加简单和高效。


更多关于golang基于通道的干净可组合并发工具包插件库rill的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于通道的干净可组合并发工具包插件库rill的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Rill: Go基于通道的干净可组合并发工具包

Rill是一个轻量级的Go并发工具包,它基于Go的通道(channel)原语,提供了更高级别的抽象来简化并发编程。下面我将介绍Rill的核心概念和使用方法,并提供一些示例代码。

Rill的核心特性

  1. 基于通道的并发模型
  2. 可组合的并发原语
  3. 简洁干净的API设计
  4. 错误处理和取消机制

安装Rill

go get github.com/deref/rill

基本使用示例

1. 创建简单的管道

package main

import (
	"fmt"
	"github.com/deref/rill"
)

func main() {
	// 创建源生成器
	source := rill.Generator(func(send func(int) bool) {
		for i := 0; i < 10; i++ {
			if !send(i) {
				return
			}
		}
	})

	// 创建处理阶段 - 平方每个数字
	square := rill.Map(func(x int) int {
		return x * x
	})

	// 创建接收器
	sink := rill.Sink(func(x int) {
		fmt.Println("Received:", x)
	})

	// 组合并运行管道
	pipeline := source.Pipe(square).Pipe(sink)
	pipeline.Run()
}

2. 并发处理模式

package main

import (
	"fmt"
	"time"
	"github.com/deref/rill"
)

func main() {
	// 创建并发工作者池
	workerPool := rill.WorkerPool(3, func(item string) string {
		time.Sleep(100 * time.Millisecond) // 模拟工作负载
		return "processed_" + item
	})

	// 创建源
	source := rill.Generator(func(send func(string) bool) {
		for _, item := range []string{"a", "b", "c", "d", "e"} {
			if !send(item) {
				return
			}
		}
	})

	// 组合管道
	pipeline := source.Pipe(workerPool).Pipe(rill.Sink(func(result string) {
		fmt.Println(result)
	}))

	// 运行并等待完成
	pipeline.Run()
}

3. 错误处理和取消

package main

import (
	"errors"
	"fmt"
	"github.com/deref/rill"
)

func main() {
	// 可能出错的阶段
	errorProneStage := rill.Map(func(x int) (int, error) {
		if x == 3 {
			return 0, errors.New("I don't like 3")
		}
		return x * 2, nil
	})

	// 带错误处理的接收器
	sink := rill.SinkWithError(func(x int) error {
		fmt.Println("Processed:", x)
		return nil
	})

	// 创建管道
	pipeline := rill.Range(1, 5).Pipe(errorProneStage).Pipe(sink)

	// 运行并检查错误
	if err := pipeline.Run(); err != nil {
		fmt.Println("Pipeline failed:", err)
	}
}

高级组合模式

1. 扇出/扇入模式

package main

import (
	"fmt"
	"github.com/deref/rill"
)

func main() {
	// 创建源
	source := rill.Range(1, 10)

	// 创建两个处理阶段
	stage1 := rill.Map(func(x int) int { return x * 2 })
	stage2 := rill.Map(func(x int) int { return x * 3 })

	// 扇出到两个阶段,然后扇入
	fanned := source.Pipe(rill.FanOut(stage1, stage2)).Pipe(rill.FanIn())

	// 收集结果
	var results []int
	collector := rill.Sink(func(x int) {
		results = append(results, x)
	})

	fanned.Pipe(collector).Run()
	fmt.Println("Results:", results)
}

2. 超时控制

package main

import (
	"context"
	"fmt"
	"time"
	"github.com/deref/rill"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	// 长时间运行的任务
	longTask := rill.Generator(func(send func(int) bool) {
		for i := 0; ; i++ {
			time.Sleep(500 * time.Millisecond)
			if !send(i) {
				return
			}
		}
	})

	// 带上下文的接收器
	sink := rill.SinkWithContext(func(ctx context.Context, x int) {
		select {
		case <-ctx.Done():
			fmt.Println("Cancelled")
			return
		default:
			fmt.Println("Processing:", x)
		}
	})

	pipeline := longTask.Pipe(sink)
	pipeline.RunWithContext(ctx)
}

Rill的优势

  1. 更清晰的代码结构:将并发逻辑分解为可组合的阶段
  2. 内置错误处理:自动传播错误,简化错误处理逻辑
  3. 资源管理:自动清理goroutine,避免泄漏
  4. 灵活的并发控制:轻松实现各种并发模式

Rill特别适合需要构建复杂数据处理管道的场景,如ETL流程、网络服务中间件、并发任务处理等。它的设计哲学是保持简单和可组合性,同时不隐藏Go的并发原语。

希望这些示例能帮助你开始使用Rill构建更干净、更可维护的并发Go程序!

回到顶部