Rust嵌入式数据库库DuckDB的使用:高性能OLAP分析与轻量级SQL查询引擎

duckdb-rs 是一个用于在 Rust 中使用 DuckDB 的便捷封装库。它尝试暴露一个类似于 rusqlite 的接口。实际上,初始代码甚至这个 README 都是从 rusqlite 分叉而来,因为 DuckDB 也尝试暴露一个与 sqlite3 兼容的 API。

use duckdb::{params, Connection, Result};

// 在你的项目中,我们需要保持 arrow 版本与 DuckDB 中使用的版本相同。
// 你可以选择:
use duckdb::arrow::record_batch::RecordBatch;
// 或者在 Cargo.toml 中使用 * 作为版本;功能可以根据你的需要切换
// arrow = { version = "*", default-features = false, features = ["prettyprint"] }
// 然后你可以:
// use arrow::record_batch::RecordBatch;

use duckdb::arrow::util::pretty::print_batches;

#[derive(Debug)]
struct Person {
    id: i32,
    name: String,
    data: Option<Vec<u8>>,
}

fn main() -> Result<()> {
    let conn = Connection::open_in_memory()?;

    conn.execute_batch(
        r"CREATE SEQUENCE seq;
          CREATE TABLE person (
                  id              INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq'),
                  name            TEXT NOT NULL,
                  data            BLOB
                  );
        ")?;

    let me = Person {
        id: 0,
        name: "Steven".to_string(),
        data: None,
    };
    conn.execute(
        "INSERT INTO person (name, data) VALUES (?, ?)",
        params![me.name, me.data],
    )?;

    // 按行查询表
    let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
    let person_iter = stmt.query_map([], |row| {
        Ok(Person {
            id: row.get(0)?,
            name: row.get(1)?,
            data: row.get(2)?,
        })
    })?;

    for person in person_iter {
        let p = person.unwrap();
        println!("ID: {}", p.id);
        println!("Found person {:?}", p);
    }

    // 按 arrow 查询表
    let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();
    print_batches(&rbs).unwrap();
    Ok(())
}

关于构建 DuckDB 和 libduckdb-sys 的说明

libduckdb-sys 是一个与 duckdb-rs 分开的 crate,它提供了 DuckDB C API 的 Rust 声明。默认情况下,libduckdb-sys 尝试使用 pkg-config 或 MSVC ABI 构建的 Vcpkg 安装来查找系统上已存在的 DuckDB 库。

你可以通过多种方式调整此行为:

  • 如果你使用 bundled 功能,libduckdb-sys 将使用 cc crate 从源代码编译 DuckDB 并链接到该库。此源代码嵌入在 libduckdb-sys crate 中,由于我们仍在开发中,我们将定期更新它。在我们更稳定之后,我们将使用来自 duckdb 的稳定发布版本。 这可能是解决任何构建问题的最简单解决方案。你可以通过在 Cargo.toml 文件中添加以下内容来启用此功能:
cargo add duckdb --features bundled

Cargo.toml 将被更新。

[dependencies]
# 假设使用 DuckDB 版本 0.9.2。
duckdb = { version = "0.9.2", features = ["bundled"] }
  • 当链接到系统上已存在的 DuckDB 库时(因此不使用任何 bundled 功能),你可以设置 DUCKDB_LIB_DIR 环境变量指向包含该库的目录。你也可以设置 DUCKDB_INCLUDE_DIR 变量指向包含 duckdb.h 的目录。
  • 安装 duckdb 开发包通常就是全部所需,但 pkg-config 和 vcpkg 的构建助手有一些额外的配置选项。使用 vcpkg 时的默认设置是动态链接,这必须在构建前通过设置 VCPKGRS_DYNAMIC=1 环境变量来启用。

绑定生成

我们使用 bindgen 从 DuckDB 的 C 头文件生成 Rust 声明。bindgen 建议将此作为使用此功能的库构建过程的一部分。我们短暂尝试过此方法(具体是 duckdb 0.10.0),但它有一些令人烦恼的地方:

  • libduckdb-sys(以及因此的 duckdb)的构建时间急剧增加。
  • 运行 bindgen 需要一个相对较新版本的 Clang,许多系统默认未安装。
  • 运行 bindgen 还需要 DuckDB 头文件存在。

因此,我们尝试通过提供预生成的 DuckDB 绑定来避免在构建时运行 bindgen

如果你使用 bundled 功能,你将获得捆绑版本 DuckDB 的预生成绑定。如果你想在构建时运行 bindgen 以生成自己的绑定,请使用 buildtime_bindgen Cargo 功能。

贡献

参见 Contributing.md

清单

  • 运行 cargo +nightly fmt 以确保你的 Rust 代码正确格式化。
  • 运行 cargo clippy --fix --allow-dirty --all-targets --workspace --all-features -- -D warnings 以修复所有 clippy 问题。
  • 确保 cargo test --all-targets --workspace --features "modern-full extensions-full" 报告无失败。

待办事项

  • [x] 重构 ErrorCode 部分,它是从 rusqlite 借用的,我们应该有自己的
  • [ ] 支持更多类型
  • [x] 更新 duckdb.h
  • [x] 调整代码示例和文档
  • [x] 删除未使用的代码/函数
  • [x] 添加 CI
  • [x] 发布到 crate

许可证

DuckDB 和 libduckdb-sys 可在 MIT 许可证下获得。有关更多信息,请参阅 LICENSE 文件。

完整示例代码:

use duckdb::{params, Connection, Result};
use duckdb::arrow::record_batch::RecordBatch;
use duckdb::arrow::util::pretty::print_batches;

#[derive(Debug)]
struct Person {
    id: i32,
    name: String,
    data: Option<Vec<u8>>,
}

fn main() -> Result<()> {
    // 创建内存数据库连接
    let conn = Connection::open_in_memory()?;

    // 创建序列和表
    conn.execute_batch(
        r"CREATE SEQUENCE seq;
          CREATE TABLE person (
                  id              INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq'),
                  name            TEXT NOT NULL,
                  data            BLOB
                  );
        ")?;

    // 插入数据
    let me = Person {
        id: 0,
        name: "Steven".to_string(),
        data: None,
    };
    conn.execute(
        "INSERT INTO person (name, data) VALUES (?, ?)",
        params![me.name, me.data],
    )?;

    // 行式查询
    let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
    let person_iter = stmt.query_map([], |row| {
        Ok(Person {
            id: row.get(0)?,
            name: row.get(1)?,
            data: row.get(2)?,
        })
    })?;

    println!("行式查询结果:");
    for person in person_iter {
        let p = person.unwrap();
        println!("ID: {}", p.id);
        println!("姓名: {}", p.name);
        println!("数据: {:?}", p.data);
        println!("---");
    }

    // Arrow格式查询
    println!("\nArrow格式查询结果:");
    let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();
    print_batches(&rbs).unwrap();

    Ok(())
}

1 回复

Rust嵌入式数据库库DuckDB的使用:高性能OLAP分析与轻量级SQL查询引擎

简介

DuckDB是一个高性能的嵌入式分析型数据库(OLAP),专为快速分析查询而设计。它采用列式存储架构,支持标准SQL语法,无需独立服务器即可在应用程序中直接运行。DuckDB特别适合数据分析、科学计算和需要高性能SQL查询的场景。

主要特性

  • 嵌入式设计:无需单独部署数据库服务器
  • 高性能OLAP:针对分析型工作负载优化
  • 完全支持SQL:包括窗口函数、CTE等高级功能
  • 零外部依赖:单个可执行文件或库
  • 跨平台支持:Windows、Linux、macOS

安装方法

在Cargo.toml中添加依赖:

[dependencies]
duckdb = "0.8"

基本使用方法

1. 连接数据库

use duckdb::{Connection, Result};

fn main() -> Result<()> {
    // 创建内存数据库
    let conn = Connection::open_in_memory()?;
    
    // 或者创建文件数据库
    let conn = Connection::open("my_database.db")?;
    
    Ok(())
}

2. 创建表和插入数据

use duckdb::{Connection, Result};

fn main() -> Result<()> {
    let conn = Connection::open_in_memory()?;
    
    // 创建表
    conn.execute_batch(
        "CREATE TABLE users (id INTEGER, name VARCHAR, age INTEGER);
         CREATE TABLE orders (id INTEGER, user_id INTEGER, amount DOUBLE);"
    )?;
    
    // 插入数据
    conn.execute(
        "INSERT INTO users VALUES (1, 'Alice', 30), (2, 'Bob', 25), (3, 'Charlie', 35)",
        [],
    )?;
    
    conn.execute(
        "INSERT INTO orders VALUES (1, 1, 100.0), (2, 1, 50.0), (3, 2, 200.0)",
        [],
    )?;
    
    Ok(())
}

3. 执行查询

use duckdb::{Connection, Result};

fn main() -> Result<()> {
    let conn = Connection::open_in_memory()?;
    
    // 简单查询
    let mut stmt = conn.prepare("SELECT name, age FROM users WHERE age > ?")?;
    let mut rows = stmt.query([25])?;
    
    while let Some(row) = rows.next()? {
        let name: String = row.get(0)?;
        let age: i32 = row.get(1)?;
        println!("Name: {}, Age: {}", name, age);
    }
    
    // 聚合查询
    let mut stmt = conn.prepare(
        "SELECT u.name, SUM(o.amount) as total 
         FROM users u 
         JOIN orders o ON u.id = o.user_id 
         GROUP BY u.name"
    )?;
    
    let mut rows = stmt.query([])?;
    while let Some(row) = rows.next()? {
        let name: String = row.get(0)?;
        let total: f64 = row.get(1)?;
        println!("{} spent: ${:.2}", name, total);
    }
    
    Ok(())
}

4. 批量数据操作

use duckdb::{Connection, Result, params};

fn main() -> Result<()> {
    let conn = Connection::open_in_memory()?;
    
    conn.execute("CREATE TABLE products (id INTEGER, name VARCHAR, price DOUBLE)", [])?;
    
    // 批量插入
    let products = vec![
        (1, "Laptop", 999.99),
        (2, "Mouse", 25.50),
        (3, "Keyboard", 75.00),
    ];
    
    for (id, name, price) in products {
        conn.execute(
            "INSERT INTO products VALUES (?, ?, ?)",
            params![id, name, price],
        )?;
    }
    
    Ok(())
}

5. 使用预编译语句

use duckdb::{Connection, Result, params};

fn main() -> Result<()> {
    let conn = Connection::open_in_memory()?;
    
    // 准备预编译语句
    let mut stmt = conn.prepare("INSERT INTO logs (level, message) VALUES (?, ?)")?;
    
    // 多次执行相同语句
    stmt.execute(params!["INFO", "Application started"])?;
    stmt.execute(params!["WARN", "Low memory warning"])?;
    stmt.execute(params!["ERROR", "Database connection failed"])?;
    
    Ok(())
}

高级功能示例

窗口函数使用

use duckdb::{Connection, Result};

fn main() -> Result<()> {
    let conn = Connection::open_in_memory()?;
    
    conn.execute_batch(
        "CREATE TABLE sales (
            region VARCHAR, 
            product VARCHAR, 
            amount DOUBLE,
            quarter INTEGER
        )"
    )?;
    
    // 使用窗口函数进行排名
    let mut stmt = conn.prepare(
        "SELECT 
            region, 
            product, 
            amount,
            RANK() OVER (PARTITION BY region ORDER BY amount DESC) as rank
         FROM sales"
    )?;
    
    let mut rows = stmt.query([])?;
    while let Some(row) = rows.next()? {
        let region: String = row.get(0)?;
        let product: String = row.get(1)?;
        let amount: f64 = row.get(2)?;
        let rank: i64 = row.get(3)?;
        println!("Region: {}, Product: {}, Amount: {}, Rank: {}", 
                region, product, amount, rank);
    }
    
    Ok(())
}

完整示例demo

use duckdb::{Connection, Result, params};

fn main() -> Result<()> {
    // 创建内存数据库连接
    let conn = Connection::open_in_memory()?;
    
    // 创建用户表
    conn.execute_batch(
        "CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY, 
            name VARCHAR NOT NULL, 
            age INTEGER,
            email VARCHAR
        )"
    )?;
    
    // 创建订单表
    conn.execute_batch(
        "CREATE TABLE IF NOT EXISTS orders (
            id INTEGER PRIMARY KEY,
            user_id INTEGER,
            product_name VARCHAR,
            amount DOUBLE,
            order_date DATE,
            FOREIGN KEY (user_id) REFERENCES users(id)
        )"
    )?;
    
    // 批量插入用户数据
    let users = vec![
        (1, "Alice", 30, "alice@example.com"),
        (2, "Bob", 25, "bob@example.com"),
        (3, "Charlie", 35, "charlie@example.com"),
        (4, "Diana", 28, "diana@example.com"),
    ];
    
    for (id, name, age, email) in users {
        conn.execute(
            "INSERT OR IGNORE INTO users (id, name, age, email) VALUES (?, ?, ?, ?)",
            params![id, name, age, email],
        )?;
    }
    
    // 批量插入订单数据
    let orders = vec![
        (1, 1, "Laptop", 1200.0, "2023-01-15"),
        (2, 1, "Mouse", 25.5, "2023-01-16"),
        (3, 2, "Keyboard", 75.0, "2023-01-17"),
        (4, 3, "Monitor", 300.0, "2023-01-18"),
        (5, 4, "Headphones", 150.0, "2023-01-19"),
        (6, 2, "Tablet", 500.0, "2023-01-20"),
    ];
    
    for (id, user_id, product_name, amount, order_date) in orders {
        conn.execute(
            "INSERT OR IGNORE INTO orders (id, user_id, product_name, amount, order_date) VALUES (?, ?, ?, ?, ?)",
            params![id, user_id, product_name, amount, order_date],
        )?;
    }
    
    // 示例1: 简单查询 - 查找年龄大于28的用户
    println!("=== 年龄大于28的用户 ===");
    let mut stmt = conn.prepare("SELECT id, name, age, email FROM users WHERE age > ?")?;
    let mut rows = stmt.query([28])?;
    
    while let Some(row) = rows.next()? {
        let id: i32 = row.get(0)?;
        let name: String = row.get(1)?;
        let age: i32 = row.get(2)?;
        let email: String = row.get(3)?;
        println!("ID: {}, Name: {}, Age: {}, Email: {}", id, name, age, email);
    }
    
    // 示例2: 聚合查询 - 每个用户的总消费金额
    println!("\n=== 用户消费统计 ===");
    let mut stmt = conn.prepare(
        "SELECT u.name, u.email, SUM(o.amount) as total_spent, COUNT(o.id) as order_count
         FROM users u 
         LEFT JOIN orders o ON u.id = o.user_id 
         GROUP BY u.id, u.name, u.email
         ORDER BY total_spent DESC"
    )?;
    
    let mut rows = stmt.query([])?;
    while let Some(row) = rows.next()? {
        let name: String = row.get(0)?;
        let email: String = row.get(1)?;
        let total_spent: f64 = row.get(2).unwrap_or(0.0);
        let order_count: i64 = row.get(3).unwrap_or(0);
        println!("{} ({}) - 总消费: ${:.2}, 订单数: {}", name, email, total_spent, order_count);
    }
    
    // 示例3: 窗口函数 - 按产品分类的销售排名
    println!("\n=== 产品销售排名 ===");
    let mut stmt = conn.prepare(
        "SELECT 
            product_name,
            amount,
            order_date,
            RANK() OVER (PARTITION BY product_name ORDER BY amount DESC) as amount_rank,
            RANK() OVER (ORDER BY order_date) as date_rank
         FROM orders
         ORDER BY product_name, amount DESC"
    )?;
    
    let mut rows = stmt.query([])?;
    while let Some(row) = rows.next()? {
        let product_name: String = row.get(0)?;
        let amount: f64 = row.get(1)?;
        let order_date: String = row.get(2)?;
        let amount_rank: i64 = row.get(3)?;
        let date_rank: i64 = row.get(4)?;
        println!("产品: {}, 金额: ${:.2}, 日期: {}, 金额排名: {}, 日期排名: {}", 
                product_name, amount, order_date, amount_rank, date_rank);
    }
    
    // 示例4: 使用事务进行批量更新
    println!("\n=== 批量更新示例 ===");
    let transaction = conn.transaction()?;
    
    // 批量更新用户年龄
    let age_updates = vec![(1, 31), (2, 26), (3, 36)];
    
    let mut stmt = transaction.prepare("UPDATE users SET age = ? WHERE id = ?")?;
    for (id, new_age) in age_updates {
        stmt.execute(params![new_age, id])?;
        println!("已更新用户ID {} 的年龄为 {}", id, new_age);
    }
    
    // 提交事务
    transaction.commit()?;
    println!("事务提交成功");
    
    // 验证更新结果
    println!("\n=== 更新后的用户年龄 ===");
    let mut stmt = conn.prepare("SELECT id, name, age FROM users ORDER BY id")?;
    let mut rows = stmt.query([])?;
    
    while let Some(row) = rows.next()? {
        let id: i32 = row.get(0)?;
        let name: String = row.get(1)?;
        let age: i32 = row.get(2)?;
        println!("ID: {}, Name: {}, Age: {}", id, name, age);
    }
    
    Ok(())
}

性能优化建议

  1. 使用预编译语句:重复查询时使用预编译语句提高性能
  2. 批量操作:使用事务进行批量数据操作
  3. 合理使用索引:为经常查询的列创建索引
  4. 内存管理:对于大型数据集,合理配置内存使用

注意事项

  • DuckDB主要针对读密集型工作负载优化
  • 在写入密集型场景中,考虑使用事务来提升性能
  • 内存数据库重启后数据会丢失,重要数据应持久化到文件

DuckDB为Rust开发者提供了一个强大而轻量级的分析数据库解决方案,特别适合需要高性能SQL查询的嵌入式应用场景。

回到顶部