Rust Cassandra数据库驱动cassandra-cpp-sys的使用,高性能C++绑定库实现Apache Cassandra无缝集成

Rust Cassandra数据库驱动cassandra-cpp-sys的使用,高性能C++绑定库实现Apache Cassandra无缝集成

cassandra-cpp-sys是一个维护中的Rust项目,提供了DataStax cpp驱动器的底层绑定。它主要是自动生成的,最初是tupshin/cassandra-sys-rs的一个分支,但那个项目已不再维护。

开始使用

要让这个包装器工作,你必须首先安装datastax-cpp驱动。按照cpp驱动文档中的步骤进行操作。大多数平台都有预构建的包可用。

确保驱动程序(特别是libcassandra_static.alibcassandra.so)位于你的/usr/local/lib64/目录中。

你可以通过cargo使用这个crate,名称为cassandra-cpp-sys

完整示例代码

以下是一个使用cassandra-cpp-sys连接Cassandra数据库并执行查询的完整示例:

use cassandra_cpp_sys::*;

fn main() {
    unsafe {
        // 创建集群对象
        let cluster = cass_cluster_new();
        
        // 添加联系点
        cass_cluster_set_contact_points(cluster, "127.0.0.1\0".as_ptr() as *const i8);
        
        // 创建会话
        let session = cass_session_new();
        let future = cass_session_connect(session, cluster);
        
        // 等待连接完成
        cass_future_wait(future);
        
        // 检查是否有错误
        if cass_future_error_code(future) != CASS_OK {
            let mut message = std::ptr::null_mut();
            let mut message_length = 0;
            cass_future_error_message(future, &mut message, &mut message_length);
            println!("Connection error: {:?}", std::slice::from_raw_parts(message as *const u8, message_length));
            cass_future_free(future);
            cass_session_free(session);
            cass_cluster_free(cluster);
            return;
        }
        
        // 释放连接future
        cass_future_free(future);
        
        // 创建查询语句
        let query = "SELECT keyspace_name FROM system_schema.keyspaces";
        let statement = cass_statement_new(query.as_ptr() as *const i8, 0);
        
        // 执行查询
        let future = cass_session_execute(session, statement);
        cass_future_wait(future);
        
        // 获取结果
        let result = cass_future_get_result(future);
        let iterator = cass_iterator_from_result(result);
        
        // 遍历结果
        while cass_iterator_next(iterator) != 0 {
            let row = cass_iterator_get_row(iterator);
            let value = cass_row_get_column_by_name(row, "keyspace_name\0".as_ptr() as *const i8);
            
            let mut keyspace_name = std::ptr::null_mut();
            let mut keyspace_name_length = 0;
            cass_value_get_string(value, &mut keyspace_name, &mut keyspace_name_length);
            
            println!("Keyspace: {:?}", std::slice::from_raw_parts(keyspace_name as *const u8, keyspace_name_length));
        }
        
        // 清理资源
        cass_iterator_free(iterator);
        cass_result_free(result);
        cass_future_free(future);
        cass_statement_free(statement);
        cass_session_free(session);
        cass_cluster_free(cluster);
    }
}

注意事项

  1. 这个crate主要提供底层绑定,使用时会到处需要使用unsafe代码块
  2. 建议使用更高级的安全包装器cassandra-cpp而不是直接使用这个sys crate
  3. 编译时需要确保系统已安装DataStax驱动
  4. 默认会搜索/usr/lib/usr/local/lib64/usr/local/lib目录
  5. 可以通过CASSANDRA_SYS_LIB_PATH环境变量指定额外的库搜索路径

自动生成

cassandra.rs文件是通过以下命令自动生成的:

$ bindgen --no-layout-tests --blacklist-type=max_align_t --rustified-enum=.* --output=src/cassandra.rs cassandra.h

自动生成的代码会使用rustfmt进行格式化。


1 回复

Rust Cassandra数据库驱动cassandra-cpp-sys使用指南

简介

cassandra-cpp-sys是Rust语言中一个高性能的Apache Cassandra数据库驱动,它提供了对C++驱动DataStax cpp-driver的Rust绑定。这个库允许Rust开发者直接与Cassandra数据库交互,同时保持了C++驱动的高性能特性。

主要特点

  • 完整的C++驱动功能绑定
  • 异步I/O支持
  • 连接池管理
  • 负载均衡和故障转移
  • 预编译语句支持
  • 批量操作支持

使用方法

1. 添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
cassandra-cpp-sys = "0.15"

