Rust时序数据库客户端库influx_db_client的使用:高效连接InfluxDB进行数据写入与查询

以下是根据您提供的内容整理的Rust influx_db_client库使用指南,包含完整的示例代码:

Rust时序数据库客户端库influx_db_client的使用:高效连接InfluxDB进行数据写入与查询

概述

这是一个用于Rust的InfluxDB驱动库,支持InfluxDB 1.0及以上版本。

使用

在Cargo.toml中添加依赖:

[dependencies]
influx_db_client = "^0.6.1"
tokio = { version = "1.0", features = ["full"] }  # 添加tokio依赖

HTTP示例

use influx_db_client::{
    Client, Point, Points, Value, Precision, point, points
};
use tokio;

fn main() {
    // 默认连接"http://127.0.0.1:8086",使用"test"数据库
    let client = Client::default().set_authentication("root", "root");

    let point = point!("test1")
        .add_field("foo", "bar")
        .add_field("integer", 11)
        .add_field("float", 22.3)
        .add_field("'boolean'", false);

    let point1 = Point::new("test1")
        .add_tag("tags", "\\\"fda")
        .add_tag("number", 12)
        .add_tag("float", 12.6)
        .add_field("fd", "'3'")
        .add_field("quto", "\\\"fda")
        .add_field("quto1", "\"fda");

    let points = points!(point1, point);

    tokio::runtime::Runtime::new().unwrap().block_on(async move {
        // 如果Precision为None,默认是秒级
        // 批量写入
        client.write_points(points, Some(Precision::Seconds), None).await.unwrap();

        // 查询,返回类型是Option<Vec<Node>>
        let res = client.query("select * from test1", None).await.unwrap();
        println!("{:?}", res.unwrap()[0].series)
    });
}

UDP示例

use influx_db_client::{UdpClient, Point, Value, point};

fn main() {
    let mut udp = UdpClient::new("127.0.0.1:8089");
    udp.add_host("127.0.0.1:8090");

    let point = point!("test").add_field("foo", Value::String(String::from("bar")));

    udp.write_point(point).unwrap();
}

完整示例代码

use influx_db_client::{Client, Point, Points, Precision, point, points};
use tokio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建客户端并设置认证
    let client = Client::new("http://localhost:8086", "test_db")
        .set_authentication("admin", "password");

    // 创建数据点1
    let point1 = point!("measurement1")
        .add_tag("location", "server_room")
        .add_tag("device", "sensor1")
        .add_field("temperature", 23.5)
        .add_field("humidity", 45.2)
        .add_field("status", "normal");

    // 创建数据点2
    let point2 = point!("measurement2")
        .add_tag("location", "outdoor")
        .add_tag("device", "sensor2")
        .add_field("temperature", 18.7)
        .add_field("humidity", 67.8)
        .add_field("status", "warning");

    // 将多个数据点组合成批处理
    let points = points!(point1, point2);

    // 写入数据到InfluxDB,使用纳秒级精度
    client.write_points(points, Some(Precision::Nanoseconds), None)
        .await?;

    // 查询数据
    let query_result = client.query("SELECT * FROM measurement1", None).await?;
    
    if let Some(nodes) = query_result {
        for node in nodes {
            println!("Measurement: {}", node.name);
            for series in node.series {
                println!("Tags: {:?}", series.tags);
                println!("Columns: {:?}", series.columns);
                println!("Values:");
                for value_set in series.values {
                    println!("  {:?}", value_set);
                }
            }
        }
    }

    Ok(())
}

兼容性

该库已测试过InfluxDB 1.0.2/1.3.5/1.5版本。


1 回复

Rust时序数据库客户端库influx_db_client的使用指南

简介

influx_db_client是一个Rust语言的InfluxDB客户端库,用于高效连接InfluxDB时序数据库,执行数据写入和查询操作。它提供了简洁的API来与InfluxDB交互,支持InfluxDB 1.x版本。

