Rust数据库连接库arrow-odbc的使用:通过ODBC高效读写Arrow格式数据

Rust数据库连接库arrow-odbc的使用:通过ODBC高效读写Arrow格式数据

arrow-odbc是一个基于Rust的库,它建立在arrowodbc-api crate之上,可以从ODBC数据源填充Apache Arrow数组,或将Arrow记录批次的内容插入到数据库表中。

关于Arrow

Apache Arrow定义了一种与语言无关的列式内存格式,用于扁平化和层次化数据,组织为在现代硬件(如CPU和GPU)上进行高效分析操作。Arrow内存格式还支持零拷贝读取,实现闪电般快速的数据访问而无需序列化开销。

关于ODBC

ODBC(开放数据库连接)是一个标准,使您能够使用SQL从各种数据源访问数据。

使用方法

以下是内容中提供的从数据库读取数据的示例代码:

use arrow_odbc::OdbcReaderBuilder;
use arrow_odbc::odbc_api as odbc_api;
use odbc_api::{Environment, ConnectionOptions};

const CONNECTION_STRING: &str = "\
    Driver={ODBC Driver 18 for SQL Server};\
    Server=localhost;\
    UID=SA;\
    PWD=My@Test@Password1;\
";

fn main() -> Result<(), anyhow::Error> {
    let odbc_environment = Environment::new()?;
    
    // 连接数据库
    let connection = odbc_environment.connect_with_connection_string(
        CONNECTION_STRING,
        ConnectionOptions::default(),
    )?;

    // 此SQL语句不需要任何参数
    let parameters = ();

    // 执行查询并创建结果集
    let cursor = connection
        .execute("SELECT * FROM MyTable", parameters)?
        .expect("SELECT语句必须产生一个游标");

    // 将结果集读取为Arrow批次。使用游标的元信息自动推断Arrow类型
    let arrow_record_batches = OdbcReaderBuilder::new()
        // 每个批次最多使用256 MiB的传输缓冲区
        .with_max_bytes_per_batch(256 * 1024 * 1024)
        .build(cursor)?;

    for batch in arrow_record_batches {
        // ...处理批次...
    }
    Ok(())
}

完整示例代码

下面是一个更完整的示例,展示了如何使用arrow-odbc从数据库读取数据并处理Arrow记录批次:

use arrow_odbc::{odbc_api, OdbcReaderBuilder};
use odbc_api::{Environment, ConnectionOptions};
use arrow::record_batch::RecordBatch;
use arrow::array::{ArrayRef, Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;

const CONNECTION_STRING: &str = "\
    Driver={ODBC Driver 18 for SQL Server};\
    Server=localhost;\
    UID=SA;\
    PWD=My@Test@Password1;\
";

fn main() -> Result<(), anyhow::Error> {
    // 1. 创建ODBC环境
    let odbc_environment = Environment::new()?;
    
    // 2. 连接数据库
    let connection = odbc_environment.connect_with_connection_string(
        CONNECTION_STRING,
        ConnectionOptions::default(),
    )?;

    // 3. 创建测试表并插入一些数据
    connection.execute("CREATE TABLE IF NOT EXISTS Employee (id INT, name VARCHAR(50), salary INT)", ())?;
    connection.execute("INSERT INTO Employee VALUES (1, 'Alice', 50000)", ())?;
    connection.execute("INSERT INTO Employee VALUES (2, 'Bob', 60000)", ())?;
    
    // 4. 执行查询
    let cursor = connection
        .execute("SELECT id, name, salary FROM Employee", ())?
        .expect("SELECT语句必须产生一个游标");

    // 5. 创建ODBC读取器并转换为Arrow记录批次
    let record_batch_reader = OdbcReaderBuilder::new()
        .with_max_bytes_per_batch(256 * 1024 * 1024) // 每个批次最多256MB
        .build(cursor)?;

    // 6. 处理每个记录批次
    for batch_result in record_batch_reader {
        let batch = batch_result?;
        
        // 打印批次信息
        println!("处理批次,包含{}行数据", batch.num_rows());
        
        // 打印每列数据和类型
        for i in 0..batch.num_columns() {
            let column = batch.column(i);
            println!("列 {}: 类型={:?}, 值={:?}", 
                    i, 
                    column.data_type(),
                    column);
        }
        
        // 这里可以添加更多处理逻辑,比如:
        // - 将数据写入文件
        // - 进行数据分析
        // - 转换数据格式
    }
    
    // 7. 清理测试数据
    connection.execute("DROP TABLE IF EXISTS Employee", ())?;
    
    Ok(())
}

类型映射示例

以下是一个处理不同类型数据的示例:

use arrow_odbc::{odbc_api, OdbcReaderBuilder};
use odbc_api::{Environment, ConnectionOptions};

const CONNECTION_STRING: &str = "\
    Driver={ODBC Driver 18 for SQL Server};\
    Server=localhost;\
    UID=SA;\
    PWD=My@Test@Password1;\
";

fn main() -> Result<(), anyhow::Error> {
    let env = Environment::new()?;
    let conn = env.connect_with_connection_string(CONNECTION_STRING, ConnectionOptions::default())?;
    
    // 创建包含各种数据类型的测试表
    conn.execute(
        "CREATE TABLE DataTypesTest (
            id INT PRIMARY KEY,
            name VARCHAR(50),
            price DECIMAL(10,2),
            quantity INT,
            is_available BIT,
            created_at DATETIME,
            binary_data VARBINARY(100)
        )", 
        ()
    )?;
    
    // 插入测试数据
    conn.execute(
        "INSERT INTO DataTypesTest VALUES 
        (1, 'Product A', 19.99, 100, 1, GETDATE(), CAST('test' AS VARBINARY(100)))",
        ()
    )?;
    
    // 查询并处理不同类型的数据
    let cursor = conn.execute("SELECT * FROM DataTypesTest", ())?.unwrap();
    let batches = OdbcReaderBuilder::new().build(cursor)?;
    
    for batch in batches {
        let batch = batch?;
        println!("处理数据类型测试批次:");
        
        for i in 0..batch.num_columns() {
            let col = batch.column(i);
            println!("列 {}: 类型={:?}", i, col.data_type());
            
            // 根据类型进行不同的处理
            match col.data_type() {
                DataType::Int32 => println!("  Int32值: {:?}", col.as_any().downcast_ref::<Int32Array>().unwrap()),
                DataType::Utf8 => println!("  字符串值: {:?}", col.as_any().downcast_ref::<StringArray>().unwrap()),
                // 其他类型处理...
                _ => println!("  其他类型值")
            }
        }
    }
    
    // 清理
    conn.execute("DROP TABLE DataTypesTest", ())?;
    
    Ok(())
}