2. 基本使用示例

use cassandra_cpp_sys::{Cluster, Session, Statement};

fn main() {
    // 创建集群连接配置
    let cluster = Cluster::new();
    cluster.set_contact_points("127.0.0.1").unwrap();
    
    // 创建会话
    let session = cluster.connect().unwrap();
    
    // 执行简单查询
    let query = "SELECT keyspace_name FROM system_schema.keyspaces";
    let statement = Statement::new(query, 0);
    let result = session.execute(&statement).wait().unwrap();
    
    // 处理结果
    for row in result.iter() {
        let keyspace_name: String = row.get_by_name("keyspace_name").unwrap();
        println!("Keyspace: {}", keyspace_name);
    }
}

3. 参数化查询示例

use cassandra_cpp_sys::{Cluster, Session, Statement};

fn insert_data(session: &Session, id: i32, name: &str) {
    let query = "INSERT INTO example.users (id, name) VALUES (?, ?)";
    let mut statement = Statement::new(query, 2);
    statement.bind_int32(0, id).unwrap();
    statement.bind_string(1, name).unwrap();
    
    session.execute(&statement).wait().unwrap();
}

4. 批量操作示例

use cassandra_cpp_sys::{Cluster, Session, Statement, Batch};

fn batch_insert(session: &Session) {
    let mut batch = Batch::new();
    
    // 添加多个语句到批处理
    for i in 0..10 {
        let query = format!("INSERT INTO example.users (id, name) VALUES ({}, 'User{}')", i, i);
        let statement = Statement::new(&query, 0);
        batch.add_statement(&statement).unwrap();
    }
    
    // 执行批处理
    session.execute_batch(&batch).wait().unwrap();
}

5. 异步查询示例

use cassandra_cpp_sys::{Cluster, Session, Statement};
use futures::Future;

fn async_query(session: &Session) {
    let query = "SELECT * FROM example.users";
    let statement = Statement::new(query, 0);
    
    let future = session.execute(&statement)
        .map(|result| {
            for row in result.iter() {
                let id: i32 = row.get_by_name("id").unwrap();
                let name: String = row.get_by_name("name").unwrap();
                println!("User {}: {}", id, name);
            }
        })
        .map_err(|err| {
            eprintln!("Query error: {}", err);
        });
    
    // 在实际应用中,可以将future放入运行时执行
    futures::executor::block_on(future);
}

高级配置

连接池配置

let cluster = Cluster::new();
cluster.set_contact_points("127.0.0.1").unwrap();
cluster.set_num_threads_io(4).unwrap();  // I/O线程数
cluster.set_core_connections_per_host(2).unwrap();  // 每个主机的核心连接数
cluster.set_max_connections_per_host(8).unwrap();  // 每个主机的最大连接数

SSL配置

use cassandra_cpp_sys::{Cluster, Ssl};

let cluster = Cluster::new();
let mut ssl = Ssl::new().unwrap();
ssl.add_trusted_cert("path/to/cert.pem").unwrap();
cluster.set_ssl(&ssl).unwrap();

性能优化建议

  1. 使用预编译语句(Prepared Statement)重复执行相同查询
  2. 合理设置连接池大小
  3. 批量处理写入操作
  4. 异步执行长时间运行的查询
  5. 适当调整一致性级别以满足性能需求

注意事项

  1. 需要安装Cassandra C++驱动依赖
  2. 在Linux系统上可能需要安装libuv开发包
  3. 错误处理非常重要,Cassandra操作可能会因网络问题失败

通过cassandra-cpp-sys,Rust开发者可以充分利用Cassandra数据库的强大功能,同时保持Rust语言的安全性和性能优势。

完整示例代码

下面是一个完整的CRUD示例,展示了如何使用cassandra-cpp-sys进行常见操作:

use cassandra_cpp_sys::{Cluster, Session, Statement, Batch, Consistency};
use futures::Future;

fn main() {
    // 1. 创建集群连接并配置
    let cluster = Cluster::new();
    cluster.set_contact_points("127.0.0.1").unwrap();
    cluster.set_core_connections_per_host(2).unwrap();
    cluster.set_max_connections_per_host(4).unwrap();
    
    // 2. 创建会话
    let session = match cluster.connect() {
        Ok(s) => s,
        Err(e) => {
            eprintln!("Failed to connect to Cassandra: {}", e);
            return;
        }
    };
    
    // 3. 创建keyspace和表
    create_schema(&session);
    
    // 4. 插入数据
    insert_user(&session, 1, "Alice");
    insert_user(&session, 2, "Bob");
    
    // 5. 批量插入
    batch_insert_users(&session);
    
    // 6. 查询数据
    query_users(&session);
    
    // 7. 更新数据
    update_user(&session, 1, "Alice Smith");
    
    // 8. 删除数据
    delete_user(&session, 2);
    
    // 9. 异步查询
    async_query_users(&session);
}

