Rust Parquet格式处理库parquet-format的使用:高效读写列式存储文件,支持大数据分析场景
Rust Parquet格式处理库parquet-format的使用:高效读写列式存储文件,支持大数据分析场景
parquet-format-rs
Apache Parquet的Rust实现,包含Thrift定义文件和生成的.rs文件。
使用和版本策略
这个crate之前跟踪Parquet格式版本,这使得保持语义化版本保证有时具有挑战性。从crate的3.0.0版本开始,当我们更新Parquet格式时,会使用独立的主版本号。
下表总结了版本映射关系:
parquet-format | parquet-format-rs |
---|---|
2.8.0 | 4.0.* |
2.7.0 | 3.0.* |
2.6.0 | 2.6.* |
2.5.0 | 2.5.* |
2.4.0 | 2.4.* |
更新Parquet格式
- 更新
parquet.thrift
文件 - 运行
./generate_parquet_format.sh
- 提交更改
注意:当更新到新的Parquet格式版本时,应该增加主版本号。
完整示例代码
use parquet_format::FileMetaData;
use std::fs::File;
use std::io::Read;
fn read_parquet_metadata(file_path: &str) -> Result<FileMetaData, std::io::Error> {
// 打开Parquet文件
let mut file = File::open(file_path)?;
// 读取文件末尾的元数据长度(最后4个字节)
file.seek(std::io::SeekFrom::End(-4))?;
let mut len_bytes = [0u8; 4];
file.read_exact(&mut len_bytes)?;
let metadata_len = i32::from_le_bytes(len_bytes) as u64;
// 读取元数据
file.seek(std::io::SeekFrom::End(-4 - (metadata_len as i64)))?;
let mut metadata_bytes = vec![0u8; metadata_len as usize];
file.read_exact(&mut metadata_bytes)?;
// 解析元数据
let mut cursor = std::io::Cursor::new(metadata_bytes);
let metadata = FileMetaData::read_from_in_stream(&mut cursor)?;
Ok(metadata)
}
fn main() {
let file_path = "example.parquet";
match read_parquet_metadata(file_path) {
Ok(metadata) => {
println!("Parquet文件版本: {}", metadata.version);
println!("创建者: {}", metadata.created_by.unwrap_or_default());
println!("行组数量: {}", metadata.row_groups.len());
}
Err(e) => eprintln!("读取Parquet元数据失败: {}", e),
}
}
扩展示例:写入Parquet文件
use parquet_format::{FileMetaData, RowGroup, SchemaElement, Type};
use std::fs::File;
use std::io::Write;
fn write_parquet_file(file_path: &str) -> Result<(), std::io::Error> {
// 创建Schema元素
let schema = vec![
SchemaElement {
name: "id".to_string(),
type_: Some(Type::INT32),
type_length: None,
// 其他字段...
},
SchemaElement {
name: "name".to_string(),
type_: Some(Type::BYTE_ARRAY),
type_length: None,
// 其他字段...
}
];
// 创建行组
let row_group = RowGroup {
columns: vec![], // 这里应该添加列块信息
total_byte_size: 1024,
num_rows: 100,
// 其他字段...
};
// 创建文件元数据
let metadata = FileMetaData {
version: 1,
schema,
num_rows: 100,
row_groups: vec![row_group],
created_by: Some("parquet-format-rs example".to_string()),
// 其他字段...
};
// 打开文件写入
let mut file = File::create(file_path)?;
// 这里应该添加实际数据写入逻辑
// 然后写入元数据
Ok(())
}
fn main() {
let file_path = "output.parquet";
if let Err(e) = write_parquet_file(file_path) {
eprintln!("写入Parquet文件失败: {}", e);
} else {
println!("成功写入Parquet文件: {}", file_path);
}
}
安装
在项目目录中运行以下Cargo命令:
cargo add parquet-format
或者在Cargo.toml中添加以下行:
parquet-format = "4.0.0"
1 回复
Rust Parquet格式处理库parquet-format使用指南
概述
parquet-format是Rust生态中处理Apache Parquet列式存储文件的库,特别适合大数据分析场景。它提供了高效读写Parquet文件的能力,支持Parquet格式的所有特性。
主要特性
- 完整的Parquet格式支持
- 高性能的列式读写操作
- 支持复杂嵌套数据结构
- 与Arrow生态系统良好集成
- 适用于大数据处理场景
安装
在Cargo.toml中添加依赖:
[dependencies]
parquet = "2.0.0"
基本使用方法
1. 写入Parquet文件
use parquet::file::properties::WriterProperties;
use parquet::arrow::ArrowWriter;
use arrow::array::{Int32Array, StringArray};
use arrow::record_batch::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;
fn write_parquet_example() -> Result<(), Box<dyn std::error::Error>> {
// 定义schema
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]);
// 创建数据
let id_array = Int32Array::from(vec![1, 2, 3, 4]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie", "David"]);
// 创建RecordBatch
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array), Arc::new(name_array)],
)?;
// 创建文件写入器
let file = std::fs::File::create("data.parquet")?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, Arc::new(schema), Some(props))?;
// 写入数据
writer.write(&batch)?;
// 关闭写入器
writer.close()?;
Ok(())
}
2. 读取Parquet文件
use parquet::arrow::ArrowReader;
use parquet::file::reader::SerializedFileReader;
use std::fs::File;
fn read_parquet_example() -> Result<(), Box<dyn std::error::Error>> {
// 打开文件
let file = File::open("data.parquet")?;
let reader = SerializedFileReader::new(file)?;
// 创建Arrow读取器
let mut arrow_reader = ArrowReader::try_new(reader)?;
// 读取所有记录批次
let record_batches = arrow_reader.get_record_reader()?;
// 遍历记录
for maybe_batch in record_batches {
let batch = maybe_batch?;
println!("Batch: {:?}", batch);
}
Ok(())
}
3. 读取特定列
use parquet::file::reader::{FileReader, SerializedFileReader};
fn read_specific_columns() -> Result<(), Box<dyn std::error::Error>> {
let file = File::open("data.parquet")?;
let reader = SerializedFileReader::new(file)?;
// 只读取name列
let iter = reader.get_row_iter(Some(vec![1]))?; // 1是name列的索引
for record in iter {
let name = record.get_string(0)?; // 获取字符串值
println!("Name: {}", name);
}
Ok(())
}
高级用法
1. 自定义写入属性
use parquet::file::properties::WriterProperties;
let props = WriterProperties::builder()
.set_max_row_group_size(10000) // 设置行组大小
.set_compression(parquet::basic::Compression::SNAPPY) // 设置压缩算法
.set_encoding(parquet::basic::Encoding::PLAIN) // 设置编码方式
.build();
2. 处理复杂类型
use arrow::datatypes::{DataType, Field, Schema};
// 定义包含嵌套结构的schema
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new_struct(
"address",
vec![
Field::new("street", DataType::Utf8, false),
Field::new("city", DataType::Utf8, false),
],
false,
),
]);
3. 使用统计信息
use parquet::file::reader::FileReader;
let reader = SerializedFileReader::new(file)?;
let metadata = reader.metadata();
// 获取第一列的统计信息
if let Some(stats) = metadata.row_group(0).column(0).statistics() {
println!("Min: {:?}", stats.min());
println!("Max: {:?}", stats.max());
}
性能优化建议
- 批量写入:尽量使用大的RecordBatch进行写入,减少小批量的频繁写入
- 合理设置行组大小:通常设置为50,000到1,000,000行之间
- 选择合适压缩算法:SNAPPY在压缩率和速度间有良好平衡
- 列投影:只读取需要的列,减少IO
- 谓词下推:利用统计信息过滤不需要的行组
实际应用场景
- 大数据分析:处理TB级别的数据分析任务
- 数据湖存储:作为数据湖的底层存储格式
- 机器学习:存储特征数据
- 日志存储:高效存储和查询日志数据
parquet-format库结合Rust的高性能特性,能够为大数据处理提供高效、安全的基础设施支持。