弱内存模型验证器Skill weak-memory-model-verifier

该技能用于在弱内存模型(如x86、ARM、C/C++11)下验证并发程序的正确性,检测数据竞争并合成内存屏障。适用于并发系统验证、低层系统编程和多处理器正确性检查。关键词:弱内存模型、并发验证、数据竞争、内存屏障、C++11、x86、ARM。

测试 0 次安装 0 次浏览 更新于 3/13/2026

name: weak-memory-model-verifier description: ‘在弱内存模型下验证程序。使用场景:(1) 并发验证,(2) 低层系统,(3) 多处理器正确性。’ version: 1.0.0 tags:

  • 验证
  • 内存模型
  • 并发
  • 宽松 difficulty: 高级 languages:
  • python
  • coq dependencies:
  • 模型检查器

弱内存模型验证器

在弱内存模型下验证程序(x86、ARM、POWER、C/C++11)。

使用时机

  • 并发系统验证
  • 低层系统编程
  • 多处理器正确性
  • 编译器优化

该技能的作用

  1. 模型内存顺序 - 从顺序一致性到宽松
  2. 验证正确性 - 在不同模型下
  3. 检测竞争 - 弱模型下的数据竞争
  4. 合成屏障 - 添加必要的内存屏障

内存模型

强度顺序:
  
  SC(顺序一致性)
    ↓
  TSO(总存储顺序)- x86
    ↓
  PSO(部分存储顺序)- SPARC
    ↓
  RMO(宽松内存顺序)- ARM/Power
    ↓
  C/C++11(释放/获取)

实现

from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Tuple
from enum import Enum
import itertools

class MemoryOrder(Enum):
    """C11/C++11内存顺序"""
    RELAXED = "relaxed"
    CONSUME = "consume"
    ACQUIRE = "acquire"
    RELEASE = "release"
    ACQ_REL = "acq_rel"
    SEQ_CST = "seq_cst"

@dataclass
class Event:
    """内存事件"""
    id: int
    thread: str
    kind: str  # read, write, fence, lock, unlock
    addr: str
    value: Optional[int] = None
    order: MemoryOrder = MemoryOrder.SEQ_CST
    po_before: List = field(default_factory=list)  # 程序顺序
    rf_before: List = field(default_factory=list)  # 读取来源
    mo_before: List = field(default_factory=list)  # 修改顺序

@dataclass
class Execution:
    """程序执行"""
    events: List[Event]
    threads: Set[str]
    synchronization: List[Event] = field(default_factory=list)

class WeakMemoryVerifier:
    """在弱内存模型下验证程序"""
    
    def __init__(self, model: str = "c11"):
        self.model = model
        self.relations = {}
    
    def verify(self, execution: Execution) -> bool:
        """
        验证执行是否在内存模型下有效
        
        如果执行一致,则返回True
        """
        
        # 计算所需关系
        self.compute_relations(execution)
        
        # 检查循环
        return not self.has_cycle()
    
    def compute_relations(self, execution: Execution):
        """计算内存模型关系"""
        
        # 程序顺序 (po)
        self.relations['po'] = self.compute_program_order(execution)
        
        # 同步关系 (sw)
        self.relations['sw'] = self.comput_synchronizes_with(execution)
        
        # 发生前关系 (hb)
        self.relations['hb'] = self.compute_happens_before(execution)
        
        # 读取来源 (rf)
        self.relations['rf'] = self.compute_reads_from(execution)
        
        # 修改顺序 (mo)
        self.relations['mo'] = self.compute_modification_order(execution)
    
    def compute_program_order(self, execution: Execution) -> Dict:
        """计算程序顺序关系"""
        
        po = {}
        
        for thread in execution.threads:
            events = [e for e in execution.events if e.thread == thread]
            
            for i, e1 in enumerate(events):
                for e2 in events[i+1:]:
                    po[(e1.id, e2.id)] = True
        
        return po
    
    def comput_synchronizes_with(self, execution: Execution) -> Dict:
        """
        计算同步关系
        
        释放 (A) → 获取 (B) 如果:
        1. A写入x,B从x读取
        2. B读取由A写入的值
        3. 内存顺序确保可见性
        """
        
        sw = {}
        
        for rel in execution.synchronization:
            # 锁/解锁对
            if rel.kind == 'unlock':
                for other in execution.events:
                    if (other.kind == 'lock' and 
                        other.addr == rel.addr and
                        self.happens_before(rel, other)):
                        sw[(rel.id, other.id)] = True
            
            # 释放/获取
            if rel.kind == 'write' and rel.order == MemoryOrder.RELEASE:
                for read in execution.events:
                    if (read.kind == 'read' and
                        read.addr == rel.addr and
                        read.value == rel.value and
                        read.order in (MemoryOrder.ACQUIRE, 
                                      MemoryOrder.SEQ_CST)):
                        # 检查读取来源
                        if (rel.id, read.id) in self.relations.get('rf', {}):
                            sw[(rel.id, read.id)] = True
        
        return sw
    
    def compute_happens_before(self, execution: Execution) -> Dict:
        """计算发生前关系 (po ∪ sw)"""
        
        hb = {}
        
        # po和sw的并集
        for pair in self.relations.get('po', {}):
            hb[pair] = True
        for pair in self.relations.get('sw', {}):
            hb[pair] = True
        
        # 传递闭包
        changed = True
        while changed:
            changed = False
            for (a, b) in list(hb.keys()):
                for (c, d) in list(hb.keys()):
                    if b == c and (a, d) not in hb:
                        hb[(a, d)] = True
                        changed = True
        
        return hb
    
    def compute_reads_from(self, execution: Execution) -> Dict:
        """计算读取来源关系"""
        
        rf = {}
        
        for read in execution.events:
            if read.kind == 'read':
                # 找到此读取的写入
                for write in execution.events:
                    if (write.kind == 'write' and
                        write.addr == read.addr and
                        write.value == read.value):
                        rf[(write.id, read.id)] = True
        
        return rf
    
    def compute_modification_order(self, execution: Execution) -> Dict:
        """计算每个地址的修改顺序"""
        
        mo = {}
        
        # 对于每个地址,写入的总顺序
        writes_by_addr: Dict[str, List[Event]] = {}
        
        for e in execution.events:
            if e.kind == 'write':
                if e.addr not in writes_by_addr:
                    writes_by_addr[e.addr] = []
                writes_by_addr[e.addr].append(e)
        
        for addr, writes in writes_by_addr.items():
            for i, w1 in enumerate(writes):
                for w2 in writes[i+1:]:
                    mo[(w1.id, w2.id)] = True
        
        return mo
    
    def has_cycle(self) -> bool:
        """检查hb ∪ rf ∪ mo中的循环"""
        
        # 构建图
        edges = set()
        
        # 添加hb边
        edges.update(self.relations.get('hb', {}).keys())
        
        # 添加rf边
        edges.update(self.relations.get('rf', {}).keys())
        
        # 添加mo边  
        edges.update(self.relations.get('mo', {}).keys())
        
        # 使用DFS检查循环
        graph = {}
        for (src, dst) in edges:
            if src not in graph:
                graph[src] = []
            graph[src].append(dst)
        
        visited = set()
        rec_stack = set()
        
        def dfs(node):
            visited.add(node)
            rec_stack.add(node)
            
            for neighbor in graph.get(node, []):
                if neighbor not in visited:
                    if dfs(neighbor):
                        return True
                elif neighbor in rec_stack:
                    return True
            
            rec_stack.remove(node)
            return False
        
        for node in graph:
            if node not in visited:
                if dfs(node):
                    return True
        
        return False

