Rust数据处理引擎DataFusion Server的使用:高性能SQL查询与分布式计算框架

Rust数据处理引擎DataFusion Server的使用:高性能SQL查询与分布式计算框架

datafusion-server crate

这是一个由Rust实现的支持多会话、多种数据源查询的服务器。

主要特性:

  • 使用Tokio生态系统的异步架构
  • 基于Apache Arrow和Apache DataFusion
    • 支持通过SQL查询多种数据源
  • 提供Python插件功能用于数据源连接器和后处理器
  • 使用Arrow Flight gRPC特性实现服务器间的水平扩展架构

系统概览

System Diagram

许可证

采用MIT许可证

支持环境

  • Linux
  • 基于BSD的Unix系统(包括macOS/Mac OSX)
  • 基于SVR的Unix系统
  • Windows(包括WSL2/Cygwin)
  • 以及其他LLVM支持的环境

使用预构建的Docker镜像(目前仅支持amd64架构)

前置要求

  • Docker CE/EE v20+

从GitHub容器仓库拉取镜像

$ docker pull ghcr.io/sal-openlab/datafusion-server/datafusion-server:latest

或者构建不带Python插件的版本:

$ docker pull ghcr.io/sal-openlab/datafusion-server/datafusion-server-without-plugin:latest

运行容器

$ docker run -d --rm \
    -p 4000:4000 \
    -v ./data:/var/datafusion-server/data \
    --name datafusion-server \
    ghcr.io/sal-openlab/datafusion-server/datafusion-server:latest

如果只使用容器中的示例数据,可以省略-v ./data:/var/xapi-server/data

从源代码构建

前置要求

  • Rust工具链1.81+(Edition 2021)
  • 或者使用Rust官方容器镜像

如何运行

$ cargo init server-executor
$ cd server-executor

Cargo.toml示例

[package]
name = "server-executor"
version = "0.1.0"
edition = "2021"

[dependencies]
datafusion-server = "0.20.1"
clap = { version = "4.5", features = ["derive"] }

src/main.rs示例

use std::path::PathBuf;

use clap::Parser;
use datafusion_server::settings::Settings;

#[derive(Parser)]
#[clap(author, version, about = "Arrow and other large datasets web server", long_about = None)]
struct Args {
    #[clap(
        long,
        value_parser,
        short = 'f',
        value_name = "FILE",
        help = "Configuration file",
        default_value = "./config.toml"
    )]
    config: PathBuf,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let args = Args::parse();
    let settings = Settings::new_with_file(&args.config)?;
    datafusion_server::execute(settings)?;
    Ok(())
}

config.toml示例

# Configuration file of datafusion-server

[server]
port = 4000
flight_grpc_port = 50051
base_url = "/"
data_dir = "./data"
plugin_dir = "./plugins"

[session]
default_keep_alive = 3600 # in seconds
upload_limit_size = 20 # MB

[log]
# trace, debug, info, warn, error
level = "debug"

调试构建和运行

$ cargo run

使用Python插件功能

需要Python 3.7+解释器

如何运行

Cargo.toml示例

[dependencies]
datafusion-server = { version = "0.20.1", features = ["plugin"] }

调试构建和运行

$ cargo run

完整示例代码

本地文件查询示例

// 使用reqwest库发送HTTP请求
use reqwest::Client;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    
    let response = client
        .post("http://localhost:4000/dataframe/query")
        .header("Content-Type", "application/json")
        .body(r#"
{
  "dataSources": [
    {
      "format": "csv",
      "name": "sales",
      "location": "file:///superstore.csv",
      "options": {
        "inferSchemaRows": 100,
        "hasHeader": true
      }
    }
  ],
  "query": {
    "sql": "SELECT * FROM sales"
  },
  "response": {
    "format": "json"
  }
}"#)
        .send
        .await?;
    
    println!("Response: {}", response.text().await?);
    Ok(())
}

远程REST API查询示例

use reqwest::Client;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    
    let response = client
        .post("http://localhost:4000/dataframe/query")
        .header("Content-Type", "application/json")
        .header("Accept", "text/csv")
        .body(r#"
{
  "dataSources": [
    {
      "format": "json",
      "name": "population",
      "location": "https://datausa.io/api/data?drilldowns=State&measures=Population",
      "options": {
        "jsonPath": "$.data[*]"
      }
    }
  ],
  "query": {
    "sql": "SELECT * FROM population WHERE \"ID Year\">=2020"
  }
}"#)
        .send()
        .await?;
    
    println!("Response: {}", response.text().await?);
    Ok(())
}

