变更数据捕获Skill cdc

变更数据捕获(CDC)是一种数据库架构技术,用于实时追踪数据库表中的INSERT、UPDATE、DELETE操作,并将变更记录写入专用表中,以支持数据同步引擎实现远程数据推送。该技能适用于数据库同步、数据备份、实时分析等场景,提高数据一致性和可追溯性。关键词包括:CDC、变更数据捕获、数据库同步、Turso、SQLite、数据变更追踪、架构设计、后端开发、数据工程。

架构设计 0 次安装 0 次浏览 更新于 3/25/2026

名称: cdc 描述: 变更数据捕获 - 架构、入口点、字节码生成、同步引擎集成、测试

CDC(变更数据捕获)- 内部特性映射

概述

CDC 通过将变更记录写入专用 CDC 表(默认为 turso_cdc)来跟踪数据库表上的 INSERT/UPDATE/DELETE 变更。它是基于每个连接的,通过 PRAGMA 启用,并在字节码生成(翻译)层操作。同步引擎消费 CDC 记录以将本地变更推送到远程。

架构图

用户 SQL(INSERT/UPDATE/DELETE/DDL)
        |
        v
  ┌─────────────────────────────────────────────────┐
  │  翻译层(core/translate/)                      │
  │  ┌───────────────────────────────────────────┐  │
  │  │ prepare_cdc_if_necessary()                │  │
  │  │   - 检查 CaptureDataChangesInfo           │  │
  │  │   - 打开 CDC 表游标(OpenWrite)          │  │
  │  │   - 如果目标 == CDC 表本身则跳过          │  │
  │  └───────────────────────────────────────────┘  │
  │  ┌───────────────────────────────────────────┐  │
  │  │ emit_cdc_insns()                          │  │
  │  │   - 将(change_id, change_time,           │  │
  │  │     change_type, table_name, id,          │  │
  │  │     before, after, updates)写入 CDC 表   │  │
  │  └───────────────────────────────────────────┘  │
  │  + emit_cdc_full_record() / emit_cdc_patch_record() │
  └─────────────────────────────────────────────────┘
        |
        v
  CDC 表(turso_cdc 或自定义名称)
        |
        v
  ┌─────────────────────────────────────────────────┐
  │  同步引擎(sync/engine/)                        │
  │  DatabaseTape 读取 CDC 表 → DatabaseChange      │
  │  → 应用/回滚 → 推送到远程                        │
  └─────────────────────────────────────────────────┘

核心数据类型

CaptureDataChangesMode + CaptureDataChangesInfocore/lib.rs

CDC 行为由两种类型控制:

#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
    V1 = 1,
    V2 = 2,
}

const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;

enum CaptureDataChangesMode {
    Id,          // 仅捕获 rowid
    Before,      // 捕获 before-image
    After,       // 捕获 after-image
    Full,        // before + after + updates
}

struct CaptureDataChangesInfo {
    mode: CaptureDataChangesMode,
    table: String,                  // CDC 表名
    version: Option<CdcVersion>,    // 架构版本(V1 或 V2)
}

连接存储 Option<CaptureDataChangesInfo>None 表示 CDC 关闭。

CdcVersion 的关键方法:

  • has_commit_record()self >= V2,控制 COMMIT 记录生成
  • Display/FromStr — 往返转换 "v1"V1"v2"V2

CaptureDataChangesInfo 的关键方法:

  • parse(value: &str, version: Option<CdcVersion>) — 解析 PRAGMA 参数 "<mode>[,<table_name>]",返回 None 表示 “off”
  • cdc_version() — 返回 CdcVersion(如果 version 为 None 则 panic)。替代旧的 is_v1()/is_v2()/version() 方法。
  • has_before() / has_after() / has_updates() — 模式能力检查
  • mode_name() — 返回模式字符串

便利特性 CaptureDataChangesExtOption<CaptureDataChangesInfo> 上提供:

  • has_before() / has_after() / has_updates() — 委托给内部,None 时返回 false
  • table() — 返回 Option<&str>,CDC 关闭时为 None

CDC 表架构 v1

