Rust数据处理插件库rgwml的使用:高效机器学习与工作流自动化工具

Rust数据处理插件库rgwml的使用:高效机器学习与工作流自动化工具

概述

RGWML是一个专注于简化数据科学、机器学习和人工智能操作的Rust库。它被设计为优雅、高效且易于使用,主要特点包括:

  • 提供类似Python Pandas的CSV处理功能
  • 支持多种机器学习算法(XGBoost和各种聚类算法)
  • 与多种数据库集成(MySQL、MSSQL、Clickhouse、Google Big Query等)
  • 支持Google Sheets和H5文件处理
  • 提供OpenAI API集成

安装依赖

首先需要安装系统依赖:

sudo apt-get update
sudo apt-get install python3-pip libxgboost-dev libhdf5-dev

然后安装Python依赖:

pip3 install google-cloud-bigquery clickhouse-driver pandas xgboost scikit-learn numpy h5py tables dask dask[dataframe] dask[distributed]

CSV处理示例

基本CSV操作

use rgwml::csv_utils::CsvBuilder;

// 创建新CSV
let builder = CsvBuilder::new()
    .set_header(&["Column1", "Column2", "Column3"])
    .add_rows(&[&["Row1-1", "Row1-2", "Row1-3"], &["Row2-1", "Row2-2", "Row2-3"]])
    .save_as("/path/to/your/file.csv");

// 从现有CSV加载
let builder = CsvBuilder::from_csv("/path/to/existing/file.csv");

// 从Google Sheets加载
use tokio::runtime::Runtime;
let rt = Runtime::new().unwrap();
rt.block_on(async {
    let csv_builder = CsvBuilder::from_publicly_viewable_google_sheet(
        "https://docs.google.com/spreadsheets/d/1U9ozNFwV__c15z4Mp_EWorGwOv6mZPaQ9dmYtjmCPow/edit#gid=272498272"
    ).await;
});

高级CSV操作

use rgwml::csv_utils::{Exp, ExpVal, CsvBuilder};

let _ = CsvBuilder::from_csv("/path/to/your/file.csv")
    .rename_columns vec![("OLD_COLUMN", "NEW_COLUMN")])
    .drop_columns(vec!["UNUSED_COLUMN"])
    .where_(
        vec![
            ("Exp1", Exp {
                column: "customer_type".to_string(),
                operator: "==".to_string(),
                compare_with: ExpVal::STR("REGULAR".to_string()),
                compare_as: "TEXT".to_string()
            }),
            ("Exp2", Exp {
                column: "invoice_data".to_string(),
                operator: ">".to_string(),
                compare_with: ExpVal::STR("2023-12-31 23:59:59".to_string()),
                compare_as: "TEXT".to_string()
            }),
        ],
        "Exp1 && Exp2"
    )
    .save_as("/path/to/modified/file.csv");

机器学习示例

XGBoost模型训练与预测

use rgwml::csv_utils::CsvBuilder;
use rgwml::xgb_utils::XgbConfig;
use tokio::runtime::Runtime;

let rt = Runtime::new().unwrap();
rt.block_on(async {
    let mut builder = CsvBuilder::from_csv("/path/to/your/training_data.csv");
    
    let (builder, report) = builder.create_xgb_model(
        "feature1,feature2",  // 特征列
        "target_column",     // 目标列
        "PREDICTION",       // 预测列名
        "/model/directory", // 模型保存目录
        "my_model",         // 模型名称
        XgbConfig::default() // 使用默认配置
    ).await;
});

聚类分析

use rgwml::csv_utils::CsvBuilder;
use rgwml::clustering_utils::ClusteringConfig;
use tokio::runtime::Runtime;

let rt = Runtime::new().unwrap();
rt.block_on(async {
    let mut builder = CsvBuilder::from_csv("/path/to/your/data.csv");
    
    let clustering_config = ClusteringConfig {
        operation: "KMEANS".to_string(),
        optimal_n_cluster_finding_method: "ELBOW".to_string(),
        ..Default::default()
    };
    
    builder.append_clustering_column(
        "feature1,feature2",  // 特征列
        "CLUSTER",           // 聚类结果列名
        clustering_config
    ).await;
});

数据库操作示例

查询MySQL数据库

use rgwml::csv_utils::CsvBuilder;
use tokio::runtime::Runtime;

let rt = Runtime::new().unwrap();
rt.block_on(async {
    let result = CsvBuilder::from_mysql_query(
        "username", 
        "password", 
        "server", 
        "database", 
        "SELECT * FROM your_table"
    ).await;
});

查询Google BigQuery

use rgwml::csv_utils::CsvBuilder;
use tokio::runtime::Runtime;

let rt = Runtime::new().unwrap();
rt.block_on(async {
    let result = CsvBuilder::from_google_big_query_query(
        "path/to/credentials.json",
        "SELECT * FROM your_table"
    ).await;
});

完整示例:端到端机器学习工作流

