Rust数据压缩库tsz的使用:高效时间序列压缩与存储解决方案

Rust数据压缩库tsz的使用:高效时间序列压缩与存储解决方案

TSZ是一个基于Facebook白皮书《Gorilla: A Fast, Scalable, In-Memory Time Series Database》的时间序列压缩库。它提供了将DataPoint流(由时间和值组成)压缩为字节,以及将字节流解压缩为DataPoint的功能。

示例

以下是使用tsz编码和解码DataPoint的简单示例:

extern crate tsz;

use std::vec::Vec;
use tsz::{DataPoint, Encode, Decode, StdEncoder, StdDecoder};
use tsz::stream::{BufferedReader, BufferedWriter};
use tsz::decode::Error;

const DATA: &'static str = "1482892270,1.76
1482892280,7.78
1482892288,7.95
1482892292,5.53
1482892310,4.41
1482892323,5.30
1482892334,5.30
1482892341,2.92
1482892350,0.73
1482892360,-1.33
1482892370,-1.78
1482892390,-12.45
1482892401,-34.76
1482892490,78.9
1482892500,335.67
1482892800,12908.12
";

fn main() {
    let w = BufferedWriter::new();

    // 1482892260是流的Unix时间戳起始时间
    let mut encoder = StdEncoder::new(1482892260, w);

    let mut actual_datapoints = Vec::new();

    for line in DATA.lines() {
        let substrings: Vec<&str> = line.split(",").collect();
        let t = substrings[0].parse::<u64>().unwrap();
        let v = substrings[1].parse::<f64>().unwrap();
        let dp = DataPoint::new(t, v);
        actual_datapoints.push(dp);
    }

    for dp in &actual_datapoints {
        encoder.encode(*dp);
    }

    let bytes = encoder.close();
    let r = BufferedReader::new(bytes);
    let mut decoder = StdDecoder::new(r);

    let mut expected_datapoints = Vec::new();

    let mut done = false;
    loop {
        if done {
            break;
        }

        match decoder.next() {
            Ok(dp) => expected_datapoints.push(dp),
            Err(err) => {
                if err == Error::EndOfStream {
                    done = true;
                } else {
                    panic!("Received an error from decoder: {:?}", err);
                }
            }
        };
    }

    println!("actual datapoints: {:?}", actual_datapoints);
    println!("expected datapoints: {:?}", expected_datapoints);
}

完整示例代码

// 添加依赖到Cargo.toml
// tsz = "0.1.4"

use tsz::{DataPoint, Encode, Decode, StdEncoder, StdDecoder};
use tsz::stream::{BufferedReader, BufferedWriter};
use tsz::decode::Error;
use std::time::{SystemTime, UNIX_EPOCH};

fn main() {
    // 创建缓冲写入器
    let w = BufferedWriter::new();
    
    // 获取当前时间作为起始时间戳
    let start_time = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_secs();
    
    // 创建编码器
    let mut encoder = StdEncoder::new(start_time, w);
    
    // 模拟一些数据点
    let mut datapoints = vec![];
    for i in 0..10 {
        let timestamp = start_time + i * 60; // 每分钟一个点
        let value = (i as f64).sin() * 100.0; // 正弦波值
        datapoints.push(DataPoint::new(timestamp, value));
    }
    
    // 编码所有数据点
    for dp in &datapoints {
        encoder.encode(*dp).unwrap();
    }
    
    // 获取压缩后的字节
    let compressed_bytes = encoder.close();
    println!("压缩后大小: {} bytes", compressed_bytes.len());
    
    // 创建解码器
    let r = BufferedReader::new(compressed_bytes);
    let mut decoder = StdDecoder::new(r);
    
    // 解码所有数据点
    let mut decoded_points = vec![];
    loop {
        match decoder.next() {
            Ok(dp) => decoded_points.push(dp),
            Err(Error::EndOfStream) => break,
            Err(e) => panic!("解码错误: {:?}", e),
        }
    }
    
    // 验证解码结果
    assert_eq!(datapoints.len(), decoded_points.len());
    for (original, decoded) in datapoints.iter().zip(decoded_points.iter()) {
        assert_eq!(original.timestamp(), decoded.timestamp());
        assert!((original.value() - decoded.value()).abs() < f64::EPSILON);
    }
    
    println!("压缩和解码成功!");
}

