Rust列式存储库columnar的使用:高性能列式数据存储与高效查询解决方案

Rust列式存储库columnar的使用:高性能列式数据存储与高效查询解决方案

简介

Columnar是一个Rust库,用于将(复杂)结构数组转换为(简单)数组结构。它提供了用于产品(struct)、总和(enum)和列表(Vec)组合器的容器,以及结构递归类型(如树)的示例。

这些容器代表输入类型的序列,但即使每个类型实例可能有许多更多的分配,它们也仅由少量Rust分配(向量或切片)支持。每个容器分配仅包含原始类型,并且可以轻松转换为(正确对齐的)二进制切片&[u8]

容器支持高效的随机读取访问,以及有限的随机写入访问形式。

示例

以下是使用Columnar库的完整示例:

use columnar::Columnar;

// 定义一个带有Columnar派生宏的枚举
#[derive(Columnar)]
enum Group<T> {
    Solo(T),
    Team(Vec<T>),
}

fn main() {
    // 创建并填充一个复杂类型的向量
    let mut roster = Vec::new();
    roster.push(Group::Solo(
        ("Alice".to_string(), 20u64)
    ));
    roster.push(Group::Team(vec![
        ("Bob".to_string(), 21),
        ("Carol".to_string(), 22),
    ]));

    // 转换为列式存储
    let mut columns = Columnar::as_columns(roster.iter());
    
    // 验证转换后的数据与原始数据匹配
    use columnar::Index;
    for (col, row) in columns.into_iter().zip(roster) {
        match (col, row) {
            (GroupReference::Solo(p0), Group::Solo(p1)) => {
                assert_eq!(p0.0, p1.0);
                assert_eq!(p0.1, &p1.1);
            },
            (GroupReference::Team(p0s), Group::Team(p1s)) => {
                assert_eq!(p0s.len(), p1s.len());
                for (p0, p1) in p0s.into_iter().zip(p1s) {
                    assert_eq!(p0.0, p1.0);
                    assert_eq!(p0.1, &p1.1);
                }
            },
            _ => { panic!("Variant mismatch"); }
        }
    }

    // 添加更多数据
    use columnar::Push;
    for index in 0 .. 1024 {
        columns.push(&Group::Team(vec![
            (format!("Brain{}", index), index),
            (format!("Brawn{}", index), index),
        ]));
    }

    // 报告支持columns的固定数量的大型缓冲区
    use columnar::bytes::{AsBytes, FromBytes};
    assert_eq!(columns.as_bytes().count(), 9);
    for (align, bytes) in columns.as_bytes() {
        println!("align: {:?}, bytes.len(): {:?}", align, bytes.len());
    }

    // 零拷贝序列化和反序列化
    fn round_trip<T: AsBytes](container: &T) -> T::Borrowed<'_> {
        let mut bytes_iter = container.as_bytes().map(|(_, bytes)| bytes);
        columnar::bytes::FromBytes::from_bytes(&mut bytes_iter)
    }

    let borrowed = round_trip(&columns);

    // 高效查询
    let solo_values: &[u64] = borrowed.Solo.1;
    let team_values: &[u64] = borrowed.Team.values.1;
    let total = solo_values.iter().sum::<u64>() + team_values.iter().sum::<u64>();
    println!("Present values summed: {:?}", total);
}

优势

  1. 更少、更大的分配:减少分配器的工作量,内存也更容易重用
  2. 更少的内存使用:数据重新打包到连续分配中,没有填充字节
  3. 零拷贝序列化和反序列化:原始类型的切片可以零成本转换为&[u8]
  4. 高效查询:支持对列数据进行高效操作,如求和等

劣势

  1. 可能不提供通过插入类型的访问:例如(S, T)的容器使用(&S, &T)类型访问元素
  2. 可能移除局部性:例如(S, T)的容器分别存储S和T,需要两次内存访问
  3. 可能不支持就地突变:例如Result<S, T>的容器不允许改变存在的变体