数据竞争检测

class DataRaceDetector:
    """在弱内存模型下检测数据竞争"""
    
    def __init__(self, model: str = "c11"):
        self.model = model
        self.verifier = WeakMemoryVerifier(model)
    
    def detect_races(self, execution: Execution) -> List[Tuple[Event, Event]]:
        """
        检测数据竞争
        
        数据竞争发生在:
        1. 两个事件访问相同内存位置
        2. 至少一个是写入
        3. 它们没有通过发生前关系排序
        """
        
        races = []
        
        # 按地址分组
        by_addr: Dict[str, List[Event]] = {}
        for e in execution.events:
            if e.kind in ('read', 'write'):
                if e.addr not in by_addr:
                    by_addr[e.addr] = []
                by_addr[e.addr].append(e)
        
        # 检查每个地址
        for addr, events in by_addr.items():
            for e1, e2 in itertools.combinations(events, 2):
                # 必须有一个写入
                if e1.kind == 'read' and e2.kind == 'read':
                    continue
                
                # 检查是否有序
                self.verifier.compute_relations(execution)
                hb = self.verifier.relations.get('hb', {})
                
                if not hb.get((e1.id, e2.id)) and not hb.get((e2.id, e1.id)):
                    races.append((e1, e2))
        
        return races

屏障合成

class FenceSynthesizer:
    """合成内存屏障以确保正确性"""
    
    def synthesize(self, program: 'Program') -> List[str]:
        """
        向程序添加必要的屏障
        
        返回屏障位置的列表
        """
        
        executions = self.generate_executions(program)
        fence_points = set()
        
        for execution in executions:
            # 查找违规
            verifier = WeakMemoryVerifier(self.model)
            
            if not verifier.verify(execution):
                # 查找缺失的屏障
                violations = self.find_violations(execution, verifier)
                
                for v in violations:
                    fence_points.add(v.location)
        
        return list(fence_points)
    
    def find_violations(self, execution: Execution, 
                       verifier: WeakMemoryVerifier) -> List:
        """查找需要屏障的地方"""
        
        violations = []
        
        # 检查关键对
        for read in execution.events:
            if read.kind != 'read':
                continue
            
            for write in execution.events:
                if write.kind != 'write':
                    continue
                
                if read.addr != write.addr:
                    continue
                
                # 检查是否有竞争
                hb = verifier.relations.get('hb', {})
                
                if not hb.get((write.id, read.id)):
                    violations.append(Violation(
                        location=read,
                        reason="写入和读取之间缺少屏障"
                    ))
        
        return violations

关键概念

概念 描述
顺序一致性 所有操作的总顺序
TSO 存储缓冲区,写入在读取后重排
释放/获取 部分顺序
发生前 部分顺序
同步关系 跨线程顺序

x86 vs ARM

特性 x86 (TSO) ARM (RMO)
存储缓冲区
加载重排
存储重排
需要屏障 很少 常见

提示

  • 使用验证工具(CBMC、SPIN)
  • 模型检查竞争条件
  • 添加最少屏障
  • 在弱模型上测试

相关技能

  • race-detection-tool - 标准竞争检测
  • software-transactional-memory - STM
  • weak-memory-model-verifier - 验证并发

研究工具与工件

内存模型工具:

工具 学习内容
CBMC C/C++的有界模型检查
CDSChecker C/C++11内存模型检查器
herd 内存模型模拟器
Alglave’s tools 内存模型测试

研究前沿

1. 形式验证

  • 目标: 证明内存模型正确性
  • 关键工作: Alglave等人关于内存模型语义的研究

实现陷阱

陷阱 实际后果 解决方案
竞争 未定义行为 添加屏障