name: async-io-model
description: 对tursodb中常见异步模式的解释。涉及IOResult、状态机、重入陷阱、CompletionGroup。在core中执行任何I/O操作时始终使用这些模式
异步I/O模型指南
Turso使用显式状态机进行协作式让步,而不是Rust的async/await。
核心类型
pub enum IOCompletions {
Single(Completion),
}
#[must_use]
pub enum IOResult<T> {
Done(T), // 操作完成,这是结果
IO(IOCompletions), // 需要I/O,在完成后再调用我
}
返回IOResult的函数必须被重复调用直到Done。
Completion和CompletionGroup
一个Completion跟踪单个I/O操作:
pub struct Completion { /* ... */ }
impl Completion {
pub fn finished(&self) -> bool;
pub fn succeeded(&self) -> bool;
pub fn get_error(&self) -> Option<CompletionError>;
}
要等待多个I/O操作,使用CompletionGroup:
let mut group = CompletionGroup::new(|_| {});
// 添加单个完成
group.add(&completion1);
group.add(&completion2);
// 构建成一个完成,当所有完成时结束
let combined = group.build();
io_yield_one!(combined);
CompletionGroup特性:
- 将多个完成聚合为一个
- 当所有完成时调用回调(或任何错误)
- 可以嵌套组(将一个组的完成添加到另一个组)
- 可通过
group.cancel()取消
辅助宏
return_if_io!
展开IOResult,将IO变体传播到调用栈:
let result = return_if_io!(some_io_operation());
// 仅当操作返回Done时到达这里
io_yield_one!
让步一个完成:
io_yield_one!(completion); // 返回Ok(IOResult::IO(Single(completion)))
状态机模式
可能让步的操作使用显式状态枚举:
enum MyOperationState {
Start,
WaitingForRead { page: PageRef },
Processing { data: Vec<u8> },
Done,
}
函数循环,匹配状态并转换:
fn my_operation(&mut self) -> Result<IOResult<Output>> {
loop {
match &mut self.state {
MyOperationState::Start => {
let (page, completion) = start_read();
self.state = MyOperationState::WaitingForRead { page };
io_yield_one!(completion);
}
MyOperationState::WaitingForRead { page } => {
let data = page.get_contents();
self.state = MyOperationState::Processing { data: data.to_vec() };
// 不让步,继续循环
}
MyOperationState::Processing { data } => {
let result = process(data);
self.state = MyOperationState::Done;
return Ok(IOResult::Done(result));
}
MyOperationState::Done => unreachable!(),
}
}
}
重入:关键陷阱
在让步点之前的状态突变会导致重入时的错误。
错误示例
fn bad_example(&mut self) -> Result<IOResult<()>> {
self.counter += 1; // 突变状态
return_if_io!(something_that_might_yield()); // 如果让步,重入时会再次递增!
Ok(IOResult::Done(()))
}
如果something_that_might_yield()返回IO,调用者等待完成,然后再次调用bad_example()。counter会被递增两次(或更多)。
正确:在让步后突变
fn good_example(&mut self) -> Result<IOResult<()>> {
return_if_io!(something_that_might_yield());
self.counter += 1; // 仅到达一次,在IO完成后
Ok(IOResult::Done(()))
}
正确:使用状态机
enum State { Start, AfterIO }
fn good_example(&mut self) -> Result<IOResult<()>> {
loop {
match self.state {
State::Start => {
// 不要在这里突变共享状态
self.state = State::AfterIO;
return_if_io!(something_that_might_yield());
}
State::AfterIO => {
self.counter += 1; // 安全:仅进入一次
return Ok(IOResult::Done(()));
}
}
}
}
常见重入错误
| 模式 | 问题 |
|---|---|
vec.push(x); return_if_io!(...) |
每次重入时Vec增长 |
idx += 1; return_if_io!(...) |
索引多次前进 |
map.insert(k,v); return_if_io!(...) |
重复插入或覆盖 |
flag = true; return_if_io!(...) |
通常ok,但检查逻辑 |
状态枚举设计
在状态变体中编码进度:
// 好:索引是状态的一部分,在让步中保留
enum ProcessState {
Start,
ProcessingItem { idx: usize, items: Vec<Item> },
Done,
}
// 循环仅在转换状态时前进idx
ProcessingItem { idx, items } => {
return_if_io!(process_item(&items[idx]));
if idx + 1 < items.len() {
self.state = ProcessingItem { idx: idx + 1, items };
} else {
self.state = Done;
}
}
Turso实现
关键文件:
core/types.rs-IOResult,IOCompletions,return_if_io!,return_and_restore_if_io!core/io/completions.rs-Completion,CompletionGroupcore/util.rs-io_yield_one!宏core/state_machine.rs- 通用StateMachine包装器core/storage/btree.rs- 许多状态机示例core/storage/pager.rs-CompletionGroup使用示例
测试异步代码
重入错误通常只在特定I/O时序下显现。使用:
- 确定性模拟(
testing/simulator/) - 多并发DST(
testing/concurrent-simulator/) - 故障注入以强制在不同点让步
参考文献
docs/manual.md关于I/O的部分