污点分析Skill taint-analysis

污点分析是一种安全分析技术,用于跟踪不可信数据在程序中的传播,以检测安全漏洞如SQL注入和跨站脚本(XSS),并支持输入验证和信息流控制。关键词:污点分析,安全漏洞检测,输入验证,信息流,静态分析,漏洞挖掘,安全审计。

漏洞挖掘 0 次安装 0 次浏览 更新于 3/13/2026

name: taint-analysis description: ‘实现用于安全的污点分析。使用时机:(1) 检测安全漏洞,(2) 输入验证,(3) 信息流。’ version: 1.0.0 tags:

  • static-analysis
  • taint-analysis
  • security
  • information-flow difficulty: intermediate languages:
  • python dependencies:
  • dataflow-analysis-framework

污点分析

实现用于跟踪不可信数据流的污点分析。

何时使用

  • 安全漏洞检测
  • SQL注入检测
  • XSS检测
  • 输入验证

这个技能做什么

  1. 跟踪来源 - 不可信输入
  2. 传播污点 - 通过计算
  3. 检测汇点 - 危险操作
  4. 处理净化器 - 可信转换

实现

from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Callable
from enum import Enum

class Taint(Enum):
    """污点状态"""
    TAINTED = "tainted"
    UNTAINTED = "untainted"
    UNKNOWN = "unknown"

@dataclass
class TaintLattice:
    """污点格:UNTAINTED < UNKNOWN < TAINTED"""
    
    @staticmethod
    def meet(a: Taint, b: Taint) -> Taint:
        """交(更精确)"""
        if a == b:
            return a
        
        if a == Taint.UNTAINTED or b == Taint.UNTAINTED:
            return Taint.UNTAINTED
        
        return Taint.UNKNOWN
    
    @staticmethod
    def join(a: Taint, b: Taint) -> Taint:
        """并(较不精确)"""
        if a == b:
            return a
        
        if a == Taint.TAINTED or b == Taint.TAINTED:
            return Taint.TAINTED
        
        return Taint.UNKNOWN
    
    @staticmethod
    def less_or_equal(a: Taint, b: Taint) -> bool:
        """偏序"""
        order = [Taint.UNTAINTED, Taint.UNKNOWN, Taint.TAINTED]
        return order.index(a) <= order.index(b)

# 污点环境
class TaintEnvironment(Dict[str, Taint]):
    """映射变量到污点状态"""
    
    def meet(self, other: 'TaintEnvironment') -> 'TaintEnvironment':
        """交两个环境"""
        
        result = TaintEnvironment()
        all_vars = set(self.keys()) | set(other.keys())
        
        for var in all_vars:
            v1 = self.get(var, Taint.UNTAINTED)
            v2 = other.get(var, Taint.UNTAINTED)
            result[var] = TaintLattice.meet(v1, v2)
        
        return result
    
    def join(self, other: 'TaintEnvironment') -> 'TaintEnvironment':
        """并两个环境"""
        
        result = TaintEnvironment()
        all_vars = set(self.keys()) | set(other.keys())
        
        for var in all_vars:
            v1 = self.get(var, Taint.UNTAINTED)
            v2 = other.get(var, Taint.UNTAINTED)
            result[var] = TaintLattice.join(v1, v2)
        
        return result

# 来源和汇点
@dataclass
class Source:
    """污点来源"""
    name: str
    function: str
    return_taint: bool = True

@dataclass
class Sink:
    """污点汇点(危险)"""
    name: str
    function: str
    taint_args: List[int]  # 应检查污点的参数索引

@dataclass
class Sanitizer:
    """污点净化器"""
    name: str
    function: str

