golang实现管道流式处理与并发控制的插件库pipeline的使用
golang实现管道流式处理与并发控制的插件库pipeline的使用
pipeline
这个包提供了Go管道的简化实现,基于《Go并发模式:管道和取消》中概述的概念。
文档
GoDoc文档可在此处查看。
示例用法
下面是一个基本的使用示例:
import "github.com/hyfather/pipeline"
// 创建一个新的管道
p := pipeline.New()
// 添加第一个处理阶段,并发度为10
p.AddStageWithFanOut(myStage, 10)
// 添加第二个处理阶段,并发度为100
p.AddStageWithFanOut(anotherStage, 100)
// 运行管道,传入输入通道
doneChan := p.Run(inChan)
// 等待管道处理完成
<- doneChan
完整示例demo
下面是一个更完整的示例,展示了如何使用pipeline库处理数据:
package main
import (
"fmt"
"github.com/hyfather/pipeline"
)
func main() {
// 创建输入通道
inChan := make(chan interface{}, 100)
// 启动生产者goroutine
go func() {
defer close(inChan)
for i := 0; i < 1000; i++ {
inChan <- i
}
}()
// 创建管道
p := pipeline.New()
// 添加第一个处理阶段 - 乘以2
p.AddStageWithFanOut(func(in interface{}) interface{} {
num := in.(int)
return num * 2
}, 10) // 并发度为10
// 添加第二个处理阶段 - 转换为字符串
p.AddStageWithFanOut(func(in interface{}) interface{} {
num := in.(int)
return fmt.Sprintf("Number: %d", num)
}, 5) // 并发度为5
// 添加第三个处理阶段 - 打印结果
p.AddStageWithFanOut(func(in interface{}) interface{} {
str := in.(string)
fmt.Println(str)
return nil
}, 3) // 并发度为3
// 运行管道
doneChan := p.Run(inChan)
// 等待处理完成
<-doneChan
fmt.Println("Pipeline processing completed")
}
在这个示例中:
- 我们创建了一个输入通道并填充了1000个整数
- 设置了三个处理阶段:
- 第一阶段将每个数字乘以2(并发10)
- 第二阶段将结果转换为字符串(并发5)
- 第三阶段打印结果(并发3)
- 最后等待管道处理完成
更多综合示例可以在项目示例目录中找到。
更多关于golang实现管道流式处理与并发控制的插件库pipeline的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang实现管道流式处理与并发控制的插件库pipeline的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang Pipeline 流式处理与并发控制
在Go语言中,管道(Pipeline)是一种非常强大的模式,用于实现流式数据处理和并发控制。下面我将介绍如何实现一个简单的pipeline库以及如何使用它。
基本概念
Pipeline模式由一系列处理阶段(stage)组成,每个阶段是一组运行相同函数的goroutine。每个阶段通过channel接收上游数据,处理后发送到下游。
实现示例
1. 基础Pipeline实现
package pipeline
// Stage 表示一个处理阶段
type Stage func(in <-chan interface{}) <-chan interface{}
// Pipe 将多个阶段串联成管道
func Pipe(stages ...Stage) Stage {
return func(in <-chan interface{}) <-chan interface{} {
var out <-chan interface{} = in
for _, stage := range stages {
out = stage(out)
}
return out
}
}
// Run 执行管道
func Run(in <-chan interface{}, stages ...Stage) <-chan interface{} {
return Pipe(stages...)(in)
}
2. 常用Stage生成函数
// Generator 生成数据
func Generator(done <-chan struct{}, values ...interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
for _, v := range values {
select {
case out <- v:
case <-done:
return
}
}
}()
return out
}
// Map 映射处理
func Map(done <-chan struct{}, in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
for v := range in {
select {
case out <- fn(v):
case <-done:
return
}
}
}()
return out
}
// Filter 过滤
func Filter(done <-chan struct{}, in <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
for v := range in {
if fn(v) {
select {
case out <- v:
case <-done:
return
}
}
}
}()
return out
}
// FanOut 扇出(并行处理)
func FanOut(done <-chan struct{}, in <-chan interface{}, fn func(interface{}), workers int) {
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for v := range in {
select {
case <-done:
return
default:
fn(v)
}
}
}()
}
wg.Wait()
}
使用示例
1. 简单数据处理管道
func main() {
done := make(chan struct{})
defer close(done)
// 生成数据
gen := pipeline.Generator(done, 1, 2, 3, 4, 5)
// 处理阶段1: 平方
square := pipeline.Map(done, gen, func(v interface{}) interface{} {
return v.(int) * v.(int)
})
// 处理阶段2: 过滤偶数
even := pipeline.Filter(done, square, func(v interface{}) bool {
return v.(int)%2 == 0
})
// 收集结果
for v := range even {
fmt.Println(v)
}
}
2. 带并行处理的管道
func main() {
done := make(chan struct{})
defer close(done)
// 生成大量数据
var nums []interface{}
for i := 0; i < 1000; i++ {
nums = append(nums, i)
}
gen := pipeline.Generator(done, nums...)
// 并行处理阶段(10个worker)
pipeline.FanOut(done, gen, func(v interface{}) {
// 模拟耗时处理
time.Sleep(10 * time.Millisecond)
fmt.Printf("处理: %d\n", v.(int))
}, 10)
fmt.Println("所有处理完成")
}
3. 组合复杂管道
func main() {
done := make(chan struct{})
defer close(done)
// 定义处理阶段
stage1 := func(in <-chan interface{}) <-chan interface{} {
return pipeline.Map(done, in, func(v interface{}) interface{} {
return v.(int) + 1
})
}
stage2 := func(in <-chan interface{}) <-chan interface{} {
return pipeline.Filter(done, in, func(v interface{}) bool {
return v.(int)%2 == 0
})
}
stage3 := func(in <-chan interface{}) <-chan interface{} {
return pipeline.Map(done, in, func(v interface{}) interface{} {
return v.(int) * v.(int)
})
}
// 构建管道
p := pipeline.Pipe(stage1, stage2, stage3)
// 输入数据
in := pipeline.Generator(done, 1, 2, 3, 4, 5)
// 执行管道
out := p(in)
// 收集结果
for v := range out {
fmt.Println(v)
}
}
高级特性扩展
1. 错误处理
可以扩展Stage类型以支持错误处理:
type Result struct {
Value interface{}
Err error
}
type Stage func(in <-chan Result) <-chan Result
2. 限流控制
// Throttle 限流stage
func Throttle(interval time.Duration) Stage {
return func(in <-chan interface{}) <-chan interface{} {
out := make(chan interface{})
ticker := time.NewTicker(interval)
go func() {
defer close(out)
defer ticker.Stop()
for v := range in {
<-ticker.C
out <- v
}
}()
return out
}
}
总结
Go语言的channel和goroutine天然适合实现管道模式,通过这种模式可以:
- 清晰地组织数据处理流程
- 轻松实现并发控制
- 有效管理资源使用
- 构建可组合、可重用的处理组件
实际项目中,可以根据需要扩展这个基础实现,添加更多功能如错误处理、指标收集、限流控制等。