golang流式ETL数据转换与JSON输出插件库omniparser的使用

Golang流式ETL数据转换与JSON输出插件库omniparser的使用

概述

Omniparser是一个原生Golang ETL解析器,它以流式方式处理各种格式的输入数据(包括CSV、txt、固定长度/宽度、XML、EDI/X12/EDIFACT、JSON和自定义格式),并根据JSON编写的模式将数据转换为所需的JSON输出。

最低Golang版本要求:1.16

主要特性

  • 支持多种输入格式:CSV、txt、固定长度/宽度、XML、EDI/X12/EDIFACT、JSON和自定义格式
  • 流式处理数据,避免内存溢出
  • 使用JSON模式定义数据转换规则
  • 支持自定义函数和自定义文件格式
  • 高性能的时间解析功能
  • 可扩展性架构

快速开始示例

下面是一个使用omniparser处理CSV文件的完整示例:

package main

import (
	"fmt"
	"log"
	"os"

	"github.com/jf-tech/omniparser"
	"github.com/jf-tech/omniparser/extensions/omniv21/fileformat"
)

func main() {
	// 1. 创建omniparser实例
	parser, err := omniparser.NewSchema(
		"example", // schema名称
		[]byte(`{
			"file_format": "csv2",
			"file_declaration": {
				"delimiter": ",",
				"has_header_line": true
			},
			"transform_declarations": {
				"FINAL_OUTPUT": { 
					"object": {
						"name": { "xpath": "record/name" },
						"age": { "xpath": "record/age", "type": "int" },
						"email": { "xpath": "record/email" }
					}
				}
			}
		}`), // schema内容
		fileformat.CSVFileFormat)
	if err != nil {
		log.Fatal(err)
	}

	// 2. 打开输入文件
	file, err := os.Open("input.csv")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	// 3. 创建内容读取器
	contentReader, err := parser.NewContentReader("example", file)
	if err != nil {
		log.Fatal(err)
	}

	// 4. 读取并转换数据
	for {
		record, err := contentReader.Read()
		if err != nil {
			if err.Error() == "EOF" {
				break // 正常结束
			}
			log.Fatal(err)
		}
		
		// 5. 输出转换后的JSON
		fmt.Println(string(record.Raw()))
	}
}

假设input.csv内容如下:

name,age,email
John Doe,30,john@example.com
Jane Smith,25,jane@example.com

输出结果将是:

{"name":"John Doe","age":30,"email":"john@example.com"}
{"name":"Jane Smith","age":25,"email":"jane@example.com"}

XML处理示例

下面是一个处理XML文件的示例:

package main

import (
	"fmt"
	"log"
	"os"

	"github.com/jf-tech/omniparser"
	"github.com/jf-tech/omniparser/extensions/omniv21/fileformat"
)

func main() {
	// 1. 创建schema
	parser, err := omniparser.NewSchema(
		"xml_example",
		[]byte(`{
			"file_format": "xml",
			"transform_declarations": {
				"FINAL_OUTPUT": {
					"array": {
						"xpath": "/catalog/book",
						"object": {
							"id": { "xpath": "@id" },
							"title": { "xpath": "title" },
							"author": { "xpath": "author" },
							"price": { "xpath": "price", "type": "float" }
						}
					}
				}
			}
		}`),
		fileformat.XMLFileFormat)
	if err != nil {
		log.Fatal(err)
	}

	// 2. 打开XML文件
	file, err := os.Open("books.xml")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	// 3. 创建内容读取器
	contentReader, err := parser.NewContentReader("xml_example", file)
	if err != nil {
		log.Fatal(err)
	}

	// 4. 读取并转换数据
	for {
		record, err := contentReader.Read()
		if err != nil {
			if err.Error() == "EOF" {
				break
			}
			log.Fatal(err)
		}
		fmt.Println(string(record.Raw()))
	}
}

假设books.xml内容如下:

<catalog>
   <book id="bk101">
      <author>Gambardella, Matthew</author>
      <title>XML Developer's Guide</title>
      <price>44.95</price>
   </book>
   <book id="bk102">
      <author>Ralls, Kim</author>
      <title>Midnight Rain</title>
      <price>5.95</price>
   </book>
</catalog>

输出结果将是:

