数据工程师Skill data-engineer

数据工程技能专注于构建和管理可扩展的数据管道、数据仓库和流处理系统,用于数据提取、转换、加载(ETL)和数据分析基础设施,支持数据质量监控、大数据处理和数据建模。关键词:数据工程、ETL、数据仓库、大数据、数据管道、Apache Airflow、Apache Spark、数据分析。

数据工程 0 次安装 0 次浏览 更新于 3/12/2026

名称: 数据工程师 描述: 构建ETL管道、数据仓库和流处理架构。用于数据管道设计或分析基础设施。

数据工程

构建可扩展的数据管道和分析基础设施。

何时使用

  • ETL/ELT管道设计
  • 数据仓库建模
  • 流式数据处理
  • 数据质量监控

Airflow DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': '数据团队',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
}

with DAG(
    'etl_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # 每日凌晨2点
    start_date=days_ago(1),
    catchup=False,
) as dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    load = PythonOperator(
        task_id='load',
        python_callable=load_data,
    )

    validate = PythonOperator(
        task_id='validate',
        python_callable=validate_data,
    )

    extract >> transform >> load >> validate

数据仓库模式

星型模式

-- 事实表
CREATE TABLE fact_sales (
    sale_id BIGINT PRIMARY KEY,
    date_key INT REFERENCES dim_date(date_key),
    product_key INT REFERENCES dim_product(product_key),
    customer_key INT REFERENCES dim_customer(customer_key),
    quantity INT,
    amount DECIMAL(10,2),
    created_at TIMESTAMP DEFAULT NOW()
);

-- 维度表
CREATE TABLE dim_date (
    date_key INT PRIMARY KEY,
    date DATE,
    year INT,
    quarter INT,
    month INT,
    week INT,
    day_of_week INT
);

CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id VARCHAR(50),
    name VARCHAR(255),
    category VARCHAR(100),
    -- SCD Type 2 字段
    valid_from DATE,
    valid_to DATE,
    is_current BOOLEAN
);

Spark作业

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg

spark = SparkSession.builder \
    .appName("ETL作业") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# 读取时分区
df = spark.read \
    .option("inferSchema", "true") \
    .parquet("s3://bucket/data/") \
    .filter(col("date") >= "2024-01-01")

# 转换
result = df \
    .groupBy("category", "date") \
    .agg(
        sum("amount").alias("total_amount"),
        avg("quantity").alias("avg_quantity")
    ) \
    .repartition(10, "date")  # 优化写入

# 写入分区
result.write \
    .mode("overwrite") \
    .partitionBy("date") \
    .parquet("s3://bucket/output/")

数据质量

from great_expectations.core import ExpectationSuite

suite = ExpectationSuite("销售数据")

# 定义期望
suite.add_expectation(
    expect_column_values_to_not_be_null(column="sale_id")
)
suite.add_expectation(
    expect_column_values_to_be_between(
        column="amount", min_value=0, max_value=1000000
    )
)
suite.add_expectation(
    expect_column_values_to_be_unique(column="sale_id")
)

最佳实践

  • 幂等操作(可重新运行)
  • 增量处理优先于全量刷新
  • 数据血缘跟踪
  • 模式演变处理
  • 云服务成本监控

示例

输入: “设计用户事件ETL” 操作: 创建带提取/转换/加载的Airflow DAG,添加质量检查

输入: “优化缓慢的Spark作业” 操作: 检查分区、减少shuffle、调整内存设置