以下是对“packet-capture”技能的中文翻译:
name: packet-capture description: 专家级技能,用于使用libpcap/Wireshark进行数据包捕获和分析。执行tcpdump/tshark命令,编写BPF过滤器表达式,分析pcap文件,解码协议层,计算统计数据,并生成Wireshark解析器。 allowed-tools: Bash(*) 读写编辑Glob Grep WebFetch metadata: author: babysitter-sdk version: “1.0.0” category: network-analysis backlog-id: SK-003
packet-capture
你是 packet-capture - 一个专门用于网络数据包捕获和分析的专家级技能,提供与libpcap、tcpdump、tshark和Wireshark相关的专业能力,用于深入检查网络流量。
概览
这项技能使AI能够执行数据包捕获和分析,包括:
- 执行tcpdump/tshark命令并解释输出
- 编写和验证BPF过滤器表达式
- 分析pcap/pcapng文件
- 解码协议层(以太网、IP、TCP、UDP、应用程序)
- 计算数据包统计和流量分析
- 生成Wireshark解析器
- 创建自定义捕获过滤器
先决条件
- 安装
tcpdump或tshark - 需要root/admin权限进行实时捕获
- 可选:Wireshark用于GUI分析
- 可选:Python与scapy用于程序化分析
能力
1. 实时数据包捕获
使用tcpdump和tshark捕获网络流量:
# 基本捕获接口
tcpdump -i eth0 -nn
# 带时间戳精度的捕获
tcpdump -i eth0 -nn -tttt
# 捕获到文件
tcpdump -i eth0 -w capture.pcap
# 带旋转的捕获(100MB文件,保留10个)
tcpdump -i eth0 -w capture_%Y%m%d_%H%M%S.pcap -C 100 -W 10
# 捕获特定流量
tcpdump -i eth0 -nn 'port 80 or port 443'
# tshark捕获带显示过滤器
tshark -i eth0 -Y 'http.request.method == "GET"'
# tshark捕获特定字段
tshark -i eth0 -T fields \
-e frame.time \
-e ip.src \
-e ip.dst \
-e tcp.port \
-e http.host
2. BPF过滤器表达式
编写高效的Berkeley Packet Filter表达式:
# 主机过滤器
tcpdump host 192.168.1.100
tcpdump src host 192.168.1.100
tcpdump dst host 192.168.1.100
# 网络过滤器
tcpdump net 192.168.1.0/24
tcpdump src net 10.0.0.0/8
# 端口过滤器
tcpdump port 80
tcpdump src port 443
tcpdump portrange 8000-8100
# 协议过滤器
tcpdump tcp
tcpdump udp
tcpdump icmp
tcpdump 'ip proto 47' # GRE
# 组合过滤器
tcpdump 'host 192.168.1.100 and port 80'
tcpdump 'src host 192.168.1.100 or dst host 192.168.1.100'
tcpdump 'tcp and (port 80 or port 443)'
tcpdump 'not port 22' # 排除SSH
# TCP标志过滤器
tcpdump 'tcp[tcpflags] & (tcp-syn|tcp-fin) != 0'
tcpdump 'tcp[tcpflags] & tcp-syn != 0' # SYN包
tcpdump 'tcp[tcpflags] & tcp-rst != 0' # RST包
# 负载过滤器
tcpdump 'tcp[((tcp[12:1] & 0xf0) >> 2):4] = 0x47455420' # HTTP GET
tcpdump 'tcp[((tcp[12:1] & 0xf0) >> 2):4] = 0x504f5354' # HTTP POST
# VLAN过滤器
tcpdump 'vlan 100'
tcpdump 'vlan and host 192.168.1.100'
3. PCAP文件分析
分析捕获的数据包文件:
# 读取pcap文件
tcpdump -r capture.pcap -nn
# 计算数据包数量
tcpdump -r capture.pcap | wc -l
# 使用tshark提取特定字段
tshark -r capture.pcap -T fields \
-e frame.number \
-e frame.time_relative \
-e ip.src \
-e ip.dst \
-e tcp.stream \
-e http.request.uri
# 协议层级统计
tshark -r capture.pcap -q -z io,phs
# 对话统计
tshark -r capture.pcap -q -z conv,tcp
tshark -r capture.pcap -q -z conv,ip
# 端点统计
tshark -r capture.pcap -q -z endpoints,tcp
# HTTP统计
tshark -r capture.pcap -q -z http,tree
tshark -r capture.pcap -q -z http_req,tree
# 跟踪TCP流
tshark -r capture.pcap -q -z follow,tcp,ascii,0
# 导出对象(HTTP、SMB等)
tshark -r capture.pcap --export-objects http,./http_exports/
# 基于时间的统计
tshark -r capture.pcap -q -z io,stat,1
4. 协议层解码
解码和分析协议层:
from scapy.all import *
def analyze_packet(packet):
"""分析数据包层。"""
analysis = {}
# 以太网层
if Ether in packet:
eth = packet[Ether]
analysis['ethernet'] = {
'src': eth.src,
'dst': eth.dst,
'type': hex(eth.type)
}
# IP层
if IP in packet:
ip = packet[IP]
analysis['ip'] = {
'version': ip.version,
'ihl': ip.ihl,
'tos': ip.tos,
'len': ip.len,
'id': ip.id,
'flags': str(ip.flags),
'frag': ip.frag,
'ttl': ip.ttl,
'proto': ip.proto,
'src': ip.src,
'dst': ip.dst
}
# TCP层
if TCP in packet:
tcp = packet[TCP]
analysis['tcp'] = {
'sport': tcp.sport,
'dport': tcp.dport,
'seq': tcp.seq,
'ack': tcp.ack,
'flags': str(tcp.flags),
'window': tcp.window,
'options': [(name, val) for name, val in tcp.options]
}
# UDP层
if UDP in packet:
udp = packet[UDP]
analysis['udp'] = {
'sport': udp.sport,
'dport': udp.dport,
'len': udp.len
}
# HTTP层(原始)
if Raw in packet:
payload = packet[Raw].load
if payload.startswith(b'GET ') or payload.startswith(b'POST ') or \
payload.startswith(b'HTTP/'):
analysis['http'] = {
'payload': payload[:500].decode('utf-8', errors='replace')
}
return analysis
def analyze_pcap(filename):
"""分析pcap文件中的所有数据包。"""
packets = rdpcap(filename)
results = []
for i, pkt in enumerate(packets):
result = {
'number': i + 1,
'time': float(pkt.time),
'length': len(pkt),
'layers': analyze_packet(pkt)
}
results.append(result)
return results
5. 流量分析
分析网络流量和连接:
from collections import defaultdict
from scapy.all import *
def extract_flows(pcap_file):
"""从pcap文件中提取TCP/UDP流量。"""
packets = rdpcap(pcap_file)
flows = defaultdict(lambda: {
'packets': [],
'bytes': 0,
'start_time': None,
'end_time': None
})
for pkt in packets:
if IP not in pkt:
continue
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
if TCP in pkt:
src_port = pkt[TCP].sport
dst_port = pkt[TCP].dport
proto = 'tcp'
elif UDP in pkt:
src_port = pkt[UDP].sport
dst_port = pkt[UDP].dport
proto = 'udp'
else:
continue
# 创建双向流键
if (src_ip, src_port) < (dst_ip, dst_port):
flow_key = (src_ip, src_port, dst_ip, dst_port, proto)
else:
flow_key = (dst_ip, dst_port, src_ip, src_port, proto)
flow = flows[flow_key]
flow['packets'].append(pkt)
flow['bytes'] += len(pkt)
pkt_time = float(pkt.time)
if flow['start_time'] is None or pkt_time < flow['start_time']:
flow['start_time'] = pkt_time
if flow['end_time'] is None or pkt_time > flow['end_time']:
flow['end_time'] = pkt_time
return flows
def flow_statistics(flows):
"""计算流量统计数据。"""
stats = []
for key, flow in flows.items():
src_ip, src_port, dst_ip, dst_port, proto = key
duration = flow['end_time'] - flow['start_time'] if flow['end_time'] != flow['start_time'] else 0.001
stats.append({
'src': f"{src_ip}:{src_port}",
'dst': f"{dst_ip}:{dst_port}",
'protocol': proto,
'packets': len(flow['packets']),
'bytes': flow['bytes'],
'duration': duration,
'bps': flow['bytes'] * 8 / duration,
'pps': len(flow['packets']) / duration
})
return sorted(stats, key=lambda x: x['bytes'], reverse=True)
6. Wireshark解析器生成
生成自定义Wireshark解析器:
-- Lua自定义协议解析器
-- 保存为Wireshark插件目录中的myprotocol.lua
local myproto = Proto("myproto", "我的自定义协议")
-- 定义字段
local f_magic = ProtoField.uint8("myproto.magic", "Magic", base.HEX)
local f_version = ProtoField.uint8("myproto.version", "版本", base.DEC)
local f_type = ProtoField.uint8("myproto.type", "消息类型", base.DEC)
local f_flags = ProtoField.uint8("myproto.flags", "标志", base.HEX)
local f_length = ProtoField.uint32("myproto.length", "负载长度", base.DEC)
local f_payload = ProtoField.bytes("myproto.payload", "负载")
myproto.fields = { f_magic, f_version, f_type, f_flags, f_length, f_payload }
-- 消息类型名称
local type_names = {
[0x01] = "HANDSHAKE",
[0x02] = "DATA",
[0x03] = "ACK",
[0x04] = "ERROR",
[0x05] = "CLOSE"
}
-- 解析函数
function myproto.dissector(buffer, pinfo, tree)
local length = buffer:len()
if length < 8 then return end -- 最小头部大小
pinfo.cols.protocol = myproto.name
local subtree = tree:add(myproto, buffer(), "我的协议数据")
-- 解析头部
local magic = buffer(0, 1):uint()
if magic ~= 0xAB then return 0 end -- 不是我们的协议
subtree:add(f_magic, buffer(0, 1))
subtree:add(f_version, buffer(1, 1))
local msg_type = buffer(2, 1):uint()
local type_item = subtree:add(f_type, buffer(2, 1))
type_item:append_text(" (" .. (type_names[msg_type] or "UNKNOWN") .. ")")
subtree:add(f_flags, buffer(3, 1))
local payload_len = buffer(4, 4):uint()
subtree:add(f_length, buffer(4, 4))
-- 如果存在负载则解析
if payload_len > 0 and length >= 8 + payload_len then
subtree:add(f_payload, buffer(8, payload_len))
end
-- 更新信息列
pinfo.cols.info = type_names[msg_type] or "未知"
return 8 + payload_len
end
-- 注册在TCP端口
local tcp_port = DissectorTable.get("tcp.port")
tcp_port:add(12345, myproto)
MCP服务器集成
这项技能可以利用以下MCP服务器增强能力:
| 服务器 | 描述 | 集成 |
|---|---|---|
| Wireshark MCP (sarthaksiddha) | 实时捕获和分析 | AI驱动的数据包检查 |
| WireMCP | 实时流量分析 | 威胁检测 |
| mcp-wireshark | Wireshark/tshark集成 | IDE集成 |
| Network Monitor MCP | 安全分析 | 实时监控 |
Wireshark MCP服务器
# 从npm安装
npm install -g @sarthaksiddha/wireshark-mcp
# 添加到Claude
claude mcp add wireshark -- npx @sarthaksiddha/wireshark-mcp
能力:
- 实时流量捕获
- PCAP文件分析
- 协议统计
- TCP流跟踪
- JSON导出
最佳实践
- 使用捕获过滤器 - 在内核级别减少捕获开销
- 旋转捕获文件 - 防止磁盘耗尽
- 设置快照长度 - 使用
-s仅捕获所需字节 - 使用环形缓冲区 - 使用
-W进行连续捕获 - 尽早过滤 - BPF过滤器比显示过滤器更高效
- 匿名化数据 - 在共享之前删除敏感信息
流程集成
这项技能与以下流程集成:
packet-capture-analysis.js- 数据包捕获和分析protocol-dissector.js- 协议解析network-traffic-analyzer.js- 流量模式分析
输出格式
执行操作时,提供结构化输出:
{
"operation": "analyze",
"file": "capture.pcap",
"status": "success",
"summary": {
"packets": 10000,
"bytes": 5242880,
"duration": 60.5,
"avgPacketSize": 524
},
"protocols": {
"tcp": 8500,
"udp": 1200,
"icmp": 300
},
"topTalkers": [
{"ip": "192.168.1.100", "packets": 3000, "bytes": 1500000}
],
"flows": 150,
"artifacts": ["analysis.json", "flows.csv"]
}
约束
- 需要适当的权限进行实时捕获
- 尊重隐私和法律要求
- 限制捕获时长和大小
- 过滤敏感协议(凭证、PII)
- 安全存储捕获