Rust Parquet格式处理库parquet-format的使用:高效读写列式存储文件,支持大数据分析场景

Rust Parquet格式处理库parquet-format的使用:高效读写列式存储文件,支持大数据分析场景

parquet-format-rs

Apache Parquet的Rust实现,包含Thrift定义文件和生成的.rs文件。

使用和版本策略

这个crate之前跟踪Parquet格式版本,这使得保持语义化版本保证有时具有挑战性。从crate的3.0.0版本开始,当我们更新Parquet格式时,会使用独立的主版本号。

下表总结了版本映射关系:

parquet-format parquet-format-rs
2.8.0 4.0.*
2.7.0 3.0.*
2.6.0 2.6.*
2.5.0 2.5.*
2.4.0 2.4.*

更新Parquet格式

  1. 更新parquet.thrift文件
  2. 运行./generate_parquet_format.sh
  3. 提交更改

注意:当更新到新的Parquet格式版本时,应该增加主版本号。

完整示例代码

use parquet_format::FileMetaData;
use std::fs::File;
use std::io::Read;

fn read_parquet_metadata(file_path: &str) -> Result<FileMetaData, std::io::Error> {
    // 打开Parquet文件
    let mut file = File::open(file_path)?;
    
    // 读取文件末尾的元数据长度(最后4个字节)
    file.seek(std::io::SeekFrom::End(-4))?;
    let mut len_bytes = [0u8; 4];
    file.read_exact(&mut len_bytes)?;
    let metadata_len = i32::from_le_bytes(len_bytes) as u64;
    
    // 读取元数据
    file.seek(std::io::SeekFrom::End(-4 - (metadata_len as i64)))?;
    let mut metadata_bytes = vec![0u8; metadata_len as usize];
    file.read_exact(&mut metadata_bytes)?;
    
    // 解析元数据
    let mut cursor = std::io::Cursor::new(metadata_bytes);
    let metadata = FileMetaData::read_from_in_stream(&mut cursor)?;
    
    Ok(metadata)
}

fn main() {
    let file_path = "example.parquet";
    match read_parquet_metadata(file_path) {
        Ok(metadata) => {
            println!("Parquet文件版本: {}", metadata.version);
            println!("创建者: {}", metadata.created_by.unwrap_or_default());
            println!("行组数量: {}", metadata.row_groups.len());
        }
        Err(e) => eprintln!("读取Parquet元数据失败: {}", e),
    }
}

扩展示例:写入Parquet文件

use parquet_format::{FileMetaData, RowGroup, SchemaElement, Type};
use std::fs::File;
use std::io::Write;

fn write_parquet_file(file_path: &str) -> Result<(), std::io::Error> {
    // 创建Schema元素
    let schema = vec![
        SchemaElement {
            name: "id".to_string(),
            type_: Some(Type::INT32),
            type_length: None,
            // 其他字段...
        },
        SchemaElement {
            name: "name".to_string(),
            type_: Some(Type::BYTE_ARRAY),
            type_length: None,
            // 其他字段...
        }
    ];

    // 创建行组
    let row_group = RowGroup {
        columns: vec![], // 这里应该添加列块信息
        total_byte_size: 1024,
        num_rows: 100,
        // 其他字段...
    };

    // 创建文件元数据
    let metadata = FileMetaData {
        version: 1,
        schema,
        num_rows: 100,
        row_groups: vec![row_group],
        created_by: Some("parquet-format-rs example".to_string()),
        // 其他字段...
    };

    // 打开文件写入
    let mut file = File::create(file_path)?;
    
    // 这里应该添加实际数据写入逻辑
    // 然后写入元数据
    
    Ok(())
}

fn main() {
    let file_path = "output.parquet";
    if let Err(e) = write_parquet_file(file_path) {
        eprintln!("写入Parquet文件失败: {}", e);
    } else {
        println!("成功写入Parquet文件: {}", file_path);
    }
}

安装

在项目目录中运行以下Cargo命令:

cargo add parquet-format

或者在Cargo.toml中添加以下行:

parquet-format = "4.0.0"

1 回复

Rust Parquet格式处理库parquet-format使用指南

概述

parquet-format是Rust生态中处理Apache Parquet列式存储文件的库,特别适合大数据分析场景。它提供了高效读写Parquet文件的能力,支持Parquet格式的所有特性。

