golang实现管道流式处理与并发控制的插件库pipeline的使用

golang实现管道流式处理与并发控制的插件库pipeline的使用

GoDoc Build Status cover.run Go Report Card

pipeline

这个包提供了Go管道的简化实现,基于《Go并发模式:管道和取消》中概述的概念。

文档

GoDoc文档可在此处查看。

示例用法

下面是一个基本的使用示例:

import "github.com/hyfather/pipeline"

// 创建一个新的管道
p := pipeline.New()

// 添加第一个处理阶段,并发度为10
p.AddStageWithFanOut(myStage, 10)

// 添加第二个处理阶段,并发度为100
p.AddStageWithFanOut(anotherStage, 100)

// 运行管道,传入输入通道
doneChan := p.Run(inChan)

// 等待管道处理完成
<- doneChan

完整示例demo

下面是一个更完整的示例,展示了如何使用pipeline库处理数据:

package main

import (
	"fmt"
	"github.com/hyfather/pipeline"
)

func main() {
	// 创建输入通道
	inChan := make(chan interface{}, 100)
	
	// 启动生产者goroutine
	go func() {
		defer close(inChan)
		for i := 0; i < 1000; i++ {
			inChan <- i
		}
	}()

	// 创建管道
	p := pipeline.New()
	
	// 添加第一个处理阶段 - 乘以2
	p.AddStageWithFanOut(func(in interface{}) interface{} {
		num := in.(int)
		return num * 2
	}, 10) // 并发度为10
	
	// 添加第二个处理阶段 - 转换为字符串
	p.AddStageWithFanOut(func(in interface{}) interface{} {
		num := in.(int)
		return fmt.Sprintf("Number: %d", num)
	}, 5) // 并发度为5
	
	// 添加第三个处理阶段 - 打印结果
	p.AddStageWithFanOut(func(in interface{}) interface{} {
		str := in.(string)
		fmt.Println(str)
		return nil
	}, 3) // 并发度为3
	
	// 运行管道
	doneChan := p.Run(inChan)
	
	// 等待处理完成
	<-doneChan
	fmt.Println("Pipeline processing completed")
}

在这个示例中:

  1. 我们创建了一个输入通道并填充了1000个整数
  2. 设置了三个处理阶段:
    • 第一阶段将每个数字乘以2(并发10)
    • 第二阶段将结果转换为字符串(并发5)
    • 第三阶段打印结果(并发3)
  3. 最后等待管道处理完成

更多综合示例可以在项目示例目录中找到。


更多关于golang实现管道流式处理与并发控制的插件库pipeline的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现管道流式处理与并发控制的插件库pipeline的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang Pipeline 流式处理与并发控制

在Go语言中,管道(Pipeline)是一种非常强大的模式,用于实现流式数据处理和并发控制。下面我将介绍如何实现一个简单的pipeline库以及如何使用它。

基本概念

Pipeline模式由一系列处理阶段(stage)组成,每个阶段是一组运行相同函数的goroutine。每个阶段通过channel接收上游数据,处理后发送到下游。

实现示例

1. 基础Pipeline实现

package pipeline

// Stage 表示一个处理阶段
type Stage func(in <-chan interface{}) <-chan interface{}

// Pipe 将多个阶段串联成管道
func Pipe(stages ...Stage) Stage {
    return func(in <-chan interface{}) <-chan interface{} {
        var out <-chan interface{} = in
        for _, stage := range stages {
            out = stage(out)
        }
        return out
    }
}

// Run 执行管道
func Run(in <-chan interface{}, stages ...Stage) <-chan interface{} {
    return Pipe(stages...)(in)
}

2. 常用Stage生成函数

// Generator 生成数据
func Generator(done <-chan struct{}, values ...interface{}) <-chan interface{} {
    out := make(chan interface{})
    go func() {
        defer close(out)
        for _, v := range values {
            select {
            case out <- v:
            case <-done:
                return
            }
        }
    }()
    return out
}

