golang按输入顺序并发处理并返回结果的插件库ordered-concurrently的使用
Golang按输入顺序并发处理并返回结果的插件库ordered-concurrently的使用
ordered-concurrently是一个用于在Go中实现并行处理并按输入顺序返回结果的库。它可以并发/并行处理队列中的项目,并按队列提供的顺序获取输出。
安装模块
go get github.com/tejzpr/ordered-concurrently/v3
导入模块
import concurrently "github.com/tejzpr/ordered-concurrently/v3"
创建WorkFunction接口实现
// 基于工作函数输入创建类型
type loadWorker int
// 需要执行的工作
// 输入类型应实现WorkFunction接口
func (w loadWorker) Run(ctx context.Context) interface{} {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
return w * 2
}
示例代码
示例1 - 处理固定数量的输入
func main() {
max := 10
inputChan := make(chan concurrently.WorkFunction)
ctx := context.Background()
output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
go func() {
for work := 0; work < max; work++ {
inputChan <- loadWorker(work)
}
close(inputChan)
}()
for out := range output {
log.Println(out.Value)
}
}
示例2 - 处理未知数量的输入
func main() {
inputChan := make(chan concurrently.WorkFunction, 10)
ctx := context.Background()
output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
ticker := time.NewTicker(100 * time.Millisecond)
done := make(chan bool)
wg := &sync.WaitGroup{}
go func() {
input := 0
for {
select {
case <-done:
return
case <-ticker.C:
inputChan <- loadWorker(input)
wg.Add(1)
input++
default:
}
}
}()
var res []loadWorker
go func() {
for out := range output {
res = append(res, out.Value.(loadWorker))
wg.Done()
}
}()
time.Sleep(1600 * time.Millisecond)
ticker.Stop()
done <- true
close(inputChan)
wg.Wait()
// 检查输出是否排序
isSorted := sort.SliceIsSorted(res, func(i, j int) bool {
return res[i] < res[j]
})
if !isSorted {
log.Println("output is not sorted")
}
}
这个库非常适合需要并发处理任务但又需要保持结果顺序的场景,比如批量处理数据并保持原始顺序输出。
更多关于golang按输入顺序并发处理并返回结果的插件库ordered-concurrently的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang按输入顺序并发处理并返回结果的插件库ordered-concurrently的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
ordered-concurrently 库的使用指南
ordered-concurrently
是一个 Go 语言库,用于按照输入顺序并发处理任务并保持输出顺序。这对于需要并行处理但需要保持结果顺序的场景非常有用。
安装
go get github.com/tejzpr/ordered-concurrently/v3
基本概念
该库的核心是 Process
函数,它接受一个输入通道和一个工作函数,返回一个按原始顺序排序的输出通道。
基本用法
package main
import (
"fmt"
"time"
orderedconcurrently "github.com/tejzpr/ordered-concurrently/v3"
)
func main() {
// 1. 创建输入通道
inputChan := make(chan orderedconcurrently.WorkFunction)
// 2. 启动处理过程
output := orderedconcurrently.Process(inputChan, &orderedconcurrently.Options{PoolSize: 3, OutChannel: nil})
// 3. 启动一个goroutine来发送任务
go func() {
for i := 0; i < 10; i++ {
value := i
inputChan <- orderedconcurrently.WorkFunction(func() interface{} {
// 模拟耗时操作
time.Sleep(time.Duration(200-value*10) * time.Millisecond)
return fmt.Sprintf("处理后的值: %d", value)
})
}
close(inputChan)
}()
// 4. 从输出通道接收结果
for result := range output {
fmt.Println(result.Value)
}
}
高级用法
自定义工作池大小
output := orderedconcurrently.Process(inputChan, &orderedconcurrently.Options{
PoolSize: 5, // 设置并发工作goroutine数量
})
处理结构体数据
type Task struct {
ID int
Input string
}
func main() {
inputChan := make(chan orderedconcurrently.WorkFunction)
output := orderedconcurrently.Process(inputChan, &orderedconcurrently.Options{PoolSize: 2})
go func() {
tasks := []Task{
{1, "任务1"},
{2, "任务2"},
{3, "任务3"},
}
for _, task := range tasks {
t := task // 创建副本
inputChan <- orderedconcurrently.WorkFunction(func() interface{} {
time.Sleep(time.Duration(t.ID) * 100 * time.Millisecond)
return fmt.Sprintf("%s 已完成", t.Input)
})
}
close(inputChan)
}()
for result := range output {
fmt.Println(result.Value)
}
}
错误处理
inputChan <- orderedconcurrently.WorkFunction(func() interface{} {
if someCondition {
return fmt.Errorf("发生错误")
}
return "成功结果"
})
// 接收时检查错误
for result := range output {
if err, ok := result.Value.(error); ok {
fmt.Println("错误:", err)
continue
}
fmt.Println("结果:", result.Value)
}
性能考虑
- 工作池大小:根据任务类型和CPU核心数调整PoolSize
- 通道缓冲:对于大量任务,可以考虑缓冲输入通道
- 任务粒度:确保每个任务有足够的工作量以抵消并发开销
实际应用场景
- 批量处理API请求但需要保持响应顺序
- 并行处理文件但需要按原始顺序输出
- 任何需要并行计算但顺序重要的场景
替代方案比较
相比简单的 sync.WaitGroup
,ordered-concurrently
提供了顺序保证。相比自己实现的有序并发处理,它提供了更简洁的API。
注意事项
- 确保在所有任务发送后关闭输入通道
- 工作函数应该是线程安全的
- 处理大量任务时注意内存使用
这个库简化了有序并发处理的实现,让开发者可以专注于业务逻辑而不是并发控制细节。