安装

将以下内容添加到你的Cargo.toml文件中:

[dependencies]
tsz = "0.1.4"

或者运行以下命令:

cargo add tsz

TSZ库采用MIT许可证,适用于需要高效存储和传输时间序列数据的应用场景,如监控系统、IoT设备数据收集等。


1 回复

Rust数据压缩库tsz的使用:高效时间序列压缩与存储解决方案

介绍

tsz是一个专门为时间序列数据设计的Rust压缩库,它实现了高效的压缩算法来减少时间序列数据的存储空间和传输带宽。该库特别适合物联网(IoT)、监控系统和金融数据等需要处理大量时间戳数据的应用场景。

tsz的核心特点:

  • 针对时间序列数据优化,比通用压缩算法(如gzip)更高效
  • 支持有损和无损压缩模式
  • 压缩率高,通常能达到90%以上的压缩率
  • 解压速度快,适合实时数据处理
  • 纯Rust实现,无外部依赖

安装方法

在Cargo.toml中添加依赖:

[dependencies]
tsz = "0.3"  # 请检查最新版本

基本使用方法

1. 创建压缩器并添加数据

use tsz::{Compressor, Decompressor};
use std::time::{SystemTime, UNIX_EPOCH};

fn main() {
    // 创建压缩器
    let mut compressor = Compressor::new();
    
    // 获取当前时间戳
    let now = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_secs() as u32;
    
    // 添加数据点 (时间戳, 值)
    compressor.compress(now, 23.5);
    compressor.compress(now + 1, 24.1);
    compressor.compress(now + 2, 25.3);
    
    // 获取压缩后的字节
    let compressed_bytes = compressor.finish();
    println!("压缩后大小: {} 字节", compressed_bytes.len());
}

2. 解压数据

fn decompress_data(compressed_bytes: Vec<u8>) {
    let mut decompressor = Decompressor::new(compressed_bytes);
    
    while let Some((timestamp, value)) = decompressor.next() {
        println!("时间戳: {}, 值: {}", timestamp, value);
    }
}

高级用法

有损压缩模式

use tsz::{Compressor, Precision};

fn lossy_compression() {
    // 创建有损压缩器,指定精度为0.01(小数点后2位)
    let mut compressor = Compressor::with_precision(Precision::Digits(2));
    
    compressor.compress(1625097600, 123.4567);  // 将被存储为123.46
    compressor.compress(1625097601, 789.0123);  // 将被存储为789.01
    
    let compressed = compressor.finish();
    // ... 解压时值将被近似为设置的精度
}

批量压缩数据

fn batch_compression() {
    let timestamps = vec![1625097600, 1625097601, 1625097602];
    let values = vec![10.5, 11.2, 12.8];
    
    let mut compressor = Compressor::new();
    
    for (ts, val) in timestamps.iter().zip(values.iter()) {
        compressor.compress(*ts, *val);
    }
    
    let compressed = compressor.finish();
    // ... 存储或传输压缩数据
}

流式处理

use tsz::stream::{Compressor as StreamCompressor, Decompressor as StreamDecompressor};

fn stream_processing() {
    // 流式压缩
    let mut stream_compressor = StreamCompressor::new();
    let chunk1 = stream_compressor.compress(1625097600, 15.3);
    let chunk2 = stream_compressor.compress(1625097601, 16.1);
    let final_chunk = stream_compressor.finish();
    
    // 流式解压
    let mut stream_decompressor = StreamDecompressor::new();
    stream_decompressor.feed(&chunk1);
    stream_decompressor.feed(&chunk2);
    stream_decompressor.feed(&final_chunk);
    
    while let Some((ts, val)) stream_decompressor.next() {
        println!("流数据 - 时间戳: {}, 值: {}", ts, val);
    }
}