use rgwml::csv_utils::{CsvBuilder, Exp, ExpVal};
use rgwml::xgb_utils::XgbConfig;
use rgwml::clustering_utils::ClusteringConfig;
use tokio::runtime::Runtime;

#[tokio::main]
async fn main() {
    // 1. 从数据库加载数据
    let mut builder = CsvBuilder::from_mysql_query(
        "username",
        "password",
        "server",
        "database",
        "SELECT * FROM customer_data"
    ).await.expect("Failed to load data from MySQL");
    
    // 2. 数据清洗和预处理
    builder
        .drop_columns(vec!["id", "unused_column"])
        .replace_all(vec!["*"], vec![("NA", "0"), ("null", "0")])
        .clean_or_test_clean_by_eliminating_rows_subject_to_column_parse_rules(
            "age:IS_NUMERICAL_VALUE;income:IS_POSITIVE_NUMERICAL_VALUE"
        );
    
    // 3. 特征工程
    builder
        .append_derived_category_column(
            "INCOME_GROUP",
            vec![
                ("LOW", vec![
                    ("Exp1", Exp {
                        column: "income".to_string(),
                        operator: "<".to_string(),
                        compare_with: ExpVal::STR("30000".to_string()),
                        compare_as: "NUMBERS".to_string()
                    }),
                ], "Exp1"),
                ("MEDIUM", vec![
                    ("Exp1", Exp {
                        column: "income".to_string(),
                        operator: ">=".to_string(),
                        compare_with: ExpVal::STR("30000".to_string()),
                        compare_as: "NUMBERS".to_string()
                    }),
                    ("Exp2", Exp {
                        column: "income".to_string(),
                        operator: "<".to_string(),
                        compare_with: ExpVal::STR("80000".to_string()),
                        compare_as: "NUMBERS".to_string()
                    }),
                ], "Exp1 && Exp2"),
                ("HIGH", vec![
                    ("Exp1", Exp {
                        column: "income".to_string(),
                        operator: ">=".to_string(),
                        compare_with: ExpVal::STR("80000".to_string()),
                        compare_as: "NUMBERS".to_string()
                    }),
                ], "Exp1")
            ]
        );
    
    // 4. 训练XGBoost模型
    let (builder, _) = builder.create_xgb_model(
        "age,income,spending_score",
        "churn",
        "CHURN_PREDICTION",
        "/models",
        "churn_model",
        XgbConfig::default()
    ).await;
    
    // 5. 聚类分析
    builder.append_clustering_column(
        "age,income,spending_score",
        "CLUSTER",
        ClusteringConfig {
            operation: "KMEANS".to_string(),
            optimal_n_cluster_finding_method: "ELBOW".to_string(),
            ..Default::default()
        }
    ).await;
    
    // 6. 保存结果
    builder.save_as("/output/final_results.csv");
    
    println!("Analysis completed successfully!");
}

总结

RGWML提供了一套全面的工具,用于:

  1. 数据获取和预处理(从CSV、数据库、Google Sheets等)
  2. 数据清洗和转换
  3. 特征工程
  4. 机器学习模型训练和预测
  5. 聚类分析
  6. 结果保存和可视化

通过链式调用和方法组合,可以构建复杂的数据处理流水线,同时保持代码的可读性和可维护性。


1 回复

Rust数据处理插件库rgwml的使用:高效机器学习与工作流自动化工具

简介

rgwml (Rust Generic Workflow Machine Learning) 是一个专注于数据处理、工作流自动化和机器学习的Rust库。它提供了一系列高效的工具和插件,帮助开发者快速构建数据处理管道和机器学习工作流。

主要特性

  • 高性能数据处理能力
  • 简洁的工作流定义方式
  • 内置常用机器学习算法
  • 可扩展的插件系统
  • 与Rust生态系统良好集成

安装方法

在Cargo.toml中添加依赖:

[dependencies]
rgwml = "0.1.0"  # 请使用最新版本

基本使用方法

1. 数据加载与处理

use rgwml::data_utils::{DataFrame, CsvLoader};

fn main() {
    // 从CSV加载数据
    let loader = CsvLoader::new("data.csv");
    let df: DataFrame = loader.load().expect("Failed to load data");
    
    // 查看数据
    println!("Loaded data with {} rows", df.row_count());
    
    // 数据预处理
    let processed_df = df.normalize().fill_na(0.0);
}

2. 构建机器学习工作流

use rgwml::ml_workflow::{Workflow, LinearRegression, TrainTestSplit};

fn main() {
    // 创建数据集
    let features = vec![vec![1.0, 2.0], vec![2.0, 3.0], vec![3.0, 4.0]];
    let targets = vec![3.0, 5.0, 7.0];
    
    // 分割训练集和测试集
    let (train_x, test_x, train_y, test_y) = TrainTestSplit::new(0.8).split(features, targets);
    
    // 创建工作流
    let mut workflow = Workflow::new();
    
    // 添加线性回归模型
    workflow.add_model("lr", LinearRegression::new());
    
    // 训练模型
    workflow.train("lr", &train_x, &train_y).unwrap();
    
    // 预测
    let predictions = workflow.predict("lr", &test_x).unwrap();
    println!("Predictions: {:?}", predictions);
}