写入数据示例

以下是一个将Arrow数据写入数据库的示例:

use arrow_odbc::{odbc_api, OdbcWriter};
use odbc_api::{Environment, ConnectionOptions};
use arrow::array::{Int32Array, StringArray};
use arrow::record_batch::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;

const CONNECTION_STRING: &str = "\
    Driver={ODBC Driver 18 for SQL Server};\
    Server=localhost;\
    UID=SA;\
    PWD=My@Test@Password1;\
";

fn main() -> Result<(), anyhow::Error> {
    // 创建ODBC环境
    let env = Environment::new()?;
    let conn = env.connect_with_connection_string(CONNECTION_STRING, ConnectionOptions::default())?;
    
    // 创建测试表
    conn.execute(
        "CREATE TABLE Products (
            id INT PRIMARY KEY,
            name VARCHAR(50),
            price DECIMAL(10,2)
        )", 
        ()
    )?;
    
    // 创建Arrow数据
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
        Field::new("price", DataType::Float64, false),
    ]);
    
    let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
    let name_array = Arc::new(StringArray::from(vec!["Laptop", "Phone", "Tablet"]));
    let price_array = Arc::new(arrow::array::Float64Array::from(vec![999.99, 699.99, 399.99]));
    
    let batch = RecordBatch::try_new(
        Arc::new(schema),
        vec![id_array, name_array, price_array]
    )?;
    
    // 写入数据到数据库
    let mut writer = OdbcWriter::new(&conn, "Products", batch.schema())?;
    writer.write(&batch)?;
    writer.finish()?;
    
    println!("数据成功写入数据库");
    
    // 清理
    conn.execute("DROP TABLE Products", ())?;
    
    Ok(())
}

构建说明

要构建arrow-odbc并将其作为Rust项目的一部分编译,您需要链接到ODBC驱动程序管理器。在Windows上,这是系统的一部分,无需额外操作。在Linux和MacOS上,建议安装UnixODBC。

Ubuntu安装

sudo apt-get install unixodbc-dev

MacOS安装

brew install unixodbc

MacOS ARM安装

在ARM架构的MacOS上,brew安装的目录可能不被cargo在链接时找到。可以尝试以下方法:

  • 从源代码安装unixODBC
  • 使用brew安装unixODBC并创建其二进制目录的符号链接

许可证

arrow-odbc使用MIT许可证。


