在Golang中实现类似Java的惰性泛型流式编程框架

在Golang中实现类似Java的惰性泛型流式编程框架 除了Go语言,我也使用Java、Kotlin和Rust进行编程。我对Java的流式编程框架和Rust的迭代器非常着迷,因为我认为它们为程序员提供了处理数据的高级抽象。

遗憾的是,Go语言并不支持这种特性(我认为部分原因是Go语言本身也不支持泛型)。

因此,我自己实现了一个,在运行时进行类型检查。虽然这损失了一些运行时性能和静态类型检查,但它确实可以工作,并且API与Java非常相似。

import r "reflect"

func Test_Typical_SliceStream(t *testing.T) {
	target := []int{1, 4, 9, 16, 36}
	slice, err := SliceStream([]string{"1", "2", "3", "4", "55", "6"}).
		Filter(func(it string) bool { return len(it) < 2 }).
		Map(func(it string) int {
			i, err := strconv.Atoi(it)
			if err != nil {
				i = 0
			}
			return i * i
		}).
		Collect()
	if err != nil {
		panic(err)
	}
	if !r.DeepEqual(target, slice) {
		panic(fmt.Sprintf("target slice is: %v, while get: %v\n", target, slice))
	}
}

此外,它的高扩展性允许你使用反射定义自己的对象解析器。 并且它支持将ObjectStream转换为MapEntryStream:

import r "reflect"

type doubleIntResult struct {
	result r.Value
}

func (dr *doubleIntResult) Result() r.Value {
	return dr.result
}

func (dr *doubleIntResult) Ok() bool {
	return true
}

// multiple an int (to int64 because of reflecting)
type multiInt struct {
	fac int
}

func (dr *multiInt) Invoke(v r.Value) IResolveResult {
	return &doubleIntResult{result: r.ValueOf(v.Int() * int64(dr.fac))}
}

// Representing int time during reflecting
var zero int64

func (dr *multiInt) OutType() r.Type {
	return r.TypeOf(zero)
}

func Test_Custom_Resolver(t *testing.T) {
	s := []int{-1, 0, 1, 2, 3, 4, 5}
	target := []int64{3, 6, 9, 12, 15, 18}

	var slice []int64
	err := SliceStream(s).
		Map(func(it int) int { return it + 1 }).
		Filter(func(it int) bool { return it > 0 }).
		Resolve(&multiInt{3}). // object becomes int64 after that
		CollectAt(&slice)
	if err != nil {
		panic(err)
	}
	if !r.DeepEqual(slice, target) {
		panic(fmt.Sprintf("target slice is :%v, while get: %v\n", target, slice))
	}
}

func Test_StreamToMap(t *testing.T) {
	s := []int{0, 1, 2, 3, 4, 5, 6}
	targetMap := map[int]string{0: "0", 1: "1", 2: "2", 3: "3", 4: "4", 5: "5"}
	res, err := SliceStream(s).
		AsMapKey(func(it int) string { return strconv.Itoa(it) }).
		FilterValue(func(it string) bool { return it != "6" }).
		Collect()
	if err != nil {
		panic(err)
	}
	if !r.DeepEqual(targetMap, res) {
		panic(fmt.Sprintf("target map is: %v while get: %v\n", targetMap, res))
	}
}

顺便提一下,我意识到随着1.18版本中泛型的出现,Go语言有可能提供其标准的泛型集合框架和流框架。 然而,我仍然做了这项工作,原因有二:1. 许多公司由于各种原因仍在使用旧版本的Go。2. 集合框架和流式编程框架是一个庞大的项目,我等不及它们了。 而且Go官方可能根本不会提供泛型集合和流式编程功能……

项目GitHub地址是:xuc1995/gostream: A go module supply Java-Like generic stream programming (while do type check at runtime) (github.com),欢迎任何建议和贡献。


更多关于在Golang中实现类似Java的惰性泛型流式编程框架的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

你好 @xuc1995

感谢你在这里分享你的工作。

关于使用旧版 Go 版本的问题,我认为这个决定通常源于不了解 Go 1 的兼容性承诺。该承诺基本上表明,Go 1.x 工具链和标准库会保持向后兼容,以最大程度地支持现有代码。(也就是说,除非有极其重要的破坏性变更需要实施,例如修复严重的安全漏洞。)

我相信“除非万不得已,否则不升级工具链”这种心态,主要是源于使用其他语言的经验,因为那些语言的工具链升级确实很容易破坏代码库。

至于 Go 标准库中对通用流处理的支持,请不要抱太大期望。它可能不会很快到来。将新内容纳入标准库的过程相当漫长,而且理应如此。标准库中的代码应该经过测试和实战验证,因为它需要在未来很多年内保持稳定。

更有可能的是,第三方流处理包(也许包括你的?)会更快地出现。

更多关于在Golang中实现类似Java的惰性泛型流式编程框架的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


