golang基于流式编程的数据处理插件库flowgraph的使用

golang基于流式编程的数据处理插件库flowgraph的使用

快速开始

安装flowgraph库并运行测试:

go get -u github.com/vectaport/flowgraph
go test

概述

Flowgraph由通过流互连的hub构成。hub使用goroutine实现,通过select等待传入数据或背压握手信号。数据和握手信号通过流传输,流使用空接口通道(interface{})实现正向数据流,使用空结构体通道(struct{})实现背压。

使用此包的用户完全不需要了解goroutine、channel和select的细节,只需要为flowgraph中的每个hub提供将输入数据转换为输出数据的空接口函数。该库还支持根据需要记录每个数据流和转换的详细程度,用于调试和监控。

该包通过使用背压管理空闲空间,可以构建正确无误的数据流系统,避免死锁和僵局。它还支持循环结构,可以使用通道缓冲在循环内实现与管道结构相同的效率。

完整示例demo

下面是一个使用flowgraph的完整示例,展示了如何创建一个简单的数据处理流程:

package main

import (
	"fmt"
	"github.com/vectaport/flowgraph"
)

func main() {
	// 创建新的flowgraph
	fg := flowgraph.New("simpleExample")

	// 创建hub
	src := fg.AddHub("src", flowgraph.Source)  // 数据源hub
	proc := fg.AddHub("proc", flowgraph.Func)  // 数据处理hub
	sink := fg.AddHub("sink", flowgraph.Sink)  // 数据接收hub

	// 定义数据源函数
	src.SrcFunc = func(n *flowgraph.Hub) {
		for i := 0; i < 5; i++ {
			n.SendData(i)  // 发送数据0-4
		}
		n.SendData(nil)  // 发送nil表示结束
	}

	// 定义处理函数 - 将输入数字乘以2
	proc.Func = func(n *flowgraph.Hub) {
		data := n.RecvData(0)  // 从端口0接收数据
		if data == nil {
			n.SendData(nil)  // 传递结束信号
			return
		}
		num := data.(int)
		n.SendData(num * 2)  // 发送处理后的数据
	}

	// 定义接收函数 - 打印处理后的数据
	sink.SinkFunc = func(n *flowgraph.Hub) {
		data := n.RecvData(0)
		if data == nil {
			return  // 结束处理
		}
		fmt.Println("Received:", data.(int))
	}

	// 连接hub
	fg.Connect(src, 0, proc, 0, 1)  // src端口0连接到proc端口0,缓冲区大小1
	fg.Connect(proc, 0, sink, 0, 1) // proc端口0连接到sink端口0,缓冲区大小1

	// 运行flowgraph
	fg.Run()
}

这个示例创建了一个简单的数据处理流程:

  1. 源hub(src)生成数字0-4
  2. 处理hub(proc)将每个数字乘以2
  3. 接收hub(sink)打印处理后的结果

输出将是:

Received: 0
Received: 2
Received: 4
Received: 6
Received: 8

关键概念说明

  1. Hub类型:

    • Source: 数据源,需要定义SrcFunc
    • Sink: 数据接收端,需要定义SinkFunc
    • Func: 数据处理节点,需要定义Func
  2. 连接hub:

    • 使用fg.Connect()方法连接hub,指定源hub、源端口、目标hub、目标端口和缓冲区大小
  3. 数据处理:

    • 使用n.RecvData(port)接收数据
    • 使用n.SendData(data)发送数据
    • nil表示数据流结束

这个库非常适合构建复杂的数据处理管道,同时保持代码简洁和易于维护。


更多关于golang基于流式编程的数据处理插件库flowgraph的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于流式编程的数据处理插件库flowgraph的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang流式编程库flowgraph使用指南

flowgraph是一个基于Golang的流式数据处理库,它允许开发者以声明式的方式构建数据处理流水线。下面我将详细介绍它的使用方法并提供示例代码。

基本概念

flowgraph的核心概念包括:

  • Node:数据处理节点,每个节点执行特定功能
  • Edge:连接节点的边,代表数据流向
  • Graph:由节点和边组成的数据处理图

安装

go get github.com/vectaport/flowgraph

基本使用示例

示例1:简单数据处理流水线

package main

import (
	"fmt"
	"github.com/vectaport/flowgraph"
)

func main() {
	// 创建新图
	fg := flowgraph.New("simple-example")

	// 创建节点
	src := fg.AddNode("source")
	processor := fg.AddNode("processor")
	sink := fg.AddNode("sink")

	// 定义节点行为
	src.SrcFunc = func() interface{} {
		// 生成数据
		staticData := []int{1, 2, 3, 4, 5}
		for _, v := range staticData {
			return v
		}
		return nil // 结束信号
	}

	processor.ProcFunc = func(data interface{}) interface{} {
		// 处理数据:将整数乘以2
		if num, ok := data.(int); ok {
			return num * 2
		}
		return nil
	}

	sink.SinkFunc = func(data interface{}) {
		// 输出结果
		fmt.Printf("Processed result: %v\n", data)
	}

	// 连接节点
	fg.Connect(src, 0, processor, 0)
	fg.Connect(processor, 0, sink, 0)

	// 运行图
	fg.Run()
}

