数据工程 data-engineering

数据工程是一门专注于设计、构建和维护数据管道、数据仓库以及数据质量验证的学科。它核心涉及ETL(提取、转换、加载)流程、使用Apache Spark进行大数据处理、实施数据质量检查,以及设计星型模式等数据仓库结构,旨在确保数据的可靠性、可用性和高效分析,适用于数据驱动决策和商业智能应用。关键词:数据工程,ETL管道,数据仓库,Apache Spark,数据质量,数据沿袭,大数据处理。

数据工程 0 次安装 0 次浏览 更新于 3/8/2026

名称: 数据工程 描述: 数据工程模式,涵盖ETL管道、数据仓库、Apache Spark和数据质量验证

数据工程

ETL 管道模式

from datetime import datetime
from dataclasses import dataclass

@dataclass
class PipelineResult:
    records_extracted: int
    records_transformed: int
    records_loaded: int
    errors: list[str]
    duration_seconds: float

class OrderPipeline:
    def __init__(self, source_db, warehouse_db):
        self.source = source_db
        self.warehouse = warehouse_db

    def extract(self, since: datetime) -> list[dict]:
        query = """
            SELECT o.*, c.name as customer_name, c.segment
            FROM orders o
            JOIN customers c ON o.customer_id = c.id
            WHERE o.updated_at > %s
        """
        return self.source.fetch_all(query, [since])

    def transform(self, records: list[dict]) -> list[dict]:
        transformed = []
        for record in records:
            transformed.append({
                "order_id": record["id"],
                "customer_name": record["customer_name"],
                "segment": record["segment"].upper(),
                "total_amount": float(record["total"]),
                "order_date": record["created_at"].date(),
                "fiscal_quarter": get_fiscal_quarter(record["created_at"]),
                "is_high_value": float(record["total"]) > 1000,
                "loaded_at": datetime.utcnow(),
            })
        return transformed

    def load(self, records: list[dict]) -> int:
        return self.warehouse.upsert_batch(
            table="fact_orders",
            records=records,
            conflict_keys=["order_id"],
            batch_size=5000,
        )

    def run(self, since: datetime) -> PipelineResult:
        start = datetime.utcnow()
        raw = self.extract(since)
        clean = self.transform(raw)
        loaded = self.load(clean)
        return PipelineResult(
            records_extracted=len(raw),
            records_transformed=len(clean),
            records_loaded=loaded,
            errors=[],
            duration_seconds=(datetime.utcnow() - start).total_seconds(),
        )

Apache Spark 处理

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("sales-analytics") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

orders = spark.read.parquet("s3://data-lake/orders/")
customers = spark.read.parquet("s3://data-lake/customers/")

daily_revenue = (
    orders
    .filter(F.col("status") == "completed")
    .withColumn("order_date", F.to_date("created_at"))
    .groupBy("order_date", "product_category")
    .agg(
        F.sum("total_amount").alias("revenue"),
        F.count("id").alias("order_count"),
        F.avg("total_amount").alias("avg_order_value"),
    )
    .withColumn(
        "revenue_7d_avg",
        F.avg("revenue").over(
            Window.partitionBy("product_category")
            .orderBy("order_date")
            .rowsBetween(-6, 0)
        )
    )
)

daily_revenue.write \
    .partitionBy("order_date") \
    .mode("overwrite") \
    .parquet("s3://data-warehouse/daily_revenue/")

数据质量检查

from dataclasses import dataclass

@dataclass
class QualityCheck:
    name: str
    query: str
    threshold: float
    severity: str

CHECKS = [
    QualityCheck(
        name="null_customer_ids",
        query="SELECT COUNT(*) FROM fact_orders WHERE customer_id IS NULL",
        threshold=0,
        severity="critical",
    ),
    QualityCheck(
        name="negative_amounts",
        query="SELECT COUNT(*) FROM fact_orders WHERE total_amount < 0",
        threshold=0,
        severity="critical",
    ),
    QualityCheck(
        name="duplicate_orders",
        query="SELECT COUNT(*) - COUNT(DISTINCT order_id) FROM fact_orders",
        threshold=0,
        severity="warning",
    ),
    QualityCheck(
        name="freshness",
        query="SELECT EXTRACT(EPOCH FROM NOW() - MAX(loaded_at))/3600 FROM fact_orders",
        threshold=2.0,
        severity="warning",
    ),
]

def run_quality_checks(db, checks: list[QualityCheck]) -> list[dict]:
    results = []
    for check in checks:
        value = db.fetch_scalar(check.query)
        passed = value <= check.threshold
        results.append({
            "name": check.name,
            "value": value,
            "threshold": check.threshold,
            "passed": passed,
            "severity": check.severity,
        })
        if not passed and check.severity == "critical":
            raise DataQualityError(f"Critical check failed: {check.name} = {value}")
    return results

数据仓库模式(星型模式)

CREATE TABLE dim_customers (
    customer_key    BIGINT PRIMARY KEY,
    customer_id     VARCHAR(50) NOT NULL,
    name            VARCHAR(200),
    segment         VARCHAR(50),
    country         VARCHAR(100),
    valid_from      TIMESTAMP NOT NULL,
    valid_to        TIMESTAMP,
    is_current      BOOLEAN DEFAULT TRUE
);

CREATE TABLE dim_products (
    product_key     BIGINT PRIMARY KEY,
    product_id      VARCHAR(50) NOT NULL,
    name            VARCHAR(200),
    category        VARCHAR(100),
    subcategory     VARCHAR(100)
);

CREATE TABLE fact_orders (
    order_key       BIGINT PRIMARY KEY,
    order_id        VARCHAR(50) UNIQUE NOT NULL,
    customer_key    BIGINT REFERENCES dim_customers(customer_key),
    product_key     BIGINT REFERENCES dim_products(product_key),
    order_date_key  INT,
    quantity        INT,
    unit_price      DECIMAL(10,2),
    total_amount    DECIMAL(12,2),
    loaded_at       TIMESTAMP DEFAULT NOW()
);

反模式

  • 逐行处理数据,而不是批量或集合处理
  • 未按日期或类别分区大表
  • 管道阶段之间缺少数据质量检查
  • 将原始数据直接加载到仓库而不进行转换
  • 当增量加载足够时使用全表扫描
  • 未跟踪数据沿袭(数据来源、处理时间)

检查清单

  • [ ] 管道遵循提取-转换-加载,具有清晰的阶段分离
  • [ ] 基于水印或变更数据捕获的增量处理
  • [ ] 在每个管道阶段后运行数据质量检查
  • [ ] 仓库使用星型或雪花模式,包含维度和事实表
  • [ ] Spark作业使用自适应查询执行和适当的分区
  • [ ] 幂等加载(重新运行产生相同结果)
  • [ ] 通过自动警报监控数据新鲜度
  • [ ] 优雅处理模式演变(优先使用加法更改)