Rust分布式追踪库tracing-tunnel的使用:实现跨进程日志与链路数据的高效隧道传输
Rust分布式追踪库tracing-tunnel的使用:实现跨进程日志与链路数据的高效隧道传输
简介
这个crate提供了tracing基础设施帮助工具,允许跨API边界传输追踪事件:
TracingEventSender
是一个tracingSubscriber
,它将tracing事件转换为可(反)序列化的表示形式,可以使用可定制的钩子发送到其他地方。TracingEventReceiver
消费由TracingEventSender
产生的事件,并将它们中继到tracing基础设施。假设事件源可能比特定TracingEventReceiver
实例的寿命更长,甚至比封装接收器的程序寿命更长。为了处理这种情况,接收器提供了持久化/恢复其状态的方法。
这解决了tracing spans/events具有动态调用站点的问题,即在编译时未知的调用站点。这可能发生在调用站点定义在动态加载模块中时,这些模块的执行被嵌入到程序中,例如WASM模块。
使用
在Crate.toml
中添加:
[dependencies]
tracing-tunnel = "0.1.0"
注意,上述两种功能都是通过可选功能启用的;详情请参阅crate文档。
发送tracing事件
use std::sync::mpsc;
use tracing_tunnel::{TracingEvent, TracingEventSender, TracingEventReceiver};
// 使用MPSC通道收集tracing事件
let (events_sx, events_rx) = mpsc::sync_channel(10);
let subscriber = TracingEventSender::new(move |event| {
events_sx.send(event).ok();
});
tracing::subscriber::with_default(subscriber, || {
tracing::info_span!("test", num = 42_i64).in_scope(|| {
tracing::warn!("I feel disturbance in the Force...");
});
});
let events: Vec<TracingEvent> = events_rx.iter().collect();
println!("{events:?}");
// 对事件进行处理...
接收tracing事件
use std::sync::mpsc;
use tracing_tunnel::{
LocalSpans, PersistedMetadata, PersistedSpans, TracingEvent, TracingEventReceiver,
};
tracing_subscriber::fmt().pretty().init();
fn replay_events(events: &[TracingEvent]) {
let mut spans = PersistedSpans::default();
let mut local_spans = LocalSpans::default();
let mut receiver = TracingEventReceiver::default();
for event in events {
if let Err(err) = receiver.try_receive(event.clone()) {
tracing::warn!(%err, "received invalid tracing event");
}
}
// 持久化接收器的最终状态。状态有两部分:
// 元数据和存活spans。spans进一步分为持久化和本地部分。
let metadata = receiver.persist_metadata();
let (spans, local_spans) = receiver.persist();
// 存储`metadata`和`spans`,例如在数据库中,而`local_spans`
// 存储在本地数据结构中,如以可执行ID为键的`HashMap`。
}
完整示例
下面是一个完整的跨进程使用tracing-tunnel的示例:
use std::sync::mpsc;
use tracing_tunnel::{TracingEvent, TracingEventSender, TracingEventReceiver};
use tracing::{info_span, warn};
// 发送端进程
fn sender_process() -> Vec<TracingEvent> {
let (events_sx, events_rx) = mpsc::sync_channel(10);
let subscriber = TracingEventSender::new(move |event| {
events_sx.send(event).ok();
});
tracing::subscriber::with_default(subscriber, || {
info_span!("cross_process_span", pid = std::process::id()).in_scope(|| {
warn!("This is a warning from sender process");
});
});
events_rx.iter().collect()
}
// 接收端进程
fn receiver_process(events: Vec<TracingEvent>) {
tracing_subscriber::fmt().pretty().init();
let mut receiver = TracingEventReceiver::default();
for event in events {
if let Err(err) = receiver.try_receive(event) {
tracing::warn!(%err, "failed to receive tracing event");
}
}
}
fn main() {
// 模拟跨进程通信
let events = sender_process();
receiver_process(events);
}
许可证
根据Apache License, Version 2.0或MIT license许可。
1 回复
Rust分布式追踪库tracing-tunnel使用指南
概述
tracing-tunnel是一个Rust库,用于实现跨进程的分布式追踪和日志数据的高效传输。它建立在tracing
生态系统之上,允许将追踪数据通过隧道方式在不同进程间传递,特别适合微服务架构下的分布式系统。
主要特性
- 跨进程的追踪上下文传播
- 高效的日志和跨度数据传输
- 低开销的序列化机制
- 支持多种传输协议
- 与现有tracing生态系统兼容
基本使用方法
添加依赖
[dependencies]
tracing = "0.1"
tracing-tunnel = "0.3"
tracing-subscriber = { version = "0.3", features = ["fmt"] }
基本示例
use tracing_tunnel::{TracedValue, TracingEventSender, TracingEventReceiver};
use tracing::{info_span, info};
use std::thread;
// 创建进程内通信通道
let (sender, receiver) = crossbeam_channel::unbounded();
// 发送方配置tracing订阅器
let sender_layer = tracing_tunnel::TracingSenderLayer::new(sender.clone());
tracing_subscriber::registry()
.with(sender_layer)
.init();
// 发送进程记录日志和跨度
let span = info_span!("parent_span");
let _guard = span.enter();
info!("This is a log message from the sender");
// 接收方线程处理接收到的数据
thread::spawn(move || {
let receiver = TracingEventReceiver::new(receiver);
for event in receiver {
println!("Received event: {:?}", event);
}
});
完整示例Demo
下面是一个完整的跨进程分布式追踪示例,包含服务端和客户端实现:
服务端代码
use tracing_tunnel::{TracingEventSender, RemoteSpan};
use tracing_subscriber::prelude::*;
use std::net::{TcpListener, TcpStream};
use std::thread;
fn main() {
// 初始化默认日志订阅器
let fmt_layer = tracing_subscriber::fmt::layer();
tracing_subscriber::registry()
.with(fmt_layer)
.init();
// 启动TCP服务器
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
println!("Server listening on 127.0.0.1:8080");
for stream in listener.incoming() {
let stream = stream.unwrap();
// 为每个连接创建新线程
thread::spawn(move || {
println!("New client connected");
// 创建tracing-tunnel发送器
let sender = TracingEventSender::new(stream);
let layer = tracing_tunnel::TracingSenderLayer::new(sender);
// 配置当前线程的订阅器
tracing_subscriber::registry()
.with(layer)
.init();
// 模拟处理业务逻辑
let root_span = tracing::info_span!("server_request");
let _guard = root_span.enter();
tracing::info!("Processing request...");
thread::sleep(std::time::Duration::from_secs(1));
tracing::info!("Request processed successfully");
});
}
}
客户端代码
use tracing_tunnel::{TracingEventReceiver, RemoteSpan};
use std::net::TcpStream;
use tracing::{info, info_span};
use std::thread;
fn main() {
// 连接到服务端
let stream = TcpStream::connect("127.0.0.1:8080").unwrap();
println!("Connected to server");
// 创建接收器线程
let receiver_thread = thread::spawn(move || {
let receiver = TracingEventReceiver::new(stream);
for event in receiver {
match event {
tracing_tunnel::TracingEvent::Span(span) => {
println!("[RECEIVED SPAN] {:?}", span);
}
tracing_tunnel::TracingEvent::Event(event) => {
println!("[RECEIVED EVENT] {:?}", event);
}
}
}
});
// 初始化客户端tracing
let fmt_layer = tracing_subscriber::fmt::layer();
tracing_subscriber::registry()
.with(fmt_layer)
.init();
// 模拟客户端业务逻辑
let client_span = info_span!("client_operation");
let _guard = client_span.enter();
info!("Starting client operation");
thread::sleep(std::time::Duration::from_millis(500));
info!("Operation completed");
// 等待接收线程结束
receiver_thread.join().unwrap();
}
高级用法示例
自定义序列化协议
use tracing_tunnel::{TracingEventSender, TracingEventReceiver};
use bincode::{Serializer, Deserializer};
use std::sync::mpsc::channel;
// 创建二进制传输通道
let (tx, rx) = channel();
// 发送方配置
let serializer = Serializer::new(tx);
let sender = TracingEventSender::new(serializer);
// 接收方配置
let deserializer = Deserializer::from_reader(rx);
let receiver = TracingEventReceiver::new(deserializer);
// 使用示例
let sender_layer = tracing_tunnel::TracingSenderLayer::new(sender);
tracing_subscriber::registry()
.with(sender_layer)
.init();
tracing::info!("This message will be serialized with bincode");
上下文传播示例
use tracing_tunnel::RemoteSpanContext;
use tracing::{info, info_span};
// 父进程/服务
let parent_span = info_span!("parent_process");
let _guard = parent_span.enter();
info!("Parent process started");
// 提取当前上下文
let remote_ctx = RemoteSpanContext::current();
// 子进程/客户端
std::thread::spawn(move || {
// 注入父进程上下文
let _guard = remote_ctx.attach();
let child_span = info_span!("child_process");
let _guard = child_span.enter();
info!("Child process with parent context");
});
性能优化建议
- 对于高吞吐场景使用bincode等二进制序列化格式
- 根据负载调整通道缓冲区大小
- 使用
tracing
的过滤功能减少不必要的事件 - 考虑实现批处理发送机制减少IO操作
错误处理最佳实践
use tracing_tunnel::{TracingError, TracingEventSender};
use std::net::TcpStream;
fn setup_sender() -> Result<(), TracingError> {
let stream = TcpStream::connect("127.0.0.1:8080")?;
let sender = TracingEventSender::new(stream)?;
let layer = tracing_tunnel::TracingSenderLayer::new(sender);
tracing_subscriber::registry()
.with(layer)
.init();
Ok(())
}
fn main() {
if let Err(e) = setup_sender() {
match e {
TracingError::Serialization(e) => {
eprintln!("序列化失败: {}", e);
}
TracingError::Transport(e) => {
eprintln!("传输错误: {}", e);
}
_ => {
eprintln!("未知错误: {}", e);
}
}
std::process::exit(1);
}
// 正常业务逻辑
tracing::info!("Application started");
}