Rust数据处理库DataFusion与Substrait集成:datafusion-substrait实现高性能查询计划的跨平台序列化与执行

Rust数据处理库DataFusion与Substrait集成:datafusion-substrait实现高性能查询计划的跨平台序列化与执行

概述

datafusion-substrait是一个Rust crate,它实现了Apache DataFusion查询计划与Substrait之间的转换功能。Substrait是一个开放标准,用于跨平台序列化和执行查询计划。

安装

在您的项目中运行以下Cargo命令:

cargo add datafusion-substrait

或者在Cargo.toml中添加:

datafusion-substrait = "49.0.0"

示例代码

以下是一个完整的示例,展示如何使用datafusion-substrait将DataFusion查询计划转换为Substrait格式:

use datafusion::prelude::*;
use datafusion_substrait::logical_plan::{consumer, producer};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 创建DataFusion上下文
    let ctx = SessionContext::new();
    
    // 2. 注册CSV作为数据源
    ctx.register_csv("example", "example.csv", CsvReadOptions::default()).await?;
    
    // 3. 创建DataFusion查询计划
    let df = ctx.sql("SELECT a, b FROM example WHERE a > 10").await?;
    let plan = df.logical_plan();
    
    // 4. 将DataFusion计划转换为Substrait计划
    let mut substrait_plan = Vec::new();
    producer::to_substrait_plan(plan, &mut substrait_plan)?;
    
    // 5. 将Substrait计划转换回DataFusion计划
    let decoded_plan = consumer::from_substrait_plan(&ctx, &substrait_plan).await?;
    
    // 6. 执行计划
    let results = ctx.execute_logical_plan(decoded_plan).await?;
    
    // 7. 打印结果
    results.show().await?;
    
    Ok(())
}

完整示例

以下是一个更完整的示例,展示了如何从CSV文件读取数据,执行复杂查询,并通过Substrait进行跨平台序列化:

use datafusion::{
    prelude::*,
    datasource::file_format::csv::CsvReadOptions,
    arrow::record_batch::RecordBatch
};
use datafusion_substrait::logical_plan::{consumer, producer};
use std::sync::Arc;
use tempfile::NamedTempFile;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 准备测试CSV数据
    let csv_data = "a,b,c\n1,2,3\n10,20,30\n100,200,300";
    let tmp_file = NamedTempFile::new()?;
    std::fs::write(tmp_file.path(), csv_data)?;
    
    // 2. 创建DataFusion上下文
    let ctx = SessionContext::new();
    
    // 3. 注册CSV文件作为数据源
    ctx.register_csv(
        "test_data",
        tmp_file.path().to_str().unwrap(),
        CsvReadOptions::default()
    ).await?;
    
    // 4. 创建复杂查询计划
    let df = ctx.sql(
        "SELECT a, b, c*2 as double_c 
         FROM test_data 
         WHERE a > 5 
         ORDER BY b DESC 
         LIMIT 10"
    ).await?;
    
    let plan = df.logical_plan();
    
    // 5. 序列化为Substrait格式
    let mut substrait_plan = Vec::new();
    producer::to_substrait_plan(plan, &mut substrait_plan)?;
    println!("生成Substrait计划,大小: {}字节", substrait_plan.len());
    
    // 6. 反序列化回DataFusion计划
    let decoded_plan = consumer::from_substrait_plan(&ctx, &substrait_plan).await?;
    
    // 7. 执行查询计划
    let results = ctx.execute_logical_plan(decoded_plan).await?;
    
    // 8. 收集并打印结果
    let batches: Vec<RecordBatch> = results.collect().await?;
    for batch in batches {
        println!("{:?}", batch);
    }
    
    Ok(())
}

主要功能

  1. 生产者(producer):将DataFusion逻辑计划转换为Substrait格式
  2. 消费者(consumer):将Substrait格式转换回DataFusion逻辑计划
  3. 跨平台兼容:通过Substrait实现与其他数据处理系统的互操作性

许可证

Apache-2.0

所有者

  • Andy Grove
  • Sutou Kouhei
  • Andrew Lamb
  • xudong.w

1 回复

Rust数据处理库DataFusion与Substrait集成:datafusion-substrait实现高性能查询计划的跨平台序列化与执行

介绍

DataFusion是Rust生态中一个高性能的查询执行框架,而Substrait是一个跨平台的查询计划序列化标准。datafusion-substrait库将这两者结合起来,实现了:

  1. 将DataFusion的查询计划序列化为Substrait格式
  2. 将Substrait格式的查询计划反序列化为DataFusion可执行的计划
  3. 实现跨平台查询计划的交换与执行

这种集成使得DataFusion可以与其他支持Substrait的系统(如Apache Arrow、DuckDB等)无缝交换查询计划,极大提高了系统的互操作性。

使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
datafusion = "25.0"  # 或其他最新版本
datafusion-substrait = "0.1"  # 或其他最新版本

基本使用示例

1. 将DataFusion逻辑计划序列化为Substrait

