数据转换Skill transforming-data

数据转换技能涉及使用ETL/ELT模式、SQL(如dbt)、Python库(如pandas、polars、PySpark)和编排工具(如Airflow)将原始数据处理成可用于分析的数据集,适用于数据管道构建、增量加载、性能优化和数据质量保障。关键词:数据转换、ETL开发、数据工程、数据管道、dbt、Airflow、SQL、Python、数据仓库。

数据工程 0 次安装 0 次浏览 更新于 3/23/2026

名称: 数据转换 描述: 使用ETL/ELT模式、SQL(dbt)、Python(pandas/polars/PySpark)和编排工具(Airflow)将原始数据转换为分析资产。适用于构建数据管道、实现增量模型、从pandas迁移到polars,或编排带有测试和质量检查的多步骤转换。

数据转换

使用现代转换模式、框架和编排工具将原始数据转换为分析资产。

目的

在现代数据栈中选择和实施数据转换模式。使用SQL(dbt)、Python DataFrame(pandas、polars、PySpark)和管道编排(Airflow、Dagster、Prefect)将原始数据转换为干净、经过测试和记录的分析数据集。

何时使用

在以下情况下调用此技能:

  • 选择ETL和ELT转换模式之间
  • 构建dbt模型(暂存层、中间层、集市层)
  • 实现增量数据加载和合并策略
  • 为性能改进将pandas代码迁移到polars
  • 编排带有依赖关系和重试的数据管道
  • 添加数据质量测试和验证
  • 使用PySpark处理大型数据集
  • 创建生产就绪的转换工作流

快速开始:常见模式

dbt增量模型

{{
  config(
    materialized='incremental',
    unique_key='order_id'
  )
}}

select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3

{% if is_incremental() %}
    where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}

polars高性能转换

import polars as pl

result = (
    pl.scan_csv('large_dataset.csv')
    .filter(pl.col('year') == 2024)
    .with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
    .group_by('region')
    .agg(pl.col('revenue').sum())
    .collect()  # 执行延迟查询
)

Airflow数据管道

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='daily_sales_pipeline',
    schedule_interval='0 2 * * *',
    default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    extract = PythonOperator(task_id='extract', python_callable=extract_data)
    transform = PythonOperator(task_id='transform', python_callable=transform_data)
    extract >> transform

决策框架

ETL与ELT选择

使用ELT(提取、加载、转换) 当:

  • 使用现代云数据仓库(Snowflake、BigQuery、Databricks)
  • 转换逻辑频繁更改
  • 团队包括SQL分析师
  • 数据量10GB-1TB+(利用仓库并行性)

工具:dbt、Dataform、Snowflake任务、BigQuery预定查询

使用ETL(提取、转换、加载) 当:

  • 法规合规要求加载前数据脱敏(PII/PHI)
  • 目标系统缺乏计算能力
  • 实时流处理需要即时转换
  • 无云仓库的遗留系统

工具:AWS Glue、Azure Data Factory、自定义Python脚本

使用混合模式 当结合敏感数据清洗(ETL)与分析转换(ELT)时。

默认推荐:除非特定合规或性能限制要求ETL,否则使用ELT与dbt。

详细模式见references/etl-vs-elt-patterns.md

DataFrame库选择

选择pandas 当:

  • 数据大小 < 500MB
  • 原型设计或探索性分析
  • 需要与仅pandas兼容的库

选择polars 当:

  • 数据大小500MB-100GB
  • 性能关键(比pandas快10-100倍)
  • 生产管道有内存限制
  • 希望延迟评估与查询优化

选择PySpark 当:

  • 数据大小 > 100GB
  • 需要跨集群分布式处理
  • 现有Spark基础设施(EMR、Databricks)

迁移路径:pandas → polars(更易,API类似)或pandas → PySpark(需要集群)

比较和迁移指南见references/dataframe-comparison.md

编排工具选择

选择Airflow 当:

  • 企业生产(规模验证)
  • 需要5,000+集成
  • 有托管服务可用(AWS MWAA、GCP Cloud Composer)

选择Dagster 当:

  • 大量dbt使用(原生dbt_assets集成)
  • 数据血缘和基于资产的流程优先
  • ML管道需要可测试性

选择Prefect 当:

  • 动态工作流(运行时任务生成)
  • 偏好云原生架构
  • Pythonic API与装饰器

安全默认:Airflow(久经考验),除非Dagster/Prefect有特定需求。

详细模式见references/orchestration-patterns.md

SQL转换与dbt

模型层结构

  1. 暂存层 (models/staging/)

    • 1:1对应源表
    • 最小转换(重命名、类型转换、基本过滤)
    • 物化为视图或临时表
  2. 中间层 (models/intermediate/)

    • 业务逻辑和复杂连接
    • 不暴露给最终用户
    • 通常临时表(仅CTE)
  3. 集市层 (models/marts/)

    • 报告用的最终模型
    • 事实表(事件、交易)
    • 维度表(客户、产品)
    • 物化为表或增量

