名称: wap数据摄取 描述: “使用写-审计-发布模式将S3数据安全加载到bauplan中。适用于从S3加载新数据、执行安全数据摄取、用户提及WAP、数据摄取、导入parquet/csv/jsonl文件或需要执行质量检查的安全数据加载场景。” 允许工具:
- 读取
- 写入
- 全局搜索
- 文本搜索
- Bash
- 网络获取(域名:docs.bauplanlabs.com)
写-审计-发布(WAP)模式
使用bauplan SDK实现WAP模式。请勿使用CLI命令。
三个步骤: 写入(摄取到临时分支) → 审计(质量检查) → 发布(合并到主分支)
分支安全性: 所有操作都在临时分支上进行,绝不在main分支上操作。默认情况下,无论成功或失败,分支都会保持开放以供检查。
原子性多表操作: merge_branch是原子操作。您可以在分支上创建或修改多个表,合并时,要么所有更改都应用到主分支,要么都不应用。这实现了安全的多表摄取工作流。
必需用户输入
在编写WAP脚本之前,您必须向用户询问以下参数:
- S3路径 (必需): 源数据的S3 URI模式(例如:
s3://bucket/path/*.parquet) - 表名 (必需): 目标表的名称
- 成功时行为 (可选):
inspect(默认): 在合并前保持分支开放供用户检查merge: 自动合并到主分支并删除分支
- 失败时行为 (可选):
keep(默认): 保持分支开放以供检查/调试delete: 删除失败的分支
WAP脚本模板
完整模板请参考wap_template.py。最小化使用示例:
from wap_template import wap_ingest
branch, success = wap_ingest(
table_name="orders",
s3_path="s3://my-bucket/data/*.parquet",
namespace="bauplan",
on_success="inspect", # 或 "merge"
on_failure="keep" # 或 "delete"
)
关键SDK方法
| 方法 | 描述 |
|---|---|
bauplan.Client() |
初始化bauplan客户端 |
client.info() |
获取客户端信息;通过.user.username访问用户名 |
client.create_branch(name, from_ref="main") |
从指定引用创建新分支 |
client.has_branch(name) |
检查分支是否存在 |
client.delete_branch(name) |
删除分支 |
client.create_table(table, search_uri, ...) |
从S3推断模式创建表 |
client.import_data(table, search_uri, ...) |
从S3导入数据到表 |
client.query(query, ref) |
运行SQL查询,返回PyArrow表 |
client.merge_branch(source_ref, into_branch) |
将分支合并到目标分支 |
client.has_table(table, ref, namespace) |
检查分支上表是否存在 |
SDK参考: 详细方法签名请查看 https://docs.bauplanlabs.com/reference/bauplan
工作流检查清单
复制并跟踪进度:
WAP进度:
- [ ] 询问用户: S3路径、表名、成功时行为、失败时行为
- [ ] 使用wap_template.py编写脚本
- [ ] 运行脚本: python wap_script.py
- [ ] 验证输出显示行数 > 0
- [ ] 如果on_success="inspect": 确认分支已准备好供审查
- [ ] 如果on_success="merge": 确认合并到主分支成功
示例输出
成功运行 (on_success=“inspect”):
$ python wap_script.py
已导入15234行
WAP成功完成。分支'alice.wap_orders_1704067200'已准备好供检查。
手动合并: client.merge_branch(source_ref='alice.wap_orders_1704067200', into_branch='main')
成功运行 (on_success=“merge”):
$ python wap_script.py
已导入15234行
成功将orders发布到主分支
已清理分支: alice.wap_orders_1704067200
失败运行 (on_failure=“keep”):
$ python wap_script.py
WAP失败: 未导入任何数据
分支'alice.wap_orders_1704067200'已保留以供检查/调试。
现有表的WAP操作
要向现有表追加数据,跳过create_table,仅调用import_data:
# 表已存在于主分支 - 仅导入新数据
client.import_data(
table=table_name,
search_uri=s3_path,
namespace=namespace,
branch=branch_name
)
这将向现有表模式追加行。审计和发布阶段保持不变: 新行在合并前自动隔离在分支上。
检查后的CLI合并
当on_success="inspect"(默认)时,分支保持开放供用户审查。如果用户在检查数据后要求合并,请使用CLI:
# 1. 首先切换到主分支(合并前必需)
bauplan checkout main
# 2. 将WAP分支合并到主分支
bauplan branch merge <用户名>.wap_<表名>_<时间戳>
# 3. 成功合并后可选择删除分支
bauplan branch rm <用户名>.wap_<表名>_<时间戳>
注意: 运行
bauplan branch merge时必须在main分支上。分支名称在WAP脚本完成时打印。