Rust时序数据库客户端库influx_db_client的使用:高效连接InfluxDB进行数据写入与查询
以下是根据您提供的内容整理的Rust influx_db_client库使用指南,包含完整的示例代码:
Rust时序数据库客户端库influx_db_client的使用:高效连接InfluxDB进行数据写入与查询
概述
这是一个用于Rust的InfluxDB驱动库,支持InfluxDB 1.0及以上版本。
使用
在Cargo.toml中添加依赖:
[dependencies]
influx_db_client = "^0.6.1"
tokio = { version = "1.0", features = ["full"] } # 添加tokio依赖
HTTP示例
use influx_db_client::{
Client, Point, Points, Value, Precision, point, points
};
use tokio;
fn main() {
// 默认连接"http://127.0.0.1:8086",使用"test"数据库
let client = Client::default().set_authentication("root", "root");
let point = point!("test1")
.add_field("foo", "bar")
.add_field("integer", 11)
.add_field("float", 22.3)
.add_field("'boolean'", false);
let point1 = Point::new("test1")
.add_tag("tags", "\\\"fda")
.add_tag("number", 12)
.add_tag("float", 12.6)
.add_field("fd", "'3'")
.add_field("quto", "\\\"fda")
.add_field("quto1", "\"fda");
let points = points!(point1, point);
tokio::runtime::Runtime::new().unwrap().block_on(async move {
// 如果Precision为None,默认是秒级
// 批量写入
client.write_points(points, Some(Precision::Seconds), None).await.unwrap();
// 查询,返回类型是Option<Vec<Node>>
let res = client.query("select * from test1", None).await.unwrap();
println!("{:?}", res.unwrap()[0].series)
});
}
UDP示例
use influx_db_client::{UdpClient, Point, Value, point};
fn main() {
let mut udp = UdpClient::new("127.0.0.1:8089");
udp.add_host("127.0.0.1:8090");
let point = point!("test").add_field("foo", Value::String(String::from("bar")));
udp.write_point(point).unwrap();
}
完整示例代码
use influx_db_client::{Client, Point, Points, Precision, point, points};
use tokio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建客户端并设置认证
let client = Client::new("http://localhost:8086", "test_db")
.set_authentication("admin", "password");
// 创建数据点1
let point1 = point!("measurement1")
.add_tag("location", "server_room")
.add_tag("device", "sensor1")
.add_field("temperature", 23.5)
.add_field("humidity", 45.2)
.add_field("status", "normal");
// 创建数据点2
let point2 = point!("measurement2")
.add_tag("location", "outdoor")
.add_tag("device", "sensor2")
.add_field("temperature", 18.7)
.add_field("humidity", 67.8)
.add_field("status", "warning");
// 将多个数据点组合成批处理
let points = points!(point1, point2);
// 写入数据到InfluxDB,使用纳秒级精度
client.write_points(points, Some(Precision::Nanoseconds), None)
.await?;
// 查询数据
let query_result = client.query("SELECT * FROM measurement1", None).await?;
if let Some(nodes) = query_result {
for node in nodes {
println!("Measurement: {}", node.name);
for series in node.series {
println!("Tags: {:?}", series.tags);
println!("Columns: {:?}", series.columns);
println!("Values:");
for value_set in series.values {
println!(" {:?}", value_set);
}
}
}
}
Ok(())
}
兼容性
该库已测试过InfluxDB 1.0.2/1.3.5/1.5版本。
1 回复
Rust时序数据库客户端库influx_db_client的使用指南
简介
influx_db_client
是一个Rust语言的InfluxDB客户端库,用于高效连接InfluxDB时序数据库,执行数据写入和查询操作。它提供了简洁的API来与InfluxDB交互,支持InfluxDB 1.x版本。
安装
在Cargo.toml中添加依赖:
[dependencies]
influx_db_client = "0.6.0"
基本使用方法
1. 创建客户端连接
use influx_db_client::{Client, Point, Precision, Value};
// 创建客户端
let client = Client::new("http://localhost:8086", "test_db")
.set_authentication("username", "password");
2. 写入数据
单个数据点写入:
let point = Point::new("cpu_load")
.add_tag("host", "server01")
.add_tag("region", "us-west")
.add_field("value", Value::Float(0.64))
.add_field("active", Value::Boolean(true))
.to_owned();
client.write_point(point, Precision::Seconds, None).unwrap();
批量写入多个数据点:
let mut points = vec![];
for i in 0..5 {
let point = Point::new("memory_usage")
.add_tag("host", format!("server{}", i))
.add_field("used", Value::Integer(i * 10))
.add_field("free", Value::Integer(100 - i * 10))
.to_owned();
points.push(point);
}
client.write_points(points, Precision::Seconds, None).unwrap();
3. 查询数据
简单查询:
let query = "SELECT * FROM cpu_load WHERE host = 'server01'";
let results = client.query(query, None).unwrap();
for series in results {
println!("Measurement: {}", series.name);
for row in series.values {
println!("Time: {}, Value: {}", row[0], row[1]);
}
}
带参数的查询:
let query = "SELECT * FROM memory_usage WHERE host = $host";
let params = vec![("host", "server1")];
let results = client.query_with_params(query, params, None).unwrap();
高级功能
1. 创建/删除数据库
// 创建数据库
client.create_database("new_db").unwrap();
// 删除数据库
client.drop_database("old_db").unwrap();
2. 设置保留策略
client.create_retention_policy(
"one_week_only",
"test_db",
"7d",
1,
false
).unwrap();
3. 分块查询(处理大量数据)
let query = "SELECT * FROM large_measurement";
let chunk_size = 1000;
let mut results = client.query_chunked(query, chunk_size, None);
while let Some(chunk) = results.next().unwrap() {
// 处理每个数据块
for series in chunk {
println!("Got {} points in series {}", series.values.len(), series.name);
}
}
错误处理
match client.query("SELECT * FROM non_existent", None) {
Ok(results) => {
// 处理结果
},
Err(e) => {
eprintln!("查询出错: {}", e);
// 可能的错误处理逻辑
}
}
性能优化建议
- 对于批量写入,尽量一次性发送多个数据点而不是单点写入
- 考虑使用连接池管理客户端连接
- 对于高频写入场景,可以实现缓冲机制定期批量写入
完整示例
use influx_db_client::{Client, Point, Precision, Value};
fn main() {
// 1. 创建客户端
let client = Client::new("http://localhost:8086", "metrics")
.set_authentication("admin", "password123");
// 2. 写入CPU指标数据
let cpu_point = Point::new("cpu_usage")
.add_tag("host", "web-server-1")
.add_tag("region", "eu-central")
.add_field("user", Value::Float(23.4))
.add_field("system", Value::Float(12.1))
.add_field("idle", Value::Float(64.5))
.to_owned();
client.write_point(cpu_point, Precision::Milliseconds, None).unwrap();
// 3. 查询最近5分钟的CPU数据
let query = "SELECT mean(\"user\") FROM cpu_usage WHERE time > now() - 5m GROUP BY time(1m)";
let results = client.query(query, None).unwrap();
for series in results {
println!("--- {} ---", series.name);
for row in series.values {
println!("Time: {}, Avg CPU Usage: {}%", row[0], row[1]);
}
}
}
这个库为Rust开发者提供了与InfluxDB交互的便捷方式,特别适合需要高效处理时序数据的应用场景。