Rust数据存储与检索库lance-core的使用,高性能列式存储引擎lance-core助力高效数据处理
Rust数据存储与检索库lance-core的使用,高性能列式存储引擎lance-core助力高效数据处理
Lance是一个为机器学习工作流优化的现代列式数据格式,具有以下特点:
- 构建搜索引擎和特征存储
- 需要高性能IO和洗牌的大规模ML训练
- 存储、查询和检查机器人学或大型二进制数据(如图像、点云等)的深度嵌套数据
关键特性
- 高性能随机访问:比Parquet快100倍,同时不牺牲扫描性能
- 向量搜索:毫秒级找到最近邻,结合OLAP查询与向量搜索
- 零拷贝自动版本控制:无需额外基础设施即可管理数据版本
- 生态系统集成:支持Apache Arrow、Pandas、Polars、DuckDB、Ray等
完整Rust示例代码
以下是一个增强版的完整Rust示例,展示如何创建、写入、读取和查询Lance格式的数据,包含向量搜索功能:
use std::sync::Arc;
use lance::{
dataset::{WriteParams, Dataset},
index::{IndexType, MetricType}
};
use arrow_array::{
RecordBatch,
Int32Array,
Float32Array,
FixedSizeListArray,
ArrayRef
};
use arrow_schema::{Schema, Field, DataType};
use futures::TryStreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. 创建包含向量数据的测试数据
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, false)),
128
),
false
),
]);
// 创建ID数组
let ids = Int32Array::from((0..1000).collect::<Vec<i32>>());
// 创建随机向量数据 (1000个128维向量)
let mut rng = rand::thread_rng();
let mut vectors = Vec::new();
for _ in 0..1000 {
let vec: Vec<f32> = (0..128).map(|_| rng.gen()).collect();
vectors.extend(vec);
}
let vector_values = Float32Array::from(vectors);
let vector_array = FixedSizeListArray::try_new_from_values(vector_values, 128)?;
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(ids) as ArrayRef,
Arc::new(vector_array) as ArrayRef,
],
)?;
// 2. 写入Lance数据集
let path = "vector_data.lance";
let write_params = WriteParams {
max_rows_per_file: 500,
max_rows_per_group: 100,
..Default::default()
};
Dataset::write(vec![batch], path, Some(write_params)).await?;
// 3. 读取Lance数据集
let dataset = Dataset::open(path).await?;
println!("数据集有 {} 行", dataset.count_rows().await?);
// 4. 创建向量索引
dataset.create_index(
&["vector"],
"vector_index",
IndexType::IVF_PQ,
MetricType::L2,
Some(256), // IVF分区数
Some(16), // PQ子向量数
false // 是否替换现有索引
).await?;
// 5. 执行向量搜索
// 创建查询向量
let query_vec: Vec<f32> = (0..128).map(|_| rng.gen()).collect();
// 搜索最近邻
let results = dataset.scan()
.nearest("vector", &query_vec, 5)?
.try_into_stream()
.await?
.try_collect::<Vec<_>>()
.await?;
println!("找到 {} 个最近邻结果", results.len());
// 6. 执行SQL风格查询
let query = dataset.scan()
.filter("id > 500")?
.limit(10)?
.try_into_stream()
.await?
.try_collect::<Vec<_>>()
.await?;
println!("查询结果: {:?}", query);
Ok(())
}
示例说明
-
数据创建:
- 创建包含1000条记录的数据集
- 每条记录包含一个ID和一个128维的随机向量
-
数据写入:
- 使用WriteParams控制文件大小和行组大小
- 将数据写入Lance格式文件
-
索引创建:
- 使用IVF_PQ算法创建向量索引
- IVF(倒排文件)分区数为256
- PQ(乘积量化)子向量数为16
-
向量搜索:
- 对随机查询向量执行最近邻搜索
- 返回最相似的5个结果
-
SQL风格查询:
- 使用filter和limit进行条件查询
性能优化建议
-
批次大小:
- 根据数据特性调整max_rows_per_file和max_rows_per_group
- 典型值在10,000到1,000,000行之间
-
索引参数:
- 大数据集增加IVF分区数(如1024)
- 高维数据增加PQ子向量数(如32或64)
-
并行处理:
- Lance支持多线程读写
- 可以使用Rayon等库并行化数据处理
Lance Logo:
性能对比图:
1 回复
Rust数据存储与检索库lance-core的使用指南
概述
lance-core是一个高性能的列式存储引擎,专为高效数据处理而设计。它提供了快速的数据存储和检索能力,特别适合大规模数据集的处理场景。
主要特性
- 列式存储格式,优化查询性能
- 支持高效的数据压缩
- 内置索引加速数据检索
- 线程安全的设计
- 低内存开销
安装方法
在Cargo.toml中添加依赖:
[dependencies]
lance-core = "0.4"
完整示例demo
下面是一个完整的示例,展示了如何使用lance-core进行数据存储、检索和更新:
use std::sync::Arc;
use lance_core::datatypes::{Schema, Field, DataType};
use lance_core::io::{FileWriter, FileReader, FileUpdater};
use lance_core::query::Query;
use lance_core::index::IndexBuilder;
use arrow_array::{RecordBatch, Int32Array, StringArray};
use arrow_schema::{SchemaRef, Schema as ArrowSchema};
fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. 创建schema
let schema = SchemaRef::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).into(),
Field::new("name", DataType::Utf8, false).into(),
]));
// 2. 创建RecordBatch数据
let ids = Int32Array::from(vec![1, 2, 3, 4]);
let names = StringArray::from(vec!["Alice", "Bob", "Charlie", "David"]);
let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(ids),
Arc::new(names),
])?;
// 3. 写入文件
let mut writer = FileWriter::try_new("data.lance", schema)?;
writer.write(&[batch])?;
writer.finish()?;
// 4. 读取数据
let reader = FileReader::try_new("data.lance")?;
let batches = reader.read_all()?;
println!("读取所有数据:");
for batch in batches {
println!("Batch: {:?}", batch);
}
// 5. 查询数据
let query = Query::new()
.filter("id > 2")
.limit(2);
println!("\n查询id>2的数据:");
let results = reader.query(&query)?;
for batch in results {
println!("Query results: {:?}", batch);
}
// 6. 创建索引
let index_builder = IndexBuilder::new("id_index")
.column("id")
.index_type("IVF_PQ");
reader.create_index(index_builder)?;
// 7. 使用索引查询
let indexed_query = Query::new()
.filter("id == 3")
.use_index(true);
println!("\n使用索引查询id=3的数据:");
let indexed_results = reader.query(&indexed_query)?;
for batch in indexed_results {
println!("Indexed query results: {:?}", batch);
}
// 8. 更新数据
let mut updater = FileUpdater::new("data.lance")?;
updater.update("id = 3", "name", "Charlie Updated")?;
updater.commit()?;
println!("\n更新后的数据:");
let updated_reader = FileReader::try_new("data.lance")?;
let updated_batches = updated_reader.read_all()?;
for batch in updated_batches {
println!("Batch: {:?}", batch);
}
Ok(())
}
最佳实践
- 批量写入:尽量使用大的RecordBatch进行写入,减少IO次数
- 合理分片:大数据集可以分成多个文件存储
- 列选择:查询时只选择需要的列,减少数据传输
- 索引策略:为常用查询条件创建合适的索引
性能对比
与传统的行式存储相比,lance-core在分析型查询上通常有10-100倍的性能提升,特别是在只需要查询部分列的场景下。
总结
lance-core为Rust生态提供了一个高性能的列式存储解决方案,特别适合数据分析、机器学习等需要高效处理大规模数据的场景。通过合理使用其列式存储特性和索引功能,可以显著提升数据处理效率。