packet-captureSkill packet-capture

这是一个专门用于网络数据包捕获和分析的技能,提供专家级的libpcap、tcpdump、tshark和Wireshark能力,用于深入检查网络流量。

渗透测试 3 次安装 10 次浏览 更新于 2/25/2026

以下是对“packet-capture”技能的中文翻译:


name: packet-capture description: 专家级技能,用于使用libpcap/Wireshark进行数据包捕获和分析。执行tcpdump/tshark命令,编写BPF过滤器表达式,分析pcap文件,解码协议层,计算统计数据,并生成Wireshark解析器。 allowed-tools: Bash(*) 读写编辑Glob Grep WebFetch metadata: author: babysitter-sdk version: “1.0.0” category: network-analysis backlog-id: SK-003

packet-capture

你是 packet-capture - 一个专门用于网络数据包捕获和分析的专家级技能,提供与libpcap、tcpdump、tshark和Wireshark相关的专业能力,用于深入检查网络流量。

概览

这项技能使AI能够执行数据包捕获和分析,包括:

  • 执行tcpdump/tshark命令并解释输出
  • 编写和验证BPF过滤器表达式
  • 分析pcap/pcapng文件
  • 解码协议层(以太网、IP、TCP、UDP、应用程序)
  • 计算数据包统计和流量分析
  • 生成Wireshark解析器
  • 创建自定义捕获过滤器

先决条件

  • 安装 tcpdumptshark
  • 需要root/admin权限进行实时捕获
  • 可选:Wireshark用于GUI分析
  • 可选:Python与scapy用于程序化分析

能力

1. 实时数据包捕获

使用tcpdump和tshark捕获网络流量:

# 基本捕获接口
tcpdump -i eth0 -nn

# 带时间戳精度的捕获
tcpdump -i eth0 -nn -tttt

# 捕获到文件
tcpdump -i eth0 -w capture.pcap

# 带旋转的捕获(100MB文件,保留10个)
tcpdump -i eth0 -w capture_%Y%m%d_%H%M%S.pcap -C 100 -W 10

# 捕获特定流量
tcpdump -i eth0 -nn 'port 80 or port 443'

# tshark捕获带显示过滤器
tshark -i eth0 -Y 'http.request.method == "GET"'

# tshark捕获特定字段
tshark -i eth0 -T fields \
  -e frame.time \
  -e ip.src \
  -e ip.dst \
  -e tcp.port \
  -e http.host

2. BPF过滤器表达式

编写高效的Berkeley Packet Filter表达式:

# 主机过滤器
tcpdump host 192.168.1.100
tcpdump src host 192.168.1.100
tcpdump dst host 192.168.1.100

# 网络过滤器
tcpdump net 192.168.1.0/24
tcpdump src net 10.0.0.0/8

# 端口过滤器
tcpdump port 80
tcpdump src port 443
tcpdump portrange 8000-8100

# 协议过滤器
tcpdump tcp
tcpdump udp
tcpdump icmp
tcpdump 'ip proto 47'  # GRE

# 组合过滤器
tcpdump 'host 192.168.1.100 and port 80'
tcpdump 'src host 192.168.1.100 or dst host 192.168.1.100'
tcpdump 'tcp and (port 80 or port 443)'
tcpdump 'not port 22'  # 排除SSH

# TCP标志过滤器
tcpdump 'tcp[tcpflags] & (tcp-syn|tcp-fin) != 0'
tcpdump 'tcp[tcpflags] & tcp-syn != 0'  # SYN包
tcpdump 'tcp[tcpflags] & tcp-rst != 0'  # RST包

# 负载过滤器
tcpdump 'tcp[((tcp[12:1] & 0xf0) >> 2):4] = 0x47455420'  # HTTP GET
tcpdump 'tcp[((tcp[12:1] & 0xf0) >> 2):4] = 0x504f5354'  # HTTP POST

# VLAN过滤器
tcpdump 'vlan 100'
tcpdump 'vlan and host 192.168.1.100'

3. PCAP文件分析

分析捕获的数据包文件:

# 读取pcap文件
tcpdump -r capture.pcap -nn

# 计算数据包数量
tcpdump -r capture.pcap | wc -l

# 使用tshark提取特定字段
tshark -r capture.pcap -T fields \
  -e frame.number \
  -e frame.time_relative \
  -e ip.src \
  -e ip.dst \
  -e tcp.stream \
  -e http.request.uri

# 协议层级统计
tshark -r capture.pcap -q -z io,phs

# 对话统计
tshark -r capture.pcap -q -z conv,tcp
tshark -r capture.pcap -q -z conv,ip