# 污点分析
class TaintAnalysis:
    """污点分析"""
    
    def __init__(self):
        self.sources: List[Source] = []
        self.sinks: List[Sink] = []
        self.sanitizers: List[Sanitizer] = []
        self.cfg: Dict[str, 'Stmt'] = {}
        
        # 结果
        self.vulnerabilities: List['Vulnerability'] = []
    
    def add_source(self, name: str, function: str):
        """添加污点来源"""
        
        self.sources.append(Source(name, function))
    
    def add_sink(self, name: str, function: str, taint_args: List[int]):
        """添加污点汇点"""
        
        self.sinks.append(Sink(name, function, taint_args))
    
    def add_sanitizer(self, name: str, function: str):
        """添加污点净化器"""
        
        self.sanitizers.append(Sanitizer(name, function))
    
    def analyze(self, program: 'Program') -> List['Vulnerability']:
        """
        执行污点分析
        
        返回:漏洞列表
        """
        
        # 前向分析
        self.forward_analyze(program)
        
        # 检查汇点污点
        self.check_sinks(program)
        
        return self.vulnerabilities
    
    def forward_analyze(self, program: 'Program'):
        """前向污点传播"""
        
        # 工作列表
        worklist = list(program.cfg.keys())
        in_env: Dict[str, TaintEnvironment] = {}
        out_env: Dict[str, TaintEnvironment] = {}
        
        # 初始化
        for node in program.cfg:
            in_env[node] = TaintEnvironment()
            out_env[node] = TaintEnvironment()
        
        # 初始:来源被污染
        entry = program.entry
        for source in self.sources:
            in_env[entry][source.function] = Taint.TAINTED
        
        while worklist:
            node = worklist.pop(0)
            
            # 计算IN:并所有前驱
            preds = program.cfg.predecessors(node)
            if preds:
                in_val = out_env[preds[0]].copy()
                for pred in preds[1:]:
                    in_val = in_val.join(out_env[pred])
            else:
                in_val = TaintEnvironment()
            
            if in_env[node].less_or_equal(in_val):
                # 已改变 - 更新并添加后继
                in_env[node] = in_val
                
                # 转换
                out_val = self.transfer(node, program.cfg[node], in_env)
                
                if out_env[node] != out_val:
                    out_env[node] = out_val
                    
                    for succ in program.cfg.successors(node):
                        if succ not in worklist:
                            worklist.append(succ)
    
    def transfer(self, node: str, stmt: 'Stmt', 
                env: TaintEnvironment) -> TaintEnvironment:
        """应用污点转换"""
        
        result = TaintEnvironment(env)
        
        match stmt:
            case Assign(x, expr):
                # 传播污点
                result[x] = self.eval_taint(expr, env)
            
            case Call(func, args):
                # 检查是否来源
                for source in self.sources:
                    if func == source.function:
                        result[func] = Taint.TAINTED
                
                # 检查是否净化器
                for sanitizer in self.sanitizers:
                    if func == sanitizer.function:
                        # 从参数移除污点
                        for arg in args:
                            result[arg] = Taint.UNTAINTED
            
            case If(cond, then, else_):
                # 两个分支
                pass  # 简化
        
        return result
    
    def eval_taint(self, expr: 'Expr', env: TaintEnvironment) -> Taint:
        """评估表达式污点"""
        
        match expr:
            case Const(_):
                return Taint.UNTAINTED
            
            case Var(x):
                return env.get(x, Taint.UNTAINTED)
            
            case BinOp(_, e1, e2):
                t1 = self.eval_taint(e1, env)
                t2 = self.eval_taint(e2, env)
                # 如果任一被污染,结果被污染
                return TaintLattice.join(t1, t2)
            
            case Call(func, args):
                # 检查函数
                for source in self.sources:
                    if func == source.function:
                        return Taint.TAINTED
                
                for sanitizer in self.sanitizers:
                    if func == sanitizer.function:
                        return Taint.UNTAINTED
                
                return Taint.UNKNOWN
        
        return Taint.UNKNOWN
    
    def check_sinks(self, program: 'Program'):
        """在汇点检查污点违规"""
        
        for node, stmt in program.cfg.items():
            match stmt:
                case Call(func, args):
                    for sink in self.sinks:
                        if func == sink.function:
                            # 检查是否有被污染参数到达汇点
                            for arg_idx in sink.taint_args:
                                if arg_idx < len(args):
                                    arg = args[arg_idx]
                                    
                                    # 从环境获取污点
                                    taint = self.get_variable_taint(arg, program, node)
                                    
                                    if taint == Taint.TAINTED:
                                        self.vulnerabilities.append(Vulnerability(
                                            severity="HIGH",
                                            type=sink.name,
                                            location=node,
                                            description=f"被污染数据到达{sink.name}"
                                        ))
    
    def get_variable_taint(self, var: str, program: 'Program', 
                         node: str) -> Taint:
        """在节点获取变量污点"""
        
        # 简化:回溯到定义
        # 真实实现:使用数据流分析
        
        return Taint.UNKNOWN

@dataclass
class Vulnerability:
    """安全漏洞"""
    severity: str
    type: str
    location: str
    description: str

污点域

精确度 用例
被污染/未被污染 简单跟踪
被污染/已净化/未知 SQL注入
多来源 不同敏感度

关键概念

概念 描述
来源 污点起源之处
汇点 污点重要之处
净化器 移除污点
传播 污点如何传播

应用

  • SQL注入
  • XSS(跨站脚本)
  • 命令注入
  • 路径遍历

提示

  • 仔细定义来源
  • 正确处理净化器
  • 考虑隐式流
  • 使用类型限定符

相关技能

  • dataflow-analysis-framework - 数据流
  • information-flow-analyzer - 信息流
  • alias-and-points-to-analysis - 指针分析

权威参考

参考 重要性
Denning, “A Lattice Model of Secure Information Flow” (CACM 1976) 基础信息流论文
Schwartz et al., “All You Ever Wanted to Know About Dynamic Taint Analysis” (2007) 全面调查
Newsome & Song, “Dynamic Taint Analysis for Automatic Detection” (NDSS 2005) 动态污点分析用于漏洞检测
Haldar et al., “Dynamic Taint Propagation for Java” (ACSAC 2005) Java污点分析

权衡与限制

污点分析方法权衡

方法 优点 缺点
静态 无运行时开销 可能过度近似
动态 精确 遗漏路径
混合 最佳两者 复杂

何时不使用污点分析

  • 对于简单输入:手动审查可能更快
  • 对于加密数据:污点不跟踪加密
  • 对于时序通道:污点无法检测

复杂性考虑

  • 数据流分析:程序大小的O(n)
  • 过程间:无摘要时指数级
  • 上下文敏感:增加复杂性

限制

  • 隐式流:污点可通过控制流泄露
  • 欠近似:动态污点遗漏错误
  • 过近似:静态污点产生误报
  • 净化器处理:复杂难以正确实现
  • 别名:通过指针难以跟踪
  • 完整性:实践中难以实现
  • 完整性与完备性:需要权衡

研究工具与工件

污点分析工具:

工具 应用 学习内容
Infer Facebook 静态污点
CodeQL GitHub 基于查询
FlowDroid Android 静态污点

研究前沿

1. 动态污点分析

  • 方法:运行时跟踪
  • 工具:TaintDroid

实现陷阱

陷阱 实际后果 解决方案
隐式流 遗漏泄露 控制流跟踪