// Map 映射处理
func Map(done <-chan struct{}, in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
    out := make(chan interface{})
    go func() {
        defer close(out)
        for v := range in {
            select {
            case out <- fn(v):
            case <-done:
                return
            }
        }
    }()
    return out
}

// Filter 过滤
func Filter(done <-chan struct{}, in <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
    out := make(chan interface{})
    go func() {
        defer close(out)
        for v := range in {
            if fn(v) {
                select {
                case out <- v:
                case <-done:
                    return
                }
            }
        }
    }()
    return out
}

// FanOut 扇出(并行处理)
func FanOut(done <-chan struct{}, in <-chan interface{}, fn func(interface{}), workers int) {
    var wg sync.WaitGroup
    wg.Add(workers)
    
    for i := 0; i < workers; i++ {
        go func() {
            defer wg.Done()
            for v := range in {
                select {
                case <-done:
                    return
                default:
                    fn(v)
                }
            }
        }()
    }
    
    wg.Wait()
}

使用示例

1. 简单数据处理管道

func main() {
    done := make(chan struct{})
    defer close(done)
    
    // 生成数据
    gen := pipeline.Generator(done, 1, 2, 3, 4, 5)
    
    // 处理阶段1: 平方
    square := pipeline.Map(done, gen, func(v interface{}) interface{} {
        return v.(int) * v.(int)
    })
    
    // 处理阶段2: 过滤偶数
    even := pipeline.Filter(done, square, func(v interface{}) bool {
        return v.(int)%2 == 0
    })
    
    // 收集结果
    for v := range even {
        fmt.Println(v)
    }
}

2. 带并行处理的管道

func main() {
    done := make(chan struct{})
    defer close(done)
    
    // 生成大量数据
    var nums []interface{}
    for i := 0; i < 1000; i++ {
        nums = append(nums, i)
    }
    gen := pipeline.Generator(done, nums...)
    
    // 并行处理阶段(10个worker)
    pipeline.FanOut(done, gen, func(v interface{}) {
        // 模拟耗时处理
        time.Sleep(10 * time.Millisecond)
        fmt.Printf("处理: %d\n", v.(int))
    }, 10)
    
    fmt.Println("所有处理完成")
}

3. 组合复杂管道

func main() {
    done := make(chan struct{})
    defer close(done)
    
    // 定义处理阶段
    stage1 := func(in <-chan interface{}) <-chan interface{} {
        return pipeline.Map(done, in, func(v interface{}) interface{} {
            return v.(int) + 1
        })
    }
    
    stage2 := func(in <-chan interface{}) <-chan interface{} {
        return pipeline.Filter(done, in, func(v interface{}) bool {
            return v.(int)%2 == 0
        })
    }
    
    stage3 := func(in <-chan interface{}) <-chan interface{} {
        return pipeline.Map(done, in, func(v interface{}) interface{} {
            return v.(int) * v.(int)
        })
    }
    
    // 构建管道
    p := pipeline.Pipe(stage1, stage2, stage3)
    
    // 输入数据
    in := pipeline.Generator(done, 1, 2, 3, 4, 5)
    
    // 执行管道
    out := p(in)
    
    // 收集结果
    for v := range out {
        fmt.Println(v)
    }
}

高级特性扩展

1. 错误处理

可以扩展Stage类型以支持错误处理:

type Result struct {
    Value interface{}
    Err   error
}

type Stage func(in <-chan Result) <-chan Result

2. 限流控制

// Throttle 限流stage
func Throttle(interval time.Duration) Stage {
    return func(in <-chan interface{}) <-chan interface{} {
        out := make(chan interface{})
        ticker := time.NewTicker(interval)
        go func() {
            defer close(out)
            defer ticker.Stop()
            for v := range in {
                <-ticker.C
                out <- v
            }
        }()
        return out
    }
}

总结

Go语言的channel和goroutine天然适合实现管道模式,通过这种模式可以:

  1. 清晰地组织数据处理流程
  2. 轻松实现并发控制
  3. 有效管理资源使用
  4. 构建可组合、可重用的处理组件

实际项目中,可以根据需要扩展这个基础实现,添加更多功能如错误处理、指标收集、限流控制等。

回到顶部