name: taint-analysis
description: ‘实现用于安全的污点分析。使用时机:(1) 检测安全漏洞,(2) 输入验证,(3) 信息流。’
version: 1.0.0
tags:
- static-analysis
- taint-analysis
- security
- information-flow
difficulty: intermediate
languages:
- python
dependencies:
- dataflow-analysis-framework
污点分析
实现用于跟踪不可信数据流的污点分析。
何时使用
- 安全漏洞检测
- SQL注入检测
- XSS检测
- 输入验证
这个技能做什么
- 跟踪来源 - 不可信输入
- 传播污点 - 通过计算
- 检测汇点 - 危险操作
- 处理净化器 - 可信转换
实现
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Callable
from enum import Enum
class Taint(Enum):
"""污点状态"""
TAINTED = "tainted"
UNTAINTED = "untainted"
UNKNOWN = "unknown"
@dataclass
class TaintLattice:
"""污点格:UNTAINTED < UNKNOWN < TAINTED"""
@staticmethod
def meet(a: Taint, b: Taint) -> Taint:
"""交(更精确)"""
if a == b:
return a
if a == Taint.UNTAINTED or b == Taint.UNTAINTED:
return Taint.UNTAINTED
return Taint.UNKNOWN
@staticmethod
def join(a: Taint, b: Taint) -> Taint:
"""并(较不精确)"""
if a == b:
return a
if a == Taint.TAINTED or b == Taint.TAINTED:
return Taint.TAINTED
return Taint.UNKNOWN
@staticmethod
def less_or_equal(a: Taint, b: Taint) -> bool:
"""偏序"""
order = [Taint.UNTAINTED, Taint.UNKNOWN, Taint.TAINTED]
return order.index(a) <= order.index(b)
# 污点环境
class TaintEnvironment(Dict[str, Taint]):
"""映射变量到污点状态"""
def meet(self, other: 'TaintEnvironment') -> 'TaintEnvironment':
"""交两个环境"""
result = TaintEnvironment()
all_vars = set(self.keys()) | set(other.keys())
for var in all_vars:
v1 = self.get(var, Taint.UNTAINTED)
v2 = other.get(var, Taint.UNTAINTED)
result[var] = TaintLattice.meet(v1, v2)
return result
def join(self, other: 'TaintEnvironment') -> 'TaintEnvironment':
"""并两个环境"""
result = TaintEnvironment()
all_vars = set(self.keys()) | set(other.keys())
for var in all_vars:
v1 = self.get(var, Taint.UNTAINTED)
v2 = other.get(var, Taint.UNTAINTED)
result[var] = TaintLattice.join(v1, v2)
return result
# 来源和汇点
@dataclass
class Source:
"""污点来源"""
name: str
function: str
return_taint: bool = True
@dataclass
class Sink:
"""污点汇点(危险)"""
name: str
function: str
taint_args: List[int] # 应检查污点的参数索引
@dataclass
class Sanitizer:
"""污点净化器"""
name: str
function: str
# 污点分析
class TaintAnalysis:
"""污点分析"""
def __init__(self):
self.sources: List[Source] = []
self.sinks: List[Sink] = []
self.sanitizers: List[Sanitizer] = []
self.cfg: Dict[str, 'Stmt'] = {}
# 结果
self.vulnerabilities: List['Vulnerability'] = []
def add_source(self, name: str, function: str):
"""添加污点来源"""
self.sources.append(Source(name, function))
def add_sink(self, name: str, function: str, taint_args: List[int]):
"""添加污点汇点"""
self.sinks.append(Sink(name, function, taint_args))
def add_sanitizer(self, name: str, function: str):
"""添加污点净化器"""
self.sanitizers.append(Sanitizer(name, function))
def analyze(self, program: 'Program') -> List['Vulnerability']:
"""
执行污点分析
返回:漏洞列表
"""
# 前向分析
self.forward_analyze(program)
# 检查汇点污点
self.check_sinks(program)
return self.vulnerabilities
def forward_analyze(self, program: 'Program'):
"""前向污点传播"""
# 工作列表
worklist = list(program.cfg.keys())
in_env: Dict[str, TaintEnvironment] = {}
out_env: Dict[str, TaintEnvironment] = {}
# 初始化
for node in program.cfg:
in_env[node] = TaintEnvironment()
out_env[node] = TaintEnvironment()
# 初始:来源被污染
entry = program.entry
for source in self.sources:
in_env[entry][source.function] = Taint.TAINTED
while worklist:
node = worklist.pop(0)
# 计算IN:并所有前驱
preds = program.cfg.predecessors(node)
if preds:
in_val = out_env[preds[0]].copy()
for pred in preds[1:]:
in_val = in_val.join(out_env[pred])
else:
in_val = TaintEnvironment()
if in_env[node].less_or_equal(in_val):
# 已改变 - 更新并添加后继
in_env[node] = in_val
# 转换
out_val = self.transfer(node, program.cfg[node], in_env)
if out_env[node] != out_val:
out_env[node] = out_val
for succ in program.cfg.successors(node):
if succ not in worklist:
worklist.append(succ)
def transfer(self, node: str, stmt: 'Stmt',
env: TaintEnvironment) -> TaintEnvironment:
"""应用污点转换"""
result = TaintEnvironment(env)
match stmt:
case Assign(x, expr):
# 传播污点
result[x] = self.eval_taint(expr, env)
case Call(func, args):
# 检查是否来源
for source in self.sources:
if func == source.function:
result[func] = Taint.TAINTED
# 检查是否净化器
for sanitizer in self.sanitizers:
if func == sanitizer.function:
# 从参数移除污点
for arg in args:
result[arg] = Taint.UNTAINTED
case If(cond, then, else_):
# 两个分支
pass # 简化
return result
def eval_taint(self, expr: 'Expr', env: TaintEnvironment) -> Taint:
"""评估表达式污点"""
match expr:
case Const(_):
return Taint.UNTAINTED
case Var(x):
return env.get(x, Taint.UNTAINTED)
case BinOp(_, e1, e2):
t1 = self.eval_taint(e1, env)
t2 = self.eval_taint(e2, env)
# 如果任一被污染,结果被污染
return TaintLattice.join(t1, t2)
case Call(func, args):
# 检查函数
for source in self.sources:
if func == source.function:
return Taint.TAINTED
for sanitizer in self.sanitizers:
if func == sanitizer.function:
return Taint.UNTAINTED
return Taint.UNKNOWN
return Taint.UNKNOWN
def check_sinks(self, program: 'Program'):
"""在汇点检查污点违规"""
for node, stmt in program.cfg.items():
match stmt:
case Call(func, args):
for sink in self.sinks:
if func == sink.function:
# 检查是否有被污染参数到达汇点
for arg_idx in sink.taint_args:
if arg_idx < len(args):
arg = args[arg_idx]
# 从环境获取污点
taint = self.get_variable_taint(arg, program, node)
if taint == Taint.TAINTED:
self.vulnerabilities.append(Vulnerability(
severity="HIGH",
type=sink.name,
location=node,
description=f"被污染数据到达{sink.name}"
))
def get_variable_taint(self, var: str, program: 'Program',
node: str) -> Taint:
"""在节点获取变量污点"""
# 简化:回溯到定义
# 真实实现:使用数据流分析
return Taint.UNKNOWN
@dataclass
class Vulnerability:
"""安全漏洞"""
severity: str
type: str
location: str
description: str
污点域
| 域 |
精确度 |
用例 |
| 被污染/未被污染 |
低 |
简单跟踪 |
| 被污染/已净化/未知 |
中 |
SQL注入 |
| 多来源 |
高 |
不同敏感度 |
关键概念
| 概念 |
描述 |
| 来源 |
污点起源之处 |
| 汇点 |
污点重要之处 |
| 净化器 |
移除污点 |
| 传播 |
污点如何传播 |
应用
- SQL注入
- XSS(跨站脚本)
- 命令注入
- 路径遍历
提示
- 仔细定义来源
- 正确处理净化器
- 考虑隐式流
- 使用类型限定符
相关技能
dataflow-analysis-framework - 数据流
information-flow-analyzer - 信息流
alias-and-points-to-analysis - 指针分析
权威参考
| 参考 |
重要性 |
| Denning, “A Lattice Model of Secure Information Flow” (CACM 1976) |
基础信息流论文 |
| Schwartz et al., “All You Ever Wanted to Know About Dynamic Taint Analysis” (2007) |
全面调查 |
| Newsome & Song, “Dynamic Taint Analysis for Automatic Detection” (NDSS 2005) |
动态污点分析用于漏洞检测 |
| Haldar et al., “Dynamic Taint Propagation for Java” (ACSAC 2005) |
Java污点分析 |
权衡与限制
污点分析方法权衡
| 方法 |
优点 |
缺点 |
| 静态 |
无运行时开销 |
可能过度近似 |
| 动态 |
精确 |
遗漏路径 |
| 混合 |
最佳两者 |
复杂 |
何时不使用污点分析
- 对于简单输入:手动审查可能更快
- 对于加密数据:污点不跟踪加密
- 对于时序通道:污点无法检测
复杂性考虑
- 数据流分析:程序大小的O(n)
- 过程间:无摘要时指数级
- 上下文敏感:增加复杂性
限制
- 隐式流:污点可通过控制流泄露
- 欠近似:动态污点遗漏错误
- 过近似:静态污点产生误报
- 净化器处理:复杂难以正确实现
- 别名:通过指针难以跟踪
- 完整性:实践中难以实现
- 完整性与完备性:需要权衡
研究工具与工件
污点分析工具:
| 工具 |
应用 |
学习内容 |
| Infer |
Facebook |
静态污点 |
| CodeQL |
GitHub |
基于查询 |
| FlowDroid |
Android |
静态污点 |
研究前沿
1. 动态污点分析
实现陷阱
| 陷阱 |
实际后果 |
解决方案 |
| 隐式流 |
遗漏泄露 |
控制流跟踪 |