Rust数据分析框架lance-datafusion的使用,高性能列式存储与SQL查询引擎集成库

Rust数据分析框架lance-datafusion使用指南

Lance简介

Lance是一种专为机器学习工作流优化的现代列式数据格式,主要特点包括:

  1. 比Parquet快100倍的随机访问性能
  2. 支持向量搜索,可与OLAP查询结合使用
  3. 零拷贝自动版本控制
  4. 丰富的生态系统集成(支持Apache Arrow、Pandas、Polars等)

快速开始

安装Python包

pip install pylance

转换为Lance格式

import lance
import pandas as pd
import pyarrow as pa

# 创建示例数据
df = pd.DataFrame({"a": [5], "b": [10]})
uri = "/tmp/test.parquet"
tbl = pa.Table.from_pandas(df)
pa.dataset.write_dataset(tbl, uri, format='parquet')

# 转换为Lance格式
parquet = pa.dataset.dataset(uri, format='parquet')
lance.write_dataset(parquet, "/tmp/test.lance")

读取Lance数据

dataset = lance.dataset("/tmp/test.lance")
assert isinstance(dataset, pa.dataset.Dataset)

Rust完整示例

基本数据分析

use datafusion::prelude::*;
use lance_datafusion::LanceTable;
use std::sync::Arc;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    // 创建DataFusion上下文
    let ctx = SessionContext::new();
    
    // 注册Lance表
    let table = LanceTable::try_new("/tmp/test.lance").await?;
    ctx.register_table("test_table", Arc::new(table))?;
    
    // 执行SQL查询
    let df = ctx
        .sql("SELECT a, b FROM test_table WHERE a > 3")
        .await?;
        
    // 显示结果
    df.show().await?;
    
    Ok(())
}

高级向量搜索示例

import lance
from lance.vector import vec_to_table
import numpy as np
import struct

# 处理SIFT数据集
nvecs = 1000000
ndims = 128
with open("sift/sift_base.fvecs", mode="rb") as fobj:
    buf = fobj.read()
    data = np.array(struct.unpack("<128000000f", buf[4:4+4*nvecs*ndims])).reshape((nvecs, ndims))
    dd = dict(zip(range(nvecs), data))

# 写入Lance格式
table = vec_to_table(dd)
uri = "vec_data.lance"
sift1m = lance.write_dataset(table, uri, max_rows_per_group=8192, max_rows_per_file=1024*1024)

# 创建向量索引
sift1m.create_index("vector",
                   index_type="IVF_PQ",
                   num_partitions=256,
                   num_sub_vectors=16)

# 执行向量搜索
import duckdb
dataset = lance.dataset(uri)
sample = duckdb.query("SELECT vector FROM dataset USING SAMPLE 100").to_df()
query_vectors = np.array([np.array(x) for x in sample.vector])

# 获取最近的10个邻居
results = [dataset.to_table(nearest={"column": "vector", "k": 10, "q": q})
           for q in query_vectors]

性能对比

格式 分析 特征工程 训练 探索 基础设施支持
Lance 丰富
Parquet 一般 丰富
JSON/XML 一般 一般

主要优势

  1. 高性能随机访问:比Parquet快100倍
  2. 向量搜索:毫秒级最近邻搜索
  3. 嵌套字段支持:高效存储和查询复杂数据结构
  4. 自动版本控制:无需额外基础设施管理数据版本

1 回复

下面是根据您提供的内容整理的完整示例代码,展示了如何使用lance-datafusion进行数据分析:

完整示例:从Lance文件读取数据并执行复杂查询

use datafusion::prelude::*;
use lance_datafusion::LanceTable;
use datafusion::arrow::array::{Int32Array, Float64Array, StringArray, Date32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{Schema, Field, DataType};
use std::sync::Arc;
use tokio;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    // 1. 准备测试数据并写入Lance文件
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("date", DataType::Date32, false),
        Field::new("region", DataType::Utf8, false),
        Field::new("product", DataType::Utf8, false),
        Field::new("amount", DataType::Float64, false),
    ]));

    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
            Arc::new(Date32Array::from(vec![19358, 19359, 19360, 19361, 19362])), // 2023-01-01 到 2023-01-05
            Arc::new(StringArray::from(vec!["North", "South", "East", "West", "North"])),
            Arc::new(StringArray::from(vec!["A", "B", "A", "C", "B"])),
            Arc::new(Float64Array::from(vec![120.5, 80.3, 210.7, 65.2, 150.0])),
        ],
    )?;

    let ctx = SessionContext::new();
    let path = "./sales_data.lance";
    let options = datafusion::datasource::file_format::file_type::FileType::LANCE.default_writer_options();
    
    // 写入数据到Lance格式
    ctx.write_batch(path, &batch, &options).await?;

    // 2. 从Lance文件读取数据并执行查询
    let table = LanceTable::try_new(path)?;
    ctx.register_table("sales", Arc::new(table))?;

    // 创建索引优化查询性能
    let table_ref = ctx.table("sales").await?;
    let table = table_ref.as_any().downcast_ref::<LanceTable>().unwrap();
    table.create_index(&["region"], lance::index::IndexType::Scalar).await?;
    table.create_index(&["amount"], lance::index::IndexType::Scalar).await?;

    // 执行复杂聚合查询
    let query = "
        SELECT 
            region,
            product,
            SUM(amount) as total_sales,
            AVG(amount) as avg_sale,
            COUNT(*) as transactions
        FROM sales
        WHERE date BETWEEN CAST('2023-01-01' AS DATE) AND CAST('2023-01-31' AS DATE)
        GROUP BY region, product
        HAVING SUM(amount) > 100
        ORDER BY total_sales DESC
    ";
    
    let df = ctx.sql(query).await?;
    
    // 显示查询计划和结果
    println!("查询执行计划:");
    println!("{:?}", df.logical_plan());
    
    println!("\n查询结果:");
    df.show().await?;

    Ok(())
}

示例说明

  1. 数据准备

    • 创建包含销售数据的RecordBatch
    • 包含字段:id、date、region、product、amount
    • 将数据写入Lance格式文件
  2. 数据查询

    • 从Lance文件创建表
    • 在region和amount字段上创建索引
    • 执行包含过滤、聚合、分组和排序的复杂SQL查询
  3. 性能优化

    • 使用WHERE子句实现谓词下推
    • 只选择必要的列
    • 创建索引加速查询
  4. 功能展示

    • 日期范围过滤
    • 多字段分组
    • 聚合函数(SUM, AVG, COUNT)
    • HAVING子句过滤
    • 结果排序

这个完整示例展示了lance-datafusion的核心功能,包括数据写入、索引创建和复杂查询执行,适合作为实际项目中的参考实现。

回到顶部