Rust嵌入式数据库库DuckDB的使用:高性能OLAP分析与轻量级SQL查询引擎
duckdb-rs 是一个用于在 Rust 中使用 DuckDB 的便捷封装库。它尝试暴露一个类似于 rusqlite 的接口。实际上,初始代码甚至这个 README 都是从 rusqlite 分叉而来,因为 DuckDB 也尝试暴露一个与 sqlite3 兼容的 API。
use duckdb::{params, Connection, Result};
// 在你的项目中,我们需要保持 arrow 版本与 DuckDB 中使用的版本相同。
// 你可以选择:
use duckdb::arrow::record_batch::RecordBatch;
// 或者在 Cargo.toml 中使用 * 作为版本;功能可以根据你的需要切换
// arrow = { version = "*", default-features = false, features = ["prettyprint"] }
// 然后你可以:
// use arrow::record_batch::RecordBatch;
use duckdb::arrow::util::pretty::print_batches;
#[derive(Debug)]
struct Person {
id: i32,
name: String,
data: Option<Vec<u8>>,
}
fn main() -> Result<()> {
let conn = Connection::open_in_memory()?;
conn.execute_batch(
r"CREATE SEQUENCE seq;
CREATE TABLE person (
id INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq'),
name TEXT NOT NULL,
data BLOB
);
")?;
let me = Person {
id: 0,
name: "Steven".to_string(),
data: None,
};
conn.execute(
"INSERT INTO person (name, data) VALUES (?, ?)",
params![me.name, me.data],
)?;
// 按行查询表
let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
let person_iter = stmt.query_map([], |row| {
Ok(Person {
id: row.get(0)?,
name: row.get(1)?,
data: row.get(2)?,
})
})?;
for person in person_iter {
let p = person.unwrap();
println!("ID: {}", p.id);
println!("Found person {:?}", p);
}
// 按 arrow 查询表
let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();
print_batches(&rbs).unwrap();
Ok(())
}
关于构建 DuckDB 和 libduckdb-sys 的说明
libduckdb-sys
是一个与 duckdb-rs
分开的 crate,它提供了 DuckDB C API 的 Rust 声明。默认情况下,libduckdb-sys
尝试使用 pkg-config 或 MSVC ABI 构建的 Vcpkg 安装来查找系统上已存在的 DuckDB 库。
你可以通过多种方式调整此行为:
- 如果你使用
bundled
功能,libduckdb-sys
将使用 cc crate 从源代码编译 DuckDB 并链接到该库。此源代码嵌入在libduckdb-sys
crate 中,由于我们仍在开发中,我们将定期更新它。在我们更稳定之后,我们将使用来自 duckdb 的稳定发布版本。 这可能是解决任何构建问题的最简单解决方案。你可以通过在Cargo.toml
文件中添加以下内容来启用此功能:
cargo add duckdb --features bundled
Cargo.toml
将被更新。
[dependencies]
# 假设使用 DuckDB 版本 0.9.2。
duckdb = { version = "0.9.2", features = ["bundled"] }
- 当链接到系统上已存在的 DuckDB 库时(因此不使用任何
bundled
功能),你可以设置DUCKDB_LIB_DIR
环境变量指向包含该库的目录。你也可以设置DUCKDB_INCLUDE_DIR
变量指向包含duckdb.h
的目录。 - 安装 duckdb 开发包通常就是全部所需,但 pkg-config 和 vcpkg 的构建助手有一些额外的配置选项。使用 vcpkg 时的默认设置是动态链接,这必须在构建前通过设置
VCPKGRS_DYNAMIC=1
环境变量来启用。
绑定生成
我们使用 bindgen 从 DuckDB 的 C 头文件生成 Rust 声明。bindgen 建议将此作为使用此功能的库构建过程的一部分。我们短暂尝试过此方法(具体是 duckdb
0.10.0),但它有一些令人烦恼的地方:
libduckdb-sys
(以及因此的duckdb
)的构建时间急剧增加。- 运行
bindgen
需要一个相对较新版本的 Clang,许多系统默认未安装。 - 运行
bindgen
还需要 DuckDB 头文件存在。
因此,我们尝试通过提供预生成的 DuckDB 绑定来避免在构建时运行 bindgen
。
如果你使用 bundled
功能,你将获得捆绑版本 DuckDB 的预生成绑定。如果你想在构建时运行 bindgen
以生成自己的绑定,请使用 buildtime_bindgen
Cargo 功能。
贡献
清单
- 运行
cargo +nightly fmt
以确保你的 Rust 代码正确格式化。 - 运行
cargo clippy --fix --allow-dirty --all-targets --workspace --all-features -- -D warnings
以修复所有 clippy 问题。 - 确保
cargo test --all-targets --workspace --features "modern-full extensions-full"
报告无失败。
待办事项
- [x] 重构 ErrorCode 部分,它是从 rusqlite 借用的,我们应该有自己的
- [ ] 支持更多类型
- [x] 更新 duckdb.h
- [x] 调整代码示例和文档
- [x] 删除未使用的代码/函数
- [x] 添加 CI
- [x] 发布到 crate
许可证
DuckDB 和 libduckdb-sys 可在 MIT 许可证下获得。有关更多信息,请参阅 LICENSE 文件。
完整示例代码:
use duckdb::{params, Connection, Result};
use duckdb::arrow::record_batch::RecordBatch;
use duckdb::arrow::util::pretty::print_batches;
#[derive(Debug)]
struct Person {
id: i32,
name: String,
data: Option<Vec<u8>>,
}
fn main() -> Result<()> {
// 创建内存数据库连接
let conn = Connection::open_in_memory()?;
// 创建序列和表
conn.execute_batch(
r"CREATE SEQUENCE seq;
CREATE TABLE person (
id INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq'),
name TEXT NOT NULL,
data BLOB
);
")?;
// 插入数据
let me = Person {
id: 0,
name: "Steven".to_string(),
data: None,
};
conn.execute(
"INSERT INTO person (name, data) VALUES (?, ?)",
params![me.name, me.data],
)?;
// 行式查询
let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
let person_iter = stmt.query_map([], |row| {
Ok(Person {
id: row.get(0)?,
name: row.get(1)?,
data: row.get(2)?,
})
})?;
println!("行式查询结果:");
for person in person_iter {
let p = person.unwrap();
println!("ID: {}", p.id);
println!("姓名: {}", p.name);
println!("数据: {:?}", p.data);
println!("---");
}
// Arrow格式查询
println!("\nArrow格式查询结果:");
let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();
print_batches(&rbs).unwrap();
Ok(())
}
Rust嵌入式数据库库DuckDB的使用:高性能OLAP分析与轻量级SQL查询引擎
简介
DuckDB是一个高性能的嵌入式分析型数据库(OLAP),专为快速分析查询而设计。它采用列式存储架构,支持标准SQL语法,无需独立服务器即可在应用程序中直接运行。DuckDB特别适合数据分析、科学计算和需要高性能SQL查询的场景。
主要特性
- 嵌入式设计:无需单独部署数据库服务器
- 高性能OLAP:针对分析型工作负载优化
- 完全支持SQL:包括窗口函数、CTE等高级功能
- 零外部依赖:单个可执行文件或库
- 跨平台支持:Windows、Linux、macOS
安装方法
在Cargo.toml中添加依赖:
[dependencies]
duckdb = "0.8"
基本使用方法
1. 连接数据库
use duckdb::{Connection, Result};
fn main() -> Result<()> {
// 创建内存数据库
let conn = Connection::open_in_memory()?;
// 或者创建文件数据库
let conn = Connection::open("my_database.db")?;
Ok(())
}
2. 创建表和插入数据
use duckdb::{Connection, Result};
fn main() -> Result<()> {
let conn = Connection::open_in_memory()?;
// 创建表
conn.execute_batch(
"CREATE TABLE users (id INTEGER, name VARCHAR, age INTEGER);
CREATE TABLE orders (id INTEGER, user_id INTEGER, amount DOUBLE);"
)?;
// 插入数据
conn.execute(
"INSERT INTO users VALUES (1, 'Alice', 30), (2, 'Bob', 25), (3, 'Charlie', 35)",
[],
)?;
conn.execute(
"INSERT INTO orders VALUES (1, 1, 100.0), (2, 1, 50.0), (3, 2, 200.0)",
[],
)?;
Ok(())
}
3. 执行查询
use duckdb::{Connection, Result};
fn main() -> Result<()> {
let conn = Connection::open_in_memory()?;
// 简单查询
let mut stmt = conn.prepare("SELECT name, age FROM users WHERE age > ?")?;
let mut rows = stmt.query([25])?;
while let Some(row) = rows.next()? {
let name: String = row.get(0)?;
let age: i32 = row.get(1)?;
println!("Name: {}, Age: {}", name, age);
}
// 聚合查询
let mut stmt = conn.prepare(
"SELECT u.name, SUM(o.amount) as total
FROM users u
JOIN orders o ON u.id = o.user_id
GROUP BY u.name"
)?;
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let name: String = row.get(0)?;
let total: f64 = row.get(1)?;
println!("{} spent: ${:.2}", name, total);
}
Ok(())
}
4. 批量数据操作
use duckdb::{Connection, Result, params};
fn main() -> Result<()> {
let conn = Connection::open_in_memory()?;
conn.execute("CREATE TABLE products (id INTEGER, name VARCHAR, price DOUBLE)", [])?;
// 批量插入
let products = vec![
(1, "Laptop", 999.99),
(2, "Mouse", 25.50),
(3, "Keyboard", 75.00),
];
for (id, name, price) in products {
conn.execute(
"INSERT INTO products VALUES (?, ?, ?)",
params![id, name, price],
)?;
}
Ok(())
}
5. 使用预编译语句
use duckdb::{Connection, Result, params};
fn main() -> Result<()> {
let conn = Connection::open_in_memory()?;
// 准备预编译语句
let mut stmt = conn.prepare("INSERT INTO logs (level, message) VALUES (?, ?)")?;
// 多次执行相同语句
stmt.execute(params!["INFO", "Application started"])?;
stmt.execute(params!["WARN", "Low memory warning"])?;
stmt.execute(params!["ERROR", "Database connection failed"])?;
Ok(())
}
高级功能示例
窗口函数使用
use duckdb::{Connection, Result};
fn main() -> Result<()> {
let conn = Connection::open_in_memory()?;
conn.execute_batch(
"CREATE TABLE sales (
region VARCHAR,
product VARCHAR,
amount DOUBLE,
quarter INTEGER
)"
)?;
// 使用窗口函数进行排名
let mut stmt = conn.prepare(
"SELECT
region,
product,
amount,
RANK() OVER (PARTITION BY region ORDER BY amount DESC) as rank
FROM sales"
)?;
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let region: String = row.get(0)?;
let product: String = row.get(1)?;
let amount: f64 = row.get(2)?;
let rank: i64 = row.get(3)?;
println!("Region: {}, Product: {}, Amount: {}, Rank: {}",
region, product, amount, rank);
}
Ok(())
}
完整示例demo
use duckdb::{Connection, Result, params};
fn main() -> Result<()> {
// 创建内存数据库连接
let conn = Connection::open_in_memory()?;
// 创建用户表
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name VARCHAR NOT NULL,
age INTEGER,
email VARCHAR
)"
)?;
// 创建订单表
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY,
user_id INTEGER,
product_name VARCHAR,
amount DOUBLE,
order_date DATE,
FOREIGN KEY (user_id) REFERENCES users(id)
)"
)?;
// 批量插入用户数据
let users = vec![
(1, "Alice", 30, "alice@example.com"),
(2, "Bob", 25, "bob@example.com"),
(3, "Charlie", 35, "charlie@example.com"),
(4, "Diana", 28, "diana@example.com"),
];
for (id, name, age, email) in users {
conn.execute(
"INSERT OR IGNORE INTO users (id, name, age, email) VALUES (?, ?, ?, ?)",
params![id, name, age, email],
)?;
}
// 批量插入订单数据
let orders = vec![
(1, 1, "Laptop", 1200.0, "2023-01-15"),
(2, 1, "Mouse", 25.5, "2023-01-16"),
(3, 2, "Keyboard", 75.0, "2023-01-17"),
(4, 3, "Monitor", 300.0, "2023-01-18"),
(5, 4, "Headphones", 150.0, "2023-01-19"),
(6, 2, "Tablet", 500.0, "2023-01-20"),
];
for (id, user_id, product_name, amount, order_date) in orders {
conn.execute(
"INSERT OR IGNORE INTO orders (id, user_id, product_name, amount, order_date) VALUES (?, ?, ?, ?, ?)",
params![id, user_id, product_name, amount, order_date],
)?;
}
// 示例1: 简单查询 - 查找年龄大于28的用户
println!("=== 年龄大于28的用户 ===");
let mut stmt = conn.prepare("SELECT id, name, age, email FROM users WHERE age > ?")?;
let mut rows = stmt.query([28])?;
while let Some(row) = rows.next()? {
let id: i32 = row.get(0)?;
let name: String = row.get(1)?;
let age: i32 = row.get(2)?;
let email: String = row.get(3)?;
println!("ID: {}, Name: {}, Age: {}, Email: {}", id, name, age, email);
}
// 示例2: 聚合查询 - 每个用户的总消费金额
println!("\n=== 用户消费统计 ===");
let mut stmt = conn.prepare(
"SELECT u.name, u.email, SUM(o.amount) as total_spent, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id, u.name, u.email
ORDER BY total_spent DESC"
)?;
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let name: String = row.get(0)?;
let email: String = row.get(1)?;
let total_spent: f64 = row.get(2).unwrap_or(0.0);
let order_count: i64 = row.get(3).unwrap_or(0);
println!("{} ({}) - 总消费: ${:.2}, 订单数: {}", name, email, total_spent, order_count);
}
// 示例3: 窗口函数 - 按产品分类的销售排名
println!("\n=== 产品销售排名 ===");
let mut stmt = conn.prepare(
"SELECT
product_name,
amount,
order_date,
RANK() OVER (PARTITION BY product_name ORDER BY amount DESC) as amount_rank,
RANK() OVER (ORDER BY order_date) as date_rank
FROM orders
ORDER BY product_name, amount DESC"
)?;
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let product_name: String = row.get(0)?;
let amount: f64 = row.get(1)?;
let order_date: String = row.get(2)?;
let amount_rank: i64 = row.get(3)?;
let date_rank: i64 = row.get(4)?;
println!("产品: {}, 金额: ${:.2}, 日期: {}, 金额排名: {}, 日期排名: {}",
product_name, amount, order_date, amount_rank, date_rank);
}
// 示例4: 使用事务进行批量更新
println!("\n=== 批量更新示例 ===");
let transaction = conn.transaction()?;
// 批量更新用户年龄
let age_updates = vec![(1, 31), (2, 26), (3, 36)];
let mut stmt = transaction.prepare("UPDATE users SET age = ? WHERE id = ?")?;
for (id, new_age) in age_updates {
stmt.execute(params![new_age, id])?;
println!("已更新用户ID {} 的年龄为 {}", id, new_age);
}
// 提交事务
transaction.commit()?;
println!("事务提交成功");
// 验证更新结果
println!("\n=== 更新后的用户年龄 ===");
let mut stmt = conn.prepare("SELECT id, name, age FROM users ORDER BY id")?;
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let id: i32 = row.get(0)?;
let name: String = row.get(1)?;
let age: i32 = row.get(2)?;
println!("ID: {}, Name: {}, Age: {}", id, name, age);
}
Ok(())
}
性能优化建议
- 使用预编译语句:重复查询时使用预编译语句提高性能
- 批量操作:使用事务进行批量数据操作
- 合理使用索引:为经常查询的列创建索引
- 内存管理:对于大型数据集,合理配置内存使用
注意事项
- DuckDB主要针对读密集型工作负载优化
- 在写入密集型场景中,考虑使用事务来提升性能
- 内存数据库重启后数据会丢失,重要数据应持久化到文件
DuckDB为Rust开发者提供了一个强大而轻量级的分析数据库解决方案,特别适合需要高性能SQL查询的嵌入式应用场景。