Rust Cassandra数据库驱动cassandra-cpp-sys的使用,高性能C++绑定库实现Apache Cassandra无缝集成
Rust Cassandra数据库驱动cassandra-cpp-sys的使用,高性能C++绑定库实现Apache Cassandra无缝集成
cassandra-cpp-sys是一个维护中的Rust项目,提供了DataStax cpp驱动器的底层绑定。它主要是自动生成的,最初是tupshin/cassandra-sys-rs的一个分支,但那个项目已不再维护。
开始使用
要让这个包装器工作,你必须首先安装datastax-cpp驱动。按照cpp驱动文档中的步骤进行操作。大多数平台都有预构建的包可用。
确保驱动程序(特别是libcassandra_static.a
和libcassandra.so
)位于你的/usr/local/lib64/
目录中。
你可以通过cargo使用这个crate,名称为cassandra-cpp-sys
。
完整示例代码
以下是一个使用cassandra-cpp-sys连接Cassandra数据库并执行查询的完整示例:
use cassandra_cpp_sys::*;
fn main() {
unsafe {
// 创建集群对象
let cluster = cass_cluster_new();
// 添加联系点
cass_cluster_set_contact_points(cluster, "127.0.0.1\0".as_ptr() as *const i8);
// 创建会话
let session = cass_session_new();
let future = cass_session_connect(session, cluster);
// 等待连接完成
cass_future_wait(future);
// 检查是否有错误
if cass_future_error_code(future) != CASS_OK {
let mut message = std::ptr::null_mut();
let mut message_length = 0;
cass_future_error_message(future, &mut message, &mut message_length);
println!("Connection error: {:?}", std::slice::from_raw_parts(message as *const u8, message_length));
cass_future_free(future);
cass_session_free(session);
cass_cluster_free(cluster);
return;
}
// 释放连接future
cass_future_free(future);
// 创建查询语句
let query = "SELECT keyspace_name FROM system_schema.keyspaces";
let statement = cass_statement_new(query.as_ptr() as *const i8, 0);
// 执行查询
let future = cass_session_execute(session, statement);
cass_future_wait(future);
// 获取结果
let result = cass_future_get_result(future);
let iterator = cass_iterator_from_result(result);
// 遍历结果
while cass_iterator_next(iterator) != 0 {
let row = cass_iterator_get_row(iterator);
let value = cass_row_get_column_by_name(row, "keyspace_name\0".as_ptr() as *const i8);
let mut keyspace_name = std::ptr::null_mut();
let mut keyspace_name_length = 0;
cass_value_get_string(value, &mut keyspace_name, &mut keyspace_name_length);
println!("Keyspace: {:?}", std::slice::from_raw_parts(keyspace_name as *const u8, keyspace_name_length));
}
// 清理资源
cass_iterator_free(iterator);
cass_result_free(result);
cass_future_free(future);
cass_statement_free(statement);
cass_session_free(session);
cass_cluster_free(cluster);
}
}
注意事项
- 这个crate主要提供底层绑定,使用时会到处需要使用
unsafe
代码块 - 建议使用更高级的安全包装器cassandra-cpp而不是直接使用这个sys crate
- 编译时需要确保系统已安装DataStax驱动
- 默认会搜索
/usr/lib
、/usr/local/lib64
和/usr/local/lib
目录 - 可以通过
CASSANDRA_SYS_LIB_PATH
环境变量指定额外的库搜索路径
自动生成
cassandra.rs
文件是通过以下命令自动生成的:
$ bindgen --no-layout-tests --blacklist-type=max_align_t --rustified-enum=.* --output=src/cassandra.rs cassandra.h
自动生成的代码会使用rustfmt
进行格式化。
Rust Cassandra数据库驱动cassandra-cpp-sys使用指南
简介
cassandra-cpp-sys
是Rust语言中一个高性能的Apache Cassandra数据库驱动,它提供了对C++驱动DataStax cpp-driver
的Rust绑定。这个库允许Rust开发者直接与Cassandra数据库交互,同时保持了C++驱动的高性能特性。
主要特点
- 完整的C++驱动功能绑定
- 异步I/O支持
- 连接池管理
- 负载均衡和故障转移
- 预编译语句支持
- 批量操作支持
使用方法
1. 添加依赖
首先在Cargo.toml
中添加依赖:
[dependencies]
cassandra-cpp-sys = "0.15"
2. 基本使用示例
use cassandra_cpp_sys::{Cluster, Session, Statement};
fn main() {
// 创建集群连接配置
let cluster = Cluster::new();
cluster.set_contact_points("127.0.0.1").unwrap();
// 创建会话
let session = cluster.connect().unwrap();
// 执行简单查询
let query = "SELECT keyspace_name FROM system_schema.keyspaces";
let statement = Statement::new(query, 0);
let result = session.execute(&statement).wait().unwrap();
// 处理结果
for row in result.iter() {
let keyspace_name: String = row.get_by_name("keyspace_name").unwrap();
println!("Keyspace: {}", keyspace_name);
}
}
3. 参数化查询示例
use cassandra_cpp_sys::{Cluster, Session, Statement};
fn insert_data(session: &Session, id: i32, name: &str) {
let query = "INSERT INTO example.users (id, name) VALUES (?, ?)";
let mut statement = Statement::new(query, 2);
statement.bind_int32(0, id).unwrap();
statement.bind_string(1, name).unwrap();
session.execute(&statement).wait().unwrap();
}
4. 批量操作示例
use cassandra_cpp_sys::{Cluster, Session, Statement, Batch};
fn batch_insert(session: &Session) {
let mut batch = Batch::new();
// 添加多个语句到批处理
for i in 0..10 {
let query = format!("INSERT INTO example.users (id, name) VALUES ({}, 'User{}')", i, i);
let statement = Statement::new(&query, 0);
batch.add_statement(&statement).unwrap();
}
// 执行批处理
session.execute_batch(&batch).wait().unwrap();
}
5. 异步查询示例
use cassandra_cpp_sys::{Cluster, Session, Statement};
use futures::Future;
fn async_query(session: &Session) {
let query = "SELECT * FROM example.users";
let statement = Statement::new(query, 0);
let future = session.execute(&statement)
.map(|result| {
for row in result.iter() {
let id: i32 = row.get_by_name("id").unwrap();
let name: String = row.get_by_name("name").unwrap();
println!("User {}: {}", id, name);
}
})
.map_err(|err| {
eprintln!("Query error: {}", err);
});
// 在实际应用中,可以将future放入运行时执行
futures::executor::block_on(future);
}
高级配置
连接池配置
let cluster = Cluster::new();
cluster.set_contact_points("127.0.0.1").unwrap();
cluster.set_num_threads_io(4).unwrap(); // I/O线程数
cluster.set_core_connections_per_host(2).unwrap(); // 每个主机的核心连接数
cluster.set_max_connections_per_host(8).unwrap(); // 每个主机的最大连接数
SSL配置
use cassandra_cpp_sys::{Cluster, Ssl};
let cluster = Cluster::new();
let mut ssl = Ssl::new().unwrap();
ssl.add_trusted_cert("path/to/cert.pem").unwrap();
cluster.set_ssl(&ssl).unwrap();
性能优化建议
- 使用预编译语句(Prepared Statement)重复执行相同查询
- 合理设置连接池大小
- 批量处理写入操作
- 异步执行长时间运行的查询
- 适当调整一致性级别以满足性能需求
注意事项
- 需要安装Cassandra C++驱动依赖
- 在Linux系统上可能需要安装libuv开发包
- 错误处理非常重要,Cassandra操作可能会因网络问题失败
通过cassandra-cpp-sys
,Rust开发者可以充分利用Cassandra数据库的强大功能,同时保持Rust语言的安全性和性能优势。
完整示例代码
下面是一个完整的CRUD示例,展示了如何使用cassandra-cpp-sys进行常见操作:
use cassandra_cpp_sys::{Cluster, Session, Statement, Batch, Consistency};
use futures::Future;
fn main() {
// 1. 创建集群连接并配置
let cluster = Cluster::new();
cluster.set_contact_points("127.0.0.1").unwrap();
cluster.set_core_connections_per_host(2).unwrap();
cluster.set_max_connections_per_host(4).unwrap();
// 2. 创建会话
let session = match cluster.connect() {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to connect to Cassandra: {}", e);
return;
}
};
// 3. 创建keyspace和表
create_schema(&session);
// 4. 插入数据
insert_user(&session, 1, "Alice");
insert_user(&session, 2, "Bob");
// 5. 批量插入
batch_insert_users(&session);
// 6. 查询数据
query_users(&session);
// 7. 更新数据
update_user(&session, 1, "Alice Smith");
// 8. 删除数据
delete_user(&session, 2);
// 9. 异步查询
async_query_users(&session);
}
fn create_schema(session: &Session) {
let queries = vec![
"CREATE KEYSPACE IF NOT EXISTS example WITH replication = \
{'class': 'SimpleStrategy', 'replication_factor': '1'}",
"CREATE TABLE IF NOT EXISTS example.users (id int PRIMARY KEY, name text)"
];
for query in queries {
let statement = Statement::new(query, 0);
if let Err(e) = session.execute(&statement).wait() {
eprintln!("Failed to execute schema query: {}", e);
}
}
}
fn insert_user(session: &Session, id: i32, name: &str) {
let query = "INSERT INTO example.users (id, name) VALUES (?, ?)";
let mut statement = Statement::new(query, 2);
statement.set_consistency(Consistency::ONE).unwrap();
statement.bind_int32(0, id).unwrap();
statement.bind_string(1, name).unwrap();
if let Err(e) = session.execute(&statement).wait() {
eprintln!("Failed to insert user: {}", e);
}
}
fn batch_insert_users(session: &Session) {
let mut batch = Batch::new();
batch.set_consistency(Consistency::ONE).unwrap();
for i in 3..=5 {
let query = "INSERT INTO example.users (id, name) VALUES (?, ?)";
let mut statement = Statement::new(query, 2);
statement.bind_int32(0, i).unwrap();
statement.bind_string(1, &format!("User{}", i)).unwrap();
if let Err(e) = batch.add_statement(&statement) {
eprintln!("Failed to add statement to batch: {}", e);
}
}
if let Err(e) = session.execute_batch(&batch).wait() {
eprintln!("Failed to execute batch: {}", e);
}
}
fn query_users(session: &Session) {
let query = "SELECT id, name FROM example.users";
let mut statement = Statement::new(query, 0);
statement.set_consistency(Consistency::ONE).unwrap();
match session.execute(&statement).wait() {
Ok(result) => {
for row in result.iter() {
let id: i32 = row.get_by_name("id").unwrap();
let name: String = row.get_by_name("name").unwrap();
println!("User {}: {}", id, name);
}
}
Err(e) => eprintln!("Query failed: {}", e),
}
}
fn update_user(session: &Session, id: i32, new_name: &str) {
let query = "UPDATE example.users SET name = ? WHERE id = ?";
let mut statement = Statement::new(query, 2);
statement.bind_string(0, new_name).unwrap();
statement.bind_int32(1, id).unwrap();
if let Err(e) = session.execute(&statement).wait() {
eprintln!("Failed to update user: {}", e);
}
}
fn delete_user(session: &Session, id: i32) {
let query = "DELETE FROM example.users WHERE id = ?";
let mut statement = Statement::new(query, 1);
statement.bind_int32(0, id).unwrap();
if let Err(e) = session.execute(&statement).wait() {
eprintln!("Failed to delete user: {}", e);
}
}
fn async_query_users(session: &Session) {
let query = "SELECT id, name FROM example.users";
let statement = Statement::new(query, 0);
let future = session.execute(&statement)
.map(|result| {
println!("\nAsync query results:");
for row in result.iter() {
let id: i32 = row.get_by_name("id").unwrap();
let name: String = row.get_by_name("name").unwrap();
println!("User {}: {}", id, name);
}
})
.map_err(|err| eprintln!("Async query error: {}", err));
futures::executor::block_on(future);
}
这个完整示例展示了:
- 集群连接配置
- 创建keyspace和表
- 基本的CRUD操作
- 批量操作
- 异步查询
- 错误处理
要运行此示例,请确保:
- 已安装Cassandra C++驱动
- 本地运行着Cassandra服务(默认127.0.0.1:9042)
- 在Cargo.toml中添加了cassandra-cpp-sys依赖