fn create_schema(session: &Session) {
    let queries = vec![
        "CREATE KEYSPACE IF NOT EXISTS example WITH replication = \
         {'class': 'SimpleStrategy', 'replication_factor': '1'}",
        "CREATE TABLE IF NOT EXISTS example.users (id int PRIMARY KEY, name text)"
    ];
    
    for query in queries {
        let statement = Statement::new(query, 0);
        if let Err(e) = session.execute(&statement).wait() {
            eprintln!("Failed to execute schema query: {}", e);
        }
    }
}

fn insert_user(session: &Session, id: i32, name: &str) {
    let query = "INSERT INTO example.users (id, name) VALUES (?, ?)";
    let mut statement = Statement::new(query, 2);
    statement.set_consistency(Consistency::ONE).unwrap();
    statement.bind_int32(0, id).unwrap();
    statement.bind_string(1, name).unwrap();
    
    if let Err(e) = session.execute(&statement).wait() {
        eprintln!("Failed to insert user: {}", e);
    }
}

fn batch_insert_users(session: &Session) {
    let mut batch = Batch::new();
    batch.set_consistency(Consistency::ONE).unwrap();
    
    for i in 3..=5 {
        let query = "INSERT INTO example.users (id, name) VALUES (?, ?)";
        let mut statement = Statement::new(query, 2);
        statement.bind_int32(0, i).unwrap();
        statement.bind_string(1, &format!("User{}", i)).unwrap();
        
        if let Err(e) = batch.add_statement(&statement) {
            eprintln!("Failed to add statement to batch: {}", e);
        }
    }
    
    if let Err(e) = session.execute_batch(&batch).wait() {
        eprintln!("Failed to execute batch: {}", e);
    }
}

fn query_users(session: &Session) {
    let query = "SELECT id, name FROM example.users";
    let mut statement = Statement::new(query, 0);
    statement.set_consistency(Consistency::ONE).unwrap();
    
    match session.execute(&statement).wait() {
        Ok(result) => {
            for row in result.iter() {
                let id: i32 = row.get_by_name("id").unwrap();
                let name: String = row.get_by_name("name").unwrap();
                println!("User {}: {}", id, name);
            }
        }
        Err(e) => eprintln!("Query failed: {}", e),
    }
}

fn update_user(session: &Session, id: i32, new_name: &str) {
    let query = "UPDATE example.users SET name = ? WHERE id = ?";
    let mut statement = Statement::new(query, 2);
    statement.bind_string(0, new_name).unwrap();
    statement.bind_int32(1, id).unwrap();
    
    if let Err(e) = session.execute(&statement).wait() {
        eprintln!("Failed to update user: {}", e);
    }
}

fn delete_user(session: &Session, id: i32) {
    let query = "DELETE FROM example.users WHERE id = ?";
    let mut statement = Statement::new(query, 1);
    statement.bind_int32(0, id).unwrap();
    
    if let Err(e) = session.execute(&statement).wait() {
        eprintln!("Failed to delete user: {}", e);
    }
}

fn async_query_users(session: &Session) {
    let query = "SELECT id, name FROM example.users";
    let statement = Statement::new(query, 0);
    
    let future = session.execute(&statement)
        .map(|result| {
            println!("\nAsync query results:");
            for row in result.iter() {
                let id: i32 = row.get_by_name("id").unwrap();
                let name: String = row.get_by_name("name").unwrap();
                println!("User {}: {}", id, name);
            }
        })
        .map_err(|err| eprintln!("Async query error: {}", err));
    
    futures::executor::block_on(future);
}

这个完整示例展示了:

  1. 集群连接配置
  2. 创建keyspace和表
  3. 基本的CRUD操作
  4. 批量操作
  5. 异步查询
  6. 错误处理

要运行此示例,请确保:

  1. 已安装Cassandra C++驱动
  2. 本地运行着Cassandra服务(默认127.0.0.1:9042)
  3. 在Cargo.toml中添加了cassandra-cpp-sys依赖
回到顶部