主要特性

  • 完整的Parquet格式支持
  • 高性能的列式读写操作
  • 支持复杂嵌套数据结构
  • 与Arrow生态系统良好集成
  • 适用于大数据处理场景

安装

在Cargo.toml中添加依赖:

[dependencies]
parquet = "2.0.0"

基本使用方法

1. 写入Parquet文件

use parquet::file::properties::WriterProperties;
use parquet::arrow::ArrowWriter;
use arrow::array::{Int32Array, StringArray};
use arrow::record_batch::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;

fn write_parquet_example() -> Result<(), Box<dyn std::error::Error>> {
    // 定义schema
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
    ]);
    
    // 创建数据
    let id_array = Int32Array::from(vec![1, 2, 3, 4]);
    let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie", "David"]);
    
    // 创建RecordBatch
    let batch = RecordBatch::try_new(
        Arc::new(schema),
        vec![Arc::new(id_array), Arc::new(name_array)],
    )?;
    
    // 创建文件写入器
    let file = std::fs::File::create("data.parquet")?;
    let props = WriterProperties::builder().build();
    let mut writer = ArrowWriter::try_new(file, Arc::new(schema), Some(props))?;
    
    // 写入数据
    writer.write(&batch)?;
    
    // 关闭写入器
    writer.close()?;
    
    Ok(())
}

2. 读取Parquet文件

use parquet::arrow::ArrowReader;
use parquet::file::reader::SerializedFileReader;
use std::fs::File;

fn read_parquet_example() -> Result<(), Box<dyn std::error::Error>> {
    // 打开文件
    let file = File::open("data.parquet")?;
    let reader = SerializedFileReader::new(file)?;
    
    // 创建Arrow读取器
    let mut arrow_reader = ArrowReader::try_new(reader)?;
    
    // 读取所有记录批次
    let record_batches = arrow_reader.get_record_reader()?;
    
    // 遍历记录
    for maybe_batch in record_batches {
        let batch = maybe_batch?;
        println!("Batch: {:?}", batch);
    }
    
    Ok(())
}

3. 读取特定列

use parquet::file::reader::{FileReader, SerializedFileReader};

fn read_specific_columns() -> Result<(), Box<dyn std::error::Error>> {
    let file = File::open("data.parquet")?;
    let reader = SerializedFileReader::new(file)?;
    
    // 只读取name列
    let iter = reader.get_row_iter(Some(vec![1]))?; // 1是name列的索引
    
    for record in iter {
        let name = record.get_string(0)?; // 获取字符串值
        println!("Name: {}", name);
    }
    
    Ok(())
}

高级用法

1. 自定义写入属性

use parquet::file::properties::WriterProperties;

let props = WriterProperties::builder()
    .set_max_row_group_size(10000) // 设置行组大小
    .set_compression(parquet::basic::Compression::SNAPPY) // 设置压缩算法
    .set_encoding(parquet::basic::Encoding::PLAIN) // 设置编码方式
    .build();

2. 处理复杂类型

use arrow::datatypes::{DataType, Field, Schema};

// 定义包含嵌套结构的schema
let schema = Schema::new(vec![
    Field::new("id", DataType::Int32, false),
    Field::new_struct(
        "address",
        vec![
            Field::new("street", DataType::Utf8, false),
            Field::new("city", DataType::Utf8, false),
        ],
        false,
    ),
]);

3. 使用统计信息

use parquet::file::reader::FileReader;

let reader = SerializedFileReader::new(file)?;
let metadata = reader.metadata();

// 获取第一列的统计信息
if let Some(stats) = metadata.row_group(0).column(0).statistics() {
    println!("Min: {:?}", stats.min());
    println!("Max: {:?}", stats.max());
}

性能优化建议

  1. 批量写入:尽量使用大的RecordBatch进行写入,减少小批量的频繁写入
  2. 合理设置行组大小:通常设置为50,000到1,000,000行之间
  3. 选择合适压缩算法:SNAPPY在压缩率和速度间有良好平衡
  4. 列投影:只读取需要的列,减少IO
  5. 谓词下推:利用统计信息过滤不需要的行组

实际应用场景

  1. 大数据分析:处理TB级别的数据分析任务
  2. 数据湖存储:作为数据湖的底层存储格式
  3. 机器学习:存储特征数据
  4. 日志存储:高效存储和查询日志数据

parquet-format库结合Rust的高性能特性,能够为大数据处理提供高效、安全的基础设施支持。

回到顶部