Rust数据源管理库re_data_source的使用,高效处理与集成多种数据源解决方案

re_data_source

rerun系列crate的一部分。

处理加载Rerun数据的不同方式,例如:

  • 通过HTTP(s)
  • 通过gRPC
  • 从磁盘

还处理不同的文件类型:

  • .rrd
  • 图像
  • 网格

安装

在项目目录中运行以下Cargo命令:

cargo add re_data_source

或者将以下行添加到您的Cargo.toml中:

re_data_source = "0.24.1"

完整示例代码

use re_data_source::DataSource;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建数据源实例
    let data_source = DataSource::new();
    
    // 示例1:从磁盘加载.rrd文件
    let file_path = "path/to/your/data.rrd";
    match data_source.load_from_disk(file_path).await {
        Ok(data) => {
            println!("成功从磁盘加载数据: {:?}", data);
        }
        Err(e) => {
            eprintln!("从磁盘加载数据失败: {}", e);
        }
    }
    
    // 示例2:通过HTTP加载数据
    let url = "https://example.com/data.rrd";
    match data_source.load_from_http(url).await {
        Ok(data) => {
            println!("成功通过HTTP加载数据: {:?}", data);
        }
        Err(e) => {
            eprintln!("通过HTTP加载数据失败: {}", e);
        }
    }
    
    // 示例3:通过gRPC加载数据
    let grpc_endpoint = "grpc://example.com:50051";
    match data_source.load_from_grpc(grpc_endpoint).await {
        Ok(data) => {
            println!("成功通过gRPC加载数据: {:?}", data);
        }
        Err(e) => {
            eprintln!("通过gRPC加载数据失败: {}", e);
        }
    }
    
    // 示例4:处理图像文件
    let image_path = "path/to/your/image.png";
    match data_source.load_image(image_path).await {
        Ok(image_data) => {
            println!("成功加载图像数据: {:?}", image_data.dimensions());
        }
        Err(e) => {
            eprintln!("加载图像失败: {}", e);
        }
    }
    
    // 示例5:处理网格文件
    let mesh_path = "path/to/your/mesh.obj";
    match data_source.load_mesh(mesh_path).await {
        Ok(mesh_data) => {
            println!("成功加载网格数据: {} 个顶点", mesh_data.vertices().len());
        }
        Err(e) => {
            eprintln!("加载网格失败: {}", e);
        }
    }
    
    Ok(())
}

许可证

MIT OR Apache-2.0

版本要求

v1.85.0

完整示例demo

use re_data_source::DataSource;
use std::path::Path;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 初始化数据源
    let data_source = DataSource::new();
    
    // 示例1:加载本地.rrd文件
    let rrd_path = "data/sample.rrd";
    if Path::new(rrd_path).exists() {
        match data_source.load_from_disk(rrd_path).await {
            Ok(rrd_data) => {
                println!("✅ RRD文件加载成功,数据大小: {} bytes", rrd_data.len());
                // 这里可以添加数据处理逻辑
            }
            Err(e) => eprintln!("❌ RRD文件加载失败: {}", e),
        }
    } else {
        println!("ℹ️  RRD文件不存在,跳过测试");
    }
    
    // 示例2:加载网络图像
    let image_urls = [
        "https://example.com/images/sample1.png",
        "https://example.com/images/sample2.jpg"
    ];
    
    for url in image_urls.iter() {
        match data_source.load_from_http(url).await {
            Ok(image_data) => {
                println!("✅ 图像下载成功: {}", url);
                // 转换为图像对象进行处理
                match data_source.process_image_data(&image_data).await {
                    Ok(processed_image) => {
                        println!("   图像尺寸: {:?}", processed_image.dimensions());
                    }
                    Err(e) => eprintln!("   图像处理失败: {}", e),
                }
            }
            Err(e) => eprintln!("❌ 图像下载失败 {}: {}", url, e),
        }
    }
    
    // 示例3:gRPC数据流处理
    let grpc_servers = [
        "grpc://server1.example.com:50051",
        "grpc://server2.example.com:50052"
    ];
    
    for endpoint in grpc_servers.iter() {
        match data_source.load_from_grpc(endpoint).await {
            Ok(stream_data) => {
                println!("✅ gRPC连接成功: {}", endpoint);
                // 处理数据流
                while let Some(data_chunk) = stream_data.next().await {
                    match data_chunk {
                        Ok(chunk) => {
                            println!("   收到数据块: {} bytes", chunk.len());
                            // 实时处理数据
                        }
                        Err(e) => eprintln!("   数据流错误: {}", e),
                    }
                }
            }
            Err(e) => eprintln!("❌ gRPC连接失败 {}: {}", endpoint, e),
        }
    }
    
    // 示例4:批量处理网格文件
    let mesh_files = [
        "models/cube.obj",
        "models/sphere.gltf",
        "models/teapot.stl"
    ];
    
    for mesh_path in mesh_files.iter() {
        if Path::new(mesh_path).exists() {
            match data_source.load_mesh(mesh_path).await {
                Ok(mesh) => {
                    println!("✅ 网格加载成功: {}", mesh_path);
                    println!("   顶点数: {}", mesh.vertices().len());
                    println!("   面数: {}", mesh.faces().len());
                    // 网格处理逻辑
                }
                Err(e) => eprintln!("❌ 网格加载失败 {}: {}", mesh_path, e),
            }
        } else {
            println!("ℹ️  网格文件不存在: {}", mesh_path);
        }
    }
    
    // 错误处理和资源清理
    println!("🎉 所有数据处理完成");
    Ok(())
}

