WAP数据安全摄取技能 wap-ingestion

WAP数据安全摄取技能是一种基于写-审计-发布模式的数据工程解决方案,专门用于从AWS S3云存储安全导入数据到bauplan数据平台。该技能通过临时分支隔离、数据质量检查、原子性合并等机制,确保数据加载过程的安全可靠,防止脏数据污染生产环境。适用于金融风控、企业数据仓库、ETL流程、数据湖管理等场景,支持Parquet、CSV、JSONL等多种数据格式的安全摄取。

数据工程 0 次安装 0 次浏览 更新于 3/2/2026

名称: wap数据摄取 描述: “使用写-审计-发布模式将S3数据安全加载到bauplan中。适用于从S3加载新数据、执行安全数据摄取、用户提及WAP、数据摄取、导入parquet/csv/jsonl文件或需要执行质量检查的安全数据加载场景。” 允许工具:

  • 读取
  • 写入
  • 全局搜索
  • 文本搜索
  • Bash
  • 网络获取(域名:docs.bauplanlabs.com)

写-审计-发布(WAP)模式

使用bauplan SDK实现WAP模式。请勿使用CLI命令。

三个步骤: 写入(摄取到临时分支) → 审计(质量检查) → 发布(合并到主分支)

分支安全性: 所有操作都在临时分支上进行,绝不在main分支上操作。默认情况下,无论成功或失败,分支都会保持开放以供检查。

原子性多表操作: merge_branch是原子操作。您可以在分支上创建或修改多个表,合并时,要么所有更改都应用到主分支,要么都不应用。这实现了安全的多表摄取工作流。

必需用户输入

在编写WAP脚本之前,您必须向用户询问以下参数:

  1. S3路径 (必需): 源数据的S3 URI模式(例如: s3://bucket/path/*.parquet)
  2. 表名 (必需): 目标表的名称
  3. 成功时行为 (可选):
    • inspect (默认): 在合并前保持分支开放供用户检查
    • merge: 自动合并到主分支并删除分支
  4. 失败时行为 (可选):
    • keep (默认): 保持分支开放以供检查/调试
    • delete: 删除失败的分支

WAP脚本模板

完整模板请参考wap_template.py。最小化使用示例:

from wap_template import wap_ingest

branch, success = wap_ingest(
    table_name="orders",
    s3_path="s3://my-bucket/data/*.parquet",
    namespace="bauplan",
    on_success="inspect",  # 或 "merge"
    on_failure="keep"      # 或 "delete"
)

关键SDK方法

方法 描述
bauplan.Client() 初始化bauplan客户端
client.info() 获取客户端信息;通过.user.username访问用户名
client.create_branch(name, from_ref="main") 从指定引用创建新分支
client.has_branch(name) 检查分支是否存在
client.delete_branch(name) 删除分支
client.create_table(table, search_uri, ...) 从S3推断模式创建表
client.import_data(table, search_uri, ...) 从S3导入数据到表
client.query(query, ref) 运行SQL查询,返回PyArrow表
client.merge_branch(source_ref, into_branch) 将分支合并到目标分支
client.has_table(table, ref, namespace) 检查分支上表是否存在

SDK参考: 详细方法签名请查看 https://docs.bauplanlabs.com/reference/bauplan

工作流检查清单

复制并跟踪进度:

WAP进度:
- [ ] 询问用户: S3路径、表名、成功时行为、失败时行为
- [ ] 使用wap_template.py编写脚本
- [ ] 运行脚本: python wap_script.py
- [ ] 验证输出显示行数 > 0
- [ ] 如果on_success="inspect": 确认分支已准备好供审查
- [ ] 如果on_success="merge": 确认合并到主分支成功

示例输出

成功运行 (on_success=“inspect”):

$ python wap_script.py
已导入15234行
WAP成功完成。分支'alice.wap_orders_1704067200'已准备好供检查。
手动合并: client.merge_branch(source_ref='alice.wap_orders_1704067200', into_branch='main')

成功运行 (on_success=“merge”):

$ python wap_script.py
已导入15234行
成功将orders发布到主分支
已清理分支: alice.wap_orders_1704067200

失败运行 (on_failure=“keep”):

$ python wap_script.py
WAP失败: 未导入任何数据
分支'alice.wap_orders_1704067200'已保留以供检查/调试。

现有表的WAP操作

要向现有表追加数据,跳过create_table,仅调用import_data:

# 表已存在于主分支 - 仅导入新数据
client.import_data(
    table=table_name,
    search_uri=s3_path,
    namespace=namespace,
    branch=branch_name
)

这将向现有表模式追加行。审计和发布阶段保持不变: 新行在合并前自动隔离在分支上。

检查后的CLI合并

on_success="inspect"(默认)时,分支保持开放供用户审查。如果用户在检查数据后要求合并,请使用CLI:

# 1. 首先切换到主分支(合并前必需)
bauplan checkout main

# 2. 将WAP分支合并到主分支
bauplan branch merge <用户名>.wap_<表名>_<时间戳>

# 3. 成功合并后可选择删除分支
bauplan branch rm <用户名>.wap_<表名>_<时间戳>

注意: 运行bauplan branch merge时必须在main分支上。分支名称在WAP脚本完成时打印。