golang将SQL模式转换为AVRO模式并查询记录的插件库avro的使用

Golang 将 SQL 模式转换为 AVRO 模式并查询记录的插件库 avro 的使用

功能概述

github.com/khezen/avro 这个包的主要目的是简化在 Go 中使用 AVRO 强类型的功能。

主要功能模块

  1. github.com/khezen/avro - AVRO 模式的序列化/反序列化
  2. github.com/khezen/avro/sqlavro - SQL 表到 AVRO 模式的转换和查询
  3. github.com/khezen/avro/redshiftavro - 从 AVRO 模式生成 Redshift 创建语句

什么是 AVRO

Apache AVRO 是一个数据序列化系统,它依赖于 JSON 模式。它提供:

  • 丰富的数据结构
  • 紧凑、快速的二进制数据格式
  • 用于存储持久数据的容器文件
  • 远程过程调用 (RPC)

AVRO 二进制编码数据与其模式一起提供,因此是完全自描述的。

使用示例

SQL 表转换为 AVRO 模式

package main

import (
  "database/sql"
  "encoding/json"
  "fmt"

  "github.com/khezen/avro/sqlavro"
)

func main() {
  db, err := sql.Open("mysql", "root@/blog")
  if err != nil {
    panic(err)
  }
  defer db.Close()
  _, err = db.Exec(
    `CREATE TABLE posts(
      ID INT NOT NULL,
      title VARCHAR(128) NOT NULL,
      body LONGBLOB NOT NULL,
      content_type VARCHAR(128) DEFAULT 'text/markdown; charset=UTF-8',
      post_date DATETIME NOT NULL,
      update_date DATETIME,
      reading_time_minutes DECIMAL(3,1),
      PRIMARY KEY(ID)
    )`,
  )
  if err != nil {
    panic(err)
  }
  schemas, err := sqlavro.SQLDatabase2AVRO(db, "blog")
  if err != nil {
    panic(err)
  }
  schemasBytes, err := json.Marshal(schemas)
  if err != nil {
    panic(err)
  }
  fmt.Println(string(schemasBytes))
}

转换后的 AVRO 模式示例:

[
    {
        "type": "record",
        "namespace": "blog",
        "name": "posts",
        "fields": [
            {
                "name": "ID",
                "type": "int"
            },
            {
                "name": "title",
                "type": "string"
            },
            {
                "name": "body",
                "type": "bytes"
            },
            {
                "name": "content_type",
                "type": [
                    "string",
                    "null"
                ],
                "default": "text/markdown; charset=UTF-8"
            },
            {
                "name": "post_date",
                "type": {
                    "type": "int",
                    "doc":"datetime",
                    "logicalType": "timestamp"
                }
            },
            {
                "name": "update_date",
                "type": [
                    "null",
                    {
                        "type": "int",
                        "doc":"datetime",
                        "logicalType": "timestamp"
                    }
                ]
            },
            {
                "name": "reading_time_minutes",
                "type": [
                    "null",
                    {
                        "type": "bytes",
                        "logicalType": "decimal",
                        "precision": 3,
                        "scale": 1
                    }
                ]
            }
        ]
    }
]

从 SQL 查询记录并转换为 AVRO 或 CSV 二进制

package main

import (
	"database/sql"
	"fmt"
	"io/ioutil"
	"time"

	"github.com/khezen/avro"
	"github.com/khezen/avro/sqlavro"
)

func main() {
	db, err := sql.Open("mysql", "root@/blog")
	if err != nil {
		panic(err)
	}
	defer db.Close()
	_, err = db.Exec(
		`CREATE TABLE posts(
			ID INT NOT NULL,
			title VARCHAR(128) NOT NULL,
			body LONGBLOB NOT NULL,
			content_type VARCHAR(128) DEFAULT 'text/markdown; charset=UTF-8',
			post_date DATETIME NOT NULL,
			update_date DATETIME,
			reading_time_minutes DECIMAL(3,1),
			PRIMARY KEY(ID)
		)`,
	)
	if err != nil {
		panic(err)
	}
	_, err = db.Exec(
		`INSERT INTO posts(ID,title,body,content_type,post_date,update_date,reading_time_minutes)
		 VALUES (?,?,?,?,?,?,?)`,
		42,
		"lorem ispum",
		[]byte(`Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.`),
		"text/markdown; charset=UTF-8",
		"2009-04-10 00:00:00",
		"2009-04-10 00:00:00",
		"4.2",
	)
	if err != nil {
		panic(err)
	}
	schema, err := sqlavro.SQLTable2AVRO(db, "blog", "posts")
	if err != nil {
		panic(err)
	}
	limit := 1000
	order := avro.Ascending
	from, err := time.Parse("2006-02-01 15:04:05", "2009-04-10 00:00:00")
	if err != nil {
		panic(err)
	}
	avroBytes, updatedCriteria, err := sqlavro.Query(sqlavro.QueryConfig{
		DB:     db,
		DBName: "blog",
		Schema: schema,
		Limit:  limit,
		Criteria: []sqlavro.Criterion{
			*sqlavro.NewCriterionDateTime("post_date", &from, order),
		},
		Output: "avro",
	})
	if err != nil {
		panic(err)
	}
	err = ioutil.WriteFile("/tmp/blog_posts.avro", avroBytes, 0644)
	if err != nil {
		panic(err)
	}
	fmt.Println(updatedCriteria)
}