// 辅助函数:检查文件是否存在
fn file_exists(path: &str) -> bool {
    Path::new(path).exists()
}

// 辅助函数:处理图像数据的异步函数
impl DataSource {
    async fn process_image_data(&self, data: &[u8]) -> Result<ImageData, Box<dyn std::error::Error>> {
        // 图像解码和处理逻辑
        // 这里可以添加具体的图像处理代码
        Ok(ImageData::from_bytes(data)?)
    }
}

// 假设的图像数据结构
struct ImageData {
    width: u32,
    height: u32,
    data: Vec<u8>,
}

impl ImageData {
    fn from_bytes(data: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
        // 简化的图像解析
        Ok(ImageData {
            width: 100,  // 实际应该从数据中解析
            height: 100, // 实际应该从数据中解析
            data: data.to_vec(),
        })
    }
    
    fn dimensions(&self) -> (u32, u32) {
        (self.width, self.height)
    }
}

1 回复

Rust数据源管理库re_data_source的使用指南

简介

re_data_source是一个专为Rust设计的高性能数据源管理库,提供统一的接口来处理和集成多种数据源。该库支持数据库连接、文件系统访问、API调用等多种数据源类型,通过异步处理和连接池优化,显著提升数据操作效率。

核心特性

  • 多数据源支持:MySQL、PostgreSQL、SQLite、CSV文件、JSON API等
  • 异步IO操作:基于tokio运行时的高效异步处理
  • 连接池管理:自动连接复用和负载均衡
  • 统一查询接口:简化不同数据源的操作差异
  • 错误处理:完善的错误类型和恢复机制

安装方法

在Cargo.toml中添加依赖:

[dependencies]
re_data_source = "0.3.0"
tokio = { version = "1.0", features = ["full"] }

基础使用方法

1. 数据库连接示例

use re_data_source::{DataSource, DatabaseConfig};
use tokio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 配置MySQL数据源
    let config = DatabaseConfig::new(
        "mysql://username:password@localhost:3306/mydatabase"
    );
    
    // 创建数据源实例
    let mut source = DataSource::database(config).await?;
    
    // 执行查询
    let results = source.query("SELECT * FROM users WHERE age > ?", &[&25]).await?;
    
    for row in results {
        println!("User: {:?}", row);
    }
    
    Ok(())
}

2. 文件数据源示例

use re_data_source::{DataSource, FileConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 配置CSV文件数据源
    let config = FileConfig::csv("data/users.csv");
    
    let mut source = DataSource::file(config).await?;
    
    // 读取文件内容
    let data = source.read_all().await?;
    println!("CSV data: {:?}", data);
    
    Ok(())
}

3. 多数据源集成示例

use re_data_source::{DataSourceManager, DatabaseConfig, FileConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut manager = DataSourceManager::new();
    
    // 添加多个数据源
    manager.add_database(
        "main_db", 
        DatabaseConfig::new("postgresql://user:pass@localhost/mydb")
    ).await?;
    
    manager.add_file(
        "user_data",
        FileConfig::json("data/users.json")
    ).await?;
    
    // 从不同数据源获取数据
    let db_data = manager.query("main_db", "SELECT * FROM products", &[]).await?;
    let file_data = manager.read("user_data").await?;
    
    // 数据处理和整合
    process_data(db_data, file_data);
    
    Ok(())
}