实现细节

该库主要描述列式容器组合器和它们的特征。容器组合器反映了上述思想,并代表产品、总和和列表的向量。这些组合器可以嵌套。

库基于两个主要特征:

  1. Push:允许容器描述它接受的类型以及如何吸收它们
  2. Index:允许容器描述当用usize索引时报告的类型

安装

要将columnar添加为库依赖,请运行以下Cargo命令:

cargo add columnar

或在Cargo.toml中添加:

columnar = "0.9.0"

完整示例demo

以下是一个更完整的示例,展示了如何使用columnar库处理复杂数据结构:

use columnar::{Columnar, Push, Index};
use columnar::bytes::{AsBytes, FromBytes};

// 定义一个复杂的数据结构
#[derive(Debug, Columnar)]
struct Person {
    name: String,
    age: u32,
    skills: Vec<String>,
    status: Status,
}

#[derive(Debug, Columnar)]
enum Status {
    Active,
    Inactive,
    OnLeave(u32), // 休假天数
}

fn main() {
    // 创建人员数据
    let mut people = vec![
        Person {
            name: "Alice".to_string(),
            age: 30,
            skills: vec!["Rust".to_string(), "Python".to_string()],
            status: Status::Active,
        },
        Person {
            name: "Bob".to_string(),
            age: 25,
            skills: vec!["Java".to_string()],
            status: Status::OnLeave(10),
        },
    ];

    // 转换为列式存储
    let mut columns = Columnar::as_columns(people.iter());
    
    // 添加更多数据
    for i in 0..100 {
        columns.push(&Person {
            name: format!("Person{}", i),
            age: 20 + (i % 10),
            skills: if i % 2 == 0 {
                vec!["SkillA".to_string(), "SkillB".to_string()]
            } else {
                vec!["SkillC".to_string()]
            },
            status: match i % 3 {
                0 => Status::Active,
                1 => Status::Inactive,
                _ => Status::OnLeave(5),
            },
        });
    }

    // 序列化
    let bytes: Vec<_> = columns.as_bytes().collect();
    println!("Serialized into {} buffers", bytes.len());

    // 反序列化
    let mut bytes_iter = bytes.iter().map(|(_, bytes)| *bytes);
    let deserialized = <Columnar<Person> as FromBytes>::from_bytes(&mut bytes_iter);
    
    // 查询示例
    let total_age: u32 = deserialized.age.iter().sum();
    let active_count = deserialized.status.Active.len();
    println!("Total age: {}, Active count: {}", total_age, active_count);

    // 访问特定字段
    println!("First person's name: {}", deserialized.name[0]);
    println!("First person's skills: {:?}", deserialized.skills.values[0]);
}

这个完整示例展示了:

  1. 定义复杂的数据结构
  2. 转换为列式存储
  3. 批量添加数据
  4. 序列化和反序列化
  5. 高效查询和访问特定字段
  6. 处理嵌套的枚举和向量类型

1 回复

Rust列式存储库columnar的使用:高性能列式数据存储与高效查询解决方案

简介

columnar是Rust中一个高性能的列式存储库,专为高效数据存储和快速查询而设计。列式存储与传统的行式存储不同,它将数据按列而非按行存储,特别适合分析型工作负载和需要处理大量数据的场景。

主要特性

  • 内存高效的列式存储格式
  • 支持多种数据类型(整数、浮点数、字符串等)
  • 内置压缩功能减少内存占用
  • 快速列扫描和过滤能力
  • 支持并行处理

安装

在Cargo.toml中添加依赖:

[dependencies]
columnar = "0.5"

完整示例代码

下面是一个完整的columnar使用示例,包含数据创建、查询、压缩和序列化:

use columnar::{Columnar, ColumnBuilder, Compression};
use serde::{Serialize, Deserialize};
use std::fs::File;
use std::io::{BufWriter, BufReader};

