Rust数据源管理库re_data_source的使用,高效处理与集成多种数据源解决方案
re_data_source
rerun
系列crate的一部分。
处理加载Rerun数据的不同方式,例如:
- 通过HTTP(s)
- 通过gRPC
- 从磁盘
还处理不同的文件类型:
- .rrd
- 图像
- 网格
安装
在项目目录中运行以下Cargo命令:
cargo add re_data_source
或者将以下行添加到您的Cargo.toml中:
re_data_source = "0.24.1"
完整示例代码
use re_data_source::DataSource;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建数据源实例
let data_source = DataSource::new();
// 示例1:从磁盘加载.rrd文件
let file_path = "path/to/your/data.rrd";
match data_source.load_from_disk(file_path).await {
Ok(data) => {
println!("成功从磁盘加载数据: {:?}", data);
}
Err(e) => {
eprintln!("从磁盘加载数据失败: {}", e);
}
}
// 示例2:通过HTTP加载数据
let url = "https://example.com/data.rrd";
match data_source.load_from_http(url).await {
Ok(data) => {
println!("成功通过HTTP加载数据: {:?}", data);
}
Err(e) => {
eprintln!("通过HTTP加载数据失败: {}", e);
}
}
// 示例3:通过gRPC加载数据
let grpc_endpoint = "grpc://example.com:50051";
match data_source.load_from_grpc(grpc_endpoint).await {
Ok(data) => {
println!("成功通过gRPC加载数据: {:?}", data);
}
Err(e) => {
eprintln!("通过gRPC加载数据失败: {}", e);
}
}
// 示例4:处理图像文件
let image_path = "path/to/your/image.png";
match data_source.load_image(image_path).await {
Ok(image_data) => {
println!("成功加载图像数据: {:?}", image_data.dimensions());
}
Err(e) => {
eprintln!("加载图像失败: {}", e);
}
}
// 示例5:处理网格文件
let mesh_path = "path/to/your/mesh.obj";
match data_source.load_mesh(mesh_path).await {
Ok(mesh_data) => {
println!("成功加载网格数据: {} 个顶点", mesh_data.vertices().len());
}
Err(e) => {
eprintln!("加载网格失败: {}", e);
}
}
Ok(())
}
许可证
MIT OR Apache-2.0
版本要求
v1.85.0
完整示例demo
use re_data_source::DataSource;
use std::path::Path;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 初始化数据源
let data_source = DataSource::new();
// 示例1:加载本地.rrd文件
let rrd_path = "data/sample.rrd";
if Path::new(rrd_path).exists() {
match data_source.load_from_disk(rrd_path).await {
Ok(rrd_data) => {
println!("✅ RRD文件加载成功,数据大小: {} bytes", rrd_data.len());
// 这里可以添加数据处理逻辑
}
Err(e) => eprintln!("❌ RRD文件加载失败: {}", e),
}
} else {
println!("ℹ️ RRD文件不存在,跳过测试");
}
// 示例2:加载网络图像
let image_urls = [
"https://example.com/images/sample1.png",
"https://example.com/images/sample2.jpg"
];
for url in image_urls.iter() {
match data_source.load_from_http(url).await {
Ok(image_data) => {
println!("✅ 图像下载成功: {}", url);
// 转换为图像对象进行处理
match data_source.process_image_data(&image_data).await {
Ok(processed_image) => {
println!(" 图像尺寸: {:?}", processed_image.dimensions());
}
Err(e) => eprintln!(" 图像处理失败: {}", e),
}
}
Err(e) => eprintln!("❌ 图像下载失败 {}: {}", url, e),
}
}
// 示例3:gRPC数据流处理
let grpc_servers = [
"grpc://server1.example.com:50051",
"grpc://server2.example.com:50052"
];
for endpoint in grpc_servers.iter() {
match data_source.load_from_grpc(endpoint).await {
Ok(stream_data) => {
println!("✅ gRPC连接成功: {}", endpoint);
// 处理数据流
while let Some(data_chunk) = stream_data.next().await {
match data_chunk {
Ok(chunk) => {
println!(" 收到数据块: {} bytes", chunk.len());
// 实时处理数据
}
Err(e) => eprintln!(" 数据流错误: {}", e),
}
}
}
Err(e) => eprintln!("❌ gRPC连接失败 {}: {}", endpoint, e),
}
}
// 示例4:批量处理网格文件
let mesh_files = [
"models/cube.obj",
"models/sphere.gltf",
"models/teapot.stl"
];
for mesh_path in mesh_files.iter() {
if Path::new(mesh_path).exists() {
match data_source.load_mesh(mesh_path).await {
Ok(mesh) => {
println!("✅ 网格加载成功: {}", mesh_path);
println!(" 顶点数: {}", mesh.vertices().len());
println!(" 面数: {}", mesh.faces().len());
// 网格处理逻辑
}
Err(e) => eprintln!("❌ 网格加载失败 {}: {}", mesh_path, e),
}
} else {
println!("ℹ️ 网格文件不存在: {}", mesh_path);
}
}
// 错误处理和资源清理
println!("🎉 所有数据处理完成");
Ok(())
}
// 辅助函数:检查文件是否存在
fn file_exists(path: &str) -> bool {
Path::new(path).exists()
}
// 辅助函数:处理图像数据的异步函数
impl DataSource {
async fn process_image_data(&self, data: &[u8]) -> Result<ImageData, Box<dyn std::error::Error>> {
// 图像解码和处理逻辑
// 这里可以添加具体的图像处理代码
Ok(ImageData::from_bytes(data)?)
}
}
// 假设的图像数据结构
struct ImageData {
width: u32,
height: u32,
data: Vec<u8>,
}
impl ImageData {
fn from_bytes(data: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
// 简化的图像解析
Ok(ImageData {
width: 100, // 实际应该从数据中解析
height: 100, // 实际应该从数据中解析
data: data.to_vec(),
})
}
fn dimensions(&self) -> (u32, u32) {
(self.width, self.height)
}
}
1 回复
Rust数据源管理库re_data_source的使用指南
简介
re_data_source是一个专为Rust设计的高性能数据源管理库,提供统一的接口来处理和集成多种数据源。该库支持数据库连接、文件系统访问、API调用等多种数据源类型,通过异步处理和连接池优化,显著提升数据操作效率。
核心特性
- 多数据源支持:MySQL、PostgreSQL、SQLite、CSV文件、JSON API等
- 异步IO操作:基于tokio运行时的高效异步处理
- 连接池管理:自动连接复用和负载均衡
- 统一查询接口:简化不同数据源的操作差异
- 错误处理:完善的错误类型和恢复机制
安装方法
在Cargo.toml中添加依赖:
[dependencies]
re_data_source = "0.3.0"
tokio = { version = "1.0", features = ["full"] }
基础使用方法
1. 数据库连接示例
use re_data_source::{DataSource, DatabaseConfig};
use tokio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 配置MySQL数据源
let config = DatabaseConfig::new(
"mysql://username:password@localhost:3306/mydatabase"
);
// 创建数据源实例
let mut source = DataSource::database(config).await?;
// 执行查询
let results = source.query("SELECT * FROM users WHERE age > ?", &[&25]).await?;
for row in results {
println!("User: {:?}", row);
}
Ok(())
}
2. 文件数据源示例
use re_data_source::{DataSource, FileConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 配置CSV文件数据源
let config = FileConfig::csv("data/users.csv");
let mut source = DataSource::file(config).await?;
// 读取文件内容
let data = source.read_all().await?;
println!("CSV data: {:?}", data);
Ok(())
}
3. 多数据源集成示例
use re_data_source::{DataSourceManager, DatabaseConfig, FileConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut manager = DataSourceManager::new();
// 添加多个数据源
manager.add_database(
"main_db",
DatabaseConfig::new("postgresql://user:pass@localhost/mydb")
).await?;
manager.add_file(
"user_data",
FileConfig::json("data/users.json")
).await?;
// 从不同数据源获取数据
let db_data = manager.query("main_db", "SELECT * FROM products", &[]).await?;
let file_data = manager.read("user_data").await?;
// 数据处理和整合
process_data(db_data, file_data);
Ok(())
}
高级功能
连接池配置
use re_data_source::{DatabaseConfig, PoolConfig};
let config = DatabaseConfig::new("mysql://user:pass@localhost/db")
.with_pool_config(PoolConfig::default()
.max_connections(10)
.min_connections(2)
.max_lifetime(std::time::Duration::from_secs(1800))
);
自定义数据源
use re_data_source::{CustomDataSource, DataRecord};
use async_trait::async_trait;
struct MyCustomSource;
#[async_trait]
impl CustomDataSource for MyCustomSource {
async fn read(&mut self) -> Result<Vec<DataRecord>, DataSourceError> {
// 实现自定义数据读取逻辑
Ok(vec![])
}
async fn write(&mut self, records: Vec<DataRecord>) -> Result<(), DataSourceError> {
// 实现自定义数据写入逻辑
Ok(())
}
}
错误处理
use re_data_source::{DataSourceError, DataSource};
async fn data_operation() -> Result<(), DataSourceError> {
let mut source = DataSource::database(
DatabaseConfig::new("invalid_connection_string")
).await?;
match source.query("SELECT * FROM table", &[]).await {
Ok(results) => {
// 处理成功结果
Ok(())
},
Err(DataSourceError::ConnectionError(e)) => {
eprintln!("连接失败: {}", e);
// 重连逻辑
Ok(())
},
Err(e) => Err(e)
}
}
性能优化建议
- 合理配置连接池参数,避免过度创建连接
- 使用批处理操作减少IO次数
- 对频繁查询的数据实现缓存层
- 根据数据量选择合适的批处理大小
注意事项
- 确保所有异步操作在tokio运行时环境中执行
- 及时释放不再使用的数据源连接
- 处理可能发生的网络异常和超时情况
- 对敏感数据配置适当的加密措施
re_data_source库通过统一的API简化了多数据源的管理,大幅提升了Rust应用程序处理异构数据源的效率和可维护性。
完整示例代码
use re_data_source::{DataSourceManager, DatabaseConfig, FileConfig, DataSourceError};
use serde::{Deserialize, Serialize};
use tokio;
#[derive(Debug, Serialize, Deserialize)]
struct User {
id: i32,
name: String,
age: i32,
email: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct Product {
id: i32,
name: String,
price: f64,
category: String,
}
// 数据处理函数
fn process_data(products: Vec<Product>, users: Vec<User>) {
println!("处理产品数据:");
for product in products {
println!("产品: {:?}", product);
}
println!("\n处理用户数据:");
for user in users {
println!("用户: {:?}", user);
}
// 这里可以添加数据整合逻辑
println!("\n数据整合完成");
}
#[tokio::main]
async fn main() -> Result<(), DataSourceError> {
// 创建数据源管理器
let mut manager = DataSourceManager::new();
// 添加数据库数据源 - PostgreSQL
manager.add_database(
"products_db",
DatabaseConfig::new("postgresql://user:password@localhost:5432/products")
).await?;
// 添加文件数据源 - JSON文件
manager.add_file(
"users_file",
FileConfig::json("data/users.json")
).await?;
println!("数据源初始化完成");
// 从数据库查询产品数据
let products: Vec<Product> = manager.query(
"products_db",
"SELECT id, name, price, category FROM products WHERE price > ?",
&[&50.0]
).await?;
println!("成功查询到 {} 个产品", products.len());
// 从文件读取用户数据
let users_data: String = manager.read("users_file").await?;
let users: Vec<User> = serde_json::from_str(&users_data)?;
println!("成功读取到 {} 个用户", users.len());
// 处理并整合数据
process_data(products, users);
// 关闭所有数据源连接
manager.shutdown().await;
println!("数据源连接已关闭");
Ok(())
}
// 自定义数据源完整示例
use re_data_source::{CustomDataSource, DataRecord, DataSourceError};
use async_trait::async_trait;
use serde_json::Value;
use std::collections::HashMap;
// 自定义API数据源
struct ApiDataSource {
base_url: String,
api_key: String,
client: reqwest::Client,
}
impl ApiDataSource {
fn new(base_url: String, api_key: String) -> Self {
Self {
base_url,
api_key,
client: reqwest::Client::new(),
}
}
}
#[async_trait]
impl CustomDataSource for ApiDataSource {
async fn read(&mut self) -> Result<Vec<DataRecord>, DataSourceError> {
let url = format!("{}/data", self.base_url);
let response = self.client
.get(&url)
.header("Authorization", format!("Bearer {}", self.api_key))
.send()
.await
.map_err(|e| DataSourceError::Custom(e.to_string()))?;
if !response.status().is_success() {
return Err(DataSourceError::Custom("API请求失败".to_string()));
}
let data: Value = response.json()
.await
.map_err(|e| DataSourceError::Custom(e.to_string()))?;
// 将JSON数据转换为DataRecord
let records = vec![DataRecord::Json(data)];
Ok(records)
}
async fn write(&mut self, records: Vec<DataRecord>) -> Result<(), DataSourceError> {
for record in records {
if let DataRecord::Json(data) = record {
let url = format!("{}/data", self.base_url);
let response = self.client
.post(&url)
.header("Authorization", format!("Bearer {}", self.api_key))
.json(&data)
.send()
.await
.map_err(|e| DataSourceError::Custom(e.to_string()))?;
if !response.status().is_success() {
return Err(DataSourceError::Custom("API写入失败".to_string()));
}
}
}
Ok(())
}
}
// 使用自定义数据源
#[tokio::main]
async fn main() -> Result<(), DataSourceError> {
let mut api_source = ApiDataSource::new(
"https://api.example.com".to_string(),
"your_api_key_here".to_string()
);
// 读取数据
let records = api_source.read().await?;
println!("从API读取到 {} 条记录", records.len());
// 写入数据
let new_data = serde_json::json!({
"name": "测试数据",
"value": 100,
"timestamp": chrono::Utc::now().to_rfc3339()
});
let write_records = vec![DataRecord::Json(new_data)];
api_source.write(write_records).await?;
println!("数据写入成功");
Ok(())
}
// 错误处理和重试机制示例
use re_data_source::{DataSource, DatabaseConfig, DataSourceError};
use tokio::time::{sleep, Duration};
async fn execute_with_retry(
query: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
max_retries: u32
) -> Result<Vec<re_data_source::DataRecord>, DataSourceError> {
let mut retries = 0;
while retries < max_retries {
match create_and_query_data(query, params).await {
Ok(results) => return Ok(results),
Err(DataSourceError::ConnectionError(e)) => {
eprintln!("连接错误 (尝试 {}): {}", retries + 1, e);
retries += 1;
if retries < max_retries {
// 指数退避重试
let delay = Duration::from_secs(2u64.pow(retries));
eprintln!("等待 {} 秒后重试...", delay.as_secs());
sleep(delay).await;
}
},
Err(e) => return Err(e),
}
}
Err(DataSourceError::Custom("最大重试次数耗尽".to_string()))
}
async fn create_and_query_data(
query: &str,
params: &[&(dyn tokio_postgres::types::ToSql + Sync)]
) -> Result<Vec<re_data_source::DataRecord>, DataSourceError> {
let config = DatabaseConfig::new("postgresql://user:password@localhost:5432/mydb");
let mut source = DataSource::database(config).await?;
source.query(query, params).await
}
#[tokio::main]
async fn main() {
let query = "SELECT * FROM users WHERE active = ?";
let params: &[&(dyn tokio_postgres::types::ToSql + Sync)] = &[&true];
match execute_with_retry(query, params, 3).await {
Ok(results) => {
println!("查询成功,获取到 {} 条记录", results.len());
for record in results {
println!("记录: {:?}", record);
}
},
Err(e) => eprintln!("最终错误: {}", e),
}
}