创建Bauplan数据管道 creating-bauplan-pipelines

本技能是Bauplan数据工程平台的详细指南,用于创建和管理数据管道项目。它涵盖了从项目初始化、DAG(有向无环图)设计、SQL/Python模型编写、到严格模式验证和物化策略的全流程。核心功能包括:数据管道开发、ETL/ELT流程构建、数据转换模型(SQL/Python)、数据质量检查、I/O下推优化、分支安全管理以及自动化工作流。适用于数据工程师、数据分析师和开发人员构建可维护、高性能的数据处理流水线。

数据工程 0 次安装 0 次浏览 更新于 3/2/2026

name: creating-bauplan-pipelines description: “创建具有SQL和Python模型的bauplan数据管道项目。在启动新管道、定义DAG转换、编写模型或从头设置bauplan项目结构时使用。” allowed-tools:

  • Bash(bauplan:*)
  • Read
  • Write
  • Glob
  • Grep
  • WebFetch(domain:docs.bauplanlabs.com)

创建新的Bauplan数据管道

本技能指导您从头开始创建新的bauplan数据管道项目,包括项目配置和SQL/Python转换模型。

关键:分支安全

切勿在main分支上运行管道。 始终使用开发分支。

分支命名约定:<用户名>.<分支名>(例如,john.feature-pipeline)。使用bauplan info获取您的用户名。请参阅工作流清单获取确切命令。

先决条件