use datafusion::prelude::*;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建DataFusion上下文
    let ctx = SessionContext::new();
    
    // 注册数据源
    ctx.register_csv("example", "path/to/file.csv", CsvReadOptions::default()).await?;
    
    // 创建逻辑计划
    let df = ctx.sql("SELECT a, b FROM example WHERE c > 10").await?;
    let plan = df.logical_plan();
    
    // 序列化为Substrait
    let mut buf = Vec::new();
    to_substrait_plan(&mut buf, plan)?;
    
    // buf现在包含Substrait格式的查询计划
    println!("Serialized Substrait plan size: {} bytes", buf.len());
    
    Ok(())
}

2. 从Substrait反序列化为DataFusion计划

use datafusion::prelude::*;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 假设我们有一个Substrait格式的查询计划
    let substrait_plan: &[u8] = /* 从文件或网络获取的Substrait数据 */;
    
    // 创建DataFusion上下文
    let ctx = SessionContext::new();
    
    // 注册数据源(必须与Substrait计划中引用的数据源匹配)
    ctx.register_csv("example", "path/to/file.csv", CsvReadOptions::default()).await?;
    
    // 反序列化Substrait计划
    let plan = from_substrait_plan(&ctx, substrait_plan).await?;
    
    // 执行计划
    let df = DataFrame::new(ctx.state(), plan);
    let results = df.collect().await?;
    
    // 处理结果...
    
    Ok(())
}

高级功能

跨平台查询计划交换

// 假设我们从一个Python系统接收了Substrait计划
let python_generated_plan: &[u8] = receive_from_python();

// 在Rust中执行
let ctx = SessionContext::new();
ctx.register_parquet("sales", "path/to/sales.parquet", Default::default()).await?;

let plan = from_substrait_plan(&ctx, python_generated_plan).await?;
let df = DataFrame::new(ctx.state(), plan);
let results = df.collect().await?;

查询计划持久化

use std::fs::File;
use std::io::Write;

// 序列化并保存查询计划
let plan = ctx.sql("SELECT * FROM large_table WHERE value > 100").await?.logical_plan();
let mut buf = Vec::new();
to_substrait_plan(&mut buf, &plan)?;

let mut file = File::create("complex_query.substrait")?;
file.write_all(&buf)?;

// 之后可以重新加载并执行
let mut file = File::open("complex_query.substrait")?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;

let reloaded_plan = from_substrait_plan(&ctx, &buf).await?;

完整示例代码

下面是一个完整的示例,展示如何使用datafusion-substrait进行查询计划的序列化和反序列化:

use datafusion::prelude::*;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
use std::fs::File;
use std::io::Write;
use std::io::Read;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // =============== 第一部分:序列化DataFusion计划到Substrait ===============
    println!("=== 开始序列化DataFusion计划到Substrait ===");
    
    // 创建DataFusion上下文
    let ctx = SessionContext::new();
    
    // 注册CSV数据源
    ctx.register_csv(
        "sales", 
        "examples/data/sales.csv", 
        CsvReadOptions::default()
    ).await?;
    
    // 创建逻辑计划
    let df = ctx.sql(
        "SELECT region, SUM(amount) as total_sales 
         FROM sales 
         WHERE amount > 100 
         GROUP BY region"
    ).await?;
    
    let plan = df.logical_plan();
    
    // 序列化为Substrait格式
    let mut substrait_data = Vec::new();
    to_substrait_plan(&mut substrait_data, plan)?;
    println!("成功序列化计划,大小: {}字节", substrait_data.len());
    
    // 保存到文件
    let mut file = File::create("sales_query.substrait")?;
    file.write_all(&substrait_data)?;
    println!("已保存Substrait计划到文件: sales_query.substrait");
    
    // =============== 第二部分:从Substrait反序列化为DataFusion计划 ===============
    println!("\n=== 开始从Substrait反序列化为DataFusion计划 ===");
    
    // 读取保存的Substrait计划
    let mut file = File::open("sales_query.substrait")?;
    let mut substrait_data = Vec::new();
    file.read_to_end(&mut substrait_data)?;
    
    // 创建新的上下文(模拟不同会话)
    let new_ctx = SessionContext::new();
    
    // 必须注册相同的表
    new_ctx.register_csv(
        "sales", 
        "examples/data/sales.csv", 
        CsvReadOptions::default()
    ).await?;
    
    // 反序列化
    let plan = from_substrait_plan(&new_ctx, &substrait_data).await?;
    println!("成功反序列化Substrait计划");
    
    // 执行查询
    let df = DataFrame::new(new_ctx.state(), plan);
    let results = df.collect().await?;
    
    // 打印结果
    println!("\n查询结果:");
    for batch in results {
        println!("{:?}", batch);
    }
    
    Ok(())
}

注意事项

  1. 确保DataFusion和datafusion-substrait版本兼容
  2. 序列化和反序列化时,所有引用的表和数据源必须已注册
  3. 某些高级DataFusion特性可能不完全支持Substrait序列化
  4. 性能考虑:对于非常复杂的查询计划,序列化/反序列化可能成为瓶颈

总结

datafusion-substrait为DataFusion提供了强大的跨平台查询计划交换能力,使得Rust数据处理系统可以更好地与其他数据生态系统集成。通过标准化的Substrait格式,查询计划可以在不同系统间自由流动,同时保持高性能执行。

回到顶部