Rust高性能数据表格库lance-table的使用,支持大规模数据存储与快速查询

Rust高性能数据表格库lance-table的使用,支持大规模数据存储与快速查询

lance-table 是 Lance 表格式的内部子 crate。

重要提示:这个 crate 不适用于外部使用

安装

在项目目录中运行以下 Cargo 命令:

cargo add lance-table

或者在 Cargo.toml 中添加以下行:

lance-table = "0.32.0"

完整示例代码

以下是使用 Lance 表格式的完整示例(使用 lance 主 crate):

use std::sync::Arc;
use tokio;  // 确保添加了 tokio 依赖

use lance::{Dataset, WriteParams};
use arrow_schema::{Schema, Field, DataType};
use arrow_array::{RecordBatch, Int32Array, StringArray};
use futures::stream::TryStreamExt;  // 用于 try_collect()

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 创建更复杂的 Arrow 模式
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
        Field::new("age", DataType::Int32, true),  // 允许空值
        Field::new("salary", DataType::Float64, false),
    ]);

    // 2. 创建测试数据
    let ids = Int32Array::from(vec![1, 2, 3, 4, 5]);
    let names = StringArray::from(vec!["Alice", "Bob", "Charlie", "David", "Eve"]);
    let ages = Int32Array::from(vec![Some(25), Some(30), None, Some(35), Some(28)]);  // 包含一个空值
    let salaries = arrow_array::Float64Array::from(vec![50000.0, 60000.0, 70000.0, 80000.0, 90000.0]);
    
    let batch = RecordBatch::try_new(
        Arc::new(schema.clone()),
        vec![
            Arc::new(ids),
            Arc::new(names),
            Arc::new(ages),
            Arc::new(salaries),
        ],
    )?;

    // 3. 写入 Lance 格式文件
    let write_params = WriteParams {
        max_rows_per_file: 2,  // 小文件用于演示
        ..Default::default()
    };
    
    let dataset = Dataset::write(
        vec![batch],
        "employees.lance",
        Some(write_params)
    ).await?;

    println!("数据集写入成功");

    // 4. 重新打开数据集
    let dataset = Dataset::open("employees.lance").await?;
    
    // 5. 执行查询 - 过滤年龄大于28的记录
    let scanner = dataset.scan()
        .filter("age > 28")?
        .project(&["id", "name", "age"])?;  // 只选择特定列

    let batches: Vec<RecordBatch> = scanner
        .try_into_stream()
        .await?
        .try_collect()
        .await?;

    // 6. 打印查询结果
    println!("\n查询结果(年龄>28):");
    for batch in batches {
        println!("{:?}", batch);
    }

    // 7. 显示数据集统计信息
    println!("\n数据集统计:");
    println!("版本数量: {}", dataset.versions().len());
    println!("文件片段数量: {}", dataset.get_fragments().len());
    println!("总行数: {}", dataset.count_rows().await?);

    Ok(())
}

依赖配置

在 Cargo.toml 中添加以下依赖:

[dependencies]
lance = "0.32.0"
arrow = "50.0.0"
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"

示例说明

  1. 创建了包含4个字段的复杂数据模式(id、name、age、salary)
  2. age字段允许为空值,演示了如何处理空值数据
  3. 写入时将最大行数设为2,会生成多个文件片段
  4. 查询时使用了过滤条件(age > 28)和列投影
  5. 最后展示了数据集的基本统计信息

注意事项

  1. 此示例使用了 lance 主 crate 而非 lance-table
  2. 实际生产环境中应考虑更大的文件片段大小
  3. 错误处理使用了 Box<dyn std::error::Error> 简化示例
  4. 查询条件支持更复杂的表达式,可参考 Lance 文档

1 回复

Rust高性能数据表格库lance-table的使用

介绍

lance-table 是一个用 Rust 编写的高性能数据表格库,专门设计用于处理大规模数据集并提供快速查询能力。它具有以下特点:

  • 支持列式存储,优化分析查询性能
  • 内置高效的索引机制,加速数据检索
  • 支持内存映射,可处理超过内存大小的数据集
  • 提供类似DataFrame的API接口
  • 支持多线程并行操作

