Rust时序数据库插件库influxdb的使用,高效处理时间序列数据与监控指标存储

Rust时序数据库插件库influxdb的使用,高效处理时间序列数据与监控指标存储

rust-influxdb

当前支持的功能

  • 读写InfluxDB
  • 可选的Serde反序列化支持
  • 在一个请求中运行多个查询
  • 在一个请求中写入单个或多个测量值
  • 认证和非认证连接
  • async/await支持
  • #[derive(InfluxDbWriteable)] 派生宏用于结构体的读写
  • GROUP BY支持
  • Tokio和async-std支持
  • 可交换的HTTP后端

快速开始

Cargo.toml中添加以下内容:

influxdb = { version = "0.7.2", features = ["derive"] }

以下是使用示例代码:

use chrono::{DateTime, Utc};
use influxdb::{Client, Error, InfluxDbWriteable, ReadQuery, Timestamp};

#[tokio::main]
// 或者 #[async_std::main] 如果你更喜欢
async fn main() -> Result<(), Error> {
    // 连接到本地8086端口上的test数据库
    let client = Client::new("http://localhost:8086", "test");

    #[derive(InfluxDbWriteable)]
    struct WeatherReading {
        time: DateTime<Utc>,
        humidity: i32,
        #[influxdb(tag)]
        wind_direction: String,
    }

    // 将数据写入名为weather的测量值
    let weather_readings = vec![
        WeatherReading {
            time: Timestamp::Hours(1).into(),
            humidity: 30,
            wind_direction: String::from("north"),
        }
        .into_query("weather"),
        WeatherReading {
            time: Timestamp::Hours(2).into(),
            humidity: 40,
            wind_direction: String::from("west"),
        }
        .into_query("weather"),
    ];

    client.query(weather_readings).await?;

    // 查询我们写入的数据
    let read_query = ReadQuery::new("SELECT * FROM weather");

    let read_result = client.query(read_query).await?;
    println!("{}", read_result);
    Ok(())
}

HTTP后端选择

你可以选择不同的HTTP后端来与InfluxDB通信:

  1. hyper (通过reqwest,默认使用),使用rustls:
influxdb = { version = "0.7.2", features = ["derive"] }
  1. hyper (通过reqwest),使用原生TLS (OpenSSL):
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "reqwest-client"] }
  1. hyper (通过surf),需要tokio 0.2兼容:
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "hyper-client"] }
  1. curl,使用libcurl:
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "curl-client"] }
  1. async-h1 使用原生TLS (OpenSSL):
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "h1-client"] }
  1. async-h1 使用rustls:
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "h1-client-rustls"] }
  1. WebAssembly的window.fetch,通过web-sys和wasm-bindgen:
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "wasm-client"] }

完整示例

以下是一个更完整的示例,展示了如何创建InfluxDB客户端、写入数据和查询数据:

use chrono::{DateTime, Utc};
use influxdb::{Client, Error, InfluxDbWriteable, ReadQuery, Timestamp};

#[tokio::main]
async fn main() -> Result<(), Error> {
    // 创建InfluxDB客户端
    let client = Client::new("http://localhost:8086", "test_db")
        .with_auth("username", "password");

    // 定义测量数据结构
    #[derive(InfluxDbWriteable)]
    struct ServerMetric {
        time: DateTime<Utc>,
        cpu_usage: f64,
        memory_usage: f64,
        #[influxdb(tag)]
        hostname: String,
        #[influxdb(tag)]
        region: String,
    }

    // 创建要写入的数据
    let metrics = vec![
        ServerMetric {
            time: Utc::now(),
            cpu_usage: 23.5,
            memory_usage: 45.2,
            hostname: String::from("server1"),
            region: String::from("us-west"),
        }
        .into_query("server_metrics"),
        ServerMetric {
            time: Utc::now(),
            cpu_usage極 56.7,
            memory_usage: 78.1,
            hostname: String::from("server2"),
            region: String::from("eu-central"),
        }
        .into_query("server_metrics"),
    ];

    // 写入数据
    client.query(metrics).await?;

    // 查询数据
    let query = ReadQuery::new("SELECT * FROM server_metrics WHERE time > now() - 1h");
    let result = client.query(query).await?;
    
    println!("查询结果: {}", result);

    // 更复杂的查询示例
    let stats_query = ReadQuery::new(
        "SELECT MEAN(cpu_usage) AS avg_cpu, MAX(memory_usage) AS max_mem \
         FROM server_metrics \
         WHERE time > now() - 1h \
         GROUP BY region"
    );
    
    let stats_result = client.query(stats_query).await?;
    println!("统计结果: {}", stats_result);

    Ok(())
}

许可证

MIT License


1 回复

Rust时序数据库插件库influxdb的使用:高效处理时间序列数据与监控指标存储

介绍

influxdb 是 Rust 中用于连接和操作 InfluxDB 时序数据库的库。InfluxDB 是专门为处理时间序列数据优化的数据库,非常适合存储监控指标、传感器数据、实时分析等场景。

