Turso异步I/O编程模式Skill async-io-model

这个技能教授如何在Turso框架中使用状态机和CompletionGroup处理异步I/O操作,避免重入错误,提高代码可靠性。适用于Rust系统开发和高性能应用。关键词:异步I/O、状态机、Turso、Rust、编程模式、重入避免、CompletionGroup、IOResult、性能优化、系统编程。

后端开发 0 次安装 0 次浏览 更新于 3/25/2026

name: async-io-model description: 对tursodb中常见异步模式的解释。涉及IOResult、状态机、重入陷阱、CompletionGroup。在core中执行任何I/O操作时始终使用这些模式

异步I/O模型指南

Turso使用显式状态机进行协作式让步,而不是Rust的async/await。

核心类型

pub enum IOCompletions {
    Single(Completion),
}

#[must_use]
pub enum IOResult<T> {
    Done(T),      // 操作完成,这是结果
    IO(IOCompletions),  // 需要I/O,在完成后再调用我
}

返回IOResult的函数必须被重复调用直到Done

Completion和CompletionGroup

一个Completion跟踪单个I/O操作:

pub struct Completion { /* ... */ }

impl Completion {
    pub fn finished(&self) -> bool;
    pub fn succeeded(&self) -> bool;
    pub fn get_error(&self) -> Option<CompletionError>;
}

要等待多个I/O操作,使用CompletionGroup

let mut group = CompletionGroup::new(|_| {});

// 添加单个完成
group.add(&completion1);
group.add(&completion2);

// 构建成一个完成,当所有完成时结束
let combined = group.build();
io_yield_one!(combined);

CompletionGroup特性:

  • 将多个完成聚合为一个
  • 当所有完成时调用回调(或任何错误)
  • 可以嵌套组(将一个组的完成添加到另一个组)
  • 可通过group.cancel()取消

辅助宏

return_if_io!

展开IOResult,将IO变体传播到调用栈:

let result = return_if_io!(some_io_operation());
// 仅当操作返回Done时到达这里

io_yield_one!

让步一个完成:

io_yield_one!(completion);  // 返回Ok(IOResult::IO(Single(completion)))

状态机模式

可能让步的操作使用显式状态枚举:

enum MyOperationState {
    Start,
    WaitingForRead { page: PageRef },
    Processing { data: Vec<u8> },
    Done,
}

函数循环,匹配状态并转换:

fn my_operation(&mut self) -> Result<IOResult<Output>> {
    loop {
        match &mut self.state {
            MyOperationState::Start => {
                let (page, completion) = start_read();
                self.state = MyOperationState::WaitingForRead { page };
                io_yield_one!(completion);
            }
            MyOperationState::WaitingForRead { page } => {
                let data = page.get_contents();
                self.state = MyOperationState::Processing { data: data.to_vec() };
                // 不让步,继续循环
            }
            MyOperationState::Processing { data } => {
                let result = process(data);
                self.state = MyOperationState::Done;
                return Ok(IOResult::Done(result));
            }
            MyOperationState::Done => unreachable!(),
        }
    }
}

重入:关键陷阱

在让步点之前的状态突变会导致重入时的错误。

错误示例

fn bad_example(&mut self) -> Result<IOResult<()>> {
    self.counter += 1;  // 突变状态
    return_if_io!(something_that_might_yield());  // 如果让步,重入时会再次递增!
    Ok(IOResult::Done(()))
}

如果something_that_might_yield()返回IO,调用者等待完成,然后再次调用bad_example()counter会被递增两次(或更多)。

正确:在让步后突变

fn good_example(&mut self) -> Result<IOResult<()>> {
    return_if_io!(something_that_might_yield());
    self.counter += 1;  // 仅到达一次,在IO完成后
    Ok(IOResult::Done(()))
}

正确:使用状态机

enum State { Start, AfterIO }

fn good_example(&mut self) -> Result<IOResult<()>> {
    loop {
        match self.state {
            State::Start => {
                // 不要在这里突变共享状态
                self.state = State::AfterIO;
                return_if_io!(something_that_might_yield());
            }
            State::AfterIO => {
                self.counter += 1;  // 安全:仅进入一次
                return Ok(IOResult::Done(()));
            }
        }
    }
}

常见重入错误

模式 问题
vec.push(x); return_if_io!(...) 每次重入时Vec增长
idx += 1; return_if_io!(...) 索引多次前进
map.insert(k,v); return_if_io!(...) 重复插入或覆盖
flag = true; return_if_io!(...) 通常ok,但检查逻辑

状态枚举设计

在状态变体中编码进度:

// 好:索引是状态的一部分,在让步中保留
enum ProcessState {
    Start,
    ProcessingItem { idx: usize, items: Vec<Item> },
    Done,
}

// 循环仅在转换状态时前进idx
ProcessingItem { idx, items } => {
    return_if_io!(process_item(&items[idx]));
    if idx + 1 < items.len() {
        self.state = ProcessingItem { idx: idx + 1, items };
    } else {
        self.state = Done;
    }
}

Turso实现

关键文件:

  • core/types.rs - IOResult, IOCompletions, return_if_io!, return_and_restore_if_io!
  • core/io/completions.rs - Completion, CompletionGroup
  • core/util.rs - io_yield_one!
  • core/state_machine.rs - 通用StateMachine包装器
  • core/storage/btree.rs - 许多状态机示例
  • core/storage/pager.rs - CompletionGroup使用示例

测试异步代码

重入错误通常只在特定I/O时序下显现。使用:

  • 确定性模拟(testing/simulator/
  • 多并发DST(testing/concurrent-simulator/
  • 故障注入以强制在不同点让步

参考文献

  • docs/manual.md 关于I/O的部分