golang将SQL模式转换为AVRO模式并查询记录的插件库avro的使用
Golang 将 SQL 模式转换为 AVRO 模式并查询记录的插件库 avro 的使用
功能概述
github.com/khezen/avro
这个包的主要目的是简化在 Go 中使用 AVRO 强类型的功能。
主要功能模块
github.com/khezen/avro
- AVRO 模式的序列化/反序列化github.com/khezen/avro/sqlavro
- SQL 表到 AVRO 模式的转换和查询github.com/khezen/avro/redshiftavro
- 从 AVRO 模式生成 Redshift 创建语句
什么是 AVRO
Apache AVRO 是一个数据序列化系统,它依赖于 JSON 模式。它提供:
- 丰富的数据结构
- 紧凑、快速的二进制数据格式
- 用于存储持久数据的容器文件
- 远程过程调用 (RPC)
AVRO 二进制编码数据与其模式一起提供,因此是完全自描述的。
使用示例
SQL 表转换为 AVRO 模式
package main
import (
"database/sql"
"encoding/json"
"fmt"
"github.com/khezen/avro/sqlavro"
)
func main() {
db, err := sql.Open("mysql", "root@/blog")
if err != nil {
panic(err)
}
defer db.Close()
_, err = db.Exec(
`CREATE TABLE posts(
ID INT NOT NULL,
title VARCHAR(128) NOT NULL,
body LONGBLOB NOT NULL,
content_type VARCHAR(128) DEFAULT 'text/markdown; charset=UTF-8',
post_date DATETIME NOT NULL,
update_date DATETIME,
reading_time_minutes DECIMAL(3,1),
PRIMARY KEY(ID)
)`,
)
if err != nil {
panic(err)
}
schemas, err := sqlavro.SQLDatabase2AVRO(db, "blog")
if err != nil {
panic(err)
}
schemasBytes, err := json.Marshal(schemas)
if err != nil {
panic(err)
}
fmt.Println(string(schemasBytes))
}
转换后的 AVRO 模式示例:
[
{
"type": "record",
"namespace": "blog",
"name": "posts",
"fields": [
{
"name": "ID",
"type": "int"
},
{
"name": "title",
"type": "string"
},
{
"name": "body",
"type": "bytes"
},
{
"name": "content_type",
"type": [
"string",
"null"
],
"default": "text/markdown; charset=UTF-8"
},
{
"name": "post_date",
"type": {
"type": "int",
"doc":"datetime",
"logicalType": "timestamp"
}
},
{
"name": "update_date",
"type": [
"null",
{
"type": "int",
"doc":"datetime",
"logicalType": "timestamp"
}
]
},
{
"name": "reading_time_minutes",
"type": [
"null",
{
"type": "bytes",
"logicalType": "decimal",
"precision": 3,
"scale": 1
}
]
}
]
}
]
从 SQL 查询记录并转换为 AVRO 或 CSV 二进制
package main
import (
"database/sql"
"fmt"
"io/ioutil"
"time"
"github.com/khezen/avro"
"github.com/khezen/avro/sqlavro"
)
func main() {
db, err := sql.Open("mysql", "root@/blog")
if err != nil {
panic(err)
}
defer db.Close()
_, err = db.Exec(
`CREATE TABLE posts(
ID INT NOT NULL,
title VARCHAR(128) NOT NULL,
body LONGBLOB NOT NULL,
content_type VARCHAR(128) DEFAULT 'text/markdown; charset=UTF-8',
post_date DATETIME NOT NULL,
update_date DATETIME,
reading_time_minutes DECIMAL(3,1),
PRIMARY KEY(ID)
)`,
)
if err != nil {
panic(err)
}
_, err = db.Exec(
`INSERT INTO posts(ID,title,body,content_type,post_date,update_date,reading_time_minutes)
VALUES (?,?,?,?,?,?,?)`,
42,
"lorem ispum",
[]byte(`Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.`),
"text/markdown; charset=UTF-8",
"2009-04-10 00:00:00",
"2009-04-10 00:00:00",
"4.2",
)
if err != nil {
panic(err)
}
schema, err := sqlavro.SQLTable2AVRO(db, "blog", "posts")
if err != nil {
panic(err)
}
limit := 1000
order := avro.Ascending
from, err := time.Parse("2006-02-01 15:04:05", "2009-04-10 00:00:00")
if err != nil {
panic(err)
}
avroBytes, updatedCriteria, err := sqlavro.Query(sqlavro.QueryConfig{
DB: db,
DBName: "blog",
Schema: schema,
Limit: limit,
Criteria: []sqlavro.Criterion{
*sqlavro.NewCriterionDateTime("post_date", &from, order),
},
Output: "avro",
})
if err != nil {
panic(err)
}
err = ioutil.WriteFile("/tmp/blog_posts.avro", avroBytes, 0644)
if err != nil {
panic(err)
}
fmt.Println(updatedCriteria)
}
从 AVRO 模式生成 Redshift 创建语句
package main
import (
"encoding/json"
"fmt"
"github.com/khezen/avro"
"github.com/khezen/avro/redshiftavro"
)
func main() {
schemaBytes := []byte(`
{
"type": "record",
"namespace": "blog",
"name": "posts",
"fields": [
{
"name": "ID",
"type": "int"
},
{
"name": "title",
"type": "string"
},
{
"name": "body",
"type": "bytes"
},
{
"name": "content_type",
"type": [
"string",
"null"
],
"default": "text/markdown; charset=UTF-8"
},
{
"name": "post_date",
"type": {
"type": "int",
"doc":"datetime",
"logicalType": "timestamp"
}
},
{
"name": "update_date",
"type": [
"null",
{
"type": "int",
"doc":"datetime",
"logicalType": "timestamp"
}
]
},
{
"name": "reading_time_minutes",
"type": [
"null",
{
"type": "bytes",
"logicalType": "decimal",
"precision": 3,
"scale": 1
}
]
}
]
}`)
var anySchema avro.AnySchema
err := json.Unmarshal(schemaBytes, &anySchema)
if err != nil {
panic(err)
}
schema := anySchema.Schema().(*avro.RecordSchema)
cfg := redshiftavro.CreateConfig{
Schema: *schema,
SortKeys: []string{"post_date", "title"},
IfNotExists: true,
}
statement, err := redshiftavro.CreateTableStatement(cfg)
if err != nil {
panic(err)
}
fmt.Println(statement)
}
生成的 Redshift SQL 语句:
CREATE TABLE IF NOT EXISTS posts(
ID INTEGER ENCODE LZO NOT NULL,
title VARCHAR(65535) ENCODE RAW NOT NULL,
body VARCHAR(65535) ENCODE ZSTD NOT NULL,
content_type VARCHAR(65535) ENCODE ZSTD NULL,
post_date TIMESTAMP WITHOUT TIME ZONE ENCODE RAW NOT NULL,
update_date TIMESTAMP WITHOUT TIME ZONE ENCODE LZO NULL,
reading_time_minutes DECIMAL(3,1) ENCODE RAW NULL
)
SORTKEY(
post_date,
title
)
类型映射
Avro | Go | SQL |
---|---|---|
null |
nil |
NULL |
bytes |
[]byte |
BLOB ,MEDIUMBLOB ,LONGBLOB |
fixed |
[]byte |
CHAR ,NCHAR |
string ,enum |
string |
VARCHAR ,NVARCHAR ,TEXT ,TINYTEXT ,MEDIUMTEXT ,LONGTEXT ,ENUM ,SET |
float |
float32 |
FLOAT |
double |
float64 |
DOUBLE |
long |
int64 |
BIGINT |
int |
int32 |
TINYINT ,SMALLINT ,INT ,YEAR |
decimal |
*big.Rat |
DECIMAL |
time |
int32 |
TIME |
timestamp |
int32 |
TIMESTAMP ,DATETIME |
date |
time.Time |
DATE |
array |
[]interface{} |
N/A |
map ,record |
map[string]interface{} |
N/A |
union |
见下文 | 任何可为空的类型 |
注意事项
- 当记录字段包含别名时,查询中使用第一个别名而不是字段名称
更多关于golang将SQL模式转换为AVRO模式并查询记录的插件库avro的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang将SQL模式转换为AVRO模式并查询记录的插件库avro的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用Go将SQL模式转换为Avro模式并查询记录
在Go语言生态中,有几个库可以帮助你将SQL数据库模式转换为Avro模式并处理Avro格式的数据记录。下面我将介绍如何使用这些库完成这个任务。
主要使用的库
github.com/linkedin/goavro
- 主流的Avro编解码库github.com/viant/goavro
- 另一个Avro处理库github.com/khezen/avro
- 提供SQL到Avro模式转换
安装依赖
go get github.com/linkedin/goavro
go get github.com/viant/goavro
go get github.com/khezen/avro
SQL模式转Avro模式示例
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"github.com/khezen/avro"
_ "github.com/lib/pq" // PostgreSQL驱动
)
func main() {
// 连接数据库
db, err := sql.Open("postgres", "user=postgres dbname=test sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 获取表结构
tableName := "users"
columns, err := avro.GetColumns(db, "public", tableName)
if err != nil {
log.Fatal(err)
}
// 转换为Avro模式
schema, err := avro.NewSchema(tableName, "com.example", columns)
if err != nil {
log.Fatal(err)
}
// 输出Avro模式
schemaJSON, _ := json.MarshalIndent(schema, "", " ")
fmt.Printf("Avro Schema:\n%s\n", schemaJSON)
}
查询记录并编码为Avro
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"github.com/linkedin/goavro"
_ "github.com/lib/pq"
)
func main() {
// 连接数据库
db, err := sql.Open("postgres", "user=postgres dbname=test sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 定义Avro模式 (可以从上一步生成)
avroSchema := `{
"type": "record",
"name": "users",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}`
// 创建Avro编解码器
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
log.Fatal(err)
}
// 查询数据
rows, err := db.Query("SELECT id, name, email, created_at FROM users LIMIT 10")
if err != nil {
log.Fatal(err)
}
defer rows.Close()
// 创建输出文件
file, err := os.Create("users.avro")
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 创建OCF写入器 (Avro Object Container File)
ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: file,
Codec: codec,
Schema: avroSchema,
})
if err != nil {
log.Fatal(err)
}
// 处理查询结果
var records []map[string]interface{}
for rows.Next() {
var id int
var name, email string
var createdAt sql.NullTime
if err := rows.Scan(&id, &name, &email, &createdAt); err != nil {
log.Fatal(err)
}
record := map[string]interface{}{
"id": id,
"name": name,
"email": email,
"created_at": goavro.Timestamp(createdAt.Time.UnixMilli()),
}
records = append(records, record)
}
// 写入Avro文件
if err := ocfWriter.Append(records); err != nil {
log.Fatal(err)
}
fmt.Println("Successfully wrote records to users.avro")
}
读取Avro文件并解码
package main
import (
"fmt"
"log"
"os"
"github.com/linkedin/goavro"
)
func main() {
// 打开Avro文件
file, err := os.Open("users.avro")
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 创建OCF读取器
ocfReader, err := goavro.NewOCFReader(file)
if err != nil {
log.Fatal(err)
}
// 读取并解码记录
for ocfReader.Scan() {
record, err := ocfReader.Read()
if err != nil {
log.Fatal(err)
}
// 打印记录
fmt.Printf("%+v\n", record)
}
if err := ocfReader.Err(); err != nil {
log.Fatal(err)
}
}
注意事项
- 类型映射需要特别注意,SQL类型和Avro类型不完全一致
- 对于NULL值处理,Avro需要明确的union类型
- 时间戳类型需要特殊处理,如上面的
goavro.Timestamp
- 对于大型数据集,考虑流式处理而非一次性加载所有记录
替代方案
如果你需要更完整的解决方案,可以考虑:
- Apache Kafka的Schema Registry
- Debezium等CDC工具
- 自定义转换器处理复杂类型映射
以上代码提供了基本的SQL到Avro转换和查询记录处理的框架,你可以根据实际需求进行调整和扩展。