Python中如何分析网络设备上的转发包情况并进行入出接口报文对比?

比如对一个路由器,我通过 paramiko 的 invoke_shell 获取了路由器入出两个接口的报文,接下来想分析这两份报文的对应情况,比如是否有丢包、报文转发的延迟等等。
报文是双向的,请求包先从 1 口到设备,回应包会从 2 口先到设备;而且经由设备转发以后,出接口转发的报文的顺序可能会变。
对于以上需求我想了很久,把抓取的报文写入队列或者 list 再去做对比总会有些问题,麻烦各位给点好的思路。
谢谢。
Python中如何分析网络设备上的转发包情况并进行入出接口报文对比?

4 回复

这种需求肯定会有线程的,看看 wireshark 的 merge 能满足不? https://www.wireshark.org/docs/wsug_html_chunked/ChStatCompareCaptureFiles.html


要分析网络设备上的转发包情况并进行入出接口报文对比,核心是抓取并解析数据包。在Python里,scapy是干这活儿最趁手的库。它能让你轻松地嗅探、构造、发送和解析网络包。

基本思路是:在设备的两个接口上同时抓包,然后通过关键字段(比如五元组、序列号)匹配出对应的入站和出站报文,最后对比分析。下面是一个可以直接跑起来的示例代码:

from scapy.all import sniff, IP, TCP, UDP
from collections import defaultdict
import threading
import time

# 存储捕获的包,按流分类
captured_packets = defaultdict(list)

def packet_callback(pkt, interface_name):
    """捕获数据包的回调函数,按流标识存储"""
    if IP in pkt:
        # 提取五元组作为流的标识(这里简化处理,实际可能需考虑更多协议)
        proto = pkt[IP].proto
        src_ip = pkt[IP].src
        dst_ip = pkt[IP].dst
        
        if TCP in pkt:
            src_port = pkt[TCP].sport
            dst_port = pkt[TCP].dport
        elif UDP in pkt:
            src_port = pkt[UDP].sport
            dst_port = pkt[UDP].dport
        else:
            src_port = None
            dst_port = None
        
        # 创建流的键,确保双向流量能匹配
        flow_key = tuple(sorted([(src_ip, src_port), (dst_ip, dst_port)])) if src_port and dst_port else None
        
        if flow_key:
            # 存储包信息,附带时间戳和接口名
            packet_info = {
                'timestamp': time.time(),
                'interface': interface_name,
                'packet': pkt
            }
            captured_packets[flow_key].append(packet_info)

def start_sniffing(interface, duration=10):
    """在指定接口上抓包一段时间"""
    print(f"开始在接口 {interface} 上抓包,持续 {duration} 秒...")
    sniff(iface=interface, prn=lambda pkt: packet_callback(pkt, interface), timeout=duration, store=0)

def analyze_flows():
    """分析捕获的流,尝试匹配入站和出站报文"""
    print("\n--- 开始分析入出接口报文对比 ---")
    
    for flow_key, packets in captured_packets.items():
        if len(packets) < 2:
            continue  # 单方向流量,跳过
        
        # 按接口分组
        by_interface = {}
        for pkt_info in packets:
            intf = pkt_info['interface']
            by_interface.setdefault(intf, []).append(pkt_info)
        
        # 如果有至少两个接口的流量,进行对比
        if len(by_interface) >= 2:
            print(f"\n流 {flow_key}:")
            for intf, intf_packets in by_interface.items():
                print(f"  接口 {intf}: 捕获 {len(intf_packets)} 个包")
                # 这里可以进一步对比包的详细信息,比如TTL变化、长度等
                # 示例:打印第一个包的摘要
                if intf_packets:
                    first_pkt = intf_packets[0]['packet']
                    print(f"    示例包: {first_pkt.summary()}")

if __name__ == "__main__":
    # 配置要监听的接口(根据你的实际环境修改)
    interfaces = ["eth0", "eth1"]  # 示例接口名
    duration = 15  # 每个接口抓包秒数
    
    # 启动多个线程同时抓包
    threads = []
    for iface in interfaces:
        t = threading.Thread(target=start_sniffing, args=(iface, duration))
        t.start()
        threads.append(t)
    
    # 等待所有抓包完成
    for t in threads:
        t.join()
    
    # 分析结果
    analyze_flows()

这段代码做了几件事:

  1. sniff在指定网卡上抓包,通过回调函数把包存下来,并按五元组(源IP、源端口、目的IP、目的端口、协议)归类到不同的“流”里。
  2. 每个包都记录了时间戳和被抓到的接口名,方便后面对比。
  3. 分析时,找出那些在多个接口上都出现的流,这很可能就是经过设备转发的流量。
  4. 最后打印出每个流在不同接口上的包数量和一个示例包,你可以根据需要扩展对比逻辑,比如检查IP头里的TTL是否减了、负载是否一致等。

要跑起来,你得先装scapy(pip install scapy),并且用管理员/root权限运行(因为抓包需要特权)。记得把代码里的interfaces列表改成你设备上实际的接口名。

这只是一个起点,真实环境里你可能需要处理更多协议、考虑分片、处理异步时序,或者把结果输出得更详细。但核心抓包、匹配、对比的思路就是这个。

总结:用scapy双线抓包,按流匹配后对比。

  • 现成

谢谢你,这个不行,因为网络设备会有 nat 改包,地址端口都有可能会变,我通过其他途径获取了 nat 后的转换关系,得通过这个转换关系来对比报文。

回到顶部