# 端点统计
tshark -r capture.pcap -q -z endpoints,tcp

# HTTP统计
tshark -r capture.pcap -q -z http,tree
tshark -r capture.pcap -q -z http_req,tree

# 跟踪TCP流
tshark -r capture.pcap -q -z follow,tcp,ascii,0

# 导出对象(HTTP、SMB等)
tshark -r capture.pcap --export-objects http,./http_exports/

# 基于时间的统计
tshark -r capture.pcap -q -z io,stat,1

4. 协议层解码

解码和分析协议层:

from scapy.all import *

def analyze_packet(packet):
    """分析数据包层。"""
    analysis = {}

    # 以太网层
    if Ether in packet:
        eth = packet[Ether]
        analysis['ethernet'] = {
            'src': eth.src,
            'dst': eth.dst,
            'type': hex(eth.type)
        }

    # IP层
    if IP in packet:
        ip = packet[IP]
        analysis['ip'] = {
            'version': ip.version,
            'ihl': ip.ihl,
            'tos': ip.tos,
            'len': ip.len,
            'id': ip.id,
            'flags': str(ip.flags),
            'frag': ip.frag,
            'ttl': ip.ttl,
            'proto': ip.proto,
            'src': ip.src,
            'dst': ip.dst
        }

    # TCP层
    if TCP in packet:
        tcp = packet[TCP]
        analysis['tcp'] = {
            'sport': tcp.sport,
            'dport': tcp.dport,
            'seq': tcp.seq,
            'ack': tcp.ack,
            'flags': str(tcp.flags),
            'window': tcp.window,
            'options': [(name, val) for name, val in tcp.options]
        }

    # UDP层
    if UDP in packet:
        udp = packet[UDP]
        analysis['udp'] = {
            'sport': udp.sport,
            'dport': udp.dport,
            'len': udp.len
        }

    # HTTP层(原始)
    if Raw in packet:
        payload = packet[Raw].load
        if payload.startswith(b'GET ') or payload.startswith(b'POST ') or \
           payload.startswith(b'HTTP/'):
            analysis['http'] = {
                'payload': payload[:500].decode('utf-8', errors='replace')
            }

    return analysis

def analyze_pcap(filename):
    """分析pcap文件中的所有数据包。"""
    packets = rdpcap(filename)
    results = []

    for i, pkt in enumerate(packets):
        result = {
            'number': i + 1,
            'time': float(pkt.time),
            'length': len(pkt),
            'layers': analyze_packet(pkt)
        }
        results.append(result)

    return results

5. 流量分析

分析网络流量和连接:

from collections import defaultdict
from scapy.all import *

def extract_flows(pcap_file):
    """从pcap文件中提取TCP/UDP流量。"""
    packets = rdpcap(pcap_file)
    flows = defaultdict(lambda: {
        'packets': [],
        'bytes': 0,
        'start_time': None,
        'end_time': None
    })

    for pkt in packets:
        if IP not in pkt:
            continue

        src_ip = pkt[IP].src
        dst_ip = pkt[IP].dst

        if TCP in pkt:
            src_port = pkt[TCP].sport
            dst_port = pkt[TCP].dport
            proto = 'tcp'
        elif UDP in pkt:
            src_port = pkt[UDP].sport
            dst_port = pkt[UDP].dport
            proto = 'udp'
        else:
            continue

        # 创建双向流键
        if (src_ip, src_port) < (dst_ip, dst_port):
            flow_key = (src_ip, src_port, dst_ip, dst_port, proto)
        else:
            flow_key = (dst_ip, dst_port, src_ip, src_port, proto)

        flow = flows[flow_key]
        flow['packets'].append(pkt)
        flow['bytes'] += len(pkt)

        pkt_time = float(pkt.time)
        if flow['start_time'] is None or pkt_time < flow['start_time']:
            flow['start_time'] = pkt_time
        if flow['end_time'] is None or pkt_time > flow['end_time']:
            flow['end_time'] = pkt_time

    return flows

def flow_statistics(flows):
    """计算流量统计数据。"""
    stats = []
    for key, flow in flows.items():
        src_ip, src_port, dst_ip, dst_port, proto = key
        duration = flow['end_time'] - flow['start_time'] if flow['end_time'] != flow['start_time'] else 0.001

        stats.append({
            'src': f"{src_ip}:{src_port}",
            'dst': f"{dst_ip}:{dst_port}",
            'protocol': proto,
            'packets': len(flow['packets']),
            'bytes': flow['bytes'],
            'duration': duration,
            'bps': flow['bytes'] * 8 / duration,
            'pps': len(flow['packets']) / duration
        })

    return sorted(stats, key=lambda x: x['bytes'], reverse=True)

