名称: 数据转换 描述: 使用ETL/ELT模式、SQL(dbt)、Python(pandas/polars/PySpark)和编排工具(Airflow)将原始数据转换为分析资产。适用于构建数据管道、实现增量模型、从pandas迁移到polars,或编排带有测试和质量检查的多步骤转换。
数据转换
使用现代转换模式、框架和编排工具将原始数据转换为分析资产。
目的
在现代数据栈中选择和实施数据转换模式。使用SQL(dbt)、Python DataFrame(pandas、polars、PySpark)和管道编排(Airflow、Dagster、Prefect)将原始数据转换为干净、经过测试和记录的分析数据集。
何时使用
在以下情况下调用此技能:
- 选择ETL和ELT转换模式之间
- 构建dbt模型(暂存层、中间层、集市层)
- 实现增量数据加载和合并策略
- 为性能改进将pandas代码迁移到polars
- 编排带有依赖关系和重试的数据管道
- 添加数据质量测试和验证
- 使用PySpark处理大型数据集
- 创建生产就绪的转换工作流
快速开始:常见模式
dbt增量模型
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3
{% if is_incremental() %}
where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}
polars高性能转换
import polars as pl
result = (
pl.scan_csv('large_dataset.csv')
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg(pl.col('revenue').sum())
.collect() # 执行延迟查询
)
Airflow数据管道
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
dag_id='daily_sales_pipeline',
schedule_interval='0 2 * * *',
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
extract >> transform
决策框架
ETL与ELT选择
使用ELT(提取、加载、转换) 当:
- 使用现代云数据仓库(Snowflake、BigQuery、Databricks)
- 转换逻辑频繁更改
- 团队包括SQL分析师
- 数据量10GB-1TB+(利用仓库并行性)
工具:dbt、Dataform、Snowflake任务、BigQuery预定查询
使用ETL(提取、转换、加载) 当:
- 法规合规要求加载前数据脱敏(PII/PHI)
- 目标系统缺乏计算能力
- 实时流处理需要即时转换
- 无云仓库的遗留系统
工具:AWS Glue、Azure Data Factory、自定义Python脚本
使用混合模式 当结合敏感数据清洗(ETL)与分析转换(ELT)时。
默认推荐:除非特定合规或性能限制要求ETL,否则使用ELT与dbt。
详细模式见references/etl-vs-elt-patterns.md。
DataFrame库选择
选择pandas 当:
- 数据大小 < 500MB
- 原型设计或探索性分析
- 需要与仅pandas兼容的库
选择polars 当:
- 数据大小500MB-100GB
- 性能关键(比pandas快10-100倍)
- 生产管道有内存限制
- 希望延迟评估与查询优化
选择PySpark 当:
- 数据大小 > 100GB
- 需要跨集群分布式处理
- 现有Spark基础设施(EMR、Databricks)
迁移路径:pandas → polars(更易,API类似)或pandas → PySpark(需要集群)
比较和迁移指南见references/dataframe-comparison.md。
编排工具选择
选择Airflow 当:
- 企业生产(规模验证)
- 需要5,000+集成
- 有托管服务可用(AWS MWAA、GCP Cloud Composer)
选择Dagster 当:
- 大量dbt使用(原生
dbt_assets集成) - 数据血缘和基于资产的流程优先
- ML管道需要可测试性
选择Prefect 当:
- 动态工作流(运行时任务生成)
- 偏好云原生架构
- Pythonic API与装饰器
安全默认:Airflow(久经考验),除非Dagster/Prefect有特定需求。
详细模式见references/orchestration-patterns.md。
SQL转换与dbt
模型层结构
-
暂存层 (
models/staging/)- 1:1对应源表
- 最小转换(重命名、类型转换、基本过滤)
- 物化为视图或临时表
-
中间层 (
models/intermediate/)- 业务逻辑和复杂连接
- 不暴露给最终用户
- 通常临时表(仅CTE)
-
集市层 (
models/marts/)- 报告用的最终模型
- 事实表(事件、交易)
- 维度表(客户、产品)
- 物化为表或增量
dbt物化类型
视图:每次引用模型时重新运行查询。用于快速查询、暂存层。
表:每次运行完全刷新。用于频繁查询的模型、昂贵计算。
增量:仅处理新增/更改记录。用于大型事实表、事件日志。
临时表:仅CTE,不持久化。用于中间计算。
dbt测试
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- relationships:
to: ref('dim_customers')
field: customer_id
- name: total_revenue
tests:
- dbt_utils.accepted_range:
min_value: 0
全面的dbt模式见:
references/dbt-best-practices.mdreferences/incremental-strategies.md
Python DataFrame转换
pandas转换
import pandas as pd
df = pd.read_csv('sales.csv')
result = (
df
.query('year == 2024')
.assign(revenue=lambda x: x['quantity'] * x['price'])
.groupby('region')
.agg({'revenue': ['sum', 'mean']})
)
polars转换(10-100倍更快)
import polars as pl
result = (
pl.scan_csv('sales.csv') # 延迟评估
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg([
pl.col('revenue').sum().alias('revenue_sum'),
pl.col('revenue').mean().alias('revenue_mean')
])
.collect() # 执行延迟查询
)
关键差异:
- polars使用
scan_csv()(延迟)vs pandasread_csv()(急切) - polars使用
with_columns()vs pandasassign() - polars使用
pl.col()表达式 vs pandas字符串引用 - polars需要
collect()执行延迟查询
PySpark分布式处理
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)
result = (
df
.filter(F.col('year') == 2024)
.withColumn('revenue', F.col('quantity') * F.col('price'))
.groupBy('region')
.agg(F.sum('revenue').alias('total_revenue'))
)
迁移指南见references/dataframe-comparison.md。
管道编排
Airflow DAG结构
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='data_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # 每天凌晨2点
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
task1 >> task2 # 定义依赖
任务依赖模式
线性:A >> B >> C(顺序)
扇出:A >> [B, C, D](A后并行)
扇入:[A, B, C] >> D(D等待所有)
Airflow、Dagster和Prefect模式见references/orchestration-patterns.md。
数据质量和测试
dbt测试
通用测试(可重用):unique、not_null、accepted_values、relationships
单一测试(自定义SQL):
-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0
Great Expectations
import great_expectations as gx
context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_revenue", min_value=0
)
)
全面测试模式见references/data-quality-testing.md。
高级SQL模式
用于分析的窗口函数:
select
order_date,
daily_revenue,
avg(daily_revenue) over (
partition by region
order by order_date
rows between 6 preceding and current row
) as revenue_7d_ma,
sum(daily_revenue) over (
partition by region
order by order_date
) as cumulative_revenue
from daily_sales
高级窗口函数见references/window-functions-guide.md。
生产最佳实践
幂等性
确保转换多次运行产生相同结果:
- 在增量模型中使用
merge语句 - 实现去重逻辑
- 在dbt增量模型中使用
unique_key
增量加载
{% if is_incremental() %}
where created_at > (select max(created_at) from {{ this }})
{% endif %}
错误处理
try:
result = perform_transformation()
validate_result(result)
except ValidationError as e:
log_error(e)
raise
监控
- 设置Airflow电子邮件/Slack警报任务失败时
- 监控dbt测试失败
- 跟踪数据新鲜度(SLA)
- 记录行数和数据质量指标
工具推荐
SQL转换:dbt Core(行业标准,多仓库,丰富生态系统)
pip install dbt-core dbt-snowflake
Python DataFrame:polars(比pandas快10-100倍,多线程,延迟评估)
pip install polars
编排:Apache Airflow(规模验证,5,000+集成)
pip install apache-airflow
示例
工作示例在:
examples/python/pandas-basics.py- pandas转换examples/python/polars-migration.py- pandas到polars迁移examples/python/pyspark-transformations.py- PySpark操作examples/python/airflow-data-pipeline.py- 完整Airflow DAGexamples/sql/dbt-staging-model.sql- dbt暂存层examples/sql/dbt-intermediate-model.sql- dbt中间层examples/sql/dbt-incremental-model.sql- 增量模式examples/sql/window-functions.sql- 高级SQL
脚本
scripts/generate_dbt_models.py- 生成dbt模型样板scripts/benchmark_dataframes.py- 比较pandas与polars性能
相关技能
数据摄取模式见ingesting-data。
数据可视化见visualizing-data。
数据库设计见databases-*技能。
实时流处理见streaming-data。
数据平台架构见ai-data-engineering。
监控管道见observability。