安装

在Cargo.toml中添加依赖:

[dependencies]
lance-table = "0.1"

基本使用方法

1. 创建表格

use lance_table::{Table, Schema, Field};

// 定义表结构
let schema = Schema::new(vec![
    Field::new("id", DataType::Int32, false),
    Field::new("name", DataType::Utf8, false),
    Field::new("value", DataType::Float64, true),
]);

// 创建表格
let mut table = Table::create("data.lance", schema).unwrap();

2. 插入数据

use lance_table::RecordBatch;

// 创建记录批次
let batch = RecordBatch::try_new(
    schema.clone(),
    vec![
        Arc::new(Int32Array::from(vec![1, 2, 3])),
        Arc::new(StringArray::from(vec!["a", "b", "c"])),
        Arc::new(Float64Array::from(vec![1.1, 2.2, 3.3])),
    ],
).unwrap();

// 插入数据
table.write(&[batch]).unwrap();

3. 查询数据

// 简单查询
let results = table.scan()
    .project(&["id", "name"])
    .filter(col("id").gt(lit(1)))
    .limit(10)
    .collect()
    .unwrap();

println!("{:?}", results);

4. 索引与优化查询

// 创建索引
table.create_index("value_index", &["value"]).unwrap();

// 使用索引查询
let query = table.scan()
    .with_index("value_index")
    .filter(col("value").between(lit(1.0), lit(3.0)))
    .collect()
    .unwrap();

高级功能

1. 处理大规模数据

// 流式处理大数据集
let mut scanner = table.scan()
    .batch_size(1024)  // 每批处理1024行
    .start()
    .unwrap();

while let Some(batch) = scanner.next().unwrap() {
    // 处理每个批次
    process_batch(batch);
}

2. 更新数据

// 更新符合条件的记录
table.update()
    .set("value", lit(0.0))
    .where_(col("id").eq(lit(1)))
    .execute()
    .unwrap();

3. 合并多个表格

let table1 = Table::open("data1.lance").unwrap();
let table2 = Table::open("data2.lance").unwrap();

table1.merge(&table2).unwrap();

性能优化技巧

  1. 为常用查询列创建索引
  2. 使用投影减少IO(只选择需要的列)
  3. 合理设置批次大小(batch_size)
  4. 对数据进行分区存储
  5. 使用谓词下推提前过滤数据

完整示例代码

use lance_table::{Table, Schema, Field};
use arrow::array::{Int32Array, StringArray, Float64Array};
use arrow::datatypes::{DataType};
use std::sync::Arc;

fn main() {
    // 1. 创建表格
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("product", DataType::Utf8, false),
        Field::new("price", DataType::Float64, true),
        Field::new("category", DataType::Utf8, false),
    ]);
    
    let mut table = Table::create("products.lance", schema).unwrap();
    
    // 2. 插入数据
    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
            Arc::new(StringArray::from(vec!["Laptop", "Phone", "Tablet", "Monitor", "Keyboard"])),
            Arc::new(Float64Array::from(vec![999.99, 699.99, 299.99, 199.99, 49.99])),
            Arc::new(StringArray::from(vec!["Electronics", "Electronics", "Electronics", "Electronics", "Accessories"])),
        ],
    ).unwrap();
    
    table.write(&[batch]).unwrap();
    
    // 3. 创建索引
    table.create_index("price_index", &["price"]).unwrap();
    
    // 4. 执行查询
    let expensive_items = table.scan()
        .with_index("price_index")
        .filter(col("price").gt(lit(500.0)))
        .filter(col("category").eq(lit("Electronics")))
        .collect()
        .unwrap();
    
    println!("Expensive electronics: {:?}", expensive_items);
    
    // 5. 更新数据
    table.update()
        .set("price", col("price") * lit(0.9))  // 打9折
        .where_(col("category").eq(lit("Electronics")))
        .execute()
        .unwrap();
}

lance-table 是处理大规模结构化数据的强大工具,特别适合需要高性能查询和分析的场景。通过合理使用其功能,可以显著提高数据处理的效率。

回到顶部