name: weak-memory-model-verifier
description: ‘在弱内存模型下验证程序。使用场景:(1) 并发验证,(2) 低层系统,(3) 多处理器正确性。’
version: 1.0.0
tags:
- 验证
- 内存模型
- 并发
- 宽松
difficulty: 高级
languages:
- python
- coq
dependencies:
- 模型检查器
弱内存模型验证器
在弱内存模型下验证程序(x86、ARM、POWER、C/C++11)。
使用时机
- 并发系统验证
- 低层系统编程
- 多处理器正确性
- 编译器优化
该技能的作用
- 模型内存顺序 - 从顺序一致性到宽松
- 验证正确性 - 在不同模型下
- 检测竞争 - 弱模型下的数据竞争
- 合成屏障 - 添加必要的内存屏障
内存模型
强度顺序:
SC(顺序一致性)
↓
TSO(总存储顺序)- x86
↓
PSO(部分存储顺序)- SPARC
↓
RMO(宽松内存顺序)- ARM/Power
↓
C/C++11(释放/获取)
实现
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Tuple
from enum import Enum
import itertools
class MemoryOrder(Enum):
"""C11/C++11内存顺序"""
RELAXED = "relaxed"
CONSUME = "consume"
ACQUIRE = "acquire"
RELEASE = "release"
ACQ_REL = "acq_rel"
SEQ_CST = "seq_cst"
@dataclass
class Event:
"""内存事件"""
id: int
thread: str
kind: str # read, write, fence, lock, unlock
addr: str
value: Optional[int] = None
order: MemoryOrder = MemoryOrder.SEQ_CST
po_before: List = field(default_factory=list) # 程序顺序
rf_before: List = field(default_factory=list) # 读取来源
mo_before: List = field(default_factory=list) # 修改顺序
@dataclass
class Execution:
"""程序执行"""
events: List[Event]
threads: Set[str]
synchronization: List[Event] = field(default_factory=list)
class WeakMemoryVerifier:
"""在弱内存模型下验证程序"""
def __init__(self, model: str = "c11"):
self.model = model
self.relations = {}
def verify(self, execution: Execution) -> bool:
"""
验证执行是否在内存模型下有效
如果执行一致,则返回True
"""
# 计算所需关系
self.compute_relations(execution)
# 检查循环
return not self.has_cycle()
def compute_relations(self, execution: Execution):
"""计算内存模型关系"""
# 程序顺序 (po)
self.relations['po'] = self.compute_program_order(execution)
# 同步关系 (sw)
self.relations['sw'] = self.comput_synchronizes_with(execution)
# 发生前关系 (hb)
self.relations['hb'] = self.compute_happens_before(execution)
# 读取来源 (rf)
self.relations['rf'] = self.compute_reads_from(execution)
# 修改顺序 (mo)
self.relations['mo'] = self.compute_modification_order(execution)
def compute_program_order(self, execution: Execution) -> Dict:
"""计算程序顺序关系"""
po = {}
for thread in execution.threads:
events = [e for e in execution.events if e.thread == thread]
for i, e1 in enumerate(events):
for e2 in events[i+1:]:
po[(e1.id, e2.id)] = True
return po
def comput_synchronizes_with(self, execution: Execution) -> Dict:
"""
计算同步关系
释放 (A) → 获取 (B) 如果:
1. A写入x,B从x读取
2. B读取由A写入的值
3. 内存顺序确保可见性
"""
sw = {}
for rel in execution.synchronization:
# 锁/解锁对
if rel.kind == 'unlock':
for other in execution.events:
if (other.kind == 'lock' and
other.addr == rel.addr and
self.happens_before(rel, other)):
sw[(rel.id, other.id)] = True
# 释放/获取
if rel.kind == 'write' and rel.order == MemoryOrder.RELEASE:
for read in execution.events:
if (read.kind == 'read' and
read.addr == rel.addr and
read.value == rel.value and
read.order in (MemoryOrder.ACQUIRE,
MemoryOrder.SEQ_CST)):
# 检查读取来源
if (rel.id, read.id) in self.relations.get('rf', {}):
sw[(rel.id, read.id)] = True
return sw
def compute_happens_before(self, execution: Execution) -> Dict:
"""计算发生前关系 (po ∪ sw)"""
hb = {}
# po和sw的并集
for pair in self.relations.get('po', {}):
hb[pair] = True
for pair in self.relations.get('sw', {}):
hb[pair] = True
# 传递闭包
changed = True
while changed:
changed = False
for (a, b) in list(hb.keys()):
for (c, d) in list(hb.keys()):
if b == c and (a, d) not in hb:
hb[(a, d)] = True
changed = True
return hb
def compute_reads_from(self, execution: Execution) -> Dict:
"""计算读取来源关系"""
rf = {}
for read in execution.events:
if read.kind == 'read':
# 找到此读取的写入
for write in execution.events:
if (write.kind == 'write' and
write.addr == read.addr and
write.value == read.value):
rf[(write.id, read.id)] = True
return rf
def compute_modification_order(self, execution: Execution) -> Dict:
"""计算每个地址的修改顺序"""
mo = {}
# 对于每个地址,写入的总顺序
writes_by_addr: Dict[str, List[Event]] = {}
for e in execution.events:
if e.kind == 'write':
if e.addr not in writes_by_addr:
writes_by_addr[e.addr] = []
writes_by_addr[e.addr].append(e)
for addr, writes in writes_by_addr.items():
for i, w1 in enumerate(writes):
for w2 in writes[i+1:]:
mo[(w1.id, w2.id)] = True
return mo
def has_cycle(self) -> bool:
"""检查hb ∪ rf ∪ mo中的循环"""
# 构建图
edges = set()
# 添加hb边
edges.update(self.relations.get('hb', {}).keys())
# 添加rf边
edges.update(self.relations.get('rf', {}).keys())
# 添加mo边
edges.update(self.relations.get('mo', {}).keys())
# 使用DFS检查循环
graph = {}
for (src, dst) in edges:
if src not in graph:
graph[src] = []
graph[src].append(dst)
visited = set()
rec_stack = set()
def dfs(node):
visited.add(node)
rec_stack.add(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
if dfs(neighbor):
return True
elif neighbor in rec_stack:
return True
rec_stack.remove(node)
return False
for node in graph:
if node not in visited:
if dfs(node):
return True
return False
数据竞争检测
class DataRaceDetector:
"""在弱内存模型下检测数据竞争"""
def __init__(self, model: str = "c11"):
self.model = model
self.verifier = WeakMemoryVerifier(model)
def detect_races(self, execution: Execution) -> List[Tuple[Event, Event]]:
"""
检测数据竞争
数据竞争发生在:
1. 两个事件访问相同内存位置
2. 至少一个是写入
3. 它们没有通过发生前关系排序
"""
races = []
# 按地址分组
by_addr: Dict[str, List[Event]] = {}
for e in execution.events:
if e.kind in ('read', 'write'):
if e.addr not in by_addr:
by_addr[e.addr] = []
by_addr[e.addr].append(e)
# 检查每个地址
for addr, events in by_addr.items():
for e1, e2 in itertools.combinations(events, 2):
# 必须有一个写入
if e1.kind == 'read' and e2.kind == 'read':
continue
# 检查是否有序
self.verifier.compute_relations(execution)
hb = self.verifier.relations.get('hb', {})
if not hb.get((e1.id, e2.id)) and not hb.get((e2.id, e1.id)):
races.append((e1, e2))
return races
屏障合成
class FenceSynthesizer:
"""合成内存屏障以确保正确性"""
def synthesize(self, program: 'Program') -> List[str]:
"""
向程序添加必要的屏障
返回屏障位置的列表
"""
executions = self.generate_executions(program)
fence_points = set()
for execution in executions:
# 查找违规
verifier = WeakMemoryVerifier(self.model)
if not verifier.verify(execution):
# 查找缺失的屏障
violations = self.find_violations(execution, verifier)
for v in violations:
fence_points.add(v.location)
return list(fence_points)
def find_violations(self, execution: Execution,
verifier: WeakMemoryVerifier) -> List:
"""查找需要屏障的地方"""
violations = []
# 检查关键对
for read in execution.events:
if read.kind != 'read':
continue
for write in execution.events:
if write.kind != 'write':
continue
if read.addr != write.addr:
continue
# 检查是否有竞争
hb = verifier.relations.get('hb', {})
if not hb.get((write.id, read.id)):
violations.append(Violation(
location=read,
reason="写入和读取之间缺少屏障"
))
return violations
关键概念
| 概念 |
描述 |
| 顺序一致性 |
所有操作的总顺序 |
| TSO |
存储缓冲区,写入在读取后重排 |
| 释放/获取 |
部分顺序 |
| 发生前 |
部分顺序 |
| 同步关系 |
跨线程顺序 |
x86 vs ARM
| 特性 |
x86 (TSO) |
ARM (RMO) |
| 存储缓冲区 |
是 |
是 |
| 加载重排 |
否 |
是 |
| 存储重排 |
否 |
是 |
| 需要屏障 |
很少 |
常见 |
提示
- 使用验证工具(CBMC、SPIN)
- 模型检查竞争条件
- 添加最少屏障
- 在弱模型上测试
相关技能
race-detection-tool - 标准竞争检测
software-transactional-memory - STM
weak-memory-model-verifier - 验证并发
研究工具与工件
内存模型工具:
| 工具 |
学习内容 |
| CBMC |
C/C++的有界模型检查 |
| CDSChecker |
C/C++11内存模型检查器 |
| herd |
内存模型模拟器 |
| Alglave’s tools |
内存模型测试 |
研究前沿
1. 形式验证
- 目标: 证明内存模型正确性
- 关键工作: Alglave等人关于内存模型语义的研究
实现陷阱
| 陷阱 |
实际后果 |
解决方案 |
| 竞争 |
未定义行为 |
添加屏障 |