Python数据源连接器插件示例

use reqwest::Client;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    
    let response = client
        .post("http://localhost:4000/dataframe/query")
        .header("Content-Type", "application/json")
        .header("Accept", "application/json")
        .body(r#"
{
  "dataSources": [
    {
      "format": "arrow",
      "name": "example",
      "location": "excel://example-workbook.xlsx/Sheet1",
      "pluginOptions": {
        "skipRows": 2
      }
    }
  ],
  "query": {
    "sql": "SELECT * FROM example"
  }
}"#)
        .send()
        .await?;
    
    println!("Response: {}", response.text().await?);
    Ok(())
}

使用说明

多数据源SQL查询

  • 支持多种数据源格式(Parquet、JSON、ndJSON、CSV等)
  • 数据可以从本地文件系统和外部REST服务获取
    • 必要时可以使用JSONPath进行处理
  • 支持跨多数据源的查询执行
    • SQL查询引擎使用Arrow DataFusion
  • 响应支持Arrow、JSON和CSV格式

1 回复

Rust数据处理引擎DataFusion Server的使用:高性能SQL查询与分布式计算框架

DataFusion简介

DataFusion是Apache Arrow项目下的一个Rust原生SQL查询引擎和分布式计算框架,它提供了高性能的数据处理能力,支持SQL查询、DataFrame API以及自定义查询优化。

主要特性

  • 纯Rust实现,无GC开销
  • 基于Apache Arrow内存格式
  • 支持SQL和DataFrame API
  • 查询优化器
  • 可扩展的数据源和UDF
  • 支持分布式执行

安装与使用

添加依赖

在Cargo.toml中添加:

[dependencies]
datafusion = "16.0.0"
tokio = { version = "1.0", features = ["full"] }

基本使用示例

use datafusion::prelude::*;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Int32Array, StringArray};

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    // 创建执行上下文
    let ctx = SessionContext::new();
    
    // 创建测试数据
    let batch = RecordBatch::try_from_iter(vec![
        ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
        ("name", Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"]))) as _,
    ])?;
    
    // 注册数据为内存表
    ctx.register_batch("people", batch)?;
    
    // 执行SQL查询
    let df = ctx.sql("SELECT id, name FROM people WHERE id > 1").await?;
    
    // 显示结果
    df.show().await?;
    
    Ok(())
}

高级功能

连接外部数据源

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let ctx = SessionContext::new();
    
    // 配置Parquet文件选项
    let listing_options = ListingOptions {
        file_extension: ".parquet".to_string(),
        format: Arc::new(ParquetFormat::default()),
        ..Default::default()
    };
    
    // 注册目录
    ctx.register_listing_table(
        "my_table",
        "path/to/parquet/files",
        listing_options,
        None,
        None,
    )
    .await?;
    
    // 查询Parquet数据
    let df = ctx.sql("SELECT * FROM my_table LIMIT 10").await?;
    df.show().await?;
    
    Ok(())
}

使用DataFrame API

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let ctx = SessionContext::new();
    
    // 创建DataFrame
    let df = ctx.read_csv("data.csv", CsvReadOptions::default()).await?;
    
    // 使用DataFrame API进行转换
    let results = df
        .filter(col("age").gt(lit(30)))?
        .select_columns(&["name", "age"])?
        .limit(0, Some(5))?
        .collect()
        .await?;
    
    // 打印结果
    println!("{:?}", results);
    
    Ok(())
}

自定义聚合函数

use datafusion::physical_plan::aggregates::{AggregateUDF, AggregateUDFImpl};
use datafusion::physical_plan::expressions::Avg;
use datafusion::logical_expr::{Volatility, Signature, TypeSignature, ReturnTypeFunction};

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let ctx = SessionContext::new();
    
    // 创建自定义聚合函数
    let my_avg = create_my_avg_udf();
    
    // 注册函数
    ctx.register_udaf(my_avg);
    
    // 使用自定义函数
    let df = ctx.sql("SELECT my_avg(price) FROM products").await?;
    df.show().await?;
    
    Ok(())
}

