name: creating-bauplan-pipelines description: “创建具有SQL和Python模型的bauplan数据管道项目。在启动新管道、定义DAG转换、编写模型或从头设置bauplan项目结构时使用。” allowed-tools:
- Bash(bauplan:*)
- Read
- Write
- Glob
- Grep
- WebFetch(domain:docs.bauplanlabs.com)
创建新的Bauplan数据管道
本技能指导您从头开始创建新的bauplan数据管道项目,包括项目配置和SQL/Python转换模型。
关键:分支安全
切勿在
main分支上运行管道。 始终使用开发分支。
分支命名约定:<用户名>.<分支名>(例如,john.feature-pipeline)。使用bauplan info获取您的用户名。请参阅工作流清单获取确切命令。
先决条件
在创建管道之前,请验证:
- 您有一个开发分支(不是
main) - 源表存在于bauplan数据湖仓中(默认命名空间是
bauplan) - 您了解源表的模式
管道作为DAG
Bauplan管道是一个由函数(模型)组成的有向无环图(DAG)。关键规则:
- 模型:转换数据的SQL或Python函数
- 源表:现有的数据湖仓表 - DAG的入口点
- 输入:每个模型可以通过
bauplan.Model()引用多个表 - 输出:每个模型产生恰好一个表:
- SQL:输出名称 = 文件名(
trips.sql→trips) - Python:输出名称 = 函数名(
def clean_trips()→clean_trips)
- SQL:输出名称 = 文件名(
- 拓扑结构:由输入引用隐式定义 - bauplan确定执行顺序
期望:数据质量函数,将表作为输入并返回布尔值。
示例DAG
[数据湖仓: taxi_fhvhv] ──→ [trips.sql] ──→ [clean_trips] ──→ [daily_summary]
↑
[数据湖仓: taxi_zones] ────────────────────────┘
在此示例中:
taxi_fhvhv和taxi_zones是源表(已在数据湖仓中)trips.sql从taxi_fhvhv读取(SQL模型,第一个节点)clean_trips将trips和taxi_zones作为输入(Python模型,多个输入)daily_summary将clean_trips作为输入(Python模型,单个输入)
所需的用户输入
在编写管道之前,您必须从用户那里收集以下信息:
- 管道目的(必需):DAG应执行哪些转换?业务逻辑或目标是什么?
- 源表(必需):应使用数据湖仓中的哪些表作为输入?使用
bauplan table get验证它们是否存在 - 输出表(必需):管道结束时应物化哪些表?这些是下游消费者可见的最终输出
- 物化策略(可选):输出表应使用
REPLACE(默认)还是APPEND? - 严格模式(可选):管道是否应在严格模式下运行?如果是,所有CLI命令都将使用
--strict标志,该标志会在干运行时因输出列不匹配等问题而失败,允许立即检测和纠正错误。
如果用户尚未提供此信息,请在继续实施之前询问。
严格模式(–strict 标志)
启用严格模式时,在所有bauplan run命令后附加--strict:
# 无严格模式(默认)
bauplan run --dry-run
bauplan run
# 启用严格模式
bauplan run --dry-run --strict
bauplan run --strict
严格模式的好处:
- 在输出列不匹配时立即失败
- 在期望失败时立即失败
- 允许您在管道完成前纠正声明错误
- 在迭代管道开发时推荐使用
项目结构
Bauplan项目是一个包含以下内容的文件夹:
my-project/
bauplan_project.yml # 必需:项目配置
model.sql # 可选:单个SQL模型,每个文件一个
models.py # 可选:Python模型(一个文件可以有>1个模型,或拆分为多个文件)
expectations.py # 可选:数据质量测试(如果有)
bauplan_project.yml
每个项目都是一个单独的文件夹,需要此配置文件:
project:
id: <唯一-uuid> # 生成唯一的UUID
name: <项目名称> # 项目的描述性名称
何时使用SQL与Python模型
重要:SQL模型应仅限于管道图中的第一个节点。
- SQL模型:仅用于直接从数据湖仓中的源表读取的节点(管道图外部的表)
- Python模型:管道中所有其他转换的首选
这确保了一致性,并允许更好地控制转换、输出模式验证和文档。
SQL模型(仅限第一个节点)
SQL模型是.sql文件,其中:
- 文件名成为输出表名
- FROM子句定义输入表
- 可选:添加物化策略作为注释
仅在从现有数据湖仓表读取时使用SQL模型:
-- trips.sql
-- 第一个节点:从数据湖仓中的taxi_fhvhv表读取
SELECT
pickup_datetime,
PULocationID,
trip_miles
FROM taxi_fhvhv
WHERE pickup_datetime >= '2022-12-01'
输出表:trips(来自文件名)
输入表:taxi_fhvhv(来自FROM子句,存在于数据湖仓中)
Python模型(首选)
Python模型使用装饰器定义转换。它们应用于除从数据湖仓读取的第一个节点之外的所有管道节点。
关键装饰器
@bauplan.model()- 将函数注册为模型@bauplan.model(columns=[...])- 指定预期的输出列以进行验证(可选但推荐)@bauplan.model(materialization_strategy='REPLACE')- 将输出持久化到数据湖仓@bauplan.python('3.11', pip={'pandas': '1.5.3'})- 指定Python版本和包
最佳实践:输出列验证
重要:尽可能在
@bauplan.model()中指定columns参数以定义预期的输出模式。这可以自动验证模型的输出。
首先,检查源表的模式以了解输入列。然后根据您的转换指定输出列:
# 如果输入有列:[id, name, age, city]
# 并且转换删除了'city'列
# 那么输出列应为:[id, name, age]
@bauplan.model(columns=['id', 'name', 'age'])
最佳实践:带有输出模式的文档字符串
重要:每个Python模型都应有一个文档字符串,描述转换并以ASCII表格形式显示输出表结构(如果表太宽,仅显示关键列;如果值太大,则在单元格中截断它们)。
@bauplan.model(columns=['id', 'name', 'age'])
@bauplan.python('3.11')
def clean_users(data=bauplan.Model('raw_users')):
"""
通过移除无效条目和删除城市列来清理用户数据。
| id | name | age |
|-----|---------|-----|
| 1 | Alice | 30 |
| 2 | Bob | 25 |
"""
# 转换逻辑
return data.drop_columns(['city'])
使用columns和filter进行I/O下推
重要:尽可能在
bauplan.Model()中使用columns和filter参数来限制读取的数据。这实现了I/O下推,显著减少了传输的数据量并提高了性能。不要读取不需要的列。
bauplan.Model(
'table_name',
columns=['col1', 'col2', 'col3'], # 仅读取这些列
filter="date >= '2022-01-01'" # 在存储级别预过滤
)
尽可能指定:
columns:仅列出模型实际需要的列filter:SQL类过滤表达式,以在存储级别限制行(如果适用)
基本Python模型
import bauplan
@bauplan.model(
columns=['pickup_datetime', 'PULocationID', 'trip_miles'],
materialization_strategy='REPLACE'
)
@bauplan.python('3.11', pip={'polars': '1.15.0'})
def clean_trips(
# 使用columns和filter进行I/O下推
data=bauplan.Model(
'trips',
columns=['pickup_datetime', 'PULocationID', 'trip_miles'],
filter="trip_miles > 0"
)
):
"""
过滤行程,仅包含里程为正的行程。
| pickup_datetime | PULocationID | trip_miles |
|---------------------|--------------|------------|
| 2022-12-01 08:00:00 | 123 | 5.2 |
"""
import polars as pl
df = pl.from_arrow(data)
df = df.filter(pl.col('trip_miles') > 0.0)
return df.to_arrow()
具有多个输入的Python模型
模型可以将多个表作为输入 - 只需添加更多bauplan.Model()参数:
def model_with_joins(
table_a=bauplan.Model('source_a', columns=['id', 'value']),
table_b=bauplan.Model('source_b', columns=['id', 'name'])
):
# 连接、转换、返回Arrow表
return table_a.join(table_b, 'id', 'id')
有关使用Polars的完整多输入示例,请参阅examples.md。
工作流清单
复制此清单并跟踪您的进度:
管道创建进度:
- [ ] 步骤1:获取用户名 → bauplan info
- [ ] 步骤2:检出main → bauplan branch checkout main
- [ ] 步骤3:创建开发分支 → bauplan branch create <用户名>.<分支名>
- [ ] 步骤4:检出开发分支 → bauplan branch checkout <用户名>.<分支名>
- [ ] 步骤5:验证源表 → bauplan table get <命名空间>.<表名>,可选数据预览:bauplan query "SELECT * FROM <命名空间>.<表名> LIMIT 3"
- [ ] 步骤6:创建包含bauplan_project.yml的项目文件夹
- [ ] 步骤7:编写SQL模型/ Python模型以进行转换,遵循指南
- [ ] 步骤8:验证物化装饰器(见下面的物化清单)
- [ ] 步骤9:干运行 → bauplan run --dry-run [--strict 如果严格模式]
- [ ] 步骤10:运行管道 → bauplan run [--strict 如果严格模式]
关键:切勿在
main分支上运行。步骤2-4确保您在开发分支上。
物化清单
编写模型后,根据用户要求验证每个模型是否具有正确的materialization_strategy:
| 模型类型 | 无物化(中间) | 物化输出 |
|---|---|---|
| Python | @bauplan.model()(无策略) |
@bauplan.model(materialization_strategy='REPLACE') 或 'APPEND' |
| SQL | 无需注释 | 添加注释:-- bauplan: materialization_strategy=REPLACE 或 APPEND |
为每个模型验证:
- [ ] 中间表(非最终输出):未指定
materialization_strategy - [ ] 用户请求的最终输出表:
materialization_strategy='REPLACE'(默认)或'APPEND' - [ ] 如果用户为任何表指定了APPEND:确认设置了
materialization_strategy='APPEND'
物化输出的Python装饰器示例:
@bauplan.model(materialization_strategy='REPLACE', columns=['col1', 'col2'])
物化输出的SQL注释示例:
-- bauplan: materialization_strategy=REPLACE
SELECT * FROM source_table
高级示例
有关以下内容,请参阅examples.md:
- APPEND物化策略
- Python模型中的DuckDB查询
- 数据质量期望
- 多阶段管道