默认表名:turso_cdc(常量 TURSO_CDC_DEFAULT_TABLE_NAME

CREATE TABLE turso_cdc (
    change_id   INTEGER PRIMARY KEY AUTOINCREMENT,
    change_time INTEGER,        -- unixepoch()
    change_type INTEGER,        -- 1=INSERT, 0=UPDATE, -1=DELETE
    table_name  TEXT,
    id          <untyped>,      -- 变更行的 rowid
    before      BLOB,           -- 二进制记录(before-image)
    after       BLOB,           -- 二进制记录(after-image)
    updates     BLOB            -- 每列变更的二进制记录
);

CDC 表架构 v2(当前)

CREATE TABLE turso_cdc (
    change_id    INTEGER PRIMARY KEY AUTOINCREMENT,
    change_time  INTEGER,        -- unixepoch()
    change_type  INTEGER,        -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
    table_name   TEXT,
    id           <untyped>,      -- 变更行的 rowid
    before       BLOB,           -- 二进制记录(before-image)
    after        BLOB,           -- 二进制记录(after-image)
    updates      BLOB,           -- 每列变更的二进制记录
    change_txn_id INTEGER        -- 事务 ID(将行分组到事务中)
);

v2 新增:

  • change_txn_id 列 — 按事务分组 CDC 行。通过 conn_txn_id(candidate) 操作码分配,获取或设置每个连接的事务 ID。
  • change_type=2(COMMIT)记录 — 标记事务边界。在自动提交模式下每个语句生成一次,或显式 COMMIT 时生成。

CDC 表在运行时由 InitCdcVersion 操作码通过 CREATE TABLE IF NOT EXISTS 创建。

CDC 版本表

当 CDC 首次启用时,创建版本跟踪表:

CREATE TABLE turso_cdc_version (
    table_name TEXT PRIMARY KEY,
    version TEXT NOT NULL
);

当前版本:CDC_VERSION_CURRENT = CdcVersion::V2(定义在 core/lib.rs,从 core/translate/pragma.rs 重新导出)

InitCdcVersion 中的版本检测

InitCdcVersion 操作码在创建 CDC 表前检查表是否存在来检测 v1 与 v2:

  • 如果 CDC 表已存在但没有版本行 → v1(来自版本跟踪之前的现有表)
  • 如果 CDC 表不存在 → 创建当前版本(v2)
  • 如果版本行已存在 → 使用该版本

DatabaseChangesync/engine/src/types.rs:229-249

同步引擎中 CDC 行的 Rust 表示。有 into_apply()into_revert() 方法用于前向/后向重播。

OperationModecore/translate/emitter.rs

emit_cdc_insns() 使用以确定 change_type 值:

  • INSERT → 1
  • UPDATE / SELECT → 0
  • DELETE → -1
  • COMMIT → 2(仅 v2,由 emit_cdc_commit_insns 生成)

入口点

1. PRAGMA — 启用/禁用 CDC

设置: core/translate/pragma.rs

  • 检查 MVCC 未启用(CDC 和 MVCC 互斥)
  • 通过 CaptureDataChangesInfo::parse() 解析模式字符串,使用 CDC_VERSION_CURRENT
  • 生成单个 InitCdcVersion 操作码 — 所有 CDC 设置(表创建、版本跟踪、状态变更)在执行时发生

获取(读取当前模式): core/translate/pragma.rs

  • 返回 3 列:modetableversion
  • 关闭时:返回 ("off", NULL, NULL)
  • 活动时:返回 (mode_name, table, version)

Pragma 注册: core/pragma.rsUnstableCaptureDataChangesConn 带有列 ["mode", "table", "version"]

2. 连接状态

字段: core/connection.rscapture_data_changes: RwLock<Option<CaptureDataChangesInfo>> 获取器: get_capture_data_changes_info() — 返回读锁 设置器: set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>) 默认: 初始化为 None(CDC 关闭)

3. ProgramBuilder 集成

字段: core/vdbe/builder.rscapture_data_changes_info: Option<CaptureDataChangesInfo> 访问器: capture_data_changes_info() — 返回 &Option<CaptureDataChangesInfo> 从传递: core/translate/mod.rs — 从连接创建 builder 时读取

4. PrepareContext

字段: core/vdbe/mod.rscapture_data_changes: Option<CaptureDataChangesInfo> 设置自: PrepareContext::from_connection() — 从 connection.get_capture_data_changes_info() 克隆

5. InitCdcVersion 操作码 — core/vdbe/execute.rs