从 AVRO 模式生成 Redshift 创建语句

package main

import (
	"encoding/json"
	"fmt"

	"github.com/khezen/avro"
	"github.com/khezen/avro/redshiftavro"
)

func main() {
	schemaBytes := []byte(`
	{
        "type": "record",
        "namespace": "blog",
        "name": "posts",
        "fields": [
            {
                "name": "ID",
                "type": "int"
            },
            {
                "name": "title",
                "type": "string"
            },
            {
                "name": "body",
                "type": "bytes"
            },
            {
                "name": "content_type",
                "type": [
                    "string",
                    "null"
                ],
                "default": "text/markdown; charset=UTF-8"
            },
            {
                "name": "post_date",
                "type": {
                    "type": "int",
                    "doc":"datetime",
                    "logicalType": "timestamp"
                }
            },
            {
                "name": "update_date",
                "type": [
                    "null",
                    {
                        "type": "int",
                        "doc":"datetime",
                        "logicalType": "timestamp"
                    }
                ]
            },
            {
                "name": "reading_time_minutes",
                "type": [
                    "null",
                    {
                        "type": "bytes",
                        "logicalType": "decimal",
                        "precision": 3,
                        "scale": 1
                    }
                ]
            }
        ]
	}`)
	var anySchema avro.AnySchema
	err := json.Unmarshal(schemaBytes, &anySchema)
	if err != nil {
		panic(err)
	}
	schema := anySchema.Schema().(*avro.RecordSchema)
	cfg := redshiftavro.CreateConfig{
		Schema:      *schema,
		SortKeys:    []string{"post_date", "title"},
		IfNotExists: true,
	}
	statement, err := redshiftavro.CreateTableStatement(cfg)
	if err != nil {
		panic(err)
	}
	fmt.Println(statement)
}

生成的 Redshift SQL 语句:

CREATE TABLE IF NOT EXISTS posts(
	ID INTEGER ENCODE LZO NOT NULL,
	title VARCHAR(65535) ENCODE RAW NOT NULL,
	body VARCHAR(65535) ENCODE ZSTD NOT NULL,
	content_type VARCHAR(65535) ENCODE ZSTD NULL,
	post_date TIMESTAMP WITHOUT TIME ZONE ENCODE RAW NOT NULL,
	update_date TIMESTAMP WITHOUT TIME ZONE ENCODE LZO NULL,
	reading_time_minutes DECIMAL(3,1) ENCODE RAW NULL
)
SORTKEY(
	post_date,
	title
)

类型映射

Avro Go SQL
null nil NULL
bytes []byte BLOB,MEDIUMBLOB,LONGBLOB
fixed []byte CHAR,NCHAR
string,enum string VARCHAR,NVARCHAR,TEXT,TINYTEXT,MEDIUMTEXT,LONGTEXT,ENUM,SET
float float32 FLOAT
double float64 DOUBLE
long int64 BIGINT
int int32 TINYINT,SMALLINT,INT,YEAR
decimal *big.Rat DECIMAL
time int32 TIME
timestamp int32 TIMESTAMP,DATETIME
date time.Time DATE
array []interface{} N/A
map,record map[string]interface{} N/A
union 见下文 任何可为空的类型

注意事项

  • 当记录字段包含别名时,查询中使用第一个别名而不是字段名称

更多关于golang将SQL模式转换为AVRO模式并查询记录的插件库avro的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang将SQL模式转换为AVRO模式并查询记录的插件库avro的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用Go将SQL模式转换为Avro模式并查询记录