高级功能

连接池配置

use re_data_source::{DatabaseConfig, PoolConfig};

let config = DatabaseConfig::new("mysql://user:pass@localhost/db")
    .with_pool_config(PoolConfig::default()
        .max_connections(10)
        .min_connections(2)
        .max_lifetime(std::time::Duration::from_secs(1800))
    );

自定义数据源

use re_data_source::{CustomDataSource, DataRecord};
use async_trait::async_trait;

struct MyCustomSource;

#[async_trait]
impl CustomDataSource for MyCustomSource {
    async fn read(&mut self) -> Result<Vec<DataRecord>, DataSourceError> {
        // 实现自定义数据读取逻辑
        Ok(vec![])
    }
    
    async fn write(&mut self, records: Vec<DataRecord>) -> Result<(), DataSourceError> {
        // 实现自定义数据写入逻辑
        Ok(())
    }
}

错误处理

use re_data_source::{DataSourceError, DataSource};

async fn data_operation() -> Result<(), DataSourceError> {
    let mut source = DataSource::database(
        DatabaseConfig::new("invalid_connection_string")
    ).await?;
    
    match source.query("SELECT * FROM table", &[]).await {
        Ok(results) => {
            // 处理成功结果
            Ok(())
        },
        Err(DataSourceError::ConnectionError(e)) => {
            eprintln!("连接失败: {}", e);
            // 重连逻辑
            Ok(())
        },
        Err(e) => Err(e)
    }
}

性能优化建议

  1. 合理配置连接池参数,避免过度创建连接
  2. 使用批处理操作减少IO次数
  3. 对频繁查询的数据实现缓存层
  4. 根据数据量选择合适的批处理大小

注意事项

  • 确保所有异步操作在tokio运行时环境中执行
  • 及时释放不再使用的数据源连接
  • 处理可能发生的网络异常和超时情况
  • 对敏感数据配置适当的加密措施

re_data_source库通过统一的API简化了多数据源的管理,大幅提升了Rust应用程序处理异构数据源的效率和可维护性。

完整示例代码

use re_data_source::{DataSourceManager, DatabaseConfig, FileConfig, DataSourceError};
use serde::{Deserialize, Serialize};
use tokio;

#[derive(Debug, Serialize, Deserialize)]
struct User {
    id: i32,
    name: String,
    age: i32,
    email: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct Product {
    id: i32,
    name: String,
    price: f64,
    category: String,
}

// 数据处理函数
fn process_data(products: Vec<Product>, users: Vec<User>) {
    println!("处理产品数据:");
    for product in products {
        println!("产品: {:?}", product);
    }
    
    println!("\n处理用户数据:");
    for user in users {
        println!("用户: {:?}", user);
    }
    
    // 这里可以添加数据整合逻辑
    println!("\n数据整合完成");
}

#[tokio::main]
async fn main() -> Result<(), DataSourceError> {
    // 创建数据源管理器
    let mut manager = DataSourceManager::new();
    
    // 添加数据库数据源 - PostgreSQL
    manager.add_database(
        "products_db", 
        DatabaseConfig::new("postgresql://user:password@localhost:5432/products")
    ).await?;
    
    // 添加文件数据源 - JSON文件
    manager.add_file(
        "users_file",
        FileConfig::json("data/users.json")
    ).await?;
    
    println!("数据源初始化完成");
    
    // 从数据库查询产品数据
    let products: Vec<Product> = manager.query(
        "products_db", 
        "SELECT id, name, price, category FROM products WHERE price > ?", 
        &[&50.0]
    ).await?;
    
    println!("成功查询到 {} 个产品", products.len());
    
    // 从文件读取用户数据
    let users_data: String = manager.read("users_file").await?;
    let users: Vec<User> = serde_json::from_str(&users_data)?;
    
    println!("成功读取到 {} 个用户", users.len());
    
    // 处理并整合数据
    process_data(products, users);
    
    // 关闭所有数据源连接
    manager.shutdown().await;
    println!("数据源连接已关闭");
    
    Ok(())
}
// 自定义数据源完整示例
use re_data_source::{CustomDataSource, DataRecord, DataSourceError};
use async_trait::async_trait;
use serde_json::Value;
use std::collections::HashMap;

// 自定义API数据源
struct ApiDataSource {
    base_url: String,
    api_key: String,
    client: reqwest::Client,
}

impl ApiDataSource {
    fn new(base_url: String, api_key: String) -> Self {
        Self {
            base_url,
            api_key,
            client: reqwest::Client::new(),
        }
    }
}

#[async_trait]
impl CustomDataSource for ApiDataSource {
    async fn read(&mut self) -> Result<Vec<DataRecord>, DataSourceError> {
        let url = format!("{}/data", self.base_url);
        
        let response = self.client
            .get(&url)
            .header("Authorization", format!("Bearer {}", self.api_key))
            .send()
            .await
            .map_err(|e| DataSourceError::Custom(e.to_string()))?;
        
        if !response.status().is_success() {
            return Err(DataSourceError::Custom("API请求失败".to_string()));
        }
        
        let data: Value = response.json()
            .await
            .map_err(|e| DataSourceError::Custom(e.to_string()))?;
        
        // 将JSON数据转换为DataRecord
        let records = vec![DataRecord::Json(data)];
        
        Ok(records)
    }
    
