Rust时序数据库插件库influxdb的使用,高效处理时间序列数据与监控指标存储
Rust时序数据库插件库influxdb的使用,高效处理时间序列数据与监控指标存储
当前支持的功能
- 读写InfluxDB
- 可选的Serde反序列化支持
- 在一个请求中运行多个查询
- 在一个请求中写入单个或多个测量值
- 认证和非认证连接
async
/await
支持#[derive(InfluxDbWriteable)]
派生宏用于结构体的读写GROUP BY
支持- Tokio和async-std支持
- 可交换的HTTP后端
快速开始
在Cargo.toml
中添加以下内容:
influxdb = { version = "0.7.2", features = ["derive"] }
以下是使用示例代码:
use chrono::{DateTime, Utc};
use influxdb::{Client, Error, InfluxDbWriteable, ReadQuery, Timestamp};
#[tokio::main]
// 或者 #[async_std::main] 如果你更喜欢
async fn main() -> Result<(), Error> {
// 连接到本地8086端口上的test数据库
let client = Client::new("http://localhost:8086", "test");
#[derive(InfluxDbWriteable)]
struct WeatherReading {
time: DateTime<Utc>,
humidity: i32,
#[influxdb(tag)]
wind_direction: String,
}
// 将数据写入名为weather的测量值
let weather_readings = vec![
WeatherReading {
time: Timestamp::Hours(1).into(),
humidity: 30,
wind_direction: String::from("north"),
}
.into_query("weather"),
WeatherReading {
time: Timestamp::Hours(2).into(),
humidity: 40,
wind_direction: String::from("west"),
}
.into_query("weather"),
];
client.query(weather_readings).await?;
// 查询我们写入的数据
let read_query = ReadQuery::new("SELECT * FROM weather");
let read_result = client.query(read_query).await?;
println!("{}", read_result);
Ok(())
}
HTTP后端选择
你可以选择不同的HTTP后端来与InfluxDB通信:
- hyper (通过reqwest,默认使用),使用rustls:
influxdb = { version = "0.7.2", features = ["derive"] }
- hyper (通过reqwest),使用原生TLS (OpenSSL):
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "reqwest-client"] }
- hyper (通过surf),需要tokio 0.2兼容:
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "hyper-client"] }
- curl,使用libcurl:
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "curl-client"] }
- async-h1 使用原生TLS (OpenSSL):
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "h1-client"] }
- async-h1 使用rustls:
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "h1-client-rustls"] }
- WebAssembly的
window.fetch
,通过web-sys
和wasm-bindgen:
influxdb = { version = "0.7.2", default-features = false, features = ["derive", "use-serde", "wasm-client"] }
完整示例
以下是一个更完整的示例,展示了如何创建InfluxDB客户端、写入数据和查询数据:
use chrono::{DateTime, Utc};
use influxdb::{Client, Error, InfluxDbWriteable, ReadQuery, Timestamp};
#[tokio::main]
async fn main() -> Result<(), Error> {
// 创建InfluxDB客户端
let client = Client::new("http://localhost:8086", "test_db")
.with_auth("username", "password");
// 定义测量数据结构
#[derive(InfluxDbWriteable)]
struct ServerMetric {
time: DateTime<Utc>,
cpu_usage: f64,
memory_usage: f64,
#[influxdb(tag)]
hostname: String,
#[influxdb(tag)]
region: String,
}
// 创建要写入的数据
let metrics = vec![
ServerMetric {
time: Utc::now(),
cpu_usage: 23.5,
memory_usage: 45.2,
hostname: String::from("server1"),
region: String::from("us-west"),
}
.into_query("server_metrics"),
ServerMetric {
time: Utc::now(),
cpu_usage極 56.7,
memory_usage: 78.1,
hostname: String::from("server2"),
region: String::from("eu-central"),
}
.into_query("server_metrics"),
];
// 写入数据
client.query(metrics).await?;
// 查询数据
let query = ReadQuery::new("SELECT * FROM server_metrics WHERE time > now() - 1h");
let result = client.query(query).await?;
println!("查询结果: {}", result);
// 更复杂的查询示例
let stats_query = ReadQuery::new(
"SELECT MEAN(cpu_usage) AS avg_cpu, MAX(memory_usage) AS max_mem \
FROM server_metrics \
WHERE time > now() - 1h \
GROUP BY region"
);
let stats_result = client.query(stats_query).await?;
println!("统计结果: {}", stats_result);
Ok(())
}
许可证
MIT License
1 回复
Rust时序数据库插件库influxdb的使用:高效处理时间序列数据与监控指标存储
介绍
influxdb
是 Rust 中用于连接和操作 InfluxDB 时序数据库的库。InfluxDB 是专门为处理时间序列数据优化的数据库,非常适合存储监控指标、传感器数据、实时分析等场景。
该库提供了同步和异步两种 API,支持 InfluxDB 1.x 和 2.x 版本,能够高效地读写时间序列数据。
安装
在 Cargo.toml 中添加依赖:
[dependencies]
influxdb = { version = "0.6.0", features = ["derive"] }
tokio = { version = "1.0", features = ["full"] } # 如果使用异步API
基本使用方法
1. 同步API使用
use influxdb::{Client, Query, Timestamp};
use influxdb::InfluxDbWriteable;
use chrono::{DateTime, Utc};
#[derive(InfluxDbWriteable)]
struct TemperatureReading {
time: DateTime<Utc>,
temperature: f64,
location: String,
#[influxdb(tag)] sensor_id: String,
}
fn main() -> Result<(), influxdb::Error> {
// 创建客户端
let client = Client::new("http://localhost:8086", "mydb");
// 创建数据点
let reading = TemperatureReading {
time: Timestamp::Hours(1).into(),
temperature: 25.3,
location: "room1".to_string(),
sensor_id: "sensor1".to_string(),
};
// 写入数据
client.query(&reading.into_query("temperature"))?;
// 查询数据
let query = Query::raw_read_query("SELECT * FROM temperature WHERE time > now() - 1h");
let result = client.query(&query)?;
println!("Query result: {:?}", result);
Ok(())
}
2. 异步API使用
use influxdb::{Client, Query, Timestamp};
use influxdb::InfluxDbWriteable;
use chrono::{DateTime, Utc};
#[derive(InfluxDbWriteable)]
struct CpuUsage {
time: DateTime<Utc>,
usage: f64,
#[influxdb(tag)] host: String,
}
#[tokio::main]
async fn main() -> Result<(), influxdb::Error> {
// 创建异步客户端
let client = Client::new("http://localhost:8086", "monitoring");
// 创建并写入数据
let usage = CpuUsage {
time: Timestamp::Now.into(),
usage: 45.2,
host: "server1".to_string(),
};
client.query(&usage.into_query("cpu_usage").await?;
// 查询数据
let query = Query::raw_read_query(
"SELECT mean(usage) FROM cpu_usage WHERE time > now() - 30m GROUP BY host"
);
let result = client.query(&query).await?;
println!("Average CPU usage: {:?}", result);
Ok(())
}
高级特性
1. 批量写入
use influxdb::{Client, Query, Timestamp};
use influxdb::InfluxDbWriteable;
use chrono::{DateTime, Utc};
#[derive(InfluxDbWriteable)]
struct NetworkTraffic {
time: DateTime<Utc>,
bytes_in: u64,
bytes_out: u64,
#[influxdb(tag)] interface: String,
}
#[tokio::main]
async fn main() -> Result<(), influxdb::Error> {
let client = Client::new("http://localhost:8086", "network_stats");
let mut points = Vec::new();
let now = Utc::now();
// 生成多个数据点
for i in 0..10 {
let traffic = NetworkTraffic {
time: now - chrono::Duration::seconds(i * 5),
bytes_in: 1000 * (i + 1) as u64,
bytes_out: 500 * (i + 1) as u64,
interface: "eth0".to_string(),
};
points.push(traffic.into_query("traffic"));
}
// 批量写入
client.query(&Query::write_query(points)).await?;
Ok(())
}
2. 使用InfluxDB 2.x
use influxdb2::Client;
use influxdb2::models::WriteDataPoint;
use influxdb2::models::DataPoint;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建InfluxDB 2.x客户端
let client = Client::new(
"http://localhost:8086",
"my-org",
"my-token",
);
// 创建数据点
let point = DataPoint::builder("memory")
.tag("host", "server1")
.field("used", 85.3)
.field("total", 128.0)
.build()?;
// 写入数据
client.write("my-bucket", point).await?;
// 查询数据
let query = r#"
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory")
"#;
let result = client.query(query).await?;
println!("Query result: {:?}", result);
Ok(())
}
最佳实践
- 批量写入:尽量批量写入数据点,减少网络请求次数
- 合理使用标签:将高频查询的维度设置为标签(tag),数值数据设置为字段(field)
- 时间戳精度:根据需求选择合适的时间戳精度(纳秒/微秒/毫秒/秒)
- 连接池:在生产环境中使用连接池管理数据库连接
- 错误处理:正确处理网络错误和数据库错误
性能优化
use influxdb::{Client, Query};
use influxdb::InfluxDbWriteable;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(InfluxDbWriteable)]
struct Metric {
time: SystemTime, // 使用SystemTime减少转换开销
value: f64,
#[influxdb(tag)] name: String,
}
fn create_client() -> Client {
// 配置超时和连接池
Client::new("http://localhost:8086", "metrics")
.with_auth("username", "password")
.with_read_timeout(std::time::Duration::from_secs(5))
.with_write_timeout(std::time::Duration::from_secs(5))
}
#[tokio::main]
async fn main() -> Result<(), influxdb::Error> {
let client = create_client();
// 批量写入1000个数据点
let mut points = Vec::with_capacity(1000);
let now = SystemTime::now();
for i in 0..1000 {
let metric = Metric {
time: now - std::time::Duration::from_secs(i as u64),
value: (i as f64).sin(),
name: "sine_wave".to_string(),
};
points.push(metric.into_query("waveform"));
}
client.query(&Query::write_query(points)).await?;
Ok(())
}
完整示例
以下是一个完整的监控系统指标收集和存储的示例,结合了同步API和异步API的最佳实践:
use influxdb::{Client, Query, Timestamp};
use influxdb::InfluxDbWriteable;
use chrono::{DateTime, Utc};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{sleep, Duration};
// 定义监控指标数据结构
#[derive(InfluxDbWriteable)]
struct SystemMetric {
time: DateTime<Utc>,
cpu_usage: f64,
memory_usage: f64,
disk_used: f64,
#[influxdb(tag)] hostname: String,
#[influxdb(tag)] service: String,
}
// 模拟获取系统指标的函数
fn get_system_metrics(hostname: &str, service: &str) -> SystemMetric {
SystemMetric {
time: Timestamp::Now.into(),
cpu_usage: rand::random::<f64>() * 100.0,
memory_usage: rand::random::<f64>() * 100.0,
disk_used: rand::random::<f64>() * 100.0,
hostname: hostname.to_string(),
service: service.to_string(),
}
}
// 同步写入函数
fn sync_write_metrics(client: &Client, metrics: Vec<SystemMetric>) -> Result<(), influxdb::Error> {
let mut points = Vec::new();
for metric in metrics {
points.push(metric.into_query("system_metrics"));
}
client.query(&Query::write_query(points))
}
// 异步监控任务
async fn async_monitoring_task(client: Client, interval_secs: u64) {
loop {
let metrics = vec![
get_system_metrics("server1", "web_service"),
get_system_metrics("server1", "database"),
get_system_metrics("server2", "web_service"),
];
if let Err(e) = client.query(&Query::write_query(
metrics.into_iter().map(|m| m.into_query("system_metrics")).collect()
)).await {
eprintln!("Failed to write metrics: {}", e);
}
sleep(Duration::from_secs(interval_secs)).await;
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建同步客户端
let sync_client = Client::new("http://localhost:8086", "monitoring_db")
.with_auth("admin", "password");
// 初始写入一些历史数据
let mut historical_data = Vec::new();
let now = SystemTime::now();
for i in 0..24 {
let metric = SystemMetric {
time: DateTime::from(now - Duration::from_secs(i * 3600)),
cpu_usage: (i as f64).sin().abs() * 100.0,
memory_usage: 30.0 + (i as f64).cos().abs() * 50.0,
disk_used: 40.0 + (i as f64).sin().abs() * 30.0,
hostname: "server1".to_string(),
service: "web_service".to_string(),
};
historical_data.push(metric);
}
sync_write_metrics(&sync_client, historical_data)?;
// 启动异步监控任务
let async_client = Client::new("http://localhost:8086", "monitoring_db")
.with_auth("admin", "password");
tokio::runtime::Runtime::new()?.block_on(async {
tokio::spawn(async_monitoring_task(async_client, 60));
// 示例查询
let query = Query::raw_read_query(
"SELECT mean(cpu_usage) FROM system_metrics WHERE time > now() - 1h GROUP BY hostname,service"
);
loop {
sleep(Duration::from_secs(300)).await;
match sync_client.query(&query) {
Ok(result) => println!("Current metrics: {:?}", result),
Err(e) => eprintln!("Query failed: {}", e),
}
}
});
Ok(())
}
这个完整示例展示了:
- 定义监控指标数据结构
- 同步写入历史数据
- 异步持续收集和写入实时监控数据
- 定期查询聚合数据
- 错误处理和日志记录
通过这个示例,您可以构建一个完整的监控系统,高效地收集、存储和查询时间序列数据。