Rust数据存储与检索库lance-core的使用,高性能列式存储引擎lance-core助力高效数据处理

Rust数据存储与检索库lance-core的使用,高性能列式存储引擎lance-core助力高效数据处理

Lance是一个为机器学习工作流优化的现代列式数据格式,具有以下特点:

  1. 构建搜索引擎和特征存储
  2. 需要高性能IO和洗牌的大规模ML训练
  3. 存储、查询和检查机器人学或大型二进制数据(如图像、点云等)的深度嵌套数据

关键特性

  • 高性能随机访问:比Parquet快100倍,同时不牺牲扫描性能
  • 向量搜索:毫秒级找到最近邻,结合OLAP查询与向量搜索
  • 零拷贝自动版本控制:无需额外基础设施即可管理数据版本
  • 生态系统集成:支持Apache Arrow、Pandas、Polars、DuckDB、Ray等

完整Rust示例代码

以下是一个增强版的完整Rust示例,展示如何创建、写入、读取和查询Lance格式的数据,包含向量搜索功能:

use std::sync::Arc;
use lance::{
    dataset::{WriteParams, Dataset},
    index::{IndexType, MetricType}
};
use arrow_array::{
    RecordBatch, 
    Int32Array, 
    Float32Array,
    FixedSizeListArray,
    ArrayRef
};
use arrow_schema::{Schema, Field, DataType};
use futures::TryStreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 创建包含向量数据的测试数据
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("vector", 
            DataType::FixedSizeList(
                Arc::new(Field::new("item", DataType::Float32, false)), 
                128
            ), 
            false
        ),
    ]);
    
    // 创建ID数组
    let ids = Int32Array::from((0..1000).collect::<Vec<i32>>());
    
    // 创建随机向量数据 (1000个128维向量)
    let mut rng = rand::thread_rng();
    let mut vectors = Vec::new();
    for _ in 0..1000 {
        let vec: Vec<f32> = (0..128).map(|_| rng.gen()).collect();
        vectors.extend(vec);
    }
    
    let vector_values = Float32Array::from(vectors);
    let vector_array = FixedSizeListArray::try_new_from_values(vector_values, 128)?;
    
    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(ids) as ArrayRef,
            Arc::new(vector_array) as ArrayRef,
        ],
    )?;

    // 2. 写入Lance数据集
    let path = "vector_data.lance";
    let write_params = WriteParams {
        max_rows_per_file: 500,
        max_rows_per_group: 100,
        ..Default::default()
    };
    
    Dataset::write(vec![batch], path, Some(write_params)).await?;
    
    // 3. 读取Lance数据集
    let dataset = Dataset::open(path).await?;
    println!("数据集有 {} 行", dataset.count_rows().await?);
    
    // 4. 创建向量索引
    dataset.create_index(
        &["vector"],
        "vector_index",
        IndexType::IVF_PQ,
        MetricType::L2,
        Some(256),  // IVF分区数
        Some(16),   // PQ子向量数
        false       // 是否替换现有索引
    ).await?;
    
    // 5. 执行向量搜索
    // 创建查询向量
    let query_vec: Vec<f32> = (0..128).map(|_| rng.gen()).collect();
    
    // 搜索最近邻
    let results = dataset.scan()
        .nearest("vector", &query_vec, 5)?
        .try_into_stream()
        .await?
        .try_collect::<Vec<_>>()
        .await?;
    
    println!("找到 {} 个最近邻结果", results.len());
    
    // 6. 执行SQL风格查询
    let query = dataset.scan()
        .filter("id > 500")?
        .limit(10)?
        .try_into_stream()
        .await?
        .try_collect::<Vec<_>>()
        .await?;
    
    println!("查询结果: {:?}", query);
    
    Ok(())
}

示例说明

  1. 数据创建

    • 创建包含1000条记录的数据集
    • 每条记录包含一个ID和一个128维的随机向量
  2. 数据写入

    • 使用WriteParams控制文件大小和行组大小
    • 将数据写入Lance格式文件
  3. 索引创建

    • 使用IVF_PQ算法创建向量索引
    • IVF(倒排文件)分区数为256
    • PQ(乘积量化)子向量数为16
  4. 向量搜索

    • 对随机查询向量执行最近邻搜索
    • 返回最相似的5个结果
  5. SQL风格查询

    • 使用filter和limit进行条件查询