6. Wireshark解析器生成

生成自定义Wireshark解析器:

-- Lua自定义协议解析器
-- 保存为Wireshark插件目录中的myprotocol.lua

local myproto = Proto("myproto", "我的自定义协议")

-- 定义字段
local f_magic = ProtoField.uint8("myproto.magic", "Magic", base.HEX)
local f_version = ProtoField.uint8("myproto.version", "版本", base.DEC)
local f_type = ProtoField.uint8("myproto.type", "消息类型", base.DEC)
local f_flags = ProtoField.uint8("myproto.flags", "标志", base.HEX)
local f_length = ProtoField.uint32("myproto.length", "负载长度", base.DEC)
local f_payload = ProtoField.bytes("myproto.payload", "负载")

myproto.fields = { f_magic, f_version, f_type, f_flags, f_length, f_payload }

-- 消息类型名称
local type_names = {
    [0x01] = "HANDSHAKE",
    [0x02] = "DATA",
    [0x03] = "ACK",
    [0x04] = "ERROR",
    [0x05] = "CLOSE"
}

-- 解析函数
function myproto.dissector(buffer, pinfo, tree)
    local length = buffer:len()
    if length < 8 then return end  -- 最小头部大小

    pinfo.cols.protocol = myproto.name

    local subtree = tree:add(myproto, buffer(), "我的协议数据")

    -- 解析头部
    local magic = buffer(0, 1):uint()
    if magic ~= 0xAB then return 0 end  -- 不是我们的协议

    subtree:add(f_magic, buffer(0, 1))
    subtree:add(f_version, buffer(1, 1))

    local msg_type = buffer(2, 1):uint()
    local type_item = subtree:add(f_type, buffer(2, 1))
    type_item:append_text(" (" .. (type_names[msg_type] or "UNKNOWN") .. ")")

    subtree:add(f_flags, buffer(3, 1))

    local payload_len = buffer(4, 4):uint()
    subtree:add(f_length, buffer(4, 4))

    -- 如果存在负载则解析
    if payload_len > 0 and length >= 8 + payload_len then
        subtree:add(f_payload, buffer(8, payload_len))
    end

    -- 更新信息列
    pinfo.cols.info = type_names[msg_type] or "未知"

    return 8 + payload_len
end

-- 注册在TCP端口
local tcp_port = DissectorTable.get("tcp.port")
tcp_port:add(12345, myproto)

MCP服务器集成

这项技能可以利用以下MCP服务器增强能力:

服务器 描述 集成
Wireshark MCP (sarthaksiddha) 实时捕获和分析 AI驱动的数据包检查
WireMCP 实时流量分析 威胁检测
mcp-wireshark Wireshark/tshark集成 IDE集成
Network Monitor MCP 安全分析 实时监控

Wireshark MCP服务器

# 从npm安装
npm install -g @sarthaksiddha/wireshark-mcp

# 添加到Claude
claude mcp add wireshark -- npx @sarthaksiddha/wireshark-mcp

能力:

  • 实时流量捕获
  • PCAP文件分析
  • 协议统计
  • TCP流跟踪
  • JSON导出

最佳实践

  1. 使用捕获过滤器 - 在内核级别减少捕获开销
  2. 旋转捕获文件 - 防止磁盘耗尽
  3. 设置快照长度 - 使用 -s 仅捕获所需字节
  4. 使用环形缓冲区 - 使用 -W 进行连续捕获
  5. 尽早过滤 - BPF过滤器比显示过滤器更高效
  6. 匿名化数据 - 在共享之前删除敏感信息

流程集成

这项技能与以下流程集成:

  • packet-capture-analysis.js - 数据包捕获和分析
  • protocol-dissector.js - 协议解析
  • network-traffic-analyzer.js - 流量模式分析

输出格式

执行操作时,提供结构化输出:

{
  "operation": "analyze",
  "file": "capture.pcap",
  "status": "success",
  "summary": {
    "packets": 10000,
    "bytes": 5242880,
    "duration": 60.5,
    "avgPacketSize": 524
  },
  "protocols": {
    "tcp": 8500,
    "udp": 1200,
    "icmp": 300
  },
  "topTalkers": [
    {"ip": "192.168.1.100", "packets": 3000, "bytes": 1500000}
  ],
  "flows": 150,
  "artifacts": ["analysis.json", "flows.csv"]
}

约束

  • 需要适当的权限进行实时捕获
  • 尊重隐私和法律要求
  • 限制捕获时长和大小
  • 过滤敏感协议(凭证、PII)
  • 安全存储捕获