Rust数据处理引擎DataFusion Server的使用:高性能SQL查询与分布式计算框架
Rust数据处理引擎DataFusion Server的使用:高性能SQL查询与分布式计算框架
datafusion-server crate
这是一个由Rust实现的支持多会话、多种数据源查询的服务器。
主要特性:
- 使用Tokio生态系统的异步架构
- 基于Apache Arrow和Apache DataFusion
- 支持通过SQL查询多种数据源
- 提供Python插件功能用于数据源连接器和后处理器
- 使用Arrow Flight gRPC特性实现服务器间的水平扩展架构
系统概览
许可证
采用MIT许可证
支持环境
- Linux
- 基于BSD的Unix系统(包括macOS/Mac OSX)
- 基于SVR的Unix系统
- Windows(包括WSL2/Cygwin)
- 以及其他LLVM支持的环境
使用预构建的Docker镜像(目前仅支持amd64架构)
前置要求
- Docker CE/EE v20+
从GitHub容器仓库拉取镜像
$ docker pull ghcr.io/sal-openlab/datafusion-server/datafusion-server:latest
或者构建不带Python插件的版本:
$ docker pull ghcr.io/sal-openlab/datafusion-server/datafusion-server-without-plugin:latest
运行容器
$ docker run -d --rm \
-p 4000:4000 \
-v ./data:/var/datafusion-server/data \
--name datafusion-server \
ghcr.io/sal-openlab/datafusion-server/datafusion-server:latest
如果只使用容器中的示例数据,可以省略-v ./data:/var/xapi-server/data
。
从源代码构建
前置要求
- Rust工具链1.81+(Edition 2021)
- 或者使用Rust官方容器镜像
如何运行
$ cargo init server-executor
$ cd server-executor
Cargo.toml示例
[package]
name = "server-executor"
version = "0.1.0"
edition = "2021"
[dependencies]
datafusion-server = "0.20.1"
clap = { version = "4.5", features = ["derive"] }
src/main.rs示例
use std::path::PathBuf;
use clap::Parser;
use datafusion_server::settings::Settings;
#[derive(Parser)]
#[clap(author, version, about = "Arrow and other large datasets web server", long_about = None)]
struct Args {
#[clap(
long,
value_parser,
short = 'f',
value_name = "FILE",
help = "Configuration file",
default_value = "./config.toml"
)]
config: PathBuf,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let settings = Settings::new_with_file(&args.config)?;
datafusion_server::execute(settings)?;
Ok(())
}
config.toml示例
# Configuration file of datafusion-server
[server]
port = 4000
flight_grpc_port = 50051
base_url = "/"
data_dir = "./data"
plugin_dir = "./plugins"
[session]
default_keep_alive = 3600 # in seconds
upload_limit_size = 20 # MB
[log]
# trace, debug, info, warn, error
level = "debug"
调试构建和运行
$ cargo run
使用Python插件功能
需要Python 3.7+解释器
如何运行
Cargo.toml示例
[dependencies]
datafusion-server = { version = "0.20.1", features = ["plugin"] }
调试构建和运行
$ cargo run
完整示例代码
本地文件查询示例
// 使用reqwest库发送HTTP请求
use reqwest::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let response = client
.post("http://localhost:4000/dataframe/query")
.header("Content-Type", "application/json")
.body(r#"
{
"dataSources": [
{
"format": "csv",
"name": "sales",
"location": "file:///superstore.csv",
"options": {
"inferSchemaRows": 100,
"hasHeader": true
}
}
],
"query": {
"sql": "SELECT * FROM sales"
},
"response": {
"format": "json"
}
}"#)
.send
.await?;
println!("Response: {}", response.text().await?);
Ok(())
}
远程REST API查询示例
use reqwest::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let response = client
.post("http://localhost:4000/dataframe/query")
.header("Content-Type", "application/json")
.header("Accept", "text/csv")
.body(r#"
{
"dataSources": [
{
"format": "json",
"name": "population",
"location": "https://datausa.io/api/data?drilldowns=State&measures=Population",
"options": {
"jsonPath": "$.data[*]"
}
}
],
"query": {
"sql": "SELECT * FROM population WHERE \"ID Year\">=2020"
}
}"#)
.send()
.await?;
println!("Response: {}", response.text().await?);
Ok(())
}
Python数据源连接器插件示例
use reqwest::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let response = client
.post("http://localhost:4000/dataframe/query")
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.body(r#"
{
"dataSources": [
{
"format": "arrow",
"name": "example",
"location": "excel://example-workbook.xlsx/Sheet1",
"pluginOptions": {
"skipRows": 2
}
}
],
"query": {
"sql": "SELECT * FROM example"
}
}"#)
.send()
.await?;
println!("Response: {}", response.text().await?);
Ok(())
}
使用说明
多数据源SQL查询
- 支持多种数据源格式(Parquet、JSON、ndJSON、CSV等)
- 数据可以从本地文件系统和外部REST服务获取
- 必要时可以使用JSONPath进行处理
- 支持跨多数据源的查询执行
- SQL查询引擎使用Arrow DataFusion
- 响应支持Arrow、JSON和CSV格式
1 回复
Rust数据处理引擎DataFusion Server的使用:高性能SQL查询与分布式计算框架
DataFusion简介
DataFusion是Apache Arrow项目下的一个Rust原生SQL查询引擎和分布式计算框架,它提供了高性能的数据处理能力,支持SQL查询、DataFrame API以及自定义查询优化。
主要特性
- 纯Rust实现,无GC开销
- 基于Apache Arrow内存格式
- 支持SQL和DataFrame API
- 查询优化器
- 可扩展的数据源和UDF
- 支持分布式执行
安装与使用
添加依赖
在Cargo.toml中添加:
[dependencies]
datafusion = "16.0.0"
tokio = { version = "1.0", features = ["full"] }
基本使用示例
use datafusion::prelude::*;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Int32Array, StringArray};
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// 创建执行上下文
let ctx = SessionContext::new();
// 创建测试数据
let batch = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
("name", Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"]))) as _,
])?;
// 注册数据为内存表
ctx.register_batch("people", batch)?;
// 执行SQL查询
let df = ctx.sql("SELECT id, name FROM people WHERE id > 1").await?;
// 显示结果
df.show().await?;
Ok(())
}
高级功能
连接外部数据源
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// 配置Parquet文件选项
let listing_options = ListingOptions {
file_extension: ".parquet".to_string(),
format: Arc::new(ParquetFormat::default()),
..Default::default()
};
// 注册目录
ctx.register_listing_table(
"my_table",
"path/to/parquet/files",
listing_options,
None,
None,
)
.await?;
// 查询Parquet数据
let df = ctx.sql("SELECT * FROM my_table LIMIT 10").await?;
df.show().await?;
Ok(())
}
使用DataFrame API
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// 创建DataFrame
let df = ctx.read_csv("data.csv", CsvReadOptions::default()).await?;
// 使用DataFrame API进行转换
let results = df
.filter(col("age").gt(lit(30)))?
.select_columns(&["name", "age"])?
.limit(0, Some(5))?
.collect()
.await?;
// 打印结果
println!("{:?}", results);
Ok(())
}
自定义聚合函数
use datafusion::physical_plan::aggregates::{AggregateUDF, AggregateUDFImpl};
use datafusion::physical_plan::expressions::Avg;
use datafusion::logical_expr::{Volatility, Signature, TypeSignature, ReturnTypeFunction};
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// 创建自定义聚合函数
let my_avg = create_my_avg_udf();
// 注册函数
ctx.register_udaf(my_avg);
// 使用自定义函数
let df = ctx.sql("SELECT my_avg(price) FROM products").await?;
df.show().await?;
Ok(())
}
fn create_my_avg_udf() -> AggregateUDF {
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
let signature = Signature::new(
TypeSignature::Exact(vec![DataType::Float64]),
Volatility::Immutable,
);
AggregateUDF::new(
"my_avg",
&signature,
&return_type,
&Arc::new(|| Ok(Box::new(Avg::new())) as _,
)
}
分布式执行
DataFusion可以与Ballista项目集成实现分布式查询执行:
use ballista::prelude::*;
#[tokio::main]
async fn main() -> Result<(), BallistaError> {
// 创建分布式执行上下文
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);
// 执行分布式查询
let df = ctx.sql("SELECT * FROM large_table WHERE value > 100").await?;
// 收集结果
let results = df.collect().await?;
println!("{:?}", results);
Ok(())
}
性能优化建议
- 分区数据:合理分区大数据集以提高并行度
- 使用列式存储:优先使用Parquet等列式存储格式
- 缓存常用数据:利用
ctx.cache_table
缓存频繁访问的表 - 调整批处理大小:通过配置控制内存使用和并行度
- 使用投影下推:只选择必要的列减少IO
完整示例demo
下面是一个完整的DataFusion使用示例,包含了SQL查询、DataFrame操作和CSV文件读取:
use datafusion::prelude::*;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Int32Array, StringArray};
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// 示例1:内存数据查询
println!("=== 内存数据查询示例 ===");
memory_query_example().await?;
// 示例2:CSV文件处理
println!("\n=== CSV文件处理示例 ===");
csv_processing_example().await?;
Ok(())
}
async fn memory_query_example() -> datafusion::error::Result<()> {
// 创建执行上下文
let ctx = SessionContext::new();
// 创建测试数据
let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie", "David", "Eve"]);
let salary_array = Int32Array::from(vec![50000, 60000, 70000, 80000, 90000]);
let batch = RecordBatch::try_from_iter(vec![
("id", Arc::new(id_array) as _),
("name", Arc::new(name_array) as _),
("salary", Arc::new(salary_array) as _),
])?;
// 注册数据为内存表
ctx.register_batch("employees", batch)?;
// 执行SQL查询
let df = ctx.sql(
"SELECT name, salary
FROM employees
WHERE salary > 60000
ORDER BY salary DESC"
).await?;
// 显示结果
df.show().await?;
Ok(())
}
async fn csv_processing_example() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
// 创建临时CSV文件(实际使用时替换为你的CSV文件路径)
let csv_content = "id,name,age,department\n\
1,Alice,25,Engineering\n\
2,Bob,30,Marketing\n\
3,Charlie,35,Engineering\n\
4,David,40,Sales\n\
5,Eve,45,HR";
let csv_path = "employees.csv";
std::fs::write(csv_path, csv_content)?;
// 读取CSV文件
let df = ctx.read_csv(csv_path, CsvReadOptions::default()).await?;
// 使用DataFrame API进行复杂操作
let results = df
.filter(col("age").gt(lit(30)))?
.select(vec![
col("name"),
col("age"),
col("department"),
(col("age").minus(lit(30))).alias("years_over_30")
])?
.sort(vec![
col("years_over_30").sort(true, true)
])?
.collect()
.await?;
// 打印最终结果
println!("员工年龄超过30岁的信息:");
for batch in results {
println!("{:?}", batch);
}
// 清理临时文件
std::fs::remove_file(csv_path)?;
Ok(())
}
总结
DataFusion Server为Rust生态系统提供了一个高性能、可扩展的数据处理解决方案,特别适合需要与Arrow生态系统集成或构建自定义数据处理管道的场景。它的模块化设计允许开发者根据需要选择组件,从嵌入式查询引擎到完整的分布式计算框架。