安装

在Cargo.toml中添加依赖:

[dependencies]
influx_db_client = "0.6.0"

基本使用方法

1. 创建客户端连接

use influx_db_client::{Client, Point, Precision, Value};

// 创建客户端
let client = Client::new("http://localhost:8086", "test_db")
    .set_authentication("username", "password");

2. 写入数据

单个数据点写入:

let point = Point::new("cpu_load")
    .add_tag("host", "server01")
    .add_tag("region", "us-west")
    .add_field("value", Value::Float(0.64))
    .add_field("active", Value::Boolean(true))
    .to_owned();

client.write_point(point, Precision::Seconds, None).unwrap();

批量写入多个数据点:

let mut points = vec![];

for i in 0..5 {
    let point = Point::new("memory_usage")
        .add_tag("host", format!("server{}", i))
        .add_field("used", Value::Integer(i * 10))
        .add_field("free", Value::Integer(100 - i * 10))
        .to_owned();
    points.push(point);
}

client.write_points(points, Precision::Seconds, None).unwrap();

3. 查询数据

简单查询:

let query = "SELECT * FROM cpu_load WHERE host = 'server01'";
let results = client.query(query, None).unwrap();

for series in results {
    println!("Measurement: {}", series.name);
    for row in series.values {
        println!("Time: {}, Value: {}", row[0], row[1]);
    }
}

带参数的查询:

let query = "SELECT * FROM memory_usage WHERE host = $host";
let params = vec![("host", "server1")];
let results = client.query_with_params(query, params, None).unwrap();

高级功能

1. 创建/删除数据库

// 创建数据库
client.create_database("new_db").unwrap();

// 删除数据库
client.drop_database("old_db").unwrap();

2. 设置保留策略

client.create_retention_policy(
    "one_week_only", 
    "test_db", 
    "7d", 
    1, 
    false
).unwrap();

3. 分块查询(处理大量数据)

let query = "SELECT * FROM large_measurement";
let chunk_size = 1000;

let mut results = client.query_chunked(query, chunk_size, None);

while let Some(chunk) = results.next().unwrap() {
    // 处理每个数据块
    for series in chunk {
        println!("Got {} points in series {}", series.values.len(), series.name);
    }
}

错误处理

match client.query("SELECT * FROM non_existent", None) {
    Ok(results) => {
        // 处理结果
    },
    Err(e) => {
        eprintln!("查询出错: {}", e);
        // 可能的错误处理逻辑
    }
}

性能优化建议

  1. 对于批量写入,尽量一次性发送多个数据点而不是单点写入
  2. 考虑使用连接池管理客户端连接
  3. 对于高频写入场景,可以实现缓冲机制定期批量写入

完整示例

use influx_db_client::{Client, Point, Precision, Value};

fn main() {
    // 1. 创建客户端
    let client = Client::new("http://localhost:8086", "metrics")
        .set_authentication("admin", "password123");
    
    // 2. 写入CPU指标数据
    let cpu_point = Point::new("cpu_usage")
        .add_tag("host", "web-server-1")
        .add_tag("region", "eu-central")
        .add_field("user", Value::Float(23.4))
        .add_field("system", Value::Float(12.1))
        .add_field("idle", Value::Float(64.5))
        .to_owned();
    
    client.write_point(cpu_point, Precision::Milliseconds, None).unwrap();
    
    // 3. 查询最近5分钟的CPU数据
    let query = "SELECT mean(\"user\") FROM cpu_usage WHERE time > now() - 5m GROUP BY time(1m)";
    let results = client.query(query, None).unwrap();
    
    for series in results {
        println!("--- {} ---", series.name);
        for row in series.values {
            println!("Time: {}, Avg CPU Usage: {}%", row[0], row[1]);
        }
    }
}

这个库为Rust开发者提供了与InfluxDB交互的便捷方式,特别适合需要高效处理时序数据的应用场景。

回到顶部