名称: 数据库设计 描述: 设计数据库模式、数据模型、关系、索引和迁移,适用于关系型、NoSQL、时间序列和仓库数据库。涵盖规范化、反规范化、ETL优化、事件溯源、星型模式和性能调优。触发关键词: 模式、表、列、迁移、ERD、规范化、反规范化、索引、外键、主键、约束、关系、SQL、DDL、数据模型、数据库设计、数据仓库、星型模式、雪花模式、时间序列、事件溯源、维度表、事实表、ETL、数据管道、OLAP、OLTP。 允许工具: 读取、grep、glob、编辑、写入、bash
数据库设计
概述
此技能专注于设计高效、可扩展和可维护的数据库模式和数据模型。它涵盖:
- OLTP系统:关系型数据库(PostgreSQL、MySQL),具有规范化和事务完整性
- OLAP系统:用于分析的数据仓库,采用星型/雪花模式
- NoSQL:文档存储(MongoDB)、键值存储(Redis)、宽列存储(Cassandra)
- 时间序列:专用于指标和事件的数据库(TimescaleDB、InfluxDB)
- 事件溯源:仅追加的事件存储,用于审计和时间查询
- 数据管道:ETL/ELT工作流中的模式设计考虑
此技能结合了操作性和分析性工作负载的数据建模专业知识。
指令
1. 理解数据需求
- 识别实体及其属性
- 映射实体之间的关系(一对一、一对多、多对多)
- 确定数据访问模式(读写频率、查询模式)
- 估计数据量、增长率和保留要求
- 区分OLTP(事务性)和OLAP(分析性)需求
2. 设计模式
对于OLTP(事务性系统):
- 规范化至第三范式以减少冗余
- 定义主键(代理键与自然键)
- 建立外键关系,设置适当的级联规则
- 选择合适的数据类型以提高存储效率
- 计划NULL处理和默认值
- 添加CHECK约束以确保数据完整性
对于OLAP(数据仓库):
- 设计星型模式(中心事实表与维度表)
- 或雪花模式(规范化维度),如果基数较高
- 创建缓慢变化维度(SCD类型1、2或3)
- 为查询性能进行反规范化
- 为维度表添加代理键
- 设计事实表,包含外键指向维度和度量列
对于时间序列:
- 使用时间戳作为主键组件
- 按时间范围分区(天、周、月)
- 设计用于仅追加写入
- 考虑降采样和聚合表
- 使用适当的保留策略
对于事件溯源:
- 将事件存储为不可变的仅追加记录
- 包括事件类型、聚合ID、时间戳、负载
- 为读取模型设计投影
- 计划事件版本控制和模式演进
3. 优化性能
- 根据查询模式设计索引(WHERE、JOIN、ORDER BY、GROUP BY)
- 考虑覆盖索引以避免表查找
- 使用部分索引进行过滤查询
- 为读密集型工作负载计划反规范化
- 为大型表设计分区策略(范围、哈希、列表)
- 为昂贵聚合添加物化视图
- 设计并发访问(乐观锁与悲观锁)
4. 计划迁移
- 创建可逆迁移脚本,包含UP和DOWN操作
- 安全处理数据转换(回填、默认值)
- 计划零停机部署(扩展/收缩模式)
- 版本控制所有模式更改
- 在生产规模数据上测试迁移
- 记录破坏性变更和迁移依赖
5. 考虑ETL/数据管道影响
- 设计支持高效批量加载的模式
- 为增量更新添加暂存表
- 包含审计列(创建时间、更新时间、加载时间)
- 如果需要,计划变更数据捕获(CDC)
- 设计幂等的upsert操作
- 考虑模式演进和向后兼容性
最佳实践
- 选择适当类型:使用正确数据类型以提高存储效率(INT与BIGINT、VARCHAR与TEXT、DECIMAL与FLOAT)
- 明智索引:索引用于WHERE、JOIN、ORDER BY、GROUP BY的列,但避免过度索引(写入成本)
- 先规范化:为OLTP进行规范化(第三范式),为OLAP或读密集型工作负载策略性反规范化
- 使用约束:在数据库级别强制数据完整性(主键、外键、唯一、CHECK、NOT NULL)
- 规划扩展:早期考虑分片、分区和复制,以处理高容量表
- 文档模式:维护ERD、数据字典和关系图
- 测试迁移:始终在生产规模数据上测试并监控性能
- 审计一切:添加创建时间、更新时间、创建者以实现可追溯性
- 版本事件:对于事件溯源,在事件负载中包含模式版本
- 优化基数:高基数列受益于索引,低基数可能不需要
- 分离读写:对于高扩展系统,考虑CQRS模式,分离读写模型
- 设计幂等性:确保ETL操作可安全重试而无重复
示例
示例1:电子商务模式(PostgreSQL)
-- 用户表,具有适当约束
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
name VARCHAR(100) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ,
CONSTRAINT email_format CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$')
);
-- 产品表,具有适当索引
CREATE TABLE products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sku VARCHAR(50) NOT NULL UNIQUE,
name VARCHAR(255) NOT NULL,
description TEXT,
price DECIMAL(10, 2) NOT NULL CHECK (price >= 0),
stock_quantity INTEGER NOT NULL DEFAULT 0 CHECK (stock_quantity >= 0),
category_id UUID REFERENCES categories(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 常见查询的索引
CREATE INDEX idx_products_category ON products(category_id);
CREATE INDEX idx_products_price ON products(price);
CREATE INDEX idx_products_name_search ON products USING gin(to_tsvector('english', name));
-- 订单表,具有适当关系
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
status VARCHAR(20) NOT NULL DEFAULT 'pending',
total_amount DECIMAL(10, 2) NOT NULL,
shipping_address JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT valid_status CHECK (status IN ('pending', 'paid', 'shipped', 'delivered', 'cancelled'))
);
CREATE INDEX idx_orders_user ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_orders_created ON orders(created_at DESC);
-- 订单项连接表
CREATE TABLE order_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
order_id UUID NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
product_id UUID NOT NULL REFERENCES products(id),
quantity INTEGER NOT NULL CHECK (quantity > 0),
unit_price DECIMAL(10, 2) NOT NULL,
UNIQUE(order_id, product_id)
);
示例2:迁移脚本
-- 迁移:添加客户忠诚度计划
-- 版本:20240115_001
BEGIN;
-- 添加忠诚度等级到用户表
ALTER TABLE users
ADD COLUMN loyalty_tier VARCHAR(20) DEFAULT 'bronze',
ADD COLUMN loyalty_points INTEGER DEFAULT 0;
-- 添加有效等级的约束
ALTER TABLE users
ADD CONSTRAINT valid_loyalty_tier
CHECK (loyalty_tier IN ('bronze', 'silver', 'gold', 'platinum'));
-- 创建积分历史表
CREATE TABLE loyalty_points_history (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
points_change INTEGER NOT NULL,
reason VARCHAR(100) NOT NULL,
reference_type VARCHAR(50),
reference_id UUID,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_loyalty_history_user ON loyalty_points_history(user_id);
CREATE INDEX idx_loyalty_history_created ON loyalty_points_history(created_at DESC);
COMMIT;
-- 回滚脚本(单独保存)
-- BEGIN;
-- DROP TABLE IF EXISTS loyalty_points_history;
-- ALTER TABLE users DROP COLUMN IF EXISTS loyalty_points;
-- ALTER TABLE users DROP COLUMN IF EXISTS loyalty_tier;
-- COMMIT;
示例3:MongoDB文档设计
// 用户文档,嵌入地址
{
_id: ObjectId("..."),
email: "user@example.com",
profile: {
name: "John Doe",
avatar_url: "https://..."
},
addresses: [
{
type: "shipping",
street: "123 Main St",
city: "Boston",
state: "MA",
zip: "02101",
is_default: true
}
],
preferences: {
newsletter: true,
notifications: {
email: true,
push: false
}
},
created_at: ISODate("2024-01-15T10:00:00Z")
}
// 索引
db.users.createIndex({ email: 1 }, { unique: true });
db.users.createIndex({ "addresses.zip": 1 });
db.users.createIndex({ created_at: -1 });
示例4:数据仓库星型模式(PostgreSQL)
-- 维度:日期(统一维度)
CREATE TABLE dim_date (
date_key INTEGER PRIMARY KEY, -- YYYYMMDD格式
full_date DATE NOT NULL,
day_of_week INTEGER,
day_name VARCHAR(10),
month INTEGER,
month_name VARCHAR(10),
quarter INTEGER,
year INTEGER,
is_weekend BOOLEAN,
is_holiday BOOLEAN
);
-- 维度:产品
CREATE TABLE dim_product (
product_key SERIAL PRIMARY KEY, -- 代理键
product_id VARCHAR(50) NOT NULL, -- 源系统自然键
product_name VARCHAR(255) NOT NULL,
category VARCHAR(100),
subcategory VARCHAR(100),
brand VARCHAR(100),
unit_cost DECIMAL(10, 2),
-- SCD类型2列,用于跟踪变化
effective_date DATE NOT NULL,
expiration_date DATE,
is_current BOOLEAN DEFAULT TRUE,
UNIQUE(product_id, effective_date)
);
CREATE INDEX idx_dim_product_current ON dim_product(product_id) WHERE is_current = TRUE;
-- 维度:客户
CREATE TABLE dim_customer (
customer_key SERIAL PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL,
customer_name VARCHAR(255),
customer_segment VARCHAR(50),
region VARCHAR(100),
country VARCHAR(100),
-- SCD类型1(覆盖)用于大多数属性
-- 如果需要跟踪段变化,使用SCD类型2
effective_date DATE NOT NULL,
expiration_date DATE,
is_current BOOLEAN DEFAULT TRUE
);
-- 事实:销售(中心事实表)
CREATE TABLE fact_sales (
sale_id BIGSERIAL PRIMARY KEY,
date_key INTEGER NOT NULL REFERENCES dim_date(date_key),
product_key INTEGER NOT NULL REFERENCES dim_product(product_key),
customer_key INTEGER NOT NULL REFERENCES dim_customer(customer_key),
-- 度量(可加事实)
quantity INTEGER NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL,
discount_amount DECIMAL(10, 2) DEFAULT 0,
tax_amount DECIMAL(10, 2) NOT NULL,
total_amount DECIMAL(10, 2) NOT NULL,
cost_amount DECIMAL(10, 2) NOT NULL,
-- 退化维度(无单独维度表的属性)
order_number VARCHAR(50),
transaction_time TIMESTAMP NOT NULL
);
-- 典型分析查询的索引
CREATE INDEX idx_fact_sales_date ON fact_sales(date_key);
CREATE INDEX idx_fact_sales_product ON fact_sales(product_key);
CREATE INDEX idx_fact_sales_customer ON fact_sales(customer_key);
CREATE INDEX idx_fact_sales_composite ON fact_sales(date_key, product_key, customer_key);
-- 预聚合月度销售的物化视图
CREATE MATERIALIZED VIEW mv_monthly_sales AS
SELECT
d.year,
d.month,
p.category,
c.region,
SUM(f.quantity) AS total_quantity,
SUM(f.total_amount) AS total_revenue,
SUM(f.cost_amount) AS total_cost,
SUM(f.total_amount - f.cost_amount) AS total_profit
FROM fact_sales f
JOIN dim_date d ON f.date_key = d.date_key
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_customer c ON f.customer_key = c.customer_key
GROUP BY d.year, d.month, p.category, c.region;
CREATE INDEX idx_mv_monthly_sales ON mv_monthly_sales(year, month, category);
示例5:时间序列数据库(TimescaleDB)
-- 为指标创建超表
CREATE TABLE metrics (
time TIMESTAMPTZ NOT NULL,
device_id VARCHAR(50) NOT NULL,
metric_name VARCHAR(100) NOT NULL,
value DOUBLE PRECISION NOT NULL,
tags JSONB,
PRIMARY KEY (time, device_id, metric_name)
);
-- 转换为超表,块间隔为1天
SELECT create_hypertable('metrics', 'time', chunk_time_interval => INTERVAL '1 day');
-- 为常见查询模式创建索引
CREATE INDEX idx_metrics_device_time ON metrics(device_id, time DESC);
CREATE INDEX idx_metrics_name_time ON metrics(metric_name, time DESC);
CREATE INDEX idx_metrics_tags ON metrics USING gin(tags);
-- 压缩策略(压缩7天前的块)
ALTER TABLE metrics SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id, metric_name'
);
SELECT add_compression_policy('metrics', INTERVAL '7 days');
-- 保留策略(删除90天前的块)
SELECT add_retention_policy('metrics', INTERVAL '90 days');
-- 每小时汇总的连续聚合
CREATE MATERIALIZED VIEW metrics_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
device_id,
metric_name,
AVG(value) AS avg_value,
MAX(value) AS max_value,
MIN(value) AS min_value,
COUNT(*) AS count
FROM metrics
GROUP BY hour, device_id, metric_name;
-- 连续聚合的刷新策略
SELECT add_continuous_aggregate_policy('metrics_hourly',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
示例6:事件溯源模式(PostgreSQL)
-- 事件存储(仅追加)
CREATE TABLE events (
event_id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_version INTEGER NOT NULL,
payload JSONB NOT NULL,
metadata JSONB,
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- 乐观并发控制
sequence_number INTEGER NOT NULL,
CONSTRAINT unique_sequence UNIQUE(aggregate_id, sequence_number)
);
-- 事件重播的索引
CREATE INDEX idx_events_aggregate ON events(aggregate_id, sequence_number);
CREATE INDEX idx_events_type_time ON events(event_type, occurred_at);
CREATE INDEX idx_events_occurred ON events(occurred_at DESC);
-- 快照以提高性能(可选,减少重播成本)
CREATE TABLE snapshots (
snapshot_id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
sequence_number INTEGER NOT NULL,
state JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT unique_snapshot UNIQUE(aggregate_id, sequence_number)
);
CREATE INDEX idx_snapshots_aggregate ON snapshots(aggregate_id, sequence_number DESC);
-- 投影(读取模型)- 事件流的物化视图
CREATE TABLE account_balances (
account_id UUID PRIMARY KEY,
current_balance DECIMAL(15, 2) NOT NULL,
last_event_sequence INTEGER NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);
-- 示例:事件处理器更新投影
-- (在应用代码中,非数据库触发器,以增强可测试性)
-- 当AccountCredited事件发生时:
-- UPDATE account_balances SET current_balance = current_balance + amount
-- 从事件重建投影的查询
CREATE OR REPLACE FUNCTION rebuild_account_balance(p_account_id UUID)
RETURNS DECIMAL AS $$
DECLARE
v_balance DECIMAL(15, 2) := 0;
BEGIN
SELECT COALESCE(SUM(
CASE
WHEN event_type = 'AccountCredited' THEN (payload->>'amount')::DECIMAL
WHEN event_type = 'AccountDebited' THEN -(payload->>'amount')::DECIMAL
ELSE 0
END
), 0)
INTO v_balance
FROM events
WHERE aggregate_id = p_account_id
AND aggregate_type = 'Account'
ORDER BY sequence_number;
RETURN v_balance;
END;
$$ LANGUAGE plpgsql;
示例7:ETL暂存模式(PostgreSQL)
-- 增量加载的暂存表
CREATE TABLE staging_orders (
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50),
order_date TIMESTAMPTZ,
total_amount DECIMAL(10, 2),
status VARCHAR(20),
-- ETL元数据
source_system VARCHAR(50),
extracted_at TIMESTAMPTZ NOT NULL,
loaded_at TIMESTAMPTZ DEFAULT NOW(),
batch_id VARCHAR(100),
-- 用于变更检测
source_hash VARCHAR(64),
is_processed BOOLEAN DEFAULT FALSE
);
-- 生产表
CREATE TABLE orders (
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50),
order_date TIMESTAMPTZ,
total_amount DECIMAL(10, 2),
status VARCHAR(20),
-- 审计列
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
source_system VARCHAR(50),
-- 用于CDC
source_hash VARCHAR(64),
version INTEGER DEFAULT 1
);
-- 将暂存合并到生产(幂等的upsert)
CREATE OR REPLACE FUNCTION merge_orders()
RETURNS INTEGER AS $$
DECLARE
v_rows_affected INTEGER;
BEGIN
-- 插入新记录
INSERT INTO orders (order_id, customer_id, order_date, total_amount, status, source_system, source_hash)
SELECT order_id, customer_id, order_date, total_amount, status, source_system, source_hash
FROM staging_orders s
WHERE NOT EXISTS (SELECT 1 FROM orders o WHERE o.order_id = s.order_id)
AND NOT is_processed;
GET DIAGNOSTICS v_rows_affected = ROW_COUNT;
-- 更新更改的记录
UPDATE orders o
SET
customer_id = s.customer_id,
order_date = s.order_date,
total_amount = s.total_amount,
status = s.status,
updated_at = NOW(),
source_hash = s.source_hash,
version = o.version + 1
FROM staging_orders s
WHERE o.order_id = s.order_id
AND o.source_hash != s.source_hash
AND NOT s.is_processed;
GET DIAGNOSTICS v_rows_affected = v_rows_affected + ROW_COUNT;
-- 标记暂存记录为已处理
UPDATE staging_orders SET is_processed = TRUE WHERE NOT is_processed;
RETURN v_rows_affected;
END;
$$ LANGUAGE plpgsql;