3. 自动化工作流

use rgwml::workflow_automation::{AutomatedWorkflow, Task};

fn main() {
    // 创建自动化工作流
    let mut auto_workflow = AutomatedWorkflow::new("data_processing");
    
    // 添加任务
    auto_workflow.add_task(Task::new(
        "load_data",
        Box::new(|input| {
            // 模拟数据加载
            println!("Loading data...");
            Ok(vec![1, 2, 3, 4, 5])
        })
    ));
    
    auto_workflow.add_task(Task::new(
        "process_data",
        Box::new(|input| {
            // 模拟数据处理
            println!("Processing data...");
            let data: Vec<i32> = input.downcast_ref::<Vec<i32>>().unwrap().iter().map(|x| x * 2).collect();
            Ok(data)
        })
    ));
    
    // 执行工作流
    let result = auto_workflow.run();
    println!("Final result: {:?}", result);
}

高级功能

自定义插件

use rgwml::plugins::{Plugin, PluginResult};
use std::any::Any;

struct MyCustomPlugin;

impl Plugin for MyCustomPlugin {
    fn execute(&self, input: &dyn Any) -> PluginResult {
        // 处理输入数据
        if let Some(numbers) = input.downcast_ref::<Vec<f64>>() {
            let sum: f64 = numbers.iter().sum();
            Ok(Box::new(sum))
        } else {
            Err("Invalid input type".into())
        }
    }
}

fn main() {
    let plugin = MyCustomPlugin;
    let data = vec![1.0, 2.0, 3.0, 4.0];
    
    match plugin.execute(&data) {
        Ok(result) => {
            let sum = result.downcast_ref::<f64>().unwrap();
            println!("Sum: {}", sum);
        }
        Err(e) => println!("Error: {}", e),
    }
}

并行数据处理

use rgwml::parallel_processing::ParallelProcessor;

fn main() {
    let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    
    let processor = ParallelProcessor::new(4); // 使用4个线程
    
    let results = processor.process(data, |x| {
        // 模拟耗时操作
        std::thread::sleep(std::time::Duration::from_millis(100));
        x * 2
    });
    
    println!("Processed data: {:?}", results);
}

完整示例:端到端机器学习工作流

use rgwml::{
    data_utils::{DataFrame, CsvLoader},
    ml_workflow::{Workflow, LinearRegression, RandomForest, TrainTestSplit},
    feature_selection::VarianceThreshold,
};

fn main() {
    // 1. 数据加载
    let loader = CsvLoader::new("housing_data.csv");
    let mut df = loader.load().expect("Failed to load data");
    println!("原始数据: {}行 x {}列", df.row_count(), df.col_count());

    // 2. 数据预处理
    let processed_df = df
        .drop_na()                // 删除缺失值
        .normalize()              // 标准化数据
        .one_hot_encode(&["zipcode"])  // 对分类变量进行独热编码
        .shuffle();               // 打乱数据顺序

    // 3. 特征选择
    let selector = VarianceThreshold::new(0.1);  // 移除低方差特征
    let (selected_features, selected_targets) = selector.fit_transform(
        processed_df.features(),
        processed_df.target_column("price")
    );

    // 4. 划分训练集和测试集
    let (train_x, test_x, train_y, test_y) = TrainTestSplit::new(0.75).split(
        selected_features,
        selected_targets
    );

    // 5. 创建工作流并添加模型
    let mut workflow = Workflow::new();
    workflow
        .add_model("linear_reg", LinearRegression::new())
        .add_model("random_forest", RandomForest::new().with_n_estimators(100));

    // 6. 训练模型
    workflow.train("linear_reg", &train_x, &train_y).unwrap();
    workflow.train("random_forest", &train_x, &train_y).unwrap();

    // 7. 评估模型
    let lr_predictions = workflow.predict("linear_reg", &test_x).unwrap();
    let rf_predictions = workflow.predict("random_forest", &test_x).unwrap();

    // 8. 输出结果
    println!("线性回归预测: {:?}", &lr_predictions[..5]);
    println!("随机森林预测: {:?}", &rf_predictions[..5]);
    
    // 9. 保存工作流
    workflow.save("housing_model.workflow").unwrap();
}

性能提示

  1. 对于大型数据集,使用LazyDataFrame进行惰性求值
  2. 利用parallel_processing模块进行并行计算
  3. 使用feature_selection模块减少不必要的数据处理
  4. 对于重复性工作流,考虑序列化保存模型和工作流状态

总结

rgwml为Rust开发者提供了一个强大而灵活的工具集,用于构建高效的数据处理和机器学习工作流。通过其模块化设计和插件系统,可以轻松扩展功能以满足特定需求。

回到顶部