#[derive(Serialize, Deserialize, Debug, Clone)]
struct Employee {
    id: i32,
    name: String,
    salary: f64,
    department: String,
}

fn main() -> std::io::Result<()> {
    // 1. 创建列式存储
    let mut columnar = Columnar::new();
    
    // 2. 添加基本数据列
    let mut id_builder = ColumnBuilder::<i32>::new();
    let mut name_builder = ColumnBuilder::<String>::new();
    let mut salary_builder = ColumnBuilder::<f64>::new();
    let mut dept_builder = ColumnBuilder::<String>::new();
    
    // 添加员工数据
    id_builder.push(101);
    name_builder.push("Alice".to_string());
    salary_builder.push(85000.0);
    dept_builder.push("Engineering".to_string());
    
    id_builder.push(102);
    name_builder.push("Bob".to_string());
    salary_builder.push(75000.0);
    dept_builder.push("Marketing".to_string());
    
    id_builder.push(103);
    name_builder.push("Charlie".to_string());
    salary_builder.push(90000.0);
    dept_builder.push("Engineering".to_string());
    
    // 构建列
    columnar.add_column("id", id_builder.build());
    columnar.add_column("name", name_builder.build());
    columnar.add_column_with_compression("salary", salary_builder.build(), Compression::Lz4);
    columnar.add_column("department", dept_builder.build());
    
    // 3. 添加自定义类型列
    let mut emp_builder = ColumnBuilder::<Employee>::new();
    emp_builder.push(Employee {
        id: 101,
        name: "Alice".to_string(),
        salary: 85000.0,
        department: "Engineering".to_string(),
    });
    emp_builder.push(Employee {
        id: 102,
        name: "Bob".to_string(),
        salary: 75000.0,
        department: "Marketing".to_string(),
    });
    columnar.add_column("employees", emp_builder.build());
    
    // 4. 查询示例
    // 获取各部门平均薪资
    let salary_col = columnar.column::<f64>("salary").unwrap();
    let dept_col = columnar.column::<String>("department").unwrap();
    
    let mut dept_salaries = std::collections::HashMap::new();
    for (salary, dept) in salary_col.iter().zip(dept_col.iter()) {
        let entry = dept_salaries.entry(dept).or_insert((0.0, 0));
        entry.0 += salary;
        entry.1 += 1;
    }
    
    println!("各部门平均薪资:");
    for (dept, (total, count)) in dept_salaries {
        println!("{}: {:.2}", dept, total / count as f64);
    }
    
    // 5. 序列化到文件
    let file = File::create("employees.col")?;
    let writer = BufWriter::new(file);
    columnar.serialize_into(writer)?;
    
    // 6. 从文件反序列化
    let file = File::open("employees.col")?;
    let reader = BufReader::new(file);
    let loaded = Columnar::deserialize_from(reader)?;
    
    // 7. 验证加载的数据
    let loaded_emp_col = loaded.column::<Employee>("employees").unwrap();
    println!("\n加载的员工数据:");
    for emp in loaded_emp_col.iter() {
        println!("{:?}", emp);
    }
    
    Ok(())
}

代码说明

  1. 数据创建:示例中创建了基本数据类型列(i32, String, f64)和自定义结构体(Employee)列
  2. 压缩使用:对薪资列使用了LZ4压缩
  3. 查询操作:计算了各部门的平均薪资
  4. 序列化:将整个列式存储序列化到文件
  5. 反序列化:从文件加载数据并验证

性能提示

  1. 批量添加数据比单条添加更高效
  2. 对频繁查询的列考虑使用字典编码
  3. 对于分析型查询,只加载需要的列
  4. 合理使用压缩可以减少内存占用,但会增加CPU开销

columnar库非常适合需要处理大量数据并进行复杂查询的场景,如数据分析、日志处理和时序数据处理等。

回到顶部