golang流式ETL数据转换与JSON输出插件库omniparser的使用
Golang流式ETL数据转换与JSON输出插件库omniparser的使用
概述
Omniparser是一个原生Golang ETL解析器,它以流式方式处理各种格式的输入数据(包括CSV、txt、固定长度/宽度、XML、EDI/X12/EDIFACT、JSON和自定义格式),并根据JSON编写的模式将数据转换为所需的JSON输出。
最低Golang版本要求:1.16
主要特性
- 支持多种输入格式:CSV、txt、固定长度/宽度、XML、EDI/X12/EDIFACT、JSON和自定义格式
- 流式处理数据,避免内存溢出
- 使用JSON模式定义数据转换规则
- 支持自定义函数和自定义文件格式
- 高性能的时间解析功能
- 可扩展性架构
快速开始示例
下面是一个使用omniparser处理CSV文件的完整示例:
package main
import (
"fmt"
"log"
"os"
"github.com/jf-tech/omniparser"
"github.com/jf-tech/omniparser/extensions/omniv21/fileformat"
)
func main() {
// 1. 创建omniparser实例
parser, err := omniparser.NewSchema(
"example", // schema名称
[]byte(`{
"file_format": "csv2",
"file_declaration": {
"delimiter": ",",
"has_header_line": true
},
"transform_declarations": {
"FINAL_OUTPUT": {
"object": {
"name": { "xpath": "record/name" },
"age": { "xpath": "record/age", "type": "int" },
"email": { "xpath": "record/email" }
}
}
}
}`), // schema内容
fileformat.CSVFileFormat)
if err != nil {
log.Fatal(err)
}
// 2. 打开输入文件
file, err := os.Open("input.csv")
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 3. 创建内容读取器
contentReader, err := parser.NewContentReader("example", file)
if err != nil {
log.Fatal(err)
}
// 4. 读取并转换数据
for {
record, err := contentReader.Read()
if err != nil {
if err.Error() == "EOF" {
break // 正常结束
}
log.Fatal(err)
}
// 5. 输出转换后的JSON
fmt.Println(string(record.Raw()))
}
}
假设input.csv
内容如下:
name,age,email
John Doe,30,john@example.com
Jane Smith,25,jane@example.com
输出结果将是:
{"name":"John Doe","age":30,"email":"john@example.com"}
{"name":"Jane Smith","age":25,"email":"jane@example.com"}
XML处理示例
下面是一个处理XML文件的示例:
package main
import (
"fmt"
"log"
"os"
"github.com/jf-tech/omniparser"
"github.com/jf-tech/omniparser/extensions/omniv21/fileformat"
)
func main() {
// 1. 创建schema
parser, err := omniparser.NewSchema(
"xml_example",
[]byte(`{
"file_format": "xml",
"transform_declarations": {
"FINAL_OUTPUT": {
"array": {
"xpath": "/catalog/book",
"object": {
"id": { "xpath": "@id" },
"title": { "xpath": "title" },
"author": { "xpath": "author" },
"price": { "xpath": "price", "type": "float" }
}
}
}
}
}`),
fileformat.XMLFileFormat)
if err != nil {
log.Fatal(err)
}
// 2. 打开XML文件
file, err := os.Open("books.xml")
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 3. 创建内容读取器
contentReader, err := parser.NewContentReader("xml_example", file)
if err != nil {
log.Fatal(err)
}
// 4. 读取并转换数据
for {
record, err := contentReader.Read()
if err != nil {
if err.Error() == "EOF" {
break
}
log.Fatal(err)
}
fmt.Println(string(record.Raw()))
}
}
假设books.xml
内容如下:
<catalog>
<book id="bk101">
<author>Gambardella, Matthew</author>
<title>XML Developer's Guide</title>
<price>44.95</price>
</book>
<book id="bk102">
<author>Ralls, Kim</author>
<title>Midnight Rain</title>
<price>5.95</price>
</book>
</catalog>
输出结果将是:
{"id":"bk101","title":"XML Developer's Guide","author":"Gambardella, Matthew","price":44.95}
{"id":"bk102","title":"Midnight Rain","author":"Ralls, Kim","price":5.95}
使用自定义函数
omniparser支持使用JavaScript编写自定义转换函数:
package main
import (
"fmt"
"log"
"os"
"github.com/jf-tech/omniparser"
"github.com/jf-tech/omniparser/extensions/omniv21/fileformat"
)
func main() {
// 1. 创建schema,包含JavaScript自定义函数
parser, err := omniparser.NewSchema(
"custom_func_example",
[]byte(`{
"file_format": "csv2",
"file_declaration": {
"delimiter": ",",
"has_header_line": true
},
"transform_declarations": {
"FINAL_OUTPUT": {
"object": {
"full_name": {
"custom_func": {
"name": "javascript",
"args": [
{ "xpath": "record/first_name" },
{ "xpath": "record/last_name" },
{ "const": " " }
],
"script": "return args[0] + args[2] + args[1];"
}
},
"age": { "xpath": "record/age", "type": "int" }
}
}
}
}`),
fileformat.CSVFileFormat)
if err != nil {
log.Fatal(err)
}
// 2. 打开CSV文件
file, err := os.Open("people.csv")
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 3. 创建内容读取器
contentReader, err := parser.NewContentReader("custom_func_example", file)
if err != nil {
log.Fatal(err)
}
// 4. 读取并转换数据
for {
record, err := contentReader.Read()
if err != nil {
if err.Error() == "EOF" {
break
}
log.Fatal(err)
}
fmt.Println(string(record.Raw()))
}
}
假设people.csv
内容如下:
first_name,last_name,age
John,Doe,30
Jane,Smith,25
输出结果将是:
{"full_name":"John Doe","age":30}
{"full_name":"Jane Smith","age":25}
总结
Omniparser是一个功能强大的Golang ETL库,具有以下优势:
- 支持多种输入格式
- 流式处理,内存效率高
- 使用JSON模式定义转换规则,配置灵活
- 支持JavaScript自定义函数
- 高性能和可扩展性
通过上面的示例,您可以快速开始使用omniparser进行数据转换和处理。更多高级用法和详细文档可以参考项目的官方文档。
更多关于golang流式ETL数据转换与JSON输出插件库omniparser的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang流式ETL数据转换与JSON输出插件库omniparser的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Omniparser: Golang流式ETL数据转换与JSON输出插件库
Omniparser是一个强大的Golang库,专门用于流式ETL(Extract, Transform, Load)数据转换,特别适合处理各种结构化或半结构化数据并将其转换为JSON格式。
主要特性
- 流式处理:支持大规模数据的流式处理,内存占用低
- 多种输入格式:支持CSV、XML、JSON、固定宽度文本等多种格式
- 灵活转换:提供强大的数据转换能力
- 高性能:基于Golang的高效实现
- 易于扩展:可以自定义输入格式和转换逻辑
基本使用示例
安装
go get github.com/jf-tech/omniparser
简单CSV转JSON示例
package main
import (
"bytes"
"fmt"
"log"
"github.com/jf-tech/omniparser"
"github.com/jf-tech/omniparser/schemahandler"
)
func main() {
// 定义转换schema
schema := `
{
"settings": {
"file_declaration": {
"file_type": "csv",
"csv": {
"has_header_line": true,
"delimiter": ","
}
}
},
"transforms": {
"final_output": {
"object": {
"id": { "xpath": "id" },
"name": { "xpath": "name" },
"age": { "xpath": "age", "type": "int" },
"active": { "xpath": "active", "type": "bool" }
}
}
}
}`
// 示例CSV数据
csvData := `id,name,age,active
1,John Doe,30,true
2,Jane Smith,25,false
3,Bob Johnson,40,true`
// 创建转换器
transform, err := omniparser.NewTransform(
"test-csv",
schemahandler.CSV,
bytes.NewReader([]byte(schema)),
omniparser.ExtensionFuncs{},
)
if err != nil {
log.Fatal(err)
}
// 处理数据
reader := bytes.NewReader([]byte(csvData))
ingester, err := transform.NewIngester("test-input", reader)
if err != nil {
log.Fatal(err)
}
// 读取并转换记录
for {
record, err := ingester.Read()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(string(record.Raw()))
}
}
处理XML数据示例
func xmlExample() {
schema := `
{
"settings": {
"file_declaration": {
"file_type": "xml"
}
},
"transforms": {
"final_output": {
"object": {
"id": { "xpath": "/person/id/text()" },
"name": { "xpath": "/person/name/text()" },
"age": { "xpath": "/person/age/text()", "type": "int" }
}
}
}
}`
xmlData := `
<person>
<id>101</id>
<name>Alice</name>
<age>28</age>
</person>`
transform, err := omniparser.NewTransform(
"test-xml",
schemahandler.XML,
bytes.NewReader([]byte(schema)),
omniparser.ExtensionFuncs{},
)
if err != nil {
log.Fatal(err)
}
reader := bytes.NewReader([]byte(xmlData))
ingester, err := transform.NewIngester("test-input", reader)
if err != nil {
log.Fatal(err)
}
record, err := ingester.Read()
if err != nil {
log.Fatal(err)
}
fmt.Println(string(record.Raw()))
}
高级功能
自定义转换函数
func customTransformExample() {
schema := `
{
"settings": {
"file_declaration": {
"file_type": "csv",
"csv": {
"has_header_line": true,
"delimiter": ","
}
}
},
"transforms": {
"final_output": {
"object": {
"id": { "xpath": "id" },
"full_name": {
"custom_func": {
"name": "concat",
"args": [
{ "const": "Name: " },
{ "xpath": "name" }
]
}
}
}
}
}
}`
// 注册自定义函数
extFuncs := omniparser.ExtensionFuncs{
"concat": func(args ...interface{}) (interface{}, error) {
var result string
for _, arg := range args {
result += fmt.Sprintf("%v", arg)
}
return result, nil
},
}
// 创建转换器时传入自定义函数
transform, err := omniparser.NewTransform(
"test-custom",
schemahandler.CSV,
bytes.NewReader([]byte(schema)),
extFuncs,
)
// ...其余处理逻辑与前面示例类似
}
处理嵌套结构
func nestedStructureExample() {
schema := `
{
"settings": {
"file_declaration": {
"file_type": "json"
}
},
"transforms": {
"final_output": {
"object": {
"user_id": { "xpath": "id" },
"profile": {
"object": {
"name": { "xpath": "name" },
"address": {
"object": {
"city": { "xpath": "address/city" },
"zip": { "xpath": "address/zip" }
}
}
}
},
"orders": {
"array": {
"xpath": "orders",
"object": {
"order_id": { "xpath": "id" },
"amount": { "xpath": "amount", "type": "float" }
}
}
}
}
}
}
}`
jsonData := `{
"id": "user123",
"name": "John Doe",
"address": {
"city": "New York",
"zip": "10001"
},
"orders": [
{ "id": "order1", "amount": "100.50" },
{ "id": "order2", "amount": "200.75" }
]
}`
// ...创建转换器和处理数据的代码与前面示例类似
}
性能优化技巧
- 重用转换器:创建Transform实例有一定开销,应尽可能重用
- 批量处理:对于大量小文件,考虑合并处理
- 合理使用缓存:对于频繁访问的外部数据,考虑添加缓存层
- 并行处理:对于独立的数据记录,可以使用goroutine并行处理
总结
Omniparser是一个功能强大且灵活的Golang ETL库,特别适合需要将各种格式数据转换为JSON的场景。它的流式处理特性使其能够高效处理大规模数据集,而丰富的转换功能则能满足复杂的数据处理需求。通过自定义函数和灵活的schema设计,几乎可以处理任何数据转换任务。
对于更复杂的使用场景,建议参考官方文档和示例代码,以充分利用Omniparser的全部功能。