始终由 PRAGMA SET 生成。在执行时处理所有 CDC 设置:

  1. 对于 “off”:存储 Nonestate.pending_cdc_info,提前返回
  2. 检查 CDC 表是否已存在(用于 v1 向后兼容)
  3. 创建 CDC 表(CREATE TABLE IF NOT EXISTS <cdc_table_name> ...) — v2 架构带 change_txn_id
  4. 创建版本表(CREATE TABLE IF NOT EXISTS turso_cdc_version ...
  5. 插入版本行:如果 CDC 表预先存在 → “v1”,否则 → 当前版本(“v2”)。使用 INSERT OR IGNORE 保留现有版本行。
  6. 从表读取实际版本
  7. 存储计算后的 CaptureDataChangesInfostate.pending_cdc_info

连接的 CDC 状态 不在操作码中应用。相反,pending_cdc_info 只在事务成功提交后在 halt() 中应用。这确保原子性:如果任何步骤失败且事务回滚,连接的 CDC 状态保持不变。

所有表创建通过嵌套 conn.prepare()/run_ignore_rows() 调用完成,而非字节码生成,因为 PRAGMA 计划不能包含对尚不存在于架构中的表的 DML。

字节码生成(core/translate/emitter.rs)

这些是核心 CDC 代码生成函数:

函数 目的
prepare_cdc_if_necessary() 如果 CDC 活动且目标 != CDC 表,打开 CDC 表游标
emit_cdc_full_record() 从游标读取所有列到 MakeRecord(用于 before/after image)
emit_cdc_patch_record() 从运行中寄存器值构建记录(用于 INSERT/UPDATE 的 after-image)
emit_cdc_insns() 为每个变更行写入单个 CDC 行(INSERT/UPDATE/DELETE)。在 DML 循环内每行调用。
emit_cdc_commit_insns() 将 COMMIT 记录(change_type=2)写入 CDC 表(仅 v2)。原始生成,无自动提交检查。
emit_cdc_autocommit_commit() 语句结束时的 COMMIT 生成。运行时检查 is_autocommit() — 仅自动提交模式下生成 COMMIT。仅 v2。

COMMIT 生成策略(v2)

每行调用点使用 emit_cdc_insns()(无 COMMIT)。语句结束点调用 emit_cdc_autocommit_commit(),运行时检查 is_autocommit()

  • 自动提交模式: 语句完成后生成一个 COMMIT 记录
  • 显式事务(BEGIN...COMMIT): 跳过每语句 COMMIT;显式 COMMIT 语句通过 emit_cdc_commit_insns() 生成 COMMIT 记录

这确保多行语句如 INSERT INTO t VALUES (1),(2),(3) 在结束时生成一个 COMMIT,而非每行一个。

集成点 — CDC 记录生成处

INSERT — core/translate/insert.rs

  • 每行: 插入后,REPLACE/冲突删除前,emit_cdc_insns()
  • 语句结束: 插入循环后,在 emit_epilogue()emit_cdc_autocommit_commit()

UPDATE — core/translate/emitter.rs

  • 每行: 捕获 before-image,通过 patch 记录 after-image,生成 emit_cdc_insns()
  • 语句结束: 更新循环后 emit_cdc_autocommit_commit()

DELETE — core/translate/emitter.rs

  • 每行: 捕获 before-image 并生成 emit_cdc_insns()
  • 语句结束: 删除循环后 emit_cdc_autocommit_commit()

UPSERT(ON CONFLICT DO UPDATE) — core/translate/upsert.rs

  • 每行: 为所有三种情况生成 emit_cdc_insns():纯插入、冲突后更新、替换
  • 无语句结束 COMMIT — upsert 共享 INSERT 的结语

架构变更(DDL) — core/translate/schema.rs

  • CREATE TABLE: emit_cdc_insns()(插入到 sqlite_schema)+ emit_cdc_autocommit_commit()
  • DROP TABLE: 元数据循环中每行 emit_cdc_insns() + 循环后 emit_cdc_autocommit_commit()
  • CREATE INDEX: emit_cdc_insns() + emit_cdc_autocommit_commit()core/translate/schema.rs
  • DROP INDEX: 每行 emit_cdc_insns() + 循环后 emit_cdc_autocommit_commit()core/translate/index.rs

显式事务中的 DDL(BEGIN; CREATE TABLE t(x); COMMIT)不生成每语句 COMMIT — 自动提交检查阻止。

ALTER TABLE — core/translate/update.rs

  • 当 CDC 有更新模式时,在更新计划上设置 cdc_update_alter_statement

视图/触发器 — 明确排除

  • core/translate/view.rs — 传递 None 给 CDC 游标
  • core/translate/trigger.rs — 传递 None 给 CDC 游标

子查询 — 无 CDC

  • core/translate/subquery.rscdc_cursor_id: None

辅助函数(用于读取 CDC 数据)

table_columns_json_array(table_name)core/function.rscore/vdbe/execute.rs

返回表的列名 JSON 数组。用于解释二进制记录。

bin_record_json_object(columns_json, blob)core/function.rscore/vdbe/execute.rs

使用列名将二进制记录(来自 before/after/updates 列)解码为 JSON 对象。

同步引擎集成

同步引擎是 CDC 数据的主要消费者。

DatabaseTape — sync/engine/src/database_tape.rs

  • CDC 配置: DEFAULT_CDC_TABLE_NAME = "turso_cdc"DEFAULT_CDC_MODE = "full"
  • PRAGMA 名: CDC_PRAGMA_NAME = "unstable_capture_data_changes_conn"
  • 初始化: connect() 设置 CDC pragma 并缓存 cdc_version 来自 turso_cdc_version 表。必须在 iterate_changes() 前调用。
  • 版本缓存: cdc_version: RwLock<Option<CdcVersion>> — 由 connect() 设置,由 iterate_changes() 读取。如果未设置则 panic。
  • 迭代器: DatabaseChangesIterator 分批读取 CDC 表,发出 DatabaseTapeOperation。对于 v2,从表发出真实 COMMIT 记录。对于 v1,在批次结束时附加合成 Commit。ignore_schema_changes: true(默认)过滤掉 sqlite_schema 行变更但不包括 COMMIT 记录。

同步操作 — sync/engine/src/database_sync_operations.rs

  • 变更计数: SELECT COUNT(*) FROM turso_cdc WHERE change_id > ?

同步引擎 — sync/engine/src/database_sync_engine.rs

  • 初始化: open_db() 调用 main_tape.connect(coro) 确保在 iterate_changes() 调用前设置 CDC 并缓存版本。
  • apply_changes 期间,检查 CDC 表是否已存在,同步后重新创建

重播生成器 — sync/engine/src/database_replay_generator.rs

  • 需要 updates 列填充(完整模式)

绑定 CDC 表面

所有绑定将 cdc_operations 暴露为同步统计的一部分:

绑定 文件
Python bindings/python/src/turso_sync.rs
JavaScript bindings/javascript/sync/src/lib.rs
JS(生成器) bindings/javascript/sync/src/generator.rs
Go bindings/go/bindings_sync.go
React Native bindings/react-native/src/types.ts
SDK Kit(C 头文件) sync/sdk-kit/turso_sync.h
SDK Kit(Rust) sync/sdk-kit/src/bindings.rs

测试

  • 集成测试: tests/integration/functions/test_cdc.rs — 覆盖所有模式、CRUD、事务、架构变更、版本表、向后兼容性。注册在 tests/integration/functions/mod.rs
  • 同步引擎测试: sync/engine/src/database_tape.rs — CDC 表读取、磁带迭代、架构变更重播。
  • JS 绑定测试: bindings/javascript/sync/packages/{wasm,native}/promise.test.ts

运行:cargo test -- test_cdc(集成)或 cargo test -p turso_sync_engine -- database_tape(同步引擎)。

面向用户的文档

  • CLI 手册页: cli/manuals/cdc.md — 可通过 REPL 中的 .manual cdc 访问
  • 数据库手册: docs/manual.md — CDC 部分在 TOC 中链接

关键设计决策

  1. 基于每个连接,而非每个数据库。 每个连接有自己的 CDC 模式并可针对不同表。
  2. 字节码级实现。 CDC 指令在翻译期间与实际 DML 字节码一起生成 — 无运行时钩子或触发器。
  3. 自排除。 CDC 表和 turso_cdc_version 表的变更从不捕获(在 prepare_cdc_if_necessary 中检查)。
  4. 架构变更跟踪。 DDL 操作记录为 sqlite_schema 表变更。
  5. 二进制记录格式。 Before/after/updates 列使用 SQLite 的 MakeRecord 格式(与 B 树有效载荷相同)。
  6. 事务感知。 CDC 写入与 DML 在同一事务内发生,因此回滚自然丢弃 CDC 条目。
  7. 版本跟踪。 CDC 架构版本记录在 turso_cdc_version 表中,并在 CaptureDataChangesInfo.version 中携带以便未来架构演进。
  8. 原子 PRAGMA。 连接 CDC 状态通过 ProgramState 中的 pending_cdc_info 延迟应用,仅在 Halt 时应用。如果 PRAGMA 的磁盘写入失败且事务回滚,连接状态保持不变。
  9. 每语句 COMMIT(v2)。 COMMIT 记录每个语句生成一次(而非每行),使用 emit_cdc_autocommit_commit(),运行时检查 is_autocommit()。在显式事务中,仅最终 COMMIT 生成 COMMIT CDC 记录。
  10. 向后兼容的版本检测。 预先存在的 v1 CDC 表(无 turso_cdc_version)通过创建前检查表存在来检测。现有表在版本表中插入 CdcVersion::V1
  11. 类型化版本枚举。 CdcVersion 枚举带 #[repr(u8)]Ord/PartialOrd 启用通过整数比较功能门控(has_commit_record() = self >= V2)。Display/FromStr 处理数据库往返。
  12. CDC 和 MVCC 互斥。 启用 CDC 当 MVCC 活动时(或反之)返回错误。在 PRAGMA 设置时和日志模式切换时检查。