golang按输入顺序并发处理并返回结果的插件库ordered-concurrently的使用

Golang按输入顺序并发处理并返回结果的插件库ordered-concurrently的使用

ordered-concurrently是一个用于在Go中实现并行处理并按输入顺序返回结果的库。它可以并发/并行处理队列中的项目,并按队列提供的顺序获取输出。

安装模块

go get github.com/tejzpr/ordered-concurrently/v3

导入模块

import concurrently "github.com/tejzpr/ordered-concurrently/v3"

创建WorkFunction接口实现

// 基于工作函数输入创建类型
type loadWorker int

// 需要执行的工作
// 输入类型应实现WorkFunction接口
func (w loadWorker) Run(ctx context.Context) interface{} {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
	return w * 2
}

示例代码

示例1 - 处理固定数量的输入

func main() {
	max := 10
	inputChan := make(chan concurrently.WorkFunction)
	ctx := context.Background()
	output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
	go func() {
		for work := 0; work < max; work++ {
			inputChan <- loadWorker(work)
		}
		close(inputChan)
	}()
	for out := range output {
		log.Println(out.Value)
	}
}

示例2 - 处理未知数量的输入

func main() {
	inputChan := make(chan concurrently.WorkFunction, 10)
	ctx := context.Background()
	output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})

	ticker := time.NewTicker(100 * time.Millisecond)
	done := make(chan bool)
	wg := &sync.WaitGroup{}
	go func() {
		input := 0
		for {
			select {
			case <-done:
				return
			case <-ticker.C:
				inputChan <- loadWorker(input)
				wg.Add(1)
				input++
			default:
			}
		}
	}()

	var res []loadWorker
	go func() {
		for out := range output {
			res = append(res, out.Value.(loadWorker))
			wg.Done()
		}
	}()

	time.Sleep(1600 * time.Millisecond)
	ticker.Stop()
	done <- true
	close(inputChan)
	wg.Wait()

	// 检查输出是否排序
	isSorted := sort.SliceIsSorted(res, func(i, j int) bool {
		return res[i] < res[j]
	})
	if !isSorted {
		log.Println("output is not sorted")
	}
}

这个库非常适合需要并发处理任务但又需要保持结果顺序的场景,比如批量处理数据并保持原始顺序输出。


更多关于golang按输入顺序并发处理并返回结果的插件库ordered-concurrently的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang按输入顺序并发处理并返回结果的插件库ordered-concurrently的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


ordered-concurrently 库的使用指南

ordered-concurrently 是一个 Go 语言库,用于按照输入顺序并发处理任务并保持输出顺序。这对于需要并行处理但需要保持结果顺序的场景非常有用。

安装

go get github.com/tejzpr/ordered-concurrently/v3

基本概念

该库的核心是 Process 函数,它接受一个输入通道和一个工作函数,返回一个按原始顺序排序的输出通道。

基本用法

package main

import (
	"fmt"
	"time"

	orderedconcurrently "github.com/tejzpr/ordered-concurrently/v3"
)

func main() {
	// 1. 创建输入通道
	inputChan := make(chan orderedconcurrently.WorkFunction)

	// 2. 启动处理过程
	output := orderedconcurrently.Process(inputChan, &orderedconcurrently.Options{PoolSize: 3, OutChannel: nil})

	// 3. 启动一个goroutine来发送任务
	go func() {
		for i := 0; i < 10; i++ {
			value := i
			inputChan <- orderedconcurrently.WorkFunction(func() interface{} {
				// 模拟耗时操作
				time.Sleep(time.Duration(200-value*10) * time.Millisecond)
				return fmt.Sprintf("处理后的值: %d", value)
			})
		}
		close(inputChan)
	}()

	// 4. 从输出通道接收结果
	for result := range output {
		fmt.Println(result.Value)
	}
}

高级用法

自定义工作池大小

output := orderedconcurrently.Process(inputChan, &orderedconcurrently.Options{
    PoolSize: 5, // 设置并发工作goroutine数量
})

处理结构体数据

type Task struct {
	ID    int
	Input string
}

func main() {
	inputChan := make(chan orderedconcurrently.WorkFunction)
	output := orderedconcurrently.Process(inputChan, &orderedconcurrently.Options{PoolSize: 2})

	go func() {
		tasks := []Task{
			{1, "任务1"},
			{2, "任务2"},
			{3, "任务3"},
		}
		
		for _, task := range tasks {
			t := task // 创建副本
			inputChan <- orderedconcurrently.WorkFunction(func() interface{} {
				time.Sleep(time.Duration(t.ID) * 100 * time.Millisecond)
				return fmt.Sprintf("%s 已完成", t.Input)
			})
		}
		close(inputChan)
	}()

	for result := range output {
		fmt.Println(result.Value)
	}
}

错误处理

inputChan <- orderedconcurrently.WorkFunction(func() interface{} {
    if someCondition {
        return fmt.Errorf("发生错误")
    }
    return "成功结果"
})

// 接收时检查错误
for result := range output {
    if err, ok := result.Value.(error); ok {
        fmt.Println("错误:", err)
        continue
    }
    fmt.Println("结果:", result.Value)
}

性能考虑

  1. 工作池大小:根据任务类型和CPU核心数调整PoolSize
  2. 通道缓冲:对于大量任务,可以考虑缓冲输入通道
  3. 任务粒度:确保每个任务有足够的工作量以抵消并发开销

实际应用场景

  1. 批量处理API请求但需要保持响应顺序
  2. 并行处理文件但需要按原始顺序输出
  3. 任何需要并行计算但顺序重要的场景

替代方案比较

相比简单的 sync.WaitGroupordered-concurrently 提供了顺序保证。相比自己实现的有序并发处理,它提供了更简洁的API。

注意事项

  1. 确保在所有任务发送后关闭输入通道
  2. 工作函数应该是线程安全的
  3. 处理大量任务时注意内存使用

这个库简化了有序并发处理的实现,让开发者可以专注于业务逻辑而不是并发控制细节。

回到顶部