Rust数据处理库DataFusion与Substrait集成:datafusion-substrait实现高性能查询计划的跨平台序列化与执行
Rust数据处理库DataFusion与Substrait集成:datafusion-substrait实现高性能查询计划的跨平台序列化与执行
概述
datafusion-substrait是一个Rust crate,它实现了Apache DataFusion查询计划与Substrait之间的转换功能。Substrait是一个开放标准,用于跨平台序列化和执行查询计划。
安装
在您的项目中运行以下Cargo命令:
cargo add datafusion-substrait
或者在Cargo.toml中添加:
datafusion-substrait = "49.0.0"
示例代码
以下是一个完整的示例,展示如何使用datafusion-substrait将DataFusion查询计划转换为Substrait格式:
use datafusion::prelude::*;
use datafusion_substrait::logical_plan::{consumer, producer};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. 创建DataFusion上下文
let ctx = SessionContext::new();
// 2. 注册CSV作为数据源
ctx.register_csv("example", "example.csv", CsvReadOptions::default()).await?;
// 3. 创建DataFusion查询计划
let df = ctx.sql("SELECT a, b FROM example WHERE a > 10").await?;
let plan = df.logical_plan();
// 4. 将DataFusion计划转换为Substrait计划
let mut substrait_plan = Vec::new();
producer::to_substrait_plan(plan, &mut substrait_plan)?;
// 5. 将Substrait计划转换回DataFusion计划
let decoded_plan = consumer::from_substrait_plan(&ctx, &substrait_plan).await?;
// 6. 执行计划
let results = ctx.execute_logical_plan(decoded_plan).await?;
// 7. 打印结果
results.show().await?;
Ok(())
}
完整示例
以下是一个更完整的示例,展示了如何从CSV文件读取数据,执行复杂查询,并通过Substrait进行跨平台序列化:
use datafusion::{
prelude::*,
datasource::file_format::csv::CsvReadOptions,
arrow::record_batch::RecordBatch
};
use datafusion_substrait::logical_plan::{consumer, producer};
use std::sync::Arc;
use tempfile::NamedTempFile;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. 准备测试CSV数据
let csv_data = "a,b,c\n1,2,3\n10,20,30\n100,200,300";
let tmp_file = NamedTempFile::new()?;
std::fs::write(tmp_file.path(), csv_data)?;
// 2. 创建DataFusion上下文
let ctx = SessionContext::new();
// 3. 注册CSV文件作为数据源
ctx.register_csv(
"test_data",
tmp_file.path().to_str().unwrap(),
CsvReadOptions::default()
).await?;
// 4. 创建复杂查询计划
let df = ctx.sql(
"SELECT a, b, c*2 as double_c
FROM test_data
WHERE a > 5
ORDER BY b DESC
LIMIT 10"
).await?;
let plan = df.logical_plan();
// 5. 序列化为Substrait格式
let mut substrait_plan = Vec::new();
producer::to_substrait_plan(plan, &mut substrait_plan)?;
println!("生成Substrait计划,大小: {}字节", substrait_plan.len());
// 6. 反序列化回DataFusion计划
let decoded_plan = consumer::from_substrait_plan(&ctx, &substrait_plan).await?;
// 7. 执行查询计划
let results = ctx.execute_logical_plan(decoded_plan).await?;
// 8. 收集并打印结果
let batches: Vec<RecordBatch> = results.collect().await?;
for batch in batches {
println!("{:?}", batch);
}
Ok(())
}
主要功能
- 生产者(producer):将DataFusion逻辑计划转换为Substrait格式
- 消费者(consumer):将Substrait格式转换回DataFusion逻辑计划
- 跨平台兼容:通过Substrait实现与其他数据处理系统的互操作性
许可证
Apache-2.0
所有者
- Andy Grove
- Sutou Kouhei
- Andrew Lamb
- xudong.w
1 回复
Rust数据处理库DataFusion与Substrait集成:datafusion-substrait实现高性能查询计划的跨平台序列化与执行
介绍
DataFusion是Rust生态中一个高性能的查询执行框架,而Substrait是一个跨平台的查询计划序列化标准。datafusion-substrait
库将这两者结合起来,实现了:
- 将DataFusion的查询计划序列化为Substrait格式
- 将Substrait格式的查询计划反序列化为DataFusion可执行的计划
- 实现跨平台查询计划的交换与执行
这种集成使得DataFusion可以与其他支持Substrait的系统(如Apache Arrow、DuckDB等)无缝交换查询计划,极大提高了系统的互操作性。
使用方法
添加依赖
首先在Cargo.toml中添加依赖:
[dependencies]
datafusion = "25.0" # 或其他最新版本
datafusion-substrait = "0.1" # 或其他最新版本
基本使用示例
1. 将DataFusion逻辑计划序列化为Substrait
use datafusion::prelude::*;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建DataFusion上下文
let ctx = SessionContext::new();
// 注册数据源
ctx.register_csv("example", "path/to/file.csv", CsvReadOptions::default()).await?;
// 创建逻辑计划
let df = ctx.sql("SELECT a, b FROM example WHERE c > 10").await?;
let plan = df.logical_plan();
// 序列化为Substrait
let mut buf = Vec::new();
to_substrait_plan(&mut buf, plan)?;
// buf现在包含Substrait格式的查询计划
println!("Serialized Substrait plan size: {} bytes", buf.len());
Ok(())
}
2. 从Substrait反序列化为DataFusion计划
use datafusion::prelude::*;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 假设我们有一个Substrait格式的查询计划
let substrait_plan: &[u8] = /* 从文件或网络获取的Substrait数据 */;
// 创建DataFusion上下文
let ctx = SessionContext::new();
// 注册数据源(必须与Substrait计划中引用的数据源匹配)
ctx.register_csv("example", "path/to/file.csv", CsvReadOptions::default()).await?;
// 反序列化Substrait计划
let plan = from_substrait_plan(&ctx, substrait_plan).await?;
// 执行计划
let df = DataFrame::new(ctx.state(), plan);
let results = df.collect().await?;
// 处理结果...
Ok(())
}
高级功能
跨平台查询计划交换
// 假设我们从一个Python系统接收了Substrait计划
let python_generated_plan: &[u8] = receive_from_python();
// 在Rust中执行
let ctx = SessionContext::new();
ctx.register_parquet("sales", "path/to/sales.parquet", Default::default()).await?;
let plan = from_substrait_plan(&ctx, python_generated_plan).await?;
let df = DataFrame::new(ctx.state(), plan);
let results = df.collect().await?;
查询计划持久化
use std::fs::File;
use std::io::Write;
// 序列化并保存查询计划
let plan = ctx.sql("SELECT * FROM large_table WHERE value > 100").await?.logical_plan();
let mut buf = Vec::new();
to_substrait_plan(&mut buf, &plan)?;
let mut file = File::create("complex_query.substrait")?;
file.write_all(&buf)?;
// 之后可以重新加载并执行
let mut file = File::open("complex_query.substrait")?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let reloaded_plan = from_substrait_plan(&ctx, &buf).await?;
完整示例代码
下面是一个完整的示例,展示如何使用datafusion-substrait
进行查询计划的序列化和反序列化:
use datafusion::prelude::*;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
use std::fs::File;
use std::io::Write;
use std::io::Read;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// =============== 第一部分:序列化DataFusion计划到Substrait ===============
println!("=== 开始序列化DataFusion计划到Substrait ===");
// 创建DataFusion上下文
let ctx = SessionContext::new();
// 注册CSV数据源
ctx.register_csv(
"sales",
"examples/data/sales.csv",
CsvReadOptions::default()
).await?;
// 创建逻辑计划
let df = ctx.sql(
"SELECT region, SUM(amount) as total_sales
FROM sales
WHERE amount > 100
GROUP BY region"
).await?;
let plan = df.logical_plan();
// 序列化为Substrait格式
let mut substrait_data = Vec::new();
to_substrait_plan(&mut substrait_data, plan)?;
println!("成功序列化计划,大小: {}字节", substrait_data.len());
// 保存到文件
let mut file = File::create("sales_query.substrait")?;
file.write_all(&substrait_data)?;
println!("已保存Substrait计划到文件: sales_query.substrait");
// =============== 第二部分:从Substrait反序列化为DataFusion计划 ===============
println!("\n=== 开始从Substrait反序列化为DataFusion计划 ===");
// 读取保存的Substrait计划
let mut file = File::open("sales_query.substrait")?;
let mut substrait_data = Vec::new();
file.read_to_end(&mut substrait_data)?;
// 创建新的上下文(模拟不同会话)
let new_ctx = SessionContext::new();
// 必须注册相同的表
new_ctx.register_csv(
"sales",
"examples/data/sales.csv",
CsvReadOptions::default()
).await?;
// 反序列化
let plan = from_substrait_plan(&new_ctx, &substrait_data).await?;
println!("成功反序列化Substrait计划");
// 执行查询
let df = DataFrame::new(new_ctx.state(), plan);
let results = df.collect().await?;
// 打印结果
println!("\n查询结果:");
for batch in results {
println!("{:?}", batch);
}
Ok(())
}
注意事项
- 确保DataFusion和datafusion-substrait版本兼容
- 序列化和反序列化时,所有引用的表和数据源必须已注册
- 某些高级DataFusion特性可能不完全支持Substrait序列化
- 性能考虑:对于非常复杂的查询计划,序列化/反序列化可能成为瓶颈
总结
datafusion-substrait
为DataFusion提供了强大的跨平台查询计划交换能力,使得Rust数据处理系统可以更好地与其他数据生态系统集成。通过标准化的Substrait格式,查询计划可以在不同系统间自由流动,同时保持高性能执行。