fn create_my_avg_udf() -> AggregateUDF {
    let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
    let signature = Signature::new(
        TypeSignature::Exact(vec![DataType::Float64]),
        Volatility::Immutable,
    );
    
    AggregateUDF::new(
        "my_avg",
        &signature,
        &return_type,
        &Arc::new(|| Ok(Box::new(Avg::new())) as _,
    )
}

分布式执行

DataFusion可以与Ballista项目集成实现分布式查询执行:

use ballista::prelude::*;

#[tokio::main]
async fn main() -> Result<(), BallistaError> {
    // 创建分布式执行上下文
    let config = BallistaConfig::builder()
        .set("ballista.shuffle.partitions", "4")
        .build()?;
    
    let ctx = BallistaContext::remote("localhost", 50050, &config);
    
    // 执行分布式查询
    let df = ctx.sql("SELECT * FROM large_table WHERE value > 100").await?;
    
    // 收集结果
    let results = df.collect().await?;
    
    println!("{:?}", results);
    
    Ok(())
}

性能优化建议

  1. 分区数据:合理分区大数据集以提高并行度
  2. 使用列式存储:优先使用Parquet等列式存储格式
  3. 缓存常用数据:利用ctx.cache_table缓存频繁访问的表
  4. 调整批处理大小:通过配置控制内存使用和并行度
  5. 使用投影下推:只选择必要的列减少IO

完整示例demo

下面是一个完整的DataFusion使用示例,包含了SQL查询、DataFrame操作和CSV文件读取:

use datafusion::prelude::*;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Int32Array, StringArray};

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    // 示例1:内存数据查询
    println!("=== 内存数据查询示例 ===");
    memory_query_example().await?;
    
    // 示例2:CSV文件处理
    println!("\n=== CSV文件处理示例 ===");
    csv_processing_example().await?;
    
    Ok(())
}

async fn memory_query_example() -> datafusion::error::Result<()> {
    // 创建执行上下文
    let ctx = SessionContext::new();
    
    // 创建测试数据
    let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
    let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie", "David", "Eve"]);
    let salary_array = Int32Array::from(vec![50000, 60000, 70000, 80000, 90000]);
    
    let batch = RecordBatch::try_from_iter(vec![
        ("id", Arc::new(id_array) as _),
        ("name", Arc::new(name_array) as _),
        ("salary", Arc::new(salary_array) as _),
    ])?;
    
    // 注册数据为内存表
    ctx.register_batch("employees", batch)?;
    
    // 执行SQL查询
    let df = ctx.sql(
        "SELECT name, salary 
         FROM employees 
         WHERE salary > 60000 
         ORDER BY salary DESC"
    ).await?;
    
    // 显示结果
    df.show().await?;
    
    Ok(())
}

async fn csv_processing_example() -> datafusion::error::Result<()> {
    let ctx = SessionContext::new();
    
    // 创建临时CSV文件(实际使用时替换为你的CSV文件路径)
    let csv_content = "id,name,age,department\n\
                      1,Alice,25,Engineering\n\
                      2,Bob,30,Marketing\n\
                      3,Charlie,35,Engineering\n\
                      4,David,40,Sales\n\
                      5,Eve,45,HR";
    
    let csv_path = "employees.csv";
    std::fs::write(csv_path, csv_content)?;
    
    // 读取CSV文件
    let df = ctx.read_csv(csv_path, CsvReadOptions::default()).await?;
    
    // 使用DataFrame API进行复杂操作
    let results = df
        .filter(col("age").gt(lit(30)))?
        .select(vec![
            col("name"),
            col("age"),
            col("department"),
            (col("age").minus(lit(30))).alias("years_over_30")
        ])?
        .sort(vec![
            col("years_over_30").sort(true, true)
        ])?
        .collect()
        .await?;
    
    // 打印最终结果
    println!("员工年龄超过30岁的信息:");
    for batch in results {
        println!("{:?}", batch);
    }
    
    // 清理临时文件
    std::fs::remove_file(csv_path)?;
    
    Ok(())
}

总结

DataFusion Server为Rust生态系统提供了一个高性能、可扩展的数据处理解决方案,特别适合需要与Arrow生态系统集成或构建自定义数据处理管道的场景。它的模块化设计允许开发者根据需要选择组件,从嵌入式查询引擎到完整的分布式计算框架。

回到顶部