这是一个非常有意思的项目!你通过反射实现了类似Java Stream API的惰性流式操作,虽然牺牲了类型安全和部分性能,但在Go 1.18之前确实提供了一种实用的函数式编程解决方案。

让我分析一下你的实现特点,并展示如何用Go 1.18+的泛型重写类似功能:

反射实现的优缺点

优点:

  • 兼容Go 1.18之前的版本
  • API设计接近Java Stream,对Java开发者友好
  • 支持复杂的运行时类型转换

缺点:

  • 运行时类型检查,编译时无法发现类型错误
  • 反射操作有性能开销
  • 代码可读性较差

Go 1.18+泛型实现示例

基于你的设计思路,我提供一个使用泛型的简化实现:

package stream

type Stream[T any] struct {
    source []T
}

func Of[T any](items ...T) *Stream[T] {
    return &Stream[T]{source: items}
}

func FromSlice[T any](slice []T) *Stream[T] {
    return &Stream[T]{source: slice}
}

func (s *Stream[T]) Filter(predicate func(T) bool) *Stream[T] {
    result := make([]T, 0, len(s.source))
    for _, item := range s.source {
        if predicate(item) {
            result = append(result, item)
        }
    }
    return &Stream[T]{source: result}
}

func (s *Stream[T]) Map[R any](mapper func(T) R) *Stream[R] {
    result := make([]R, len(s.source))
    for i, item := range s.source {
        result[i] = mapper(item)
    }
    return &Stream[R]{source: result}
}

func (s *Stream[T]) Collect() []T {
    return s.source
}

func (s *Stream[T]) ForEach(consumer func(T)) {
    for _, item := range s.source {
        consumer(item)
    }
}

// 惰性求值版本
type LazyStream[T any] struct {
    source []T
    ops    []func([]T) []T
}

func LazyOf[T any](items ...T) *LazyStream[T] {
    return &LazyStream[T]{
        source: items,
        ops:    make([]func([]T) []T, 0),
    }
}

func (ls *LazyStream[T]) Filter(predicate func(T) bool) *LazyStream[T] {
    ls.ops = append(ls.ops, func(items []T) []T {
        result := make([]T, 0, len(items))
        for _, item := range items {
            if predicate(item) {
                result = append(result, item)
            }
        }
        return result
    })
    return ls
}

func (ls *LazyStream[T]) Map[R any](mapper func(T) R) *LazyStream[R] {
    // 类型转换需要更复杂的处理
    // 这里简化展示思路
    return nil
}

func (ls *LazyStream[T]) Collect() []T {
    result := ls.source
    for _, op := range ls.ops {
        result = op(result)
    }
    return result
}

使用示例

func main() {
    // 基本使用
    numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    
    result := FromSlice(numbers).
        Filter(func(x int) bool { return x%2 == 0 }).
        Map(func(x int) string { return fmt.Sprintf("Number: %d", x) }).
        Collect()
    
    fmt.Println(result) // [Number: 2 Number: 4 Number: 6 Number: 8 Number: 10]
    
    // 链式操作
    sum := Of(1, 2, 3, 4, 5).
        Filter(func(x int) bool { return x > 2 }).
        Map(func(x int) int { return x * 2 }).
        Collect()
    
    fmt.Println(sum) // [6 8 10]
    
    // 自定义类型
    type Person struct {
        Name string
        Age  int
    }
    
    people := []Person{
        {"Alice", 25},
        {"Bob", 17},
        {"Charlie", 30},
        {"David", 16},
    }
    
    adultNames := FromSlice(people).
        Filter(func(p Person) bool { return p.Age >= 18 }).
        Map(func(p Person) string { return p.Name }).
        Collect()
    
    fmt.Println(adultNames) // [Alice Charlie]
}

性能优化建议

对于生产环境,可以考虑以下优化:

// 使用迭代器模式避免中间数组分配
type Iterator[T any] interface {
    Next() (T, bool)
}

type sliceIterator[T any] struct {
    data []T
    idx  int
}

func (it *sliceIterator[T]) Next() (T, bool) {
    if it.idx >= len(it.data) {
        var zero T
        return zero, false
    }
    item := it.data[it.idx]
    it.idx++
    return item, true
}

// 管道操作
func Pipeline[T any](it Iterator[T]) *PipelineStream[T] {
    return &PipelineStream[T]{source: it}
}

type PipelineStream[T any] struct {
    source Iterator[T]
}

func (ps *PipelineStream[T]) Filter(predicate func(T) bool) *PipelineStream[T] {
    return &PipelineStream[T]{
        source: &filterIterator[T]{
            source:   ps.source,
            predicate: predicate,
        },
    }
}

你的项目为Go社区提供了一个有价值的参考,特别是在需要兼容旧版本Go的情况下。随着Go泛型的成熟,这类库会逐渐向编译时类型安全的方向演进。

回到顶部