Rust分布式追踪库tracing-tunnel的使用:实现跨进程日志与链路数据的高效隧道传输

Rust分布式追踪库tracing-tunnel的使用:实现跨进程日志与链路数据的高效隧道传输

简介

这个crate提供了tracing基础设施帮助工具,允许跨API边界传输追踪事件:

  • TracingEventSender是一个tracing Subscriber,它将tracing事件转换为可(反)序列化的表示形式,可以使用可定制的钩子发送到其他地方。
  • TracingEventReceiver消费由TracingEventSender产生的事件,并将它们中继到tracing基础设施。假设事件源可能比特定TracingEventReceiver实例的寿命更长,甚至比封装接收器的程序寿命更长。为了处理这种情况,接收器提供了持久化/恢复其状态的方法。

这解决了tracing spans/events具有动态调用站点的问题,即在编译时未知的调用站点。这可能发生在调用站点定义在动态加载模块中时,这些模块的执行被嵌入到程序中,例如WASM模块。

使用

Crate.toml中添加:

[dependencies]
tracing-tunnel = "0.1.0"

注意,上述两种功能都是通过可选功能启用的;详情请参阅crate文档。

发送tracing事件

use std::sync::mpsc;
use tracing_tunnel::{TracingEvent, TracingEventSender, TracingEventReceiver};

// 使用MPSC通道收集tracing事件
let (events_sx, events_rx) = mpsc::sync_channel(10);
let subscriber = TracingEventSender::new(move |event| {
    events_sx.send(event).ok();
});

tracing::subscriber::with_default(subscriber, || {
    tracing::info_span!("test", num = 42_i64).in_scope(|| {
        tracing::warn!("I feel disturbance in the Force...");
    });
});

let events: Vec<TracingEvent> = events_rx.iter().collect();
println!("{events:?}");
// 对事件进行处理...

接收tracing事件

use std::sync::mpsc;
use tracing_tunnel::{
    LocalSpans, PersistedMetadata, PersistedSpans, TracingEvent, TracingEventReceiver,
};

tracing_subscriber::fmt().pretty().init();

fn replay_events(events: &[TracingEvent]) {
    let mut spans = PersistedSpans::default();
    let mut local_spans = LocalSpans::default();
    let mut receiver = TracingEventReceiver::default();
    for event in events {
        if let Err(err) = receiver.try_receive(event.clone()) {
            tracing::warn!(%err, "received invalid tracing event");
        }
    }

    // 持久化接收器的最终状态。状态有两部分:
    // 元数据和存活spans。spans进一步分为持久化和本地部分。
    let metadata = receiver.persist_metadata();
    let (spans, local_spans) = receiver.persist();
    // 存储`metadata`和`spans`,例如在数据库中,而`local_spans`
    // 存储在本地数据结构中,如以可执行ID为键的`HashMap`。
}

完整示例

下面是一个完整的跨进程使用tracing-tunnel的示例:

use std::sync::mpsc;
use tracing_tunnel::{TracingEvent, TracingEventSender, TracingEventReceiver};
use tracing::{info_span, warn};

// 发送端进程
fn sender_process() -> Vec<TracingEvent> {
    let (events_sx, events_rx) = mpsc::sync_channel(10);
    let subscriber = TracingEventSender::new(move |event| {
        events_sx.send(event).ok();
    });

    tracing::subscriber::with_default(subscriber, || {
        info_span!("cross_process_span", pid = std::process::id()).in_scope(|| {
            warn!("This is a warning from sender process");
        });
    });

    events_rx.iter().collect()
}

// 接收端进程
fn receiver_process(events: Vec<TracingEvent>) {
    tracing_subscriber::fmt().pretty().init();
    
    let mut receiver = TracingEventReceiver::default();
    for event in events {
        if let Err(err) = receiver.try_receive(event) {
            tracing::warn!(%err, "failed to receive tracing event");
        }
    }
}

fn main() {
    // 模拟跨进程通信
    let events = sender_process();
    receiver_process(events);
}

许可证

根据Apache License, Version 2.0或MIT license许可。


1 回复

Rust分布式追踪库tracing-tunnel使用指南

概述

tracing-tunnel是一个Rust库,用于实现跨进程的分布式追踪和日志数据的高效传输。它建立在tracing生态系统之上,允许将追踪数据通过隧道方式在不同进程间传递,特别适合微服务架构下的分布式系统。

主要特性

  • 跨进程的追踪上下文传播
  • 高效的日志和跨度数据传输
  • 低开销的序列化机制
  • 支持多种传输协议
  • 与现有tracing生态系统兼容

基本使用方法

添加依赖

[dependencies]
tracing = "0.1"
tracing-tunnel = "0.3"
tracing-subscriber = { version = "0.3", features = ["fmt"] }

基本示例

use tracing_tunnel::{TracedValue, TracingEventSender, TracingEventReceiver};
use tracing::{info_span, info};
use std::thread;

// 创建进程内通信通道
let (sender, receiver) = crossbeam_channel::unbounded();

// 发送方配置tracing订阅器
let sender_layer = tracing_tunnel::TracingSenderLayer::new(sender.clone());
tracing_subscriber::registry()
    .with(sender_layer)
    .init();

// 发送进程记录日志和跨度
let span = info_span!("parent_span");
let _guard = span.enter();
info!("This is a log message from the sender");