该库提供了同步和异步两种 API,支持 InfluxDB 1.x 和 2.x 版本,能够高效地读写时间序列数据。

安装

在 Cargo.toml 中添加依赖:

[dependencies]
influxdb = { version = "0.6.0", features = ["derive"] }
tokio = { version = "1.0", features = ["full"] } # 如果使用异步API

基本使用方法

1. 同步API使用

use influxdb::{Client, Query, Timestamp};
use influxdb::InfluxDbWriteable;
use chrono::{DateTime, Utc};

#[derive(InfluxDbWriteable)]
struct TemperatureReading {
    time: DateTime<Utc>,
    temperature: f64,
    location: String,
    #[influxdb(tag)] sensor_id: String,
}

fn main() -> Result<(), influxdb::Error> {
    // 创建客户端
    let client = Client::new("http://localhost:8086", "mydb");
    
    // 创建数据点
    let reading = TemperatureReading {
        time: Timestamp::Hours(1).into(),
        temperature: 25.3,
        location: "room1".to_string(),
        sensor_id: "sensor1".to_string(),
    };
    
    // 写入数据
    client.query(&reading.into_query("temperature"))?;
    
    // 查询数据
    let query = Query::raw_read_query("SELECT * FROM temperature WHERE time > now() - 1h");
    let result = client.query(&query)?;
    
    println!("Query result: {:?}", result);
    Ok(())
}

2. 异步API使用

use influxdb::{Client, Query, Timestamp};
use influxdb::InfluxDbWriteable;
use chrono::{DateTime, Utc};

#[derive(InfluxDbWriteable)]
struct CpuUsage {
    time: DateTime<Utc>,
    usage: f64,
    #[influxdb(tag)] host: String,
}

#[tokio::main]
async fn main() -> Result<(), influxdb::Error> {
    // 创建异步客户端
    let client = Client::new("http://localhost:8086", "monitoring");
    
    // 创建并写入数据
    let usage = CpuUsage {
        time: Timestamp::Now.into(),
        usage: 45.2,
        host: "server1".to_string(),
    };
    
    client.query(&usage.into_query("cpu_usage").await?;
    
    // 查询数据
    let query = Query::raw_read_query(
        "SELECT mean(usage) FROM cpu_usage WHERE time > now() - 30m GROUP BY host"
    );
    let result = client.query(&query).await?;
    
    println!("Average CPU usage: {:?}", result);
    Ok(())
}

高级特性

1. 批量写入

use influxdb::{Client, Query, Timestamp};
use influxdb::InfluxDbWriteable;
use chrono::{DateTime, Utc};

#[derive(InfluxDbWriteable)]
struct NetworkTraffic {
    time: DateTime<Utc>,
    bytes_in: u64,
    bytes_out: u64,
    #[influxdb(tag)] interface: String,
}

#[tokio::main]
async fn main() -> Result<(), influxdb::Error> {
    let client = Client::new("http://localhost:8086", "network_stats");
    
    let mut points = Vec::new();
    let now = Utc::now();
    
    // 生成多个数据点
    for i in 0..10 {
        let traffic = NetworkTraffic {
            time: now - chrono::Duration::seconds(i * 5),
            bytes_in: 1000 * (i + 1) as u64,
            bytes_out: 500 * (i + 1) as u64,
            interface: "eth0".to_string(),
        };
        points.push(traffic.into_query("traffic"));
    }
    
    // 批量写入
    client.query(&Query::write_query(points)).await?;
    Ok(())
}

2. 使用InfluxDB 2.x

use influxdb2::Client;
use influxdb2::models::WriteDataPoint;
use influxdb2::models::DataPoint;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建InfluxDB 2.x客户端
    let client = Client::new(
        "http://localhost:8086",
        "my-org",
        "my-token",
    );
    
    // 创建数据点
    let point = DataPoint::builder("memory")
        .tag("host", "server1")
        .field("used", 85.3)
        .field("total", 128.0)
        .build()?;
    
    // 写入数据
    client.write("my-bucket", point).await?;
    
    // 查询数据
    let query = r#"
        from(bucket: "my-bucket")
        |> range(start: -1h)
        |> filter(fn: (r) => r._measurement == "memory")
    "#;
    
    let result = client.query(query).await?;
    println!("Query result: {:?}", result);
    
    Ok(())
}

最佳实践

  1. 批量写入:尽量批量写入数据点,减少网络请求次数
  2. 合理使用标签:将高频查询的维度设置为标签(tag),数值数据设置为字段(field)
  3. 时间戳精度:根据需求选择合适的时间戳精度(纳秒/微秒/毫秒/秒)
  4. 连接池:在生产环境中使用连接池管理数据库连接
  5. 错误处理:正确处理网络错误和数据库错误

性能优化