    async fn write(&mut self, records: Vec<DataRecord>) -> Result<(), DataSourceError> {
        for record in records {
            if let DataRecord::Json(data) = record {
                let url = format!("{}/data", self.base_url);
                
                let response = self.client
                    .post(&url)
                    .header("Authorization", format!("Bearer {}", self.api_key))
                    .json(&data)
                    .send()
                    .await
                    .map_err(|e| DataSourceError::Custom(e.to_string()))?;
                
                if !response.status().is_success() {
                    return Err(DataSourceError::Custom("API写入失败".to_string()));
                }
            }
        }
        
        Ok(())
    }
}

// 使用自定义数据源
#[tokio::main]
async fn main() -> Result<(), DataSourceError> {
    let mut api_source = ApiDataSource::new(
        "https://api.example.com".to_string(),
        "your_api_key_here".to_string()
    );
    
    // 读取数据
    let records = api_source.read().await?;
    println!("从API读取到 {} 条记录", records.len());
    
    // 写入数据
    let new_data = serde_json::json!({
        "name": "测试数据",
        "value": 100,
        "timestamp": chrono::Utc::now().to_rfc3339()
    });
    
    let write_records = vec![DataRecord::Json(new_data)];
    api_source.write(write_records).await?;
    println!("数据写入成功");
    
    Ok(())
}
// 错误处理和重试机制示例
use re_data_source::{DataSource, DatabaseConfig, DataSourceError};
use tokio::time::{sleep, Duration};

async fn execute_with_retry(
    query: &str, 
    params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
    max_retries: u32
) -> Result<Vec<re_data_source::DataRecord>, DataSourceError> {
    let mut retries = 0;
    
    while retries < max_retries {
        match create_and_query_data(query, params).await {
            Ok(results) => return Ok(results),
            Err(DataSourceError::ConnectionError(e)) => {
                eprintln!("连接错误 (尝试 {}): {}", retries + 1, e);
                retries += 1;
                
                if retries < max_retries {
                    // 指数退避重试
                    let delay = Duration::from_secs(2u64.pow(retries));
                    eprintln!("等待 {} 秒后重试...", delay.as_secs());
                    sleep(delay).await;
                }
            },
            Err(e) => return Err(e),
        }
    }
    
    Err(DataSourceError::Custom("最大重试次数耗尽".to_string()))
}

async fn create_and_query_data(
    query: &str, 
    params: &[&(dyn tokio_postgres::types::ToSql + Sync)]
) -> Result<Vec<re_data_source::DataRecord>, DataSourceError> {
    let config = DatabaseConfig::new("postgresql://user:password@localhost:5432/mydb");
    let mut source = DataSource::database(config).await?;
    
    source.query(query, params).await
}

#[tokio::main]
async fn main() {
    let query = "SELECT * FROM users WHERE active = ?";
    let params: &[&(dyn tokio_postgres::types::ToSql + Sync)] = &[&true];
    
    match execute_with_retry(query, params, 3).await {
        Ok(results) => {
            println!("查询成功,获取到 {} 条记录", results.len());
            for record in results {
                println!("记录: {:?}", record);
            }
        },
        Err(e) => eprintln!("最终错误: {}", e),
    }
}
回到顶部