1 回复

Rust数据库连接库arrow-odbc的使用:通过ODBC高效读写Arrow格式数据

arrow-odbc是一个Rust库,允许通过ODBC接口高效地读写Apache Arrow格式的数据。它提供了在Rust应用程序和ODBC兼容数据库(如SQL Server、PostgreSQL、MySQL等)之间传输Arrow数据的能力。

主要特性

  • 将查询结果直接读取到Arrow记录批处理中
  • 使用Arrow格式高效插入数据到数据库
  • 支持带参数的查询
  • 自动类型映射在Arrow和ODBC类型之间
  • 异步支持(通过tokioasync-std)

安装

在Cargo.toml中添加依赖:

[dependencies]
arrow-odbc = "0.6"
tokio = { version = "1.0", features = ["full"] } # 如果需要异步支持

完整示例代码

下面是一个完整的示例,展示如何使用arrow-odbc进行数据库读写操作:

use std::sync::Arc;
use arrow::{
    array::{Float64Array, Int32Array, StringArray},
    datatypes::{DataType, Field, Schema},
    record_batch::RecordBatch,
};
use arrow_odbc::{
    odbc_api::{ConnectionOptions, Environment, ParameterCollection},
    OdbcReaderBuilder, OdbcWriter,
};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 从数据库读取数据到Arrow
    let batches = read_from_db()?;
    println!("读取到 {} 个记录批处理", batches.len());

    // 2. 将数据写入数据库
    write_to_db()?;
    println!("数据写入成功");

    // 3. 参数化查询示例
    let param_batches = parameterized_query()?;
    println!("参数化查询返回 {} 个记录批处理", param_batches.len());

    Ok(())
}

fn read_from_db() -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
    // 创建ODBC环境
    let env = Environment::new()?;
    
    // 建立数据库连接
    let connection = env.connect(
        "YourDSN", 
        "username", 
        "password", 
        ConnectionOptions::default()
    )?;
    
    // 执行查询并将结果读取到Arrow记录批处理
    let batches: Vec<RecordBatch> = OdbcReaderBuilder::new()
        .with_connection(connection)
        .with_query("SELECT id, name, value FROM example_table")
        .build()?
        .collect::<Result<Vec<_>, _>>()?;
    
    Ok(batches)
}

fn write_to_db() -> Result<(), Box<dyn std::error::Error>> {
    // 创建示例Arrow数据
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
        Field::new("value", DataType::Float64, false),
    ]);
    
    let id_array = Int32Array::from(vec![4, 5, 6]);
    let name_array = StringArray::from(vec!["D", "E", "F"]);
    let value_array = Float64Array::from(vec![40.1, 50.2, 60.3]);
    
    let batch = RecordBatch::try_new(
        Arc::new(schema),
        vec![
            Arc::new(id_array),
            Arc::new(name_array),
            Arc::new(value_array),
        ],
    )?;
    
    // 创建ODBC环境并连接
    let env = Environment::new()?;
    let connection = env.connect(
        "YourDSN",
        "username",
        "password",
        ConnectionOptions::default(),
    )?;
    
    // 写入数据到数据库表
    OdbcWriter::try_new(
        connection,
        "INSERT INTO example_table (id, name, value) VALUES (?, ?, ?)",
        batch.schema(),
    )?
    .write(&batch)?;
    
    Ok(())
}

fn parameterized_query() -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
    let env = Environment::new()?;
    let connection = env.connect("YourDSN", "username", "password", ConnectionOptions::default())?;
    
    // 创建参数集合
    let mut params = ParameterCollection::new();
    params.append(3);  // 查询id大于3的记录
    params.append("C"); // 查询name等于"C"的记录
    
    let batches: Vec<RecordBatch> = OdbcReaderBuilder::new()
        .with_connection(connection)
        .with_query("SELECT * FROM example_table WHERE id > ? AND name = ?")
        .with_parameters(params)
        .build()?
        .collect::<Result<Vec<_>, _>>()?;
    
    Ok(batches)
}

性能提示

  1. 批量处理数据:尽量使用批量插入而不是单行插入
  2. 预编译语句:对于重复查询,使用预编译语句
  3. 适当调整批处理大小:根据数据大小调整批处理大小以获得最佳性能
  4. 使用合适的数据类型:确保Arrow和数据库类型正确映射

arrow-odbc为需要在Rust应用程序和ODBC兼容数据库之间高效传输数据的场景提供了强大的解决方案,特别是当处理大量数据时,Arrow格式的高效性能可以显著提高吞吐量。

回到顶部