性能优化建议

  1. 批次大小

    • 根据数据特性调整max_rows_per_file和max_rows_per_group
    • 典型值在10,000到1,000,000行之间
  2. 索引参数

    • 大数据集增加IVF分区数(如1024)
    • 高维数据增加PQ子向量数(如32或64)
  3. 并行处理

    • Lance支持多线程读写
    • 可以使用Rayon等库并行化数据处理

Lance Logo:
Lance Logo

性能对比图:
lance_perf.png


1 回复

Rust数据存储与检索库lance-core的使用指南

概述

lance-core是一个高性能的列式存储引擎,专为高效数据处理而设计。它提供了快速的数据存储和检索能力,特别适合大规模数据集的处理场景。

主要特性

  • 列式存储格式,优化查询性能
  • 支持高效的数据压缩
  • 内置索引加速数据检索
  • 线程安全的设计
  • 低内存开销

安装方法

在Cargo.toml中添加依赖:

[dependencies]
lance-core = "0.4"

完整示例demo

下面是一个完整的示例,展示了如何使用lance-core进行数据存储、检索和更新:

use std::sync::Arc;
use lance_core::datatypes::{Schema, Field, DataType};
use lance_core::io::{FileWriter, FileReader, FileUpdater};
use lance_core::query::Query;
use lance_core::index::IndexBuilder;
use arrow_array::{RecordBatch, Int32Array, StringArray};
use arrow_schema::{SchemaRef, Schema as ArrowSchema};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 创建schema
    let schema = SchemaRef::new(ArrowSchema::new(vec![
        Field::new("id", DataType::Int32, false).into(),
        Field::new("name", DataType::Utf8, false).into(),
    ]));

    // 2. 创建RecordBatch数据
    let ids = Int32Array::from(vec![1, 2, 3, 4]);
    let names = StringArray::from(vec!["Alice", "Bob", "Charlie", "David"]);
    let batch = RecordBatch::try_new(schema.clone(), vec![
        Arc::new(ids),
        Arc::new(names),
    ])?;

    // 3. 写入文件
    let mut writer = FileWriter::try_new("data.lance", schema)?;
    writer.write(&[batch])?;
    writer.finish()?;

    // 4. 读取数据
    let reader = FileReader::try_new("data.lance")?;
    let batches = reader.read_all()?;
    
    println!("读取所有数据:");
    for batch in batches {
        println!("Batch: {:?}", batch);
    }

    // 5. 查询数据
    let query = Query::new()
        .filter("id > 2")
        .limit(2);

    println!("\n查询id>2的数据:");
    let results = reader.query(&query)?;
    for batch in results {
        println!("Query results: {:?}", batch);
    }

    // 6. 创建索引
    let index_builder = IndexBuilder::new("id_index")
        .column("id")
        .index_type("IVF_PQ");

    reader.create_index(index_builder)?;

    // 7. 使用索引查询
    let indexed_query = Query::new()
        .filter("id == 3")
        .use_index(true);

    println!("\n使用索引查询id=3的数据:");
    let indexed_results = reader.query(&indexed_query)?;
    for batch in indexed_results {
        println!("Indexed query results: {:?}", batch);
    }

    // 8. 更新数据
    let mut updater = FileUpdater::new("data.lance")?;
    updater.update("id = 3", "name", "Charlie Updated")?;
    updater.commit()?;

    println!("\n更新后的数据:");
    let updated_reader = FileReader::try_new("data.lance")?;
    let updated_batches = updated_reader.read_all()?;
    for batch in updated_batches {
        println!("Batch: {:?}", batch);
    }

    Ok(())
}

最佳实践

  1. 批量写入:尽量使用大的RecordBatch进行写入,减少IO次数
  2. 合理分片:大数据集可以分成多个文件存储
  3. 列选择:查询时只选择需要的列,减少数据传输
  4. 索引策略:为常用查询条件创建合适的索引

性能对比

与传统的行式存储相比,lance-core在分析型查询上通常有10-100倍的性能提升,特别是在只需要查询部分列的场景下。

总结

lance-core为Rust生态提供了一个高性能的列式存储解决方案,特别适合数据分析、机器学习等需要高效处理大规模数据的场景。通过合理使用其列式存储特性和索引功能,可以显著提升数据处理效率。

回到顶部