示例2:多阶段数据处理

package main

import (
	"fmt"
	"github.com/vectaport/flowgraph"
	"strings"
)

func main() {
	fg := flowgraph.New("multi-stage")

	// 创建节点
	generator := fg.AddNode("generator")
	filter := fg.AddNode("filter")
	transformer := fg.AddNode("transformer")
	aggregator := fg.AddNode("aggregator")
	printer := fg.AddNode("printer")

	// 定义节点行为
	words := []string{"apple", "banana", "cherry", "date", "elderberry", "fig"}

	generator.SrcFunc = func() interface{} {
		if len(words) == 0 {
			return nil
		}
		word := words[0]
		words = words[1:]
		return word
	}

	filter.ProcFunc = func(data interface{}) interface{} {
		word := data.(string)
		if len(word) > 4 { // 只保留长度大于4的单词
			return word
		}
		return nil // 过滤掉
	}

	transformer.ProcFunc = func(data interface{}) interface{} {
		word := data.(string)
		return strings.ToUpper(word)
	}

	aggregator.ProcFunc = func(data interface{}) interface{} {
		word := data.(string)
		// 简单聚合:计算字母ASCII码之和
		sum := 0
		for _, c := range word {
			sum += int(c)
		}
		return fmt.Sprintf("%s:%d", word, sum)
	}

	printer.SinkFunc = func(data interface{}) {
		fmt.Println(data)
	}

	// 构建图
	fg.Connect(generator, 0, filter, 0)
	fg.Connect(filter, 0, transformer, 0)
	fg.Connect(transformer, 0, aggregator, 0)
	fg.Connect(aggregator, 0, printer, 0)

	// 运行
	fg.Run()
}

高级特性

并行处理

package main

import (
	"fmt"
	"github.com/vectaport/flowgraph"
	"sync"
	"time"
)

func main() {
	fg := flowgraph.New("parallel")

	src := fg.AddNode("source")
	workers := make([]*flowgraph.Node, 3)
	var wg sync.WaitGroup

	// 数据源
	src.SrcFunc = func() interface{} {
		for i := 0; i < 10; i++ {
			time.Sleep(100 * time.Millisecond)
			return i
		}
		return nil
	}

	// 创建3个工作节点
	for i := range workers {
		workers[i] = fg.AddNode(fmt.Sprintf("worker-%d", i))
		workers[i].ProcFunc = func(data interface{}) interface{} {
			num := data.(int)
			time.Sleep(500 * time.Millisecond) // 模拟耗时处理
			return num * num
		}
		fg.Connect(src, 0, workers[i], 0)
	}

	// 结果收集器
	collector := fg.AddNode("collector")
	collector.SinkFunc = func(data interface{}) {
		fmt.Printf("Result: %v\n", data)
		wg.Done()
	}

	// 连接所有worker到collector
	for _, worker := range workers {
		fg.Connect(worker, 0, collector, 0)
	}

	wg.Add(10) // 预期处理10个数据项
	fg.Run()
	wg.Wait()
}

错误处理

package main

import (
	"errors"
	"fmt"
	"github.com/vectaport/flowgraph"
)

func main() {
	fg := flowgraph.New("error-handling")

	src := fg.AddNode("source")
	processor := fg.AddNode("processor")
	sink := fg.AddNode("sink")

	src.SrcFunc = func() interface{} {
		return "data"
	}

	processor.ProcFunc = func(data interface{}) interface{} {
		str, ok := data.(string)
		if !ok {
			return errors.New("invalid data type")
		}
		if str == "error" {
			return errors.New("simulated error")
		}
		return str + "-processed"
	}

	sink.SinkFunc = func(data interface{}) {
		switch v := data.(type) {
		case error:
			fmt.Printf("Error occurred: %v\n", v)
		default:
			fmt.Printf("Success: %v\n", v)
		}
	}

	fg.Connect(src, 0, processor, 0)
	fg.Connect(processor, 0, sink, 0)

	fg.Run()
}

最佳实践

  1. 节点职责单一:每个节点应该只做一件事情
  2. 错误处理:在关键节点添加错误处理逻辑
  3. 性能考虑:对于计算密集型任务,考虑使用并行节点
  4. 资源清理:对于需要清理的资源,可以在节点中添加关闭逻辑

flowgraph提供了一种优雅的方式来构建复杂的数据处理流水线,特别适合ETL、数据转换和实时数据处理等场景。通过组合简单的节点,可以构建出强大的数据处理系统。

回到顶部