性能建议

  1. 对于高频数据采集,建议批量压缩(如每100-1000个点压缩一次)而不是单个点压缩
  2. 根据应用场景选择适当的精度 - 更高的精度意味着更低的压缩率
  3. 考虑使用有损压缩模式,如果应用可以容忍小的精度损失
  4. 对于长期存储,可以结合通用压缩算法(如zstd)进一步压缩tsz的输出

完整示例代码

下面是一个完整的物联网传感器数据收集和压缩存储的示例:

use tsz::{Compressor, Decompressor};
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, BufReader, Read, Write};
use std::time::{SystemTime, UNIX_EPOCH};

// 传感器数据结构
#[derive(Debug)]
struct SensorReading {
    timestamp: u32,
    temperature: f64,
    pressure: f64,
}

impl SensorReading {
    fn new(temperature: f64, pressure: f64) -> Self {
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs() as u32;
        SensorReading {
            timestamp,
            temperature,
            pressure,
        }
    }
}

// 压缩并保存传感器数据
fn save_sensor_data(readings: Vec<SensorReading>, file_path: &str) -> std::io::Result<()> {
    // 创建压缩器
    let mut temp_compressor = Compressor::with_precision(Precision::Digits(2)); // 温度保留2位小数
    let mut press_compressor = Compressor::new(); // 压力使用无损压缩
    
    // 压缩数据
    for reading in readings {
        temp_compressor.compress(reading.timestamp, reading.temperature);
        press_compressor.compress(reading.timestamp, reading.pressure);
    }
    
    // 创建文件并写入压缩数据
    let file = OpenOptions::new()
        .create(true)
        .write(true)
        .truncate(true)
        .open(file_path)?;
    
    let mut writer = BufWriter::new(file);
    
    // 写入温度数据
    writer.write_all(&temp_compressor.finish())?;
    // 写入分隔符
    writer.write_all(b"---TSZ-SEPARATOR---")?;
    // 写入压力数据
    writer.write_all(&press_compressor.finish())?;
    
    Ok(())
}

// 从文件加载并解压传感器数据
fn load_sensor_data(file_path: &str) -> std::io::Result<()> {
    let mut file = File::open(file_path)?;
    let mut buffer = Vec::new();
    file.read_to_end(&mut buffer)?;
    
    // 分割温度数据和压力数据
    let parts: Vec<&[u8]> = buffer.split(|&b| b == b'-').collect();
    let temp_data = parts[0];
    let press_data = parts[3];
    
    // 解压温度数据
    println!("温度数据:");
    let mut temp_decompressor = Decompressor::new(temp_data.to_vec());
    while let Some((ts, temp)) = temp_decompressor.next() {
        println!("时间戳: {}, 温度: {:.2}°C", ts, temp);
    }
    
    // 解压压力数据
    println!("\n压力数据:");
    let mut press_decompressor = Decompressor::new(press_data.to_vec());
    while let Some((ts, press)) = press_decompressor.next() {
        println!("时间戳: {}, 压力: {:.4}hPa", ts, press);
    }
    
    Ok(())
}

fn main() -> std::io::Result<()> {
    // 模拟传感器数据
    let readings = vec![
        SensorReading::new(23.456, 1013.2543),
        SensorReading::new(23.512, 1013.2101),
        SensorReading::new(23.487, 1013.1987),
    ];
    
    // 保存数据
    save_sensor_data(readings, "sensor_data.tsz")?;
    
    // 加载并显示数据
    load_sensor_data("sensor_data.tsz")?;
    
    Ok(())
}

这个完整示例展示了:

  1. 定义传感器数据结构
  2. 使用有损压缩存储温度数据(保留2位小数)
  3. 使用无损压缩存储压力数据
  4. 将压缩数据写入文件
  5. 从文件读取并解压数据
  6. 处理数据分隔问题

tsz库为Rust开发者提供了高效处理时间序列数据的工具,特别适合资源受限的环境或需要处理大量时间序列数据的应用场景。

回到顶部