Golang迭代操作指南:Map()、泛型Filter()和Reduce()的实现

Golang迭代操作指南:Map()、泛型Filter()和Reduce()的实现 我刚刚在GitHub上发布了go_iter:

GitHub

GitHub - serge-hulne/go_iter: Go迭代工具(用于迭代、映射、过滤、归约流 - 以通道表示)

Go迭代工具(用于迭代、映射、过滤、归约流 - 以通道表示) - GitHub - serge-hulne/go_iter: Go迭代工具(用于迭代、映射、过滤、归约流 -r…

这是一个通用的迭代器工具集,用于在Go中迭代任意类型和任意长度的数据流。

适用于:

  • 处理大型数据文件(自然语言处理等)。
  • 以并发方式处理流。
  • 类似于JS的Rx。
  • 在Go中进行函数式编程。

更多关于Golang迭代操作指南:Map()、泛型Filter()和Reduce()的实现的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang迭代操作指南:Map()、泛型Filter()和Reduce()的实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


这是一个很棒的库!go_iter 通过通道实现了类似函数式编程的流式操作,为 Go 带来了更优雅的迭代处理方式。我来详细说明一下 Map、Filter 和 Reduce 的实现原理,并展示如何使用你的库。

核心实现分析

1. Map() 实现

// 典型的 Map 实现模式
func Map[T any, R any](stream <-chan T, f func(T) R) <-chan R {
    out := make(chan R)
    go func() {
        defer close(out)
        for item := range stream {
            out <- f(item)
        }
    }()
    return out
}

// 使用示例
func main() {
    // 创建输入流
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 5; i++ {
            input <- i
        }
    }()
    
    // Map 操作:平方
    squared := Map(input, func(x int) int {
        return x * x
    })
    
    // 消费结果
    for val := range squared {
        fmt.Println(val) // 输出: 1, 4, 9, 16, 25
    }
}

2. Filter() 泛型实现

// 泛型 Filter 实现
func Filter[T any](stream <-chan T, predicate func(T) bool) <-chan T {
    out := make(chan T)
    go func() {
        defer close(out)
        for item := range stream {
            if predicate(item) {
                out <- item
            }
        }
    }()
    return out
}

// 使用示例
func main() {
    numbers := make(chan int)
    go func() {
        defer close(numbers)
        for i := 1; i <= 10; i++ {
            numbers <- i
        }
    }()
    
    // 过滤偶数
    evens := Filter(numbers, func(x int) bool {
        return x%2 == 0
    })
    
    for val := range evens {
        fmt.Println(val) // 输出: 2, 4, 6, 8, 10
    }
}

3. Reduce() 实现

// Reduce 实现
func Reduce[T any, R any](stream <-chan T, initial R, reducer func(R, T) R) R {
    result := initial
    for item := range stream {
        result = reducer(result, item)
    }
    return result
}

// 使用示例
func main() {
    numbers := make(chan int)
    go func() {
        defer close(numbers)
        for i := 1; i <= 5; i++ {
            numbers <- i
        }
    }()
    
    // 求和
    sum := Reduce(numbers, 0, func(acc, x int) int {
        return acc + x
    })
    
    fmt.Println("Sum:", sum) // 输出: Sum: 15
}

链式操作示例

func main() {
    // 创建数据源
    source := make(chan int)
    go func() {
        defer close(source)
        for i := 1; i <= 100; i++ {
            source <- i
        }
    }()
    
    // 链式操作:过滤 -> 映射 -> 归约
    result := Reduce(
        Map(
            Filter(source, func(x int) bool {
                return x%2 == 0  // 只保留偶数
            }),
            func(x int) int {
                return x * 2     // 每个元素乘以2
            }
        ),
        0,
        func(acc, x int) int {
            return acc + x       // 求和
        }
    )
    
    fmt.Println("Result:", result)
}

并发处理优势

// 并发 Map 处理
func ConcurrentMap[T any, R any](stream <-chan T, f func(T) R, workers int) <-chan R {
    out := make(chan R)
    var wg sync.WaitGroup
    
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for item := range stream {
                out <- f(item)
            }
        }()
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

实际应用场景

// 处理大型文件
func ProcessLargeFile(filename string) {
    lines := ReadFileLines(filename) // 返回 chan string
    
    // 流式处理
    processed := Map(lines, func(line string) string {
        return strings.ToUpper(line)
    })
    
    filtered := Filter(processed, func(line string) bool {
        return strings.Contains(line, "ERROR")
    })
    
    // 写入结果
    for line := range filtered {
        fmt.Println(line)
    }
}

// NLP 处理示例
func ProcessTextStream(texts <-chan string) <-chan []string {
    return Map(texts, func(text string) []string {
        // 分词处理
        return strings.Fields(text)
    })
}

go_iter 的这种设计模式特别适合处理:

  1. 大数据流:避免一次性加载所有数据到内存
  2. 实时数据:可以边生产边消费
  3. 并发处理:天然支持流水线并行
  4. 组合操作:可以灵活组合不同的处理阶段

这种基于通道的迭代器模式确实为 Go 的函数式编程提供了很好的基础设施。

回到顶部