在创建管道之前,请验证:

  1. 您有一个开发分支(不是main
  2. 源表存在于bauplan数据湖仓中(默认命名空间是bauplan
  3. 您了解源表的模式

管道作为DAG

Bauplan管道是一个由函数(模型)组成的有向无环图(DAG)。关键规则:

  1. 模型:转换数据的SQL或Python函数
  2. 源表:现有的数据湖仓表 - DAG的入口点
  3. 输入:每个模型可以通过bauplan.Model()引用多个表
  4. 输出:每个模型产生恰好一个表
    • SQL:输出名称 = 文件名(trips.sqltrips
    • Python:输出名称 = 函数名(def clean_trips()clean_trips
  5. 拓扑结构:由输入引用隐式定义 - bauplan确定执行顺序

期望:数据质量函数,将表作为输入并返回布尔值

示例DAG

[数据湖仓: taxi_fhvhv] ──→ [trips.sql] ──→ [clean_trips] ──→ [daily_summary]
                                                ↑
[数据湖仓: taxi_zones] ────────────────────────┘

在此示例中:

  • taxi_fhvhvtaxi_zones是源表(已在数据湖仓中)
  • trips.sqltaxi_fhvhv读取(SQL模型,第一个节点)
  • clean_tripstripstaxi_zones作为输入(Python模型,多个输入)
  • daily_summaryclean_trips作为输入(Python模型,单个输入)

所需的用户输入

在编写管道之前,您必须从用户那里收集以下信息:

  1. 管道目的(必需):DAG应执行哪些转换?业务逻辑或目标是什么?
  2. 源表(必需):应使用数据湖仓中的哪些表作为输入?使用bauplan table get验证它们是否存在
  3. 输出表(必需):管道结束时应物化哪些表?这些是下游消费者可见的最终输出
  4. 物化策略(可选):输出表应使用REPLACE(默认)还是APPEND
  5. 严格模式(可选):管道是否应在严格模式下运行?如果是,所有CLI命令都将使用--strict标志,该标志会在干运行时因输出列不匹配等问题而失败,允许立即检测和纠正错误。

如果用户尚未提供此信息,请在继续实施之前询问。

严格模式(–strict 标志)

启用严格模式时,在所有bauplan run命令后附加--strict

# 无严格模式(默认)
bauplan run --dry-run
bauplan run

# 启用严格模式
bauplan run --dry-run --strict
bauplan run --strict

严格模式的好处:

  • 在输出列不匹配时立即失败
  • 在期望失败时立即失败
  • 允许您在管道完成前纠正声明错误
  • 在迭代管道开发时推荐使用

项目结构

Bauplan项目是一个包含以下内容的文件夹:

my-project/
  bauplan_project.yml    # 必需:项目配置
  model.sql              # 可选:单个SQL模型,每个文件一个
  models.py              # 可选:Python模型(一个文件可以有>1个模型,或拆分为多个文件)
  expectations.py        # 可选:数据质量测试(如果有)

bauplan_project.yml

每个项目都是一个单独的文件夹,需要此配置文件:

project:
  id: <唯一-uuid>       # 生成唯一的UUID
  name: <项目名称>    # 项目的描述性名称

何时使用SQL与Python模型

重要:SQL模型应仅限于管道图中的第一个节点

  • SQL模型:仅用于直接从数据湖仓中的源表读取的节点(管道图外部的表)
  • Python模型:管道中所有其他转换的首选

这确保了一致性,并允许更好地控制转换、输出模式验证和文档。

SQL模型(仅限第一个节点)

SQL模型是.sql文件,其中:

  • 文件名成为输出表名
  • FROM子句定义输入表
  • 可选:添加物化策略作为注释

仅在从现有数据湖仓表读取时使用SQL模型:

-- trips.sql
-- 第一个节点:从数据湖仓中的taxi_fhvhv表读取
SELECT
    pickup_datetime,
    PULocationID,
    trip_miles
FROM taxi_fhvhv
WHERE pickup_datetime >= '2022-12-01'

输出表:trips(来自文件名) 输入表:taxi_fhvhv(来自FROM子句,存在于数据湖仓中)

Python模型(首选)

Python模型使用装饰器定义转换。它们应用于除从数据湖仓读取的第一个节点之外的所有管道节点。

关键装饰器

  • @bauplan.model() - 将函数注册为模型
  • @bauplan.model(columns=[...]) - 指定预期的输出列以进行验证(可选但推荐)
  • @bauplan.model(materialization_strategy='REPLACE') - 将输出持久化到数据湖仓
  • @bauplan.python('3.11', pip={'pandas': '1.5.3'}) - 指定Python版本和包

最佳实践:输出列验证

重要:尽可能在@bauplan.model()中指定columns参数以定义预期的输出模式。这可以自动验证模型的输出。

首先,检查源表的模式以了解输入列。然后根据您的转换指定输出列:

# 如果输入有列:[id, name, age, city]
# 并且转换删除了'city'列
# 那么输出列应为:[id, name, age]

@bauplan.model(columns=['id', 'name', 'age'])

最佳实践:带有输出模式的文档字符串

重要:每个Python模型都应有一个文档字符串,描述转换并以ASCII表格形式显示输出表结构(如果表太宽,仅显示关键列;如果值太大,则在单元格中截断它们)。

@bauplan.model(columns=['id', 'name', 'age'])
@bauplan.python('3.11')
def clean_users(data=bauplan.Model('raw_users')):
    """
    通过移除无效条目和删除城市列来清理用户数据。

    | id  | name    | age |
    |-----|---------|-----|
    | 1   | Alice   | 30  |
    | 2   | Bob     | 25  |
    """
    # 转换逻辑
    return data.drop_columns(['city'])

使用columnsfilter进行I/O下推

重要:尽可能在bauplan.Model()中使用columnsfilter参数来限制读取的数据。这实现了I/O下推,显著减少了传输的数据量并提高了性能。不要读取不需要的列。

bauplan.Model(
    'table_name',
    columns=['col1', 'col2', 'col3'],   # 仅读取这些列
    filter="date >= '2022-01-01'"       # 在存储级别预过滤
)

尽可能指定:

  • columns:仅列出模型实际需要的列
  • filter:SQL类过滤表达式,以在存储级别限制行(如果适用)

基本Python模型

import bauplan

@bauplan.model(
    columns=['pickup_datetime', 'PULocationID', 'trip_miles'],
    materialization_strategy='REPLACE'
)
@bauplan.python('3.11', pip={'polars': '1.15.0'})
def clean_trips(
    # 使用columns和filter进行I/O下推
    data=bauplan.Model(
        'trips',
        columns=['pickup_datetime', 'PULocationID', 'trip_miles'],
        filter="trip_miles > 0"
    )
):
    """
    过滤行程,仅包含里程为正的行程。

    | pickup_datetime     | PULocationID | trip_miles |
    |---------------------|--------------|------------|
    | 2022-12-01 08:00:00 | 123          | 5.2        |
    """
    import polars as pl

    df = pl.from_arrow(data)
    df = df.filter(pl.col('trip_miles') > 0.0)

    return df.to_arrow()

具有多个输入的Python模型

模型可以将多个表作为输入 - 只需添加更多bauplan.Model()参数:

def model_with_joins(
    table_a=bauplan.Model('source_a', columns=['id', 'value']),
    table_b=bauplan.Model('source_b', columns=['id', 'name'])
):
    # 连接、转换、返回Arrow表
    return table_a.join(table_b, 'id', 'id')

有关使用Polars的完整多输入示例,请参阅examples.md

工作流清单

复制此清单并跟踪您的进度:

管道创建进度:
- [ ] 步骤1:获取用户名 → bauplan info
- [ ] 步骤2:检出main → bauplan branch checkout main
- [ ] 步骤3:创建开发分支 → bauplan branch create <用户名>.<分支名>
- [ ] 步骤4:检出开发分支 → bauplan branch checkout <用户名>.<分支名>
- [ ] 步骤5:验证源表 → bauplan table get <命名空间>.<表名>,可选数据预览:bauplan query "SELECT * FROM <命名空间>.<表名> LIMIT 3"
- [ ] 步骤6:创建包含bauplan_project.yml的项目文件夹
- [ ] 步骤7:编写SQL模型/ Python模型以进行转换,遵循指南
- [ ] 步骤8:验证物化装饰器(见下面的物化清单)
- [ ] 步骤9:干运行 → bauplan run --dry-run [--strict 如果严格模式]
- [ ] 步骤10:运行管道 → bauplan run [--strict 如果严格模式]

关键:切勿在main分支上运行。步骤2-4确保您在开发分支上。

物化清单

编写模型后,根据用户要求验证每个模型是否具有正确的materialization_strategy

模型类型 无物化(中间) 物化输出
Python @bauplan.model()(无策略) @bauplan.model(materialization_strategy='REPLACE')'APPEND'
SQL 无需注释 添加注释:-- bauplan: materialization_strategy=REPLACEAPPEND

为每个模型验证:

  • [ ] 中间表(非最终输出):未指定materialization_strategy
  • [ ] 用户请求的最终输出表:materialization_strategy='REPLACE'(默认)或'APPEND'
  • [ ] 如果用户为任何表指定了APPEND:确认设置了materialization_strategy='APPEND'

物化输出的Python装饰器示例:

@bauplan.model(materialization_strategy='REPLACE', columns=['col1', 'col2'])

物化输出的SQL注释示例:

-- bauplan: materialization_strategy=REPLACE
SELECT * FROM source_table

高级示例

有关以下内容,请参阅examples.md

  • APPEND物化策略
  • Python模型中的DuckDB查询
  • 数据质量期望
  • 多阶段管道