{"id":"bk101","title":"XML Developer's Guide","author":"Gambardella, Matthew","price":44.95}
{"id":"bk102","title":"Midnight Rain","author":"Ralls, Kim","price":5.95}

使用自定义函数

omniparser支持使用JavaScript编写自定义转换函数:

package main

import (
	"fmt"
	"log"
	"os"

	"github.com/jf-tech/omniparser"
	"github.com/jf-tech/omniparser/extensions/omniv21/fileformat"
)

func main() {
	// 1. 创建schema,包含JavaScript自定义函数
	parser, err := omniparser.NewSchema(
		"custom_func_example",
		[]byte(`{
			"file_format": "csv2",
			"file_declaration": {
				"delimiter": ",",
				"has_header_line": true
			},
			"transform_declarations": {
				"FINAL_OUTPUT": {
					"object": {
						"full_name": {
							"custom_func": {
								"name": "javascript",
								"args": [
									{ "xpath": "record/first_name" },
									{ "xpath": "record/last_name" },
									{ "const": " " }
								],
								"script": "return args[0] + args[2] + args[1];"
							}
						},
						"age": { "xpath": "record/age", "type": "int" }
					}
				}
			}
		}`),
		fileformat.CSVFileFormat)
	if err != nil {
		log.Fatal(err)
	}

	// 2. 打开CSV文件
	file, err := os.Open("people.csv")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	// 3. 创建内容读取器
	contentReader, err := parser.NewContentReader("custom_func_example", file)
	if err != nil {
		log.Fatal(err)
	}

	// 4. 读取并转换数据
	for {
		record, err := contentReader.Read()
		if err != nil {
			if err.Error() == "EOF" {
				break
			}
			log.Fatal(err)
		}
		fmt.Println(string(record.Raw()))
	}
}

假设people.csv内容如下:

first_name,last_name,age
John,Doe,30
Jane,Smith,25

输出结果将是:

{"full_name":"John Doe","age":30}
{"full_name":"Jane Smith","age":25}

总结

Omniparser是一个功能强大的Golang ETL库,具有以下优势:

  1. 支持多种输入格式
  2. 流式处理,内存效率高
  3. 使用JSON模式定义转换规则,配置灵活
  4. 支持JavaScript自定义函数
  5. 高性能和可扩展性

通过上面的示例,您可以快速开始使用omniparser进行数据转换和处理。更多高级用法和详细文档可以参考项目的官方文档。


更多关于golang流式ETL数据转换与JSON输出插件库omniparser的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang流式ETL数据转换与JSON输出插件库omniparser的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Omniparser: Golang流式ETL数据转换与JSON输出插件库

Omniparser是一个强大的Golang库,专门用于流式ETL(Extract, Transform, Load)数据转换,特别适合处理各种结构化或半结构化数据并将其转换为JSON格式。

主要特性

  1. 流式处理:支持大规模数据的流式处理,内存占用低
  2. 多种输入格式:支持CSV、XML、JSON、固定宽度文本等多种格式
  3. 灵活转换:提供强大的数据转换能力
  4. 高性能:基于Golang的高效实现
  5. 易于扩展:可以自定义输入格式和转换逻辑

基本使用示例

安装

go get github.com/jf-tech/omniparser

简单CSV转JSON示例

package main

import (
	"bytes"
	"fmt"
	"log"

	"github.com/jf-tech/omniparser"
	"github.com/jf-tech/omniparser/schemahandler"
)

func main() {
	// 定义转换schema
	schema := `
	{
		"settings": {
			"file_declaration": {
				"file_type": "csv",
				"csv": {
					"has_header_line": true,
					"delimiter": ","
				}
			}
		},
		"transforms": {
			"final_output": {
				"object": {
					"id": { "xpath": "id" },
					"name": { "xpath": "name" },
					"age": { "xpath": "age", "type": "int" },
					"active": { "xpath": "active", "type": "bool" }
				}
			}
		}
	}`

	// 示例CSV数据
	csvData := `id,name,age,active
1,John Doe,30,true
2,Jane Smith,25,false
3,Bob Johnson,40,true`

	// 创建转换器
	transform, err := omniparser.NewTransform(
		"test-csv",
		schemahandler.CSV,
		bytes.NewReader([]byte(schema)),
		omniparser.ExtensionFuncs{},
	)
	if err != nil {
		log.Fatal(err)
	}

	// 处理数据
	reader := bytes.NewReader([]byte(csvData))
	ingester, err := transform.NewIngester("test-input", reader)
	if err != nil {
		log.Fatal(err)
	}

	// 读取并转换记录
	for {
		record, err := ingester.Read()
		if err != nil {
			if err == io.EOF {
				break
			}
			log.Fatal(err)
		}
		fmt.Println(string(record.Raw()))
	}
}