use influxdb::{Client, Query};
use influxdb::InfluxDbWriteable;
use std::time::{SystemTime, UNIX_EPOCH};

#[derive(InfluxDbWriteable)]
struct Metric {
    time: SystemTime,  // 使用SystemTime减少转换开销
    value: f64,
    #[influxdb(tag)] name: String,
}

fn create_client() -> Client {
    // 配置超时和连接池
    Client::new("http://localhost:8086", "metrics")
        .with_auth("username", "password")
        .with_read_timeout(std::time::Duration::from_secs(5))
        .with_write_timeout(std::time::Duration::from_secs(5))
}

#[tokio::main]
async fn main() -> Result<(), influxdb::Error> {
    let client = create_client();
    
    // 批量写入1000个数据点
    let mut points = Vec::with_capacity(1000);
    let now = SystemTime::now();
    
    for i in 0..1000 {
        let metric = Metric {
            time: now - std::time::Duration::from_secs(i as u64),
            value: (i as f64).sin(),
            name: "sine_wave".to_string(),
        };
        points.push(metric.into_query("waveform"));
    }
    
    client.query(&Query::write_query(points)).await?;
    Ok(())
}

完整示例

以下是一个完整的监控系统指标收集和存储的示例,结合了同步API和异步API的最佳实践:

use influxdb::{Client, Query, Timestamp};
use influxdb::InfluxDbWriteable;
use chrono::{DateTime, Utc};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{sleep, Duration};

// 定义监控指标数据结构
#[derive(InfluxDbWriteable)]
struct SystemMetric {
    time: DateTime<Utc>,
    cpu_usage: f64,
    memory_usage: f64,
    disk_used: f64,
    #[influxdb(tag)] hostname: String,
    #[influxdb(tag)] service: String,
}

// 模拟获取系统指标的函数
fn get_system_metrics(hostname: &str, service: &str) -> SystemMetric {
    SystemMetric {
        time: Timestamp::Now.into(),
        cpu_usage: rand::random::<f64>() * 100.0,
        memory_usage: rand::random::<f64>() * 100.0,
        disk_used: rand::random::<f64>() * 100.0,
        hostname: hostname.to_string(),
        service: service.to_string(),
    }
}

// 同步写入函数
fn sync_write_metrics(client: &Client, metrics: Vec<SystemMetric>) -> Result<(), influxdb::Error> {
    let mut points = Vec::new();
    for metric in metrics {
        points.push(metric.into_query("system_metrics"));
    }
    client.query(&Query::write_query(points))
}

// 异步监控任务
async fn async_monitoring_task(client: Client, interval_secs: u64) {
    loop {
        let metrics = vec![
            get_system_metrics("server1", "web_service"),
            get_system_metrics("server1", "database"),
            get_system_metrics("server2", "web_service"),
        ];
        
        if let Err(e) = client.query(&Query::write_query(
            metrics.into_iter().map(|m| m.into_query("system_metrics")).collect()
        )).await {
            eprintln!("Failed to write metrics: {}", e);
        }
        
        sleep(Duration::from_secs(interval_secs)).await;
    }
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建同步客户端
    let sync_client = Client::new("http://localhost:8086", "monitoring_db")
        .with_auth("admin", "password");
    
    // 初始写入一些历史数据
    let mut historical_data = Vec::new();
    let now = SystemTime::now();
    
    for i in 0..24 {
        let metric = SystemMetric {
            time: DateTime::from(now - Duration::from_secs(i * 3600)),
            cpu_usage: (i as f64).sin().abs() * 100.0,
            memory_usage: 30.0 + (i as f64).cos().abs() * 50.0,
            disk_used: 40.0 + (i as f64).sin().abs() * 30.0,
            hostname: "server1".to_string(),
            service: "web_service".to_string(),
        };
        historical_data.push(metric);
    }
    
    sync_write_metrics(&sync_client, historical_data)?;
    
    // 启动异步监控任务
    let async_client = Client::new("http://localhost:8086", "monitoring_db")
        .with_auth("admin", "password");
    
    tokio::runtime::Runtime::new()?.block_on(async {
        tokio::spawn(async_monitoring_task(async_client, 60));
        
        // 示例查询
        let query = Query::raw_read_query(
            "SELECT mean(cpu_usage) FROM system_metrics WHERE time > now() - 1h GROUP BY hostname,service"
        );
        
        loop {
            sleep(Duration::from_secs(300)).await;
            
            match sync_client.query(&query) {
                Ok(result) => println!("Current metrics: {:?}", result),
                Err(e) => eprintln!("Query failed: {}", e),
            }
        }
    });
    
    Ok(())
}

这个完整示例展示了:

  1. 定义监控指标数据结构
  2. 同步写入历史数据
  3. 异步持续收集和写入实时监控数据
  4. 定期查询聚合数据
  5. 错误处理和日志记录

通过这个示例,您可以构建一个完整的监控系统,高效地收集、存储和查询时间序列数据。

回到顶部