Data Engineering Command Center
Complete methodology for designing, building, operating, and scaling data pipelines and infrastructure. Zero dependencies — pure agent skill.
Phase 1: Data Architecture Assessment
Before building anything, understand the landscape.
Architecture Brief
project_name: ""
business_context: ""
data_consumers:
- team: ""
use_case: "" # analytics | ML | operational | reporting | reverse-ETL
latency_requirement: "" # real-time (<1s) | near-real-time (<5min) | batch (hourly+)
query_pattern: "" # ad-hoc | scheduled | API | dashboard
current_state:
sources: [] # list every system producing data
storage: [] # where data lives today
pain_points: [] # what's broken, slow, unreliable
data_volume:
current_gb_per_day: 0
growth_rate_percent: 0
retention_months: 0
constraints:
budget_monthly_usd: 0
team_size: 0
skill_level: "" # junior | mid | senior | mixed
compliance: [] # GDPR, HIPAA, SOX, PCI, none
cloud_provider: "" # AWS | GCP | Azure | multi | on-prem
Architecture Pattern Decision Matrix
| Signal |
Pattern |
When to Use |
| All consumers need data hourly+ |
Batch ETL |
Reporting, warehousing, most analytics |
| Some need <5 min latency |
Micro-batch |
Dashboard freshness, near-real-time analytics |
| Events need <1s processing |
Streaming |
Fraud detection, real-time pricing, alerts |
| Need both batch + streaming |
Lambda |
When batch accuracy + real-time speed both matter |
| Want to simplify Lambda |
Kappa |
When you can reprocess from stream replay |
| Data lake + warehouse combined |
Lakehouse |
When you need both cheap storage + fast SQL |
| Sources change independently |
Data Mesh |
Large orgs, domain-owned data, >5 teams |
| ML is primary consumer |
Feature Store |
ML-heavy orgs with feature reuse needs |
Technology Selection Guide
Orchestration
| Tool |
Best For |
Avoid When |
| Airflow |
Complex DAGs, Python-native teams, mature ecosystem |
Simple pipelines (<5 tasks) |
| Dagster |
Software-defined assets, strong typing, dev experience |
Legacy team resistant to new paradigms |
| Prefect |
Dynamic workflows, cloud-native, Python-first |
Need on-prem with no cloud dependency |
| dbt |
SQL transformations, ELT, analytics engineering |
Non-SQL transforms, streaming |
| Temporal |
Long-running workflows, retry-heavy, microservices |
Simple ETL, small teams |
| Cron + scripts |
<3 pipelines, solo engineer, simple schedules |
Anything with dependencies or retries |
Processing
| Tool |
Best For |
Avoid When |
| Spark |
>100GB, complex transforms, ML pipelines |
<10GB (overkill), real-time streaming |
| DuckDB |
Local analytics, <100GB, SQL on files |
Distributed processing, production streaming |
| Polars |
Single-node, Rust-speed, <50GB, DataFrames |
Distributed, need Spark ecosystem |
| Pandas |
<1GB, quick analysis, prototyping |
Production pipelines, anything >5GB |
| Flink |
True streaming, event-time processing |
Batch-only, small team (steep learning curve) |
| SQL (warehouse) |
ELT in Snowflake/BigQuery/Redshift |
Complex ML transforms, binary data |
Storage
| Tool |
Best For |
Avoid When |
| Snowflake |
Analytics, separation of compute/storage, multi-cloud |
Tight budget, real-time OLTP |
| BigQuery |
GCP-native, serverless, large-scale analytics |
Multi-cloud, need fine-grained cost control |
| Redshift |
AWS-native, existing AWS ecosystem |
Elastic scaling needs, multi-cloud |
| Databricks |
ML + analytics unified, Spark-native, lakehouse |
Pure SQL analytics, small data |
| PostgreSQL |
OLTP + light analytics, <500GB, budget-conscious |
>1TB analytics, real-time dashboards on large data |
| S3/GCS/ADLS |
Raw data lake, cheap storage, any format |
Direct SQL queries (need compute layer) |
| Delta Lake/Iceberg |
Table format on data lake, ACID on files |
Simple file storage, no lakehouse need |
Phase 2: Data Modeling
Modeling Methodology Decision
| Approach |
Best For |
Key Concept |
| Kimball (Dimensional) |
BI/reporting, star schemas |
Facts + Dimensions, business-process-centric |
| Inmon (3NF) |
Enterprise data warehouse, single source of truth |
Normalized, subject-area-centric |
| Data Vault 2.0 |
Agile warehousing, auditability, multiple sources |
Hubs + Links + Satellites, insert-only |
| One Big Table (OBT) |
Simple analytics, few joins, dashboard performance |
Pre-joined, denormalized, fast queries |
| Activity Schema |
Event analytics, product analytics |
Entity + Activity + Feature columns |
Dimensional Model Template
fact_table:
name: "fact_[business_process]"
grain: "" # one row = one [what]?
grain_statement: "One row per [transaction/event/snapshot] at [time grain]"
measures:
- name: ""
type: "" # additive | semi-additive | non-additive
aggregation: "" # SUM | AVG | COUNT | MIN | MAX | COUNT DISTINCT
business_definition: ""
degenerate_dimensions: [] # IDs stored in fact (order_number, invoice_id)
foreign_keys: [] # links to dimension tables
dimensions:
- name: "dim_[entity]"
type: "" # Type 1 (overwrite) | Type 2 (history) | Type 3 (previous value)
natural_key: "" # business key from source
surrogate_key: "" # warehouse-generated key
attributes:
- name: ""
source: ""
scd_type: "" # 1 | 2 | 3
hierarchy: [] # e.g., [country, region, city, store]
SCD Type Decision Guide
| Scenario |
SCD Type |
Implementation |
| Don’t care about history |
Type 1 |
UPDATE in place |
| Need full history |
Type 2 |
New row + valid_from/valid_to + is_current flag |
| Only need previous value |
Type 3 |
Add previous_[column] |
| Track changes with timestamps |
Type 4 |
Mini-dimension (history table) |
| Hybrid: some attrs Type 1, some Type 2 |
Type 6 |
Combine 1+2+3 in one table |
Default recommendation: Type 2 for anything business-critical (customer status, product price, employee department). Type 1 for everything else.
Naming Conventions
| Object |
Convention |
Example |
| Raw/staging tables |
raw_[source]_[table] |
raw_stripe_payments |
| Staging models |
stg_[source]__[entity] |
stg_stripe__payments |
| Intermediate models |
int_[entity]_[verb] |
int_orders_pivoted |
| Mart/fact tables |
fct_[business_process] |
fct_orders |
| Dimension tables |
dim_[entity] |
dim_customers |
| Metrics/aggregates |
mrt_[domain]_[metric] |
mrt_sales_daily |
| Snapshots |
snp_[entity]_[grain] |
snp_inventory_daily |
| Columns: boolean |
is_[state] or has_[thing] |
is_active, has_subscription |
| Columns: timestamp |
[event]_at |
created_at, shipped_at |
| Columns: date |
[event]_date |
order_date |
| Columns: ID |
[entity]_id |
customer_id |
| Columns: amount |
[thing]_amount |
order_amount |
| Columns: count |
[thing]_count |
line_item_count |
Phase 3: Pipeline Design Patterns
Universal Pipeline Template
pipeline:
name: ""
owner: ""
schedule: "" # cron expression
sla_minutes: 0 # max acceptable runtime
tier: "" # 1 (critical) | 2 (important) | 3 (nice-to-have)
extract:
source_system: ""
connection: ""
strategy: "" # full | incremental | CDC | log-based
incremental_key: "" # column for incremental (e.g., updated_at)
watermark_storage: "" # where to persist last-extracted position
transform:
engine: "" # SQL | Spark | Python | dbt
stages:
- name: "clean"
operations: [] # dedupe, null handling, type casting, trimming
- name: "conform"
operations: [] # standardize codes, currencies, timezones
- name: "enrich"
operations: [] # lookups, calculations, derived fields
- name: "aggregate"
operations: [] # rollups, pivots, window functions
load:
target_system: ""
strategy: "" # append | upsert | merge | truncate-reload | partition-swap
merge_keys: []
partition_key: ""
clustering_keys: []
quality_gates:
pre_load: [] # checks before writing
post_load: [] # checks after writing
error_handling:
strategy: "" # fail-fast | dead-letter | retry | skip-and-alert
max_retries: 3
retry_delay_seconds: 300
alert_channels: []
Extraction Strategy Decision Tree
Is the source database?
├── Yes → Does it support CDC?
│ ├── Yes → Use CDC (Debezium, AWS DMS, Fivetran)
│ │ Best for: high-volume, low-latency, minimal source impact
│ └── No → Does it have a reliable updated_at column?
│ ├── Yes → Incremental extraction on updated_at
│ │ ⚠️ Won't catch hard deletes — add periodic full reconciliation
│ └── No → Full extraction
│ Only viable for small tables (<1M rows)
├── Is it an API?
│ ├── Supports webhooks? → Event-driven ingestion
│ ├── Has cursor/pagination? → Incremental with cursor bookmark
│ └── No pagination? → Full pull with rate-limit handling
├── Is it files (S3, SFTP, email)?
│ └── Event-triggered (S3 notification, file watcher)
│ Validate: schema, completeness, filename pattern
└── Is it streaming (Kafka, Kinesis, Pub/Sub)?
└── Consumer group with offset management
Key decisions: at-least-once vs exactly-once, consumer lag alerting
Load Strategy Decision
| Strategy |
When |
Trade-off |
| Append |
Event/log data, immutable facts |
Simple but grows forever — partition + retain |
| Upsert/Merge |
Dimension updates, SCD Type 1 |
Handles updates but slower on large tables |
| Truncate-Reload |
Small tables (<1M), reference data |
Simple but window of missing data |
| Partition Swap |
Large fact tables, daily loads |
Atomic, fast, but needs partition alignment |
| Soft Delete |
Need audit trail of deletions |
Adds complexity to every downstream query |
Idempotency Rules (NON-NEGOTIABLE)
Every pipeline MUST be re-runnable without side effects:
- Use MERGE/UPSERT, never blind INSERT for mutable data
- Partition-swap for immutable data — drop partition + reload
- Store watermarks externally — not in the pipeline code
- Dedup at ingestion — use source natural keys
- Test by running twice — output must be identical both times
Phase 4: Data Quality Framework
Quality Dimensions
| Dimension |
Definition |
Example Check |
| Completeness |
No missing values where required |
NOT NULL on required fields, row count within range |
| Uniqueness |
No unexpected duplicates |
Primary key uniqueness, natural key uniqueness |
| Validity |
Values within expected domain |
Enum checks, range checks, regex patterns |
| Accuracy |
Data matches real-world truth |
Cross-system reconciliation, manual spot checks |
| Freshness |
Data arrives on time |
MAX(loaded_at) > NOW() - INTERVAL '2 hours' |
| Consistency |
Same data agrees across systems |
Sum reconciliation between source and target |
Quality Check Templates
-- Completeness: Required fields not null
SELECT COUNT(*) AS null_violations
FROM {table}
WHERE {required_column} IS NULL;
-- Threshold: 0
-- Uniqueness: No duplicate primary keys
SELECT {pk_column}, COUNT(*) AS dupe_count
FROM {table}
GROUP BY {pk_column}
HAVING COUNT(*) > 1;
-- Threshold: 0
-- Freshness: Data arrived within SLA
SELECT CASE
WHEN MAX({timestamp_col}) > CURRENT_TIMESTAMP - INTERVAL '{sla_hours} hours'
THEN 'PASS' ELSE 'FAIL'
END AS freshness_check
FROM {table};
-- Volume: Row count within expected range
SELECT CASE
WHEN COUNT(*) BETWEEN {min_expected} AND {max_expected}
THEN 'PASS' ELSE 'FAIL'
END AS volume_check
FROM {table}
WHERE {partition_col} = '{run_date}';
-- Referential: FK integrity
SELECT COUNT(*) AS orphan_count
FROM {fact_table} f
LEFT JOIN {dim_table} d ON f.{fk} = d.{pk}
WHERE d.{pk} IS NULL;
-- Threshold: 0
-- Distribution: No unexpected skew
SELECT {column}, COUNT(*) AS cnt,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct
FROM {table}
GROUP BY {column}
ORDER BY cnt DESC;
-- Alert if any single value > {max_pct}%
-- Cross-system reconciliation
SELECT
(SELECT SUM(amount) FROM source_system.orders WHERE date = '{date}') AS source_total,
(SELECT SUM(amount) FROM warehouse.fct_orders WHERE order_date = '{date}') AS target_total,
ABS(source_total - target_total) AS variance;
-- Threshold: variance < 0.01 * source_total (1%)
Data Contract Template
contract:
name: ""
version: ""
owner: "" # team responsible for producing this data
consumers: [] # teams consuming this data
sla:
freshness_hours: 0
availability_percent: 99.9
support_hours: "" # business-hours | 24x7
schema:
- column: ""
type: ""
nullable: false
description: ""
business_definition: ""
pii: false
checks:
- type: "" # not_null | unique | range | enum | regex | custom
params: {}
breaking_change_policy: "" # notify-30-days | version-bump | never-break
notification_channel: ""
Quality Severity Levels
| Level |
Definition |
Response |
| P0 — Critical |
Data corruption, wrong numbers in production dashboards, compliance data wrong |
Stop pipeline, alert immediately, rollback if possible |
| P1 — High |
Missing data for key reports, SLA breach, >5% of records affected |
Alert team, fix within 4 hours, post-mortem required |
| P2 — Medium |
Non-critical field quality, <1% records affected, no downstream impact |
Fix in next sprint, add monitoring to prevent recurrence |
| P3 — Low |
Cosmetic issues, edge cases, non-critical data |
Backlog, fix when convenient |
Phase 5: Performance Optimization
SQL Optimization Checklist
| Problem |
Fix |
Impact |
| Full table scan |
Add/use partition pruning |
10-100x faster |
| Large joins |
Pre-aggregate before joining |
5-50x faster |
| SELECT * |
Select only needed columns |
2-10x faster (columnar stores) |
| Correlated subquery |
Rewrite as JOIN or window function |
10-100x faster |
| DISTINCT on large result |
Fix upstream duplication instead |
2-5x faster |
| ORDER BY without LIMIT |
Add LIMIT or remove if not needed |
Prevents memory spills |
| String operations in WHERE |
Pre-compute, use lookup table |
Enables index usage |
| Multiple passes over same data |
Combine with CASE WHEN + GROUP BY |
2-5x faster |
| NOT IN with NULLs |
Use NOT EXISTS or LEFT JOIN IS NULL |
Correctness + performance |
Spark Optimization Guide
| Problem |
Solution |
| Shuffle-heavy joins |
Broadcast small table (broadcast(df)) if <100MB |
| Data skew |
Salt the skewed key: add random prefix, join on salted key, aggregate |
| Small files |
Coalesce output: .coalesce(target_files) or use adaptive query execution |
| Too many partitions |
spark.sql.shuffle.partitions = 2-3x cluster cores |
| OOM errors |
Increase spark.executor.memory, reduce partition size, spill to disk |
| Slow writes |
Use Parquet with snappy, partition by date, avoid small writes |
| Repeated computation |
.cache() or .persist() DataFrames used >1 time |
| Complex transformations |
Push down predicates, filter early, select early |
Partitioning Strategy
| Data Type |
Partition Key |
Why |
| Transactional/event |
Date (daily or monthly) |
Most queries filter by time range |
| Multi-tenant |
Tenant ID + date |
Isolate tenant queries, time-range pruning |
| Geospatial |
Region + date |
Regional queries are common |
| Log data |
Date + hour |
High volume needs finer partitions |
| Reference/dimension |
Don’t partition |
Too small, full scan is fine |
Rules:
- Target 100MB-1GB per partition (compressed)
- <10,000 total partitions per table
- Never partition on high-cardinality columns (user_id)
- Always include partition key in WHERE clauses
Phase 6: Data Governance & Cataloging
Data Classification
| Level |
Examples |
Controls |
| Public |
Product catalog, published stats |
No restrictions |
| Internal |
Aggregated metrics, non-PII analytics |
Auth required, audit logging |
| Confidential |
Customer PII, financial records, HR data |
Encryption, column-level access, masking |
| Restricted |
SSN, payment cards, health records, passwords |
Encryption at rest + transit, tokenization, audit every access, retention limits |
PII Handling Rules
- Identify: Scan all sources for PII columns (name, email, phone, SSN, DOB, address, IP)
- Classify: Tag each with sensitivity level
- Minimize: Only ingest PII you actually need
- Protect:
- Hash or tokenize in staging (SHA-256 with salt for pseudonymization)
- Dynamic masking for non-privileged users
- Column-level encryption for restricted data
- Retain: Auto-delete after retention period
- Audit: Log every query touching PII columns
- Right to delete: Build a deletion pipeline that propagates across all derived tables
Data Catalog Entry Template
dataset:
name: ""
description: ""
owner_team: ""
steward: "" # person responsible for quality
domain: "" # sales | marketing | finance | product | engineering
tier: "" # gold (trusted) | silver (cleaned) | bronze (raw)
lineage:
sources: [] # upstream datasets/systems
transformations: "" # brief description of key transforms
downstream: [] # who consumes this
refresh:
schedule: ""
sla_hours: 0
last_successful_run: ""
quality:
tests: [] # list of quality checks
last_score: 0 # 0-100
known_issues: []
access:
classification: "" # public | internal | confidential | restricted
pii_columns: []
access_request_process: "" # how to get access
usage:
avg_daily_queries: 0
top_consumers: []
cost_monthly_usd: 0
Phase 7: Pipeline Monitoring & Alerting
Pipeline Health Dashboard
dashboard:
pipeline_metrics:
- metric: "Pipeline Success Rate"
formula: "successful_runs / total_runs * 100"
target: ">99%"
alert_threshold: "<95%"
- metric: "Average Runtime"
formula: "avg(end_time - start_time) over 7 days"
target: "<SLA"
alert_threshold: ">80% of SLA"
- metric: "Data Freshness"
formula: "NOW() - MAX(loaded_at)"
target: "<SLA hours"
alert_threshold: ">SLA"
- metric: "Data Volume Variance"
formula: "abs(today_rows - avg_7d_rows) / avg_7d_rows * 100"
target: "<20%"
alert_threshold: ">50%"
- metric: "Quality Check Pass Rate"
formula: "passed_checks / total_checks * 100"
target: "100%"
alert_threshold: "<95%"
- metric: "Failed Pipeline Count"
formula: "count where status = failed in last 24h"
target: "0"
alert_threshold: ">0"
- metric: "Backfill Queue"
formula: "count of pending backfill requests"
target: "0"
alert_threshold: ">5"
- metric: "Infrastructure Cost"
formula: "compute + storage + egress"
target: "<budget"
alert_threshold: ">110% budget"
Alert Severity
| Severity |
Condition |
Response Time |
Example |
| P0 |
Revenue/compliance impacting |
15 min |
Payment pipeline down, regulatory report delayed |
| P1 |
Business-critical dashboard stale |
1 hour |
Executive dashboard >4h stale |
| P2 |
Non-critical pipeline failed |
4 hours |
Marketing attribution delayed |
| P3 |
Warning/degradation |
Next business day |
Pipeline 80% of SLA, minor quality drift |
Structured Logging Standard
Every pipeline run MUST log:
{
"pipeline_name": "",
"run_id": "",
"started_at": "",
"completed_at": "",
"status": "success|failed|partial",
"stage": "",
"rows_extracted": 0,
"rows_transformed": 0,
"rows_loaded": 0,
"rows_rejected": 0,
"quality_checks_passed": 0,
"quality_checks_failed": 0,
"duration_seconds": 0,
"error_message": "",
"watermark_before": "",
"watermark_after": ""
}
Phase 8: Testing Strategy
Pipeline Test Pyramid
| Layer |
What to Test |
How |
When |
| Unit |
Individual transforms, business logic |
pytest with fixtures, dbt unit tests |
Every PR |
| Integration |
Source connectivity, schema compatibility |
Test against staging/dev environment |
Daily + PR |
| Contract |
Schema hasn’t changed, data types stable |
Schema registry, contract tests |
Every pipeline run |
| Data Quality |
Completeness, uniqueness, freshness, validity |
Quality framework checks |
Every pipeline run |
| E2E |
Full pipeline produces correct output |
Golden dataset comparison |
Weekly + release |
| Performance |
Runtime within SLA, no regression |
Benchmark against historical runs |
Weekly |
dbt Testing Checklist
# For every model, define at minimum:
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: order_status
tests:
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']
- name: ordered_at
tests:
- not_null
- dbt_utils.recency:
datepart: day
field: ordered_at
interval: 2
Backfill Protocol
When you need to reprocess historical data:
- Scope: Define exact date range and affected tables
- Impact assessment: What downstream models/dashboards will be affected?
- Communication: Notify consumers of temporary data inconsistency
- Isolation: Run backfill in separate compute to avoid impacting current pipelines
- Validation: Compare row counts and key metrics pre/post backfill
- Execution: Process in reverse-chronological order (most recent first)
- Monitoring: Watch for resource spikes, duplicate creation
- Verification: Reconcile against source after completion
- Documentation: Log what was backfilled, why, and any anomalies found
Phase 9: Cost Optimization
Cloud Cost Reduction Strategies
| Strategy |
Savings |
Effort |
| Right-size compute (auto-scaling) |
20-40% |
Low |
| Use spot/preemptible instances for batch |
60-80% |
Medium |
| Compress data (Parquet + Snappy/Zstd) |
50-80% storage |
Low |
| Lifecycle policies (hot → warm → cold → archive) |
40-70% storage |
Low |
| Eliminate unused tables/pipelines |
10-30% |
Low |
| Optimize query patterns (partition pruning) |
30-60% compute |
Medium |
| Reserved capacity for steady-state |
30-50% |
Medium |
| Cache expensive queries |
20-50% compute |
Medium |
Cost Allocation Template
cost_tracking:
by_pipeline:
- pipeline: ""
compute_monthly_usd: 0
storage_monthly_usd: 0
egress_monthly_usd: 0
total: 0
cost_per_row: 0 # total / rows_processed
business_value: "" # what revenue/decision does this enable?
roi_justified: true # is the cost worth it?
optimization_opportunities:
- description: ""
estimated_savings_usd: 0
effort: "" # low | medium | high
priority: 0 # 1 = do now
Cost Red Flags
- Single pipeline >30% of total spend
- Cost per row increasing month-over-month
- Tables with 0 queries in 30 days
- Dev/staging environments running 24/7
- Full table scans on >1TB tables
- Uncompressed data in cloud storage
- Cross-region data transfer
Phase 10: Operational Runbooks
Pipeline Failure Triage
Pipeline failed →
1. Check error message in logs
├── Connection timeout → Check source availability, network, credentials
├── Schema mismatch → Source schema changed → update extract + notify
├── Data quality check failed → Investigate source data, check for anomalies
├── Out of memory → Increase resources or optimize query
├── Permission denied → Check IAM roles, token expiry
├── Duplicate key violation → Check idempotency, investigate source dupes
└── Timeout (SLA breach) → Check data volume spike, query plan, cluster health
2. Determine impact
├── What dashboards/reports are affected?
├── What's the data freshness SLA?
└── Who needs to be notified?
3. Fix
├── Transient (network, timeout) → Retry
├── Data issue → Fix source data, re-run with quality gate override if safe
├── Schema change → Update pipeline, backfill if needed
└── Infrastructure → Scale up, file ticket with cloud provider
4. Post-fix
├── Verify data correctness
├── Update runbook with new failure mode
└── Add monitoring/alerting to catch earlier next time
Schema Change Management
When a source system changes schema:
- Detect: Schema comparison check in extraction pipeline (hash schema, compare to registered)
- Classify:
- Additive (new column): Usually safe — add to pipeline, backfill if needed
- Rename: Map old → new in transform, update downstream
- Type change: Assess compatibility, may need cast or historical rebuild
- Column removed: Critical — breaks queries, need immediate attention
- Test: Run pipeline in dry-run mode with new schema
- Deploy: Update transforms, quality checks, documentation
- Communicate: Notify downstream consumers via data contract channel
Disaster Recovery
| Scenario |
RPO |
RTO |
Recovery Steps |
| Pipeline code lost |
0 (git) |
1h |
Redeploy from git, restore orchestrator state |
| Warehouse data corrupted |
Varies |
4h |
Restore from Time Travel/snapshot, re-run affected pipelines |
| Source system down |
N/A |
Wait |
Queue extractions, catch up with incremental once restored |
| Cloud region outage |
24h |
8h |
Failover to DR region if configured, else wait |
| Credential compromise |
0 |
2h |
Rotate all credentials, audit access logs, re-run affected pipelines |
Phase 11: Advanced Patterns
Slowly Changing Dimension Type 2 (SQL Template)
-- Merge pattern for SCD Type 2
MERGE INTO dim_customer AS target
USING (
SELECT * FROM stg_customers
WHERE updated_at > (SELECT MAX(valid_from) FROM dim_customer)
) AS source
ON target.customer_natural_key = source.customer_id
AND target.is_current = TRUE
-- Update: close old record
WHEN MATCHED AND (
target.customer_name != source.name OR
target.customer_status != source.status
-- list all Type 2 tracked columns
) THEN UPDATE SET
is_current = FALSE,
valid_to = CURRENT_TIMESTAMP
-- Insert: new record (both new customers and changed ones)
WHEN NOT MATCHED THEN INSERT (
customer_natural_key, customer_name, customer_status,
valid_from, valid_to, is_current
) VALUES (
source.customer_id, source.name, source.status,
CURRENT_TIMESTAMP, '9999-12-31', TRUE
);
-- Then insert new versions of changed records
INSERT INTO dim_customer (
customer_natural_key, customer_name, customer_status,
valid_from, valid_to, is_current
)
SELECT customer_id, name, status,
CURRENT_TIMESTAMP, '9999-12-31', TRUE
FROM stg_customers s
WHERE EXISTS (
SELECT 1 FROM dim_customer d
WHERE d.customer_natural_key = s.customer_id
AND d.is_current = FALSE
AND d.valid_to = CURRENT_TIMESTAMP
);
CDC with Debezium (Architecture Pattern)
Source DB → Debezium Connector → Kafka Topic →
├── Stream processor (Flink/Spark Streaming) → Target DB
├── S3 sink connector → Data Lake (raw)
└── Elasticsearch sink → Search index
Key decisions:
- Topic per table or single topic: Per table (easier routing, independent scaling)
- Schema registry: Always use (Confluent Schema Registry or AWS Glue)
- Serialization: Avro (compact + schema evolution) or Protobuf (strict + fast)
- Offset management: Connector manages; monitor consumer lag
Feature Store Pattern
feature_store:
entity: "customer"
entity_key: "customer_id"
features:
- name: "total_orders_30d"
description: "Total orders in last 30 days"
type: "INT"
source: "fct_orders"
computation: "batch" # batch | streaming | on-demand
freshness: "daily"
ttl_hours: 48
- name: "avg_order_value_90d"
description: "Average order value last 90 days"
type: "FLOAT"
source: "fct_orders"
computation: "batch"
freshness: "daily"
ttl_hours: 48
- name: "last_login_minutes_ago"
description: "Minutes since last login event"
type: "INT"
source: "events_stream"
computation: "streaming"
freshness: "real-time"
ttl_hours: 1
serving:
online: true # low-latency feature serving (Redis/DynamoDB)
offline: true # batch feature retrieval for training
point_in_time_correct: true # prevent feature leakage in ML training
Data Mesh Principles
If operating at scale (>5 data teams):
- Domain ownership: Each business domain owns its data products (not central data team)
- Data as a product: Treat datasets like products — SLAs, documentation, discoverability
- Self-serve platform: Central team builds the platform, domains build on top
- Federated governance: Standards and interoperability maintained centrally, implementation decentralized
When NOT to use Data Mesh:
- <5 data producers/consumers
- Small team (<20 engineers total)
- Single business domain
- Early-stage company (over-engineering)
Quality Scoring Rubric (0-100)
| Dimension |
Weight |
Scoring |
| Pipeline Reliability |
20 |
0=frequent failures, 10=some failures with manual recovery, 20=99.5%+ success rate with auto-retry |
| Data Quality |
20 |
0=no checks, 10=basic null/unique checks, 20=comprehensive quality framework with contracts |
| Performance |
15 |
0=regularly breaches SLA, 8=meets SLA, 15=well under SLA with optimization |
| Documentation |
10 |
0=none, 5=basic README, 10=full catalog entries with lineage and business definitions |
| Monitoring |
15 |
0=no alerts, 8=failure alerts only, 15=proactive monitoring with dashboards and anomaly detection |
| Testing |
10 |
0=no tests, 5=basic smoke tests, 10=full test pyramid (unit+integration+contract+E2E) |
| Cost Efficiency |
10 |
0=no cost tracking, 5=tracked, 10=optimized with ROI justification per pipeline |
Scoring guide:
- 0-40: Critical gaps — prioritize pipeline reliability and data quality
- 41-60: Functional but fragile — add monitoring, testing, documentation
- 61-80: Solid — optimize performance, cost, governance
- 81-100: Excellent — maintain, innovate, mentor
Edge Cases & Gotchas
Timezone Traps
- Store everything in UTC. Convert only at presentation layer
- Event timestamps: use event time, not processing time
- Daylight saving:
TIMESTAMP WITH TIME ZONE, never WITHOUT
- Late-arriving data: watermark strategy + allowed lateness window
Late-Arriving Data
- Define maximum acceptable lateness per source
- Reprocess affected partitions when late data arrives
- Track late arrival rate as a quality metric
- Consider separate “late data” pipeline that patches in
Exactly-Once Processing
- True exactly-once is expensive. Most systems need at-least-once + idempotent writes
- Use transaction IDs or natural keys for deduplication
- Kafka: use idempotent producer + transactional consumer
- Database: MERGE/UPSERT on natural key
Schema Evolution
- Forward compatible: New code reads old data (safe to deploy new readers first)
- Backward compatible: Old code reads new data (safe to deploy new writers first)
- Full compatible: Both directions (safest, most restrictive)
- Use Avro or Protobuf with schema registry for streaming data
Multi-Tenant Data
- Tenant ID in every table, every query, every log
- Row-level security in warehouse
- Separate compute per tenant (or at least isolation)
- Never join across tenants without explicit business reason
- Tenant-aware backfill (don’t rebuild all tenants for one tenant’s issue)
Data Lake Anti-Patterns
- “Data Swamp”: ingesting everything with no organization or catalog → only ingest what has a known consumer
- Small files: thousands of <1MB files → compact regularly (target 100MB-1GB)
- No table format: raw Parquet/CSV without Delta/Iceberg → loses ACID, schema evolution, time travel
- No access controls: single bucket, everyone admin → implement IAM per domain/team
Natural Language Commands
Say any of these to activate specific workflows:
- “Design a data pipeline for [source] to [target]” → Full pipeline template with extraction strategy, transforms, load pattern, quality checks
- “Model [entity/domain] for analytics” → Dimensional model with fact/dimension tables, grain, measures, SCD types
- “Optimize this query/pipeline” → Performance analysis with specific recommendations
- “Set up data quality for [table/pipeline]” → Quality framework with checks, contracts, monitoring
- “Audit our data infrastructure” → Full assessment using scoring rubric
- “Help with [Spark/Airflow/dbt/Kafka] issue” → Troubleshooting with technology-specific guidance
- “Design a data catalog for our org” → Catalog template with governance, classification, lineage
- “Plan a data migration from [old] to [new]” → Migration plan with validation, rollback, parallel-run
- “Set up monitoring for our pipelines” → Dashboard template with alerts, logging standards, runbooks
- “Review our data costs” → Cost analysis with optimization strategies and ROI framework
- “Handle schema change in [source]” → Change management protocol with impact assessment
- “Backfill [table] for [date range]” → Backfill protocol with validation and communication plan