Rust高性能数据表格库lance-table的使用,支持大规模数据存储与快速查询
Rust高性能数据表格库lance-table的使用,支持大规模数据存储与快速查询
lance-table
是 Lance 表格式的内部子 crate。
重要提示:这个 crate 不适用于外部使用。
安装
在项目目录中运行以下 Cargo 命令:
cargo add lance-table
或者在 Cargo.toml 中添加以下行:
lance-table = "0.32.0"
完整示例代码
以下是使用 Lance 表格式的完整示例(使用 lance
主 crate):
use std::sync::Arc;
use tokio; // 确保添加了 tokio 依赖
use lance::{Dataset, WriteParams};
use arrow_schema::{Schema, Field, DataType};
use arrow_array::{RecordBatch, Int32Array, StringArray};
use futures::stream::TryStreamExt; // 用于 try_collect()
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. 创建更复杂的 Arrow 模式
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, true), // 允许空值
Field::new("salary", DataType::Float64, false),
]);
// 2. 创建测试数据
let ids = Int32Array::from(vec![1, 2, 3, 4, 5]);
let names = StringArray::from(vec!["Alice", "Bob", "Charlie", "David", "Eve"]);
let ages = Int32Array::from(vec![Some(25), Some(30), None, Some(35), Some(28)]); // 包含一个空值
let salaries = arrow_array::Float64Array::from(vec![50000.0, 60000.0, 70000.0, 80000.0, 90000.0]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(ids),
Arc::new(names),
Arc::new(ages),
Arc::new(salaries),
],
)?;
// 3. 写入 Lance 格式文件
let write_params = WriteParams {
max_rows_per_file: 2, // 小文件用于演示
..Default::default()
};
let dataset = Dataset::write(
vec![batch],
"employees.lance",
Some(write_params)
).await?;
println!("数据集写入成功");
// 4. 重新打开数据集
let dataset = Dataset::open("employees.lance").await?;
// 5. 执行查询 - 过滤年龄大于28的记录
let scanner = dataset.scan()
.filter("age > 28")?
.project(&["id", "name", "age"])?; // 只选择特定列
let batches: Vec<RecordBatch> = scanner
.try_into_stream()
.await?
.try_collect()
.await?;
// 6. 打印查询结果
println!("\n查询结果(年龄>28):");
for batch in batches {
println!("{:?}", batch);
}
// 7. 显示数据集统计信息
println!("\n数据集统计:");
println!("版本数量: {}", dataset.versions().len());
println!("文件片段数量: {}", dataset.get_fragments().len());
println!("总行数: {}", dataset.count_rows().await?);
Ok(())
}
依赖配置
在 Cargo.toml 中添加以下依赖:
[dependencies]
lance = "0.32.0"
arrow = "50.0.0"
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
示例说明
- 创建了包含4个字段的复杂数据模式(id、name、age、salary)
- age字段允许为空值,演示了如何处理空值数据
- 写入时将最大行数设为2,会生成多个文件片段
- 查询时使用了过滤条件(age > 28)和列投影
- 最后展示了数据集的基本统计信息
注意事项
- 此示例使用了
lance
主 crate 而非lance-table
- 实际生产环境中应考虑更大的文件片段大小
- 错误处理使用了
Box<dyn std::error::Error>
简化示例 - 查询条件支持更复杂的表达式,可参考 Lance 文档
1 回复
Rust高性能数据表格库lance-table的使用
介绍
lance-table 是一个用 Rust 编写的高性能数据表格库,专门设计用于处理大规模数据集并提供快速查询能力。它具有以下特点:
- 支持列式存储,优化分析查询性能
- 内置高效的索引机制,加速数据检索
- 支持内存映射,可处理超过内存大小的数据集
- 提供类似DataFrame的API接口
- 支持多线程并行操作
安装
在Cargo.toml中添加依赖:
[dependencies]
lance-table = "0.1"
基本使用方法
1. 创建表格
use lance_table::{Table, Schema, Field};
// 定义表结构
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("value", DataType::Float64, true),
]);
// 创建表格
let mut table = Table::create("data.lance", schema).unwrap();
2. 插入数据
use lance_table::RecordBatch;
// 创建记录批次
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
Arc::new(Float64Array::from(vec![1.1, 2.2, 3.3])),
],
).unwrap();
// 插入数据
table.write(&[batch]).unwrap();
3. 查询数据
// 简单查询
let results = table.scan()
.project(&["id", "name"])
.filter(col("id").gt(lit(1)))
.limit(10)
.collect()
.unwrap();
println!("{:?}", results);
4. 索引与优化查询
// 创建索引
table.create_index("value_index", &["value"]).unwrap();
// 使用索引查询
let query = table.scan()
.with_index("value_index")
.filter(col("value").between(lit(1.0), lit(3.0)))
.collect()
.unwrap();
高级功能
1. 处理大规模数据
// 流式处理大数据集
let mut scanner = table.scan()
.batch_size(1024) // 每批处理1024行
.start()
.unwrap();
while let Some(batch) = scanner.next().unwrap() {
// 处理每个批次
process_batch(batch);
}
2. 更新数据
// 更新符合条件的记录
table.update()
.set("value", lit(0.0))
.where_(col("id").eq(lit(1)))
.execute()
.unwrap();
3. 合并多个表格
let table1 = Table::open("data1.lance").unwrap();
let table2 = Table::open("data2.lance").unwrap();
table1.merge(&table2).unwrap();
性能优化技巧
- 为常用查询列创建索引
- 使用投影减少IO(只选择需要的列)
- 合理设置批次大小(batch_size)
- 对数据进行分区存储
- 使用谓词下推提前过滤数据
完整示例代码
use lance_table::{Table, Schema, Field};
use arrow::array::{Int32Array, StringArray, Float64Array};
use arrow::datatypes::{DataType};
use std::sync::Arc;
fn main() {
// 1. 创建表格
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("product", DataType::Utf8, false),
Field::new("price", DataType::Float64, true),
Field::new("category", DataType::Utf8, false),
]);
let mut table = Table::create("products.lance", schema).unwrap();
// 2. 插入数据
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(StringArray::from(vec!["Laptop", "Phone", "Tablet", "Monitor", "Keyboard"])),
Arc::new(Float64Array::from(vec![999.99, 699.99, 299.99, 199.99, 49.99])),
Arc::new(StringArray::from(vec!["Electronics", "Electronics", "Electronics", "Electronics", "Accessories"])),
],
).unwrap();
table.write(&[batch]).unwrap();
// 3. 创建索引
table.create_index("price_index", &["price"]).unwrap();
// 4. 执行查询
let expensive_items = table.scan()
.with_index("price_index")
.filter(col("price").gt(lit(500.0)))
.filter(col("category").eq(lit("Electronics")))
.collect()
.unwrap();
println!("Expensive electronics: {:?}", expensive_items);
// 5. 更新数据
table.update()
.set("price", col("price") * lit(0.9)) // 打9折
.where_(col("category").eq(lit("Electronics")))
.execute()
.unwrap();
}
lance-table 是处理大规模结构化数据的强大工具,特别适合需要高性能查询和分析的场景。通过合理使用其功能,可以显著提高数据处理的效率。