在Go语言生态中,有几个库可以帮助你将SQL数据库模式转换为Avro模式并处理Avro格式的数据记录。下面我将介绍如何使用这些库完成这个任务。

主要使用的库

  1. github.com/linkedin/goavro - 主流的Avro编解码库
  2. github.com/viant/goavro - 另一个Avro处理库
  3. github.com/khezen/avro - 提供SQL到Avro模式转换

安装依赖

go get github.com/linkedin/goavro
go get github.com/viant/goavro
go get github.com/khezen/avro

SQL模式转Avro模式示例

package main

import (
	"database/sql"
	"encoding/json"
	"fmt"
	"log"

	"github.com/khezen/avro"
	_ "github.com/lib/pq" // PostgreSQL驱动
)

func main() {
	// 连接数据库
	db, err := sql.Open("postgres", "user=postgres dbname=test sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// 获取表结构
	tableName := "users"
	columns, err := avro.GetColumns(db, "public", tableName)
	if err != nil {
		log.Fatal(err)
	}

	// 转换为Avro模式
	schema, err := avro.NewSchema(tableName, "com.example", columns)
	if err != nil {
		log.Fatal(err)
	}

	// 输出Avro模式
	schemaJSON, _ := json.MarshalIndent(schema, "", "  ")
	fmt.Printf("Avro Schema:\n%s\n", schemaJSON)
}

查询记录并编码为Avro

package main

import (
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"os"

	"github.com/linkedin/goavro"
	_ "github.com/lib/pq"
)

func main() {
	// 连接数据库
	db, err := sql.Open("postgres", "user=postgres dbname=test sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// 定义Avro模式 (可以从上一步生成)
	avroSchema := `{
		"type": "record",
		"name": "users",
		"namespace": "com.example",
		"fields": [
			{"name": "id", "type": "int"},
			{"name": "name", "type": "string"},
			{"name": "email", "type": "string"},
			{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
		]
	}`

	// 创建Avro编解码器
	codec, err := goavro.NewCodec(avroSchema)
	if err != nil {
		log.Fatal(err)
	}

	// 查询数据
	rows, err := db.Query("SELECT id, name, email, created_at FROM users LIMIT 10")
	if err != nil {
		log.Fatal(err)
	}
	defer rows.Close()

	// 创建输出文件
	file, err := os.Create("users.avro")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	// 创建OCF写入器 (Avro Object Container File)
	ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{
		W:      file,
		Codec:  codec,
		Schema: avroSchema,
	})
	if err != nil {
		log.Fatal(err)
	}

	// 处理查询结果
	var records []map[string]interface{}
	for rows.Next() {
		var id int
		var name, email string
		var createdAt sql.NullTime

		if err := rows.Scan(&id, &name, &email, &createdAt); err != nil {
			log.Fatal(err)
		}

		record := map[string]interface{}{
			"id":         id,
			"name":       name,
			"email":      email,
			"created_at": goavro.Timestamp(createdAt.Time.UnixMilli()),
		}
		records = append(records, record)
	}

	// 写入Avro文件
	if err := ocfWriter.Append(records); err != nil {
		log.Fatal(err)
	}

	fmt.Println("Successfully wrote records to users.avro")
}

读取Avro文件并解码

package main

import (
	"fmt"
	"log"
	"os"

	"github.com/linkedin/goavro"
)

func main() {
	// 打开Avro文件
	file, err := os.Open("users.avro")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	// 创建OCF读取器
	ocfReader, err := goavro.NewOCFReader(file)
	if err != nil {
		log.Fatal(err)
	}

	// 读取并解码记录
	for ocfReader.Scan() {
		record, err := ocfReader.Read()
		if err != nil {
			log.Fatal(err)
		}

		// 打印记录
		fmt.Printf("%+v\n", record)
	}

	if err := ocfReader.Err(); err != nil {
		log.Fatal(err)
	}
}

注意事项

  1. 类型映射需要特别注意,SQL类型和Avro类型不完全一致
  2. 对于NULL值处理,Avro需要明确的union类型
  3. 时间戳类型需要特殊处理,如上面的goavro.Timestamp
  4. 对于大型数据集,考虑流式处理而非一次性加载所有记录

替代方案

如果你需要更完整的解决方案,可以考虑:

  1. Apache Kafka的Schema Registry
  2. Debezium等CDC工具
  3. 自定义转换器处理复杂类型映射

以上代码提供了基本的SQL到Avro转换和查询记录处理的框架,你可以根据实际需求进行调整和扩展。

回到顶部