name: splunk-analysis description: 使用 SPL(搜索处理语言)进行 Splunk 日志分析。当通过 Splunk 日志、保存的搜索或警报调查问题时使用。 allowed-tools: Bash(python *)
Splunk 日志分析
认证
重要:凭证由代理层自动注入。请勿检查环境变量中的 SPLUNK_HOST、SPLUNK_TOKEN 或其他凭证 - 它们对您不可见。直接运行脚本即可;认证是透明处理的。
强制性:统计优先调查
切勿转储原始日志。 始终遵循此模式:
统计 → 样本 → 模式 → 关联
- 统计优先 - 在采样前了解数量、错误率和顶部模式
- 策略性采样 - 基于统计选择合适的策略
- 模式提取 - 聚类相似错误以找到根本原因
- 上下文关联 - 在异常时间戳周围进行调查
可用脚本
所有脚本都在 .claude/skills/observability-splunk/scripts/
主要调查脚本
get_statistics.py - 始终从这里开始
全面的统计与模式提取。
python .claude/skills/observability-splunk/scripts/get_statistics.py [--index INDEX] [--sourcetype SOURCETYPE] [--time-range MINUTES]
# 示例:
python .claude/skills/observability-splunk/scripts/get_statistics.py --time-range 60
python .claude/skills/observability-splunk/scripts/get_statistics.py --index main
python .claude/skills/observability-splunk/scripts/get_statistics.py --sourcetype access_combined
输出包括:
- 总数量、错误数量、错误率百分比
- 状态分布(信息、警告、错误)
- 按日志量排名的顶部源类型和主机
- 顶部错误模式(快速分类的关键)
- 可操作建议
sample_logs.py - 策略性采样
基于统计选择合适的采样策略。
python .claude/skills/observability-splunk/scripts/sample_logs.py --strategy STRATEGY [--index INDEX] [--sourcetype SOURCETYPE] [--limit N]
# 策略:
# errors_only - 仅错误日志(默认用于事件)
# warnings_up - 警告和错误日志
# around_time - 围绕特定时间戳的日志
# all - 所有日志级别
# 示例:
python .claude/skills/observability-splunk/scripts/sample_logs.py --strategy errors_only --index main
python .claude/skills/observability-splunk/scripts/sample_logs.py --strategy around_time --timestamp "2026-01-27T05:00:00" --window 5
python .claude/skills/observability-splunk/scripts/sample_logs.py --strategy all --sourcetype access_combined --limit 20
SPL(搜索处理语言)
基本搜索
# 简单关键词搜索
error
# 索引特定搜索(始终指定索引以提高性能)
index=main error
# 多个关键词(隐式 AND)
index=main error connection
# 精确短语
index=main "connection refused"
字段搜索
# 精确字段匹配
index=main host=web-01
# 通配符
index=main host=web-*
# 数值比较
index=main status>=400
# NOT 运算符
index=main NOT status=200
# OR 运算符
index=main (status=500 OR status=503)
时间范围
# 相对时间(在工具调用中)
earliest=-15m latest=now
# 绝对时间
earliest="01/15/2024:10:00:00" latest="01/15/2024:11:00:00"
# 自然时间修饰符
earliest=-1h@h # 1小时前,四舍五入到小时
earliest=-1d@d # 1天前,四舍五入到天
调查工作流
标准事件调查
┌─────────────────────────────────────────────────────────────┐
│ 1. 统计优先(强制性) │
│ python get_statistics.py --index <index> │
│ → 了解数量、错误率、顶部模式 │
└─────────────────────────────────────────────────────────────┘
│
▼
高错误率?
┌─────────────┴─────────────┐
│ │
是 (>5%) 否
│ │
▼ ▼
┌─────────────────────────────┐ ┌───────────────────────────────────────────┐
│ 2. 快速路径 │ │ 2. 针对性调查 │
│ 直接采样错误 │ │ 按特定标准筛选 │
│ python sample_logs.py │ │ python sample_logs.py --strategy all │
│ --strategy errors_only │ │ → 寻找异常 │
└─────────────────────────────┘ └───────────────────────────────────────────┘
快速命令参考
| 目标 | 命令 |
|---|---|
| 开始调查 | get_statistics.py --index X |
| 仅采样错误 | sample_logs.py --strategy errors_only --index X |
| 调查峰值 | sample_logs.py --strategy around_time --timestamp T |
| 所有日志 | sample_logs.py --strategy all --index X --limit 20 |
SPL 命令参考
过滤命令
| 命令 | 目的 | 示例 |
|---|---|---|
search |
过滤事件 | search error |
where |
使用表达式过滤 | where status > 400 |
dedup |
移除重复项 | dedup host |
head |
前 N 个结果 | head 10 |
tail |
后 N 个结果 | tail 10 |
转换命令
| 命令 | 目的 | 示例 |
|---|---|---|
stats |
聚合统计 | stats count by host |
timechart |
基于时间的聚合 | timechart span=5m count |
chart |
数据透视表 | chart count by status, host |
top |
顶部值 | top 10 host |
rare |
稀有值 | rare message |
table |
选择字段 | table _time, host, message |
字段操作
| 命令 | 目的 | 示例 |
|---|---|---|
eval |
计算字段 | eval duration_sec=duration/1000 |
rex |
正则表达式提取 | rex field=message "error: (?<error_type>\\w+)" |
rename |
重命名字段 | rename src_ip as source_ip |
fields |
包含/排除字段 | fields host, message |
常见查询模式
错误率分析
# 每5分钟错误计数
index=main | timechart span=5m count(eval(level="ERROR")) as errors, count as total
# 随时间错误百分比
index=main
| timechart span=5m count(eval(level="ERROR")) as errors, count as total
| eval error_rate=errors/total*100
按服务顶部错误
index=main level=ERROR
| stats count by service, message
| sort -count
| head 20
响应时间分析
index=main sourcetype=access_combined
| stats avg(response_time) as avg_rt,
p95(response_time) as p95_rt,
max(response_time) as max_rt
by uri_path
| sort -avg_rt
异常检测
# 突然峰值检测
index=main
| timechart span=5m count as events
| eventstats avg(events) as avg_events, stdev(events) as stdev_events
| eval anomaly=if(events > avg_events + 2*stdev_events, 1, 0)
| where anomaly=1
应避免的反模式
- ❌ 切勿跳过统计 -
get_statistics.py是强制性第一步 - ❌ 未指定索引 - 始终使用
index=X以提高性能 - ❌ 无限制时间范围 - 始终指定时间范围
- ❌ 获取所有日志 - 使用采样策略,而非无限制搜索
- ❌ 忽略错误率 - 高错误率意味着立即调查
- ❌ 在所有事件上使用复杂 rex - 先过滤,后提取