处理XML数据示例

func xmlExample() {
	schema := `
	{
		"settings": {
			"file_declaration": {
				"file_type": "xml"
			}
		},
		"transforms": {
			"final_output": {
				"object": {
					"id": { "xpath": "/person/id/text()" },
					"name": { "xpath": "/person/name/text()" },
					"age": { "xpath": "/person/age/text()", "type": "int" }
				}
			}
		}
	}`

	xmlData := `
	<person>
		<id>101</id>
		<name>Alice</name>
		<age>28</age>
	</person>`

	transform, err := omniparser.NewTransform(
		"test-xml",
		schemahandler.XML,
		bytes.NewReader([]byte(schema)),
		omniparser.ExtensionFuncs{},
	)
	if err != nil {
		log.Fatal(err)
	}

	reader := bytes.NewReader([]byte(xmlData))
	ingester, err := transform.NewIngester("test-input", reader)
	if err != nil {
		log.Fatal(err)
	}

	record, err := ingester.Read()
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(string(record.Raw()))
}

高级功能

自定义转换函数

func customTransformExample() {
	schema := `
	{
		"settings": {
			"file_declaration": {
				"file_type": "csv",
				"csv": {
					"has_header_line": true,
					"delimiter": ","
				}
			}
		},
		"transforms": {
			"final_output": {
				"object": {
					"id": { "xpath": "id" },
					"full_name": { 
						"custom_func": {
							"name": "concat",
							"args": [
								{ "const": "Name: " },
								{ "xpath": "name" }
							]
						}
					}
				}
			}
		}
	}`

	// 注册自定义函数
	extFuncs := omniparser.ExtensionFuncs{
		"concat": func(args ...interface{}) (interface{}, error) {
			var result string
			for _, arg := range args {
				result += fmt.Sprintf("%v", arg)
			}
			return result, nil
		},
	}

	// 创建转换器时传入自定义函数
	transform, err := omniparser.NewTransform(
		"test-custom",
		schemahandler.CSV,
		bytes.NewReader([]byte(schema)),
		extFuncs,
	)
	// ...其余处理逻辑与前面示例类似
}

处理嵌套结构

func nestedStructureExample() {
	schema := `
	{
		"settings": {
			"file_declaration": {
				"file_type": "json"
			}
		},
		"transforms": {
			"final_output": {
				"object": {
					"user_id": { "xpath": "id" },
					"profile": {
						"object": {
							"name": { "xpath": "name" },
							"address": {
								"object": {
									"city": { "xpath": "address/city" },
									"zip": { "xpath": "address/zip" }
								}
							}
						}
					},
					"orders": {
						"array": {
							"xpath": "orders",
							"object": {
								"order_id": { "xpath": "id" },
								"amount": { "xpath": "amount", "type": "float" }
							}
						}
					}
				}
			}
		}
	}`

	jsonData := `{
		"id": "user123",
		"name": "John Doe",
		"address": {
			"city": "New York",
			"zip": "10001"
		},
		"orders": [
			{ "id": "order1", "amount": "100.50" },
			{ "id": "order2", "amount": "200.75" }
		]
	}`

	// ...创建转换器和处理数据的代码与前面示例类似
}

性能优化技巧

  1. 重用转换器:创建Transform实例有一定开销,应尽可能重用
  2. 批量处理:对于大量小文件,考虑合并处理
  3. 合理使用缓存:对于频繁访问的外部数据,考虑添加缓存层
  4. 并行处理:对于独立的数据记录,可以使用goroutine并行处理

总结

Omniparser是一个功能强大且灵活的Golang ETL库,特别适合需要将各种格式数据转换为JSON的场景。它的流式处理特性使其能够高效处理大规模数据集,而丰富的转换功能则能满足复杂的数据处理需求。通过自定义函数和灵活的schema设计,几乎可以处理任何数据转换任务。

对于更复杂的使用场景,建议参考官方文档和示例代码,以充分利用Omniparser的全部功能。

回到顶部