Rust数据分析框架lance-datafusion的使用,高性能列式存储与SQL查询引擎集成库
Rust数据分析框架lance-datafusion使用指南
Lance简介
Lance是一种专为机器学习工作流优化的现代列式数据格式,主要特点包括:
- 比Parquet快100倍的随机访问性能
- 支持向量搜索,可与OLAP查询结合使用
- 零拷贝自动版本控制
- 丰富的生态系统集成(支持Apache Arrow、Pandas、Polars等)
快速开始
安装Python包
pip install pylance
转换为Lance格式
import lance
import pandas as pd
import pyarrow as pa
# 创建示例数据
df = pd.DataFrame({"a": [5], "b": [10]})
uri = "/tmp/test.parquet"
tbl = pa.Table.from_pandas(df)
pa.dataset.write_dataset(tbl, uri, format='parquet')
# 转换为Lance格式
parquet = pa.dataset.dataset(uri, format='parquet')
lance.write_dataset(parquet, "/tmp/test.lance")
读取Lance数据
dataset = lance.dataset("/tmp/test.lance")
assert isinstance(dataset, pa.dataset.Dataset)
Rust完整示例
基本数据分析
use datafusion::prelude::*;
use lance_datafusion::LanceTable;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// 创建DataFusion上下文
let ctx = SessionContext::new();
// 注册Lance表
let table = LanceTable::try_new("/tmp/test.lance").await?;
ctx.register_table("test_table", Arc::new(table))?;
// 执行SQL查询
let df = ctx
.sql("SELECT a, b FROM test_table WHERE a > 3")
.await?;
// 显示结果
df.show().await?;
Ok(())
}
高级向量搜索示例
import lance
from lance.vector import vec_to_table
import numpy as np
import struct
# 处理SIFT数据集
nvecs = 1000000
ndims = 128
with open("sift/sift_base.fvecs", mode="rb") as fobj:
buf = fobj.read()
data = np.array(struct.unpack("<128000000f", buf[4:4+4*nvecs*ndims])).reshape((nvecs, ndims))
dd = dict(zip(range(nvecs), data))
# 写入Lance格式
table = vec_to_table(dd)
uri = "vec_data.lance"
sift1m = lance.write_dataset(table, uri, max_rows_per_group=8192, max_rows_per_file=1024*1024)
# 创建向量索引
sift1m.create_index("vector",
index_type="IVF_PQ",
num_partitions=256,
num_sub_vectors=16)
# 执行向量搜索
import duckdb
dataset = lance.dataset(uri)
sample = duckdb.query("SELECT vector FROM dataset USING SAMPLE 100").to_df()
query_vectors = np.array([np.array(x) for x in sample.vector])
# 获取最近的10个邻居
results = [dataset.to_table(nearest={"column": "vector", "k": 10, "q": q})
for q in query_vectors]
性能对比
格式 | 分析 | 特征工程 | 训练 | 探索 | 基础设施支持 |
---|---|---|---|---|---|
Lance | 快 | 快 | 快 | 快 | 丰富 |
Parquet | 快 | 快 | 一般 | 慢 | 丰富 |
JSON/XML | 慢 | 一般 | 慢 | 快 | 一般 |
主要优势
- 高性能随机访问:比Parquet快100倍
- 向量搜索:毫秒级最近邻搜索
- 嵌套字段支持:高效存储和查询复杂数据结构
- 自动版本控制:无需额外基础设施管理数据版本
1 回复
下面是根据您提供的内容整理的完整示例代码,展示了如何使用lance-datafusion进行数据分析:
完整示例:从Lance文件读取数据并执行复杂查询
use datafusion::prelude::*;
use lance_datafusion::LanceTable;
use datafusion::arrow::array::{Int32Array, Float64Array, StringArray, Date32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{Schema, Field, DataType};
use std::sync::Arc;
use tokio;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// 1. 准备测试数据并写入Lance文件
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("date", DataType::Date32, false),
Field::new("region", DataType::Utf8, false),
Field::new("product", DataType::Utf8, false),
Field::new("amount", DataType::Float64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(Date32Array::from(vec![19358, 19359, 19360, 19361, 19362])), // 2023-01-01 到 2023-01-05
Arc::new(StringArray::from(vec!["North", "South", "East", "West", "North"])),
Arc::new(StringArray::from(vec!["A", "B", "A", "C", "B"])),
Arc::new(Float64Array::from(vec![120.5, 80.3, 210.7, 65.2, 150.0])),
],
)?;
let ctx = SessionContext::new();
let path = "./sales_data.lance";
let options = datafusion::datasource::file_format::file_type::FileType::LANCE.default_writer_options();
// 写入数据到Lance格式
ctx.write_batch(path, &batch, &options).await?;
// 2. 从Lance文件读取数据并执行查询
let table = LanceTable::try_new(path)?;
ctx.register_table("sales", Arc::new(table))?;
// 创建索引优化查询性能
let table_ref = ctx.table("sales").await?;
let table = table_ref.as_any().downcast_ref::<LanceTable>().unwrap();
table.create_index(&["region"], lance::index::IndexType::Scalar).await?;
table.create_index(&["amount"], lance::index::IndexType::Scalar).await?;
// 执行复杂聚合查询
let query = "
SELECT
region,
product,
SUM(amount) as total_sales,
AVG(amount) as avg_sale,
COUNT(*) as transactions
FROM sales
WHERE date BETWEEN CAST('2023-01-01' AS DATE) AND CAST('2023-01-31' AS DATE)
GROUP BY region, product
HAVING SUM(amount) > 100
ORDER BY total_sales DESC
";
let df = ctx.sql(query).await?;
// 显示查询计划和结果
println!("查询执行计划:");
println!("{:?}", df.logical_plan());
println!("\n查询结果:");
df.show().await?;
Ok(())
}
示例说明
-
数据准备:
- 创建包含销售数据的RecordBatch
- 包含字段:id、date、region、product、amount
- 将数据写入Lance格式文件
-
数据查询:
- 从Lance文件创建表
- 在region和amount字段上创建索引
- 执行包含过滤、聚合、分组和排序的复杂SQL查询
-
性能优化:
- 使用WHERE子句实现谓词下推
- 只选择必要的列
- 创建索引加速查询
-
功能展示:
- 日期范围过滤
- 多字段分组
- 聚合函数(SUM, AVG, COUNT)
- HAVING子句过滤
- 结果排序
这个完整示例展示了lance-datafusion的核心功能,包括数据写入、索引创建和复杂查询执行,适合作为实际项目中的参考实现。