// 接收方线程处理接收到的数据
thread::spawn(move || {
    let receiver = TracingEventReceiver::new(receiver);
    for event in receiver {
        println!("Received event: {:?}", event);
    }
});

完整示例Demo

下面是一个完整的跨进程分布式追踪示例,包含服务端和客户端实现:

服务端代码

use tracing_tunnel::{TracingEventSender, RemoteSpan};
use tracing_subscriber::prelude::*;
use std::net::{TcpListener, TcpStream};
use std::thread;

fn main() {
    // 初始化默认日志订阅器
    let fmt_layer = tracing_subscriber::fmt::layer();
    tracing_subscriber::registry()
        .with(fmt_layer)
        .init();

    // 启动TCP服务器
    let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
    println!("Server listening on 127.0.0.1:8080");

    for stream in listener.incoming() {
        let stream = stream.unwrap();
        
        // 为每个连接创建新线程
        thread::spawn(move || {
            println!("New client connected");
            
            // 创建tracing-tunnel发送器
            let sender = TracingEventSender::new(stream);
            let layer = tracing_tunnel::TracingSenderLayer::new(sender);
            
            // 配置当前线程的订阅器
            tracing_subscriber::registry()
                .with(layer)
                .init();
            
            // 模拟处理业务逻辑
            let root_span = tracing::info_span!("server_request");
            let _guard = root_span.enter();
            
            tracing::info!("Processing request...");
            thread::sleep(std::time::Duration::from_secs(1));
            
            tracing::info!("Request processed successfully");
        });
    }
}

客户端代码

use tracing_tunnel::{TracingEventReceiver, RemoteSpan};
use std::net::TcpStream;
use tracing::{info, info_span};
use std::thread;

fn main() {
    // 连接到服务端
    let stream = TcpStream::connect("127.0.0.1:8080").unwrap();
    println!("Connected to server");

    // 创建接收器线程
    let receiver_thread = thread::spawn(move || {
        let receiver = TracingEventReceiver::new(stream);
        for event in receiver {
            match event {
                tracing_tunnel::TracingEvent::Span(span) => {
                    println!("[RECEIVED SPAN] {:?}", span);
                }
                tracing_tunnel::TracingEvent::Event(event) => {
                    println!("[RECEIVED EVENT] {:?}", event);
                }
            }
        }
    });

    // 初始化客户端tracing
    let fmt_layer = tracing_subscriber::fmt::layer();
    tracing_subscriber::registry()
        .with(fmt_layer)
        .init();

    // 模拟客户端业务逻辑
    let client_span = info_span!("client_operation");
    let _guard = client_span.enter();
    
    info!("Starting client operation");
    thread::sleep(std::time::Duration::from_millis(500));
    
    info!("Operation completed");
    
    // 等待接收线程结束
    receiver_thread.join().unwrap();
}

高级用法示例

自定义序列化协议

use tracing_tunnel::{TracingEventSender, TracingEventReceiver};
use bincode::{Serializer, Deserializer};
use std::sync::mpsc::channel;

// 创建二进制传输通道
let (tx, rx) = channel();

// 发送方配置
let serializer = Serializer::new(tx);
let sender = TracingEventSender::new(serializer);

// 接收方配置
let deserializer = Deserializer::from_reader(rx);
let receiver = TracingEventReceiver::new(deserializer);

// 使用示例
let sender_layer = tracing_tunnel::TracingSenderLayer::new(sender);
tracing_subscriber::registry()
    .with(sender_layer)
    .init();

tracing::info!("This message will be serialized with bincode");

上下文传播示例

use tracing_tunnel::RemoteSpanContext;
use tracing::{info, info_span};

// 父进程/服务
let parent_span = info_span!("parent_process");
let _guard = parent_span.enter();
info!("Parent process started");

// 提取当前上下文
let remote_ctx = RemoteSpanContext::current();

// 子进程/客户端
std::thread::spawn(move || {
    // 注入父进程上下文
    let _guard = remote_ctx.attach();
    
    let child_span = info_span!("child_process");
    let _guard = child_span.enter();
    info!("Child process with parent context");
});

性能优化建议

  1. 对于高吞吐场景使用bincode等二进制序列化格式
  2. 根据负载调整通道缓冲区大小
  3. 使用tracing的过滤功能减少不必要的事件
  4. 考虑实现批处理发送机制减少IO操作

错误处理最佳实践

use tracing_tunnel::{TracingError, TracingEventSender};
use std::net::TcpStream;

fn setup_sender() -> Result<(), TracingError> {
    let stream = TcpStream::connect("127.0.0.1:8080")?;
    let sender = TracingEventSender::new(stream)?;
    
    let layer = tracing_tunnel::TracingSenderLayer::new(sender);
    tracing_subscriber::registry()
        .with(layer)
        .init();
    
    Ok(())
}

fn main() {
    if let Err(e) = setup_sender() {
        match e {
            TracingError::Serialization(e) => {
                eprintln!("序列化失败: {}", e);
            }
            TracingError::Transport(e) => {
                eprintln!("传输错误: {}", e);
            }
            _ => {
                eprintln!("未知错误: {}", e);
            }
        }
        std::process::exit(1);
    }
    
    // 正常业务逻辑
    tracing::info!("Application started");
}
回到顶部