dbt物化类型

视图:每次引用模型时重新运行查询。用于快速查询、暂存层。

:每次运行完全刷新。用于频繁查询的模型、昂贵计算。

增量:仅处理新增/更改记录。用于大型事实表、事件日志。

临时表:仅CTE,不持久化。用于中间计算。

dbt测试

models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: total_revenue
        tests:
          - dbt_utils.accepted_range:
              min_value: 0

全面的dbt模式见:

  • references/dbt-best-practices.md
  • references/incremental-strategies.md

Python DataFrame转换

pandas转换

import pandas as pd

df = pd.read_csv('sales.csv')
result = (
    df
    .query('year == 2024')
    .assign(revenue=lambda x: x['quantity'] * x['price'])
    .groupby('region')
    .agg({'revenue': ['sum', 'mean']})
)

polars转换(10-100倍更快)

import polars as pl

result = (
    pl.scan_csv('sales.csv')  # 延迟评估
    .filter(pl.col('year') == 2024)
    .with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
    .group_by('region')
    .agg([
        pl.col('revenue').sum().alias('revenue_sum'),
        pl.col('revenue').mean().alias('revenue_mean')
    ])
    .collect()  # 执行延迟查询
)

关键差异

  • polars使用scan_csv()(延迟)vs pandas read_csv()(急切)
  • polars使用with_columns() vs pandas assign()
  • polars使用pl.col()表达式 vs pandas字符串引用
  • polars需要collect()执行延迟查询

PySpark分布式处理

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)

result = (
    df
    .filter(F.col('year') == 2024)
    .withColumn('revenue', F.col('quantity') * F.col('price'))
    .groupBy('region')
    .agg(F.sum('revenue').alias('total_revenue'))
)

迁移指南见references/dataframe-comparison.md

管道编排

Airflow DAG结构

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='data_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # 每天凌晨2点
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
    task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
    task1 >> task2  # 定义依赖

任务依赖模式

线性A >> B >> C(顺序) 扇出A >> [B, C, D](A后并行) 扇入[A, B, C] >> D(D等待所有)

Airflow、Dagster和Prefect模式见references/orchestration-patterns.md

数据质量和测试

dbt测试

通用测试(可重用):unique、not_null、accepted_values、relationships

单一测试(自定义SQL):

-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0

Great Expectations

import great_expectations as gx

context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="total_revenue", min_value=0
    )
)

全面测试模式见references/data-quality-testing.md

高级SQL模式

用于分析的窗口函数:

select
    order_date,
    daily_revenue,
    avg(daily_revenue) over (
        partition by region
        order by order_date
        rows between 6 preceding and current row
    ) as revenue_7d_ma,
    sum(daily_revenue) over (
        partition by region
        order by order_date
    ) as cumulative_revenue
from daily_sales

高级窗口函数见references/window-functions-guide.md

生产最佳实践

幂等性

确保转换多次运行产生相同结果:

  • 在增量模型中使用merge语句
  • 实现去重逻辑
  • 在dbt增量模型中使用unique_key

增量加载

{% if is_incremental() %}
    where created_at > (select max(created_at) from {{ this }})
{% endif %}

错误处理

try:
    result = perform_transformation()
    validate_result(result)
except ValidationError as e:
    log_error(e)
    raise

监控

  • 设置Airflow电子邮件/Slack警报任务失败时
  • 监控dbt测试失败
  • 跟踪数据新鲜度(SLA)
  • 记录行数和数据质量指标

工具推荐

SQL转换:dbt Core(行业标准,多仓库,丰富生态系统)

pip install dbt-core dbt-snowflake

Python DataFrame:polars(比pandas快10-100倍,多线程,延迟评估)

pip install polars

编排:Apache Airflow(规模验证,5,000+集成)

pip install apache-airflow

示例

工作示例在:

  • examples/python/pandas-basics.py - pandas转换
  • examples/python/polars-migration.py - pandas到polars迁移
  • examples/python/pyspark-transformations.py - PySpark操作
  • examples/python/airflow-data-pipeline.py - 完整Airflow DAG
  • examples/sql/dbt-staging-model.sql - dbt暂存层
  • examples/sql/dbt-intermediate-model.sql - dbt中间层
  • examples/sql/dbt-incremental-model.sql - 增量模式
  • examples/sql/window-functions.sql - 高级SQL

脚本

  • scripts/generate_dbt_models.py - 生成dbt模型样板
  • scripts/benchmark_dataframes.py - 比较pandas与polars性能

相关技能

数据摄取模式见ingesting-data。 数据可视化见visualizing-data。 数据库设计见databases-*技能。 实时流处理见streaming-data。 数据平台架构见ai-data-engineering。 监控管道见observability