name: query-builder description: 交互式数据库查询构建器,用于生成优化的SQL和NoSQL查询。
查询构建器技能
交互式数据库查询构建器,用于生成优化的SQL和NoSQL查询。
说明
您是一个数据库查询专家。当调用时:
-
理解需求:
- 分析请求的数据操作
- 识别表/集合和关系
- 确定所需的过滤器、连接和聚合
- 考虑性能影响
-
检测数据库类型:
- PostgreSQL, MySQL, SQLite (SQL数据库)
- MongoDB, DynamoDB (NoSQL数据库)
- 检查ORM使用 (Prisma, TypeORM, SQLAlchemy, Mongoose)
-
生成查询:
- 编写优化、可读的查询
- 使用适当的索引和查询模式
- 包括参数化查询以防止SQL注入
- 在适用时提供原始SQL和ORM版本
-
解释查询:
- 分解查询执行流程
- 突出性能考虑
- 如果需要,建议索引
- 提供相关替代方法
支持的数据库
- SQL: PostgreSQL, MySQL, MariaDB, SQLite, SQL Server
- NoSQL: MongoDB, DynamoDB, Redis, Cassandra
- ORMs: Prisma, TypeORM, Sequelize, SQLAlchemy, Django ORM, Mongoose
使用示例
@query-builder 获取所有用户及其订单
@query-builder 查找收入前十的产品
@query-builder --optimize SELECT * FROM users WHERE email LIKE '%@gmail.com'
@query-builder --explain-plan
SQL查询模式
带过滤器的基础SELECT
-- PostgreSQL/MySQL
SELECT
id,
username,
email,
created_at
FROM users
WHERE
active = true
AND created_at >= NOW() - INTERVAL '30 days'
ORDER BY created_at DESC
LIMIT 100;
-- 带参数 (防止SQL注入)
SELECT * FROM users
WHERE email = $1 AND active = $2;
JOIN操作
-- INNER JOIN - 获取用户及其订单
SELECT
u.id,
u.username,
u.email,
o.id as order_id,
o.total_amount,
o.created_at as order_date
FROM users u
INNER JOIN orders o ON u.id = o.user_id
WHERE o.status = 'completed'
ORDER BY o.created_at DESC;
-- LEFT JOIN - 包括没有订单的用户
SELECT
u.id,
u.username,
COUNT(o.id) as order_count,
COALESCE(SUM(o.total_amount), 0) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id, u.username
HAVING COUNT(o.id) > 0
ORDER BY total_spent DESC;
-- 多重JOIN
SELECT
o.id as order_id,
u.username,
p.name as product_name,
oi.quantity,
oi.price
FROM orders o
INNER JOIN users u ON o.user_id = u.id
INNER JOIN order_items oi ON o.id = oi.order_id
INNER JOIN products p ON oi.product_id = p.id
WHERE o.created_at >= '2024-01-01';
聚合
-- 分组聚合
SELECT
DATE_TRUNC('day', created_at) as date,
COUNT(*) as order_count,
SUM(total_amount) as daily_revenue,
AVG(total_amount) as avg_order_value,
MAX(total_amount) as largest_order
FROM orders
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY DATE_TRUNC('day', created_at)
ORDER BY date DESC;
-- 窗口函数
SELECT
id,
user_id,
total_amount,
created_at,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY created_at DESC) as order_rank,
AVG(total_amount) OVER (PARTITION BY user_id) as user_avg_order
FROM orders;
子查询
-- WHERE子句中的子查询
SELECT * FROM users
WHERE id IN (
SELECT DISTINCT user_id
FROM orders
WHERE total_amount > 1000
);
-- SELECT中的子查询 (标量子查询)
SELECT
id,
username,
(SELECT COUNT(*) FROM orders WHERE user_id = users.id) as order_count,
(SELECT MAX(total_amount) FROM orders WHERE user_id = users.id) as max_order
FROM users;
-- 公共表表达式 (CTE)
WITH recent_orders AS (
SELECT
user_id,
COUNT(*) as order_count,
SUM(total_amount) as total_spent
FROM orders
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY user_id
)
SELECT
u.id,
u.username,
u.email,
COALESCE(ro.order_count, 0) as recent_orders,
COALESCE(ro.total_spent, 0) as recent_spending
FROM users u
LEFT JOIN recent_orders ro ON u.id = ro.user_id
WHERE u.active = true;
复杂查询
-- 递归CTE用于分层数据
WITH RECURSIVE category_tree AS (
-- 基础情况: 根类别
SELECT id, name, parent_id, 0 as level
FROM categories
WHERE parent_id IS NULL
UNION ALL
-- 递归情况: 子类别
SELECT c.id, c.name, c.parent_id, ct.level + 1
FROM categories c
INNER JOIN category_tree ct ON c.parent_id = ct.id
)
SELECT * FROM category_tree
ORDER BY level, name;
-- 查找每组前N个
WITH ranked_products AS (
SELECT
p.*,
c.name as category_name,
ROW_NUMBER() OVER (PARTITION BY p.category_id ORDER BY p.sales DESC) as rank
FROM products p
INNER JOIN categories c ON p.category_id = c.id
)
SELECT * FROM ranked_products
WHERE rank <= 3;
UPSERT (插入或更新)
-- PostgreSQL - ON CONFLICT
INSERT INTO users (id, username, email, updated_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (id)
DO UPDATE SET
username = EXCLUDED.username,
email = EXCLUDED.email,
updated_at = NOW();
-- MySQL - ON DUPLICATE KEY UPDATE
INSERT INTO users (id, username, email, updated_at)
VALUES (?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
username = VALUES(username),
email = VALUES(email),
updated_at = NOW();
ORM查询示例
Prisma (TypeScript)
// 基础查询
const users = await prisma.user.findMany({
where: {
active: true,
createdAt: {
gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
}
},
orderBy: { createdAt: 'desc' },
take: 100
});
// 关系
const userWithOrders = await prisma.user.findUnique({
where: { id: userId },
include: {
orders: {
where: { status: 'completed' },
include: {
items: {
include: { product: true }
}
}
}
}
});
// 聚合
const stats = await prisma.order.groupBy({
by: ['userId'],
where: {
createdAt: {
gte: new Date('2024-01-01')
}
},
_count: { id: true },
_sum: { totalAmount: true },
_avg: { totalAmount: true }
});
// 需要时使用原始SQL
const result = await prisma.$queryRaw`
SELECT * FROM users
WHERE email = ${email}
AND active = true
`;
TypeORM (TypeScript)
// 查询构建器
const users = await dataSource
.getRepository(User)
.createQueryBuilder('user')
.where('user.active = :active', { active: true })
.andWhere('user.createdAt >= :date', {
date: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
})
.orderBy('user.createdAt', 'DESC')
.take(100)
.getMany();
// 关系
const userWithOrders = await dataSource
.getRepository(User)
.createQueryBuilder('user')
.leftJoinAndSelect('user.orders', 'order')
.leftJoinAndSelect('order.items', 'item')
.leftJoinAndSelect('item.product', 'product')
.where('user.id = :id', { id: userId })
.andWhere('order.status = :status', { status: 'completed' })
.getOne();
// 聚合
const stats = await dataSource
.getRepository(Order)
.createQueryBuilder('order')
.select('order.userId', 'userId')
.addSelect('COUNT(order.id)', 'orderCount')
.addSelect('SUM(order.totalAmount)', 'totalSpent')
.addSelect('AVG(order.totalAmount)', 'avgOrder')
.where('order.createdAt >= :date', { date: new Date('2024-01-01') })
.groupBy('order.userId')
.getRawMany();
SQLAlchemy (Python)
from sqlalchemy import select, func, and_, or_
from datetime import datetime, timedelta
// 基础查询
stmt = (
select(User)
.where(
and_(
User.active == True,
User.created_at >= datetime.now() - timedelta(days=30)
)
)
.order_by(User.created_at.desc())
.limit(100)
)
users = session.execute(stmt).scalars().all()
// 连接
stmt = (
select(User, Order)
.join(Order, User.id == Order.user_id)
.where(Order.status == 'completed')
.order_by(Order.created_at.desc())
)
results = session.execute(stmt).all()
// 聚合
stmt = (
select(
func.date_trunc('day', Order.created_at).label('date'),
func.count(Order.id).label('order_count'),
func.sum(Order.total_amount).label('revenue'),
func.avg(Order.total_amount).label('avg_order')
)
.where(Order.created_at >= datetime.now() - timedelta(days=7))
.group_by(func.date_trunc('day', Order.created_at))
.order_by('date desc')
)
stats = session.execute(stmt).all()
// 需要时使用原始SQL
result = session.execute(
text("SELECT * FROM users WHERE email = :email"),
{"email": email}
).fetchall()
NoSQL查询示例
MongoDB
// 基础查询
db.users.find({
active: true,
createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
})
.sort({ createdAt: -1 })
.limit(100);
// 聚合管道
db.orders.aggregate([
{
$match: {
status: 'completed',
createdAt: { $gte: new Date('2024-01-01') }
}
},
{
$group: {
_id: '$userId',
orderCount: { $sum: 1 },
totalSpent: { $sum: '$totalAmount' },
avgOrder: { $avg: '$totalAmount' }
}
},
{
$sort: { totalSpent: -1 }
},
{
$limit: 10
}
]);
// 查找 (连接)
db.users.aggregate([
{
$lookup: {
from: 'orders',
localField: '_id',
foreignField: 'userId',
as: 'orders'
}
},
{
$match: { 'orders.0': { $exists: true } }
},
{
$project: {
username: 1,
email: 1,
orderCount: { $size: '$orders' }
}
}
]);
Mongoose (Node.js)
// 基础查询
const users = await User.find({
active: true,
createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
})
.sort({ createdAt: -1 })
.limit(100);
// 填充 (连接)
const user = await User.findById(userId)
.populate({
path: 'orders',
match: { status: 'completed' },
populate: {
path: 'items.product'
}
});
// 聚合
const stats = await Order.aggregate([
{
$match: {
createdAt: { $gte: new Date('2024-01-01') }
}
},
{
$group: {
_id: {
$dateToString: { format: '%Y-%m-%d', date: '$createdAt' }
},
orderCount: { $sum: 1 },
revenue: { $sum: '$totalAmount' },
avgOrder: { $avg: '$totalAmount' }
}
},
{ $sort: { _id: -1 } }
]);
性能优化
使用索引
-- 为频繁查询的列创建索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_created_at ON orders(created_at);
-- 复合索引用于多列
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- 部分索引 (PostgreSQL)
CREATE INDEX idx_active_users ON users(email) WHERE active = true;
-- 全文搜索索引 (PostgreSQL)
CREATE INDEX idx_products_search ON products
USING GIN(to_tsvector('english', name || ' ' || description));
查询优化技巧
-- ❌ 不好 - SELECT *
SELECT * FROM users WHERE id = 1;
-- ✓ 好 - 只选择需要的列
SELECT id, username, email FROM users WHERE id = 1;
-- ❌ 不好 - 在索引列上使用函数
SELECT * FROM users WHERE LOWER(email) = 'user@example.com';
-- ✓ 好 - 存储小写邮箱或使用函数索引
SELECT * FROM users WHERE email = 'user@example.com';
-- ❌ 不好 - OR条件不能高效使用索引
SELECT * FROM orders WHERE user_id = 1 OR customer_email = 'user@example.com';
-- ✓ 好 - 适当时使用UNION
SELECT * FROM orders WHERE user_id = 1
UNION
SELECT * FROM orders WHERE customer_email = 'user@example.com';
-- ❌ 不好 - 带子查询的NOT IN
SELECT * FROM users WHERE id NOT IN (SELECT user_id FROM banned_users);
-- ✓ 好 - 带NULL检查的LEFT JOIN
SELECT u.* FROM users u
LEFT JOIN banned_users bu ON u.id = bu.user_id
WHERE bu.user_id IS NULL;
分页
-- ❌ 不好 - OFFSET在大量偏移时变慢
SELECT * FROM users
ORDER BY created_at DESC
LIMIT 20 OFFSET 10000;
-- ✓ 好 - 基于游标的分页
SELECT * FROM users
WHERE created_at < '2024-01-01 12:00:00'
ORDER BY created_at DESC
LIMIT 20;
-- ✓ 更好 - 键集分页
SELECT * FROM users
WHERE (created_at, id) < ('2024-01-01 12:00:00', 12345)
ORDER BY created_at DESC, id DESC
LIMIT 20;
常见模式
软删除
-- 添加deleted_at列
ALTER TABLE users ADD COLUMN deleted_at TIMESTAMP NULL;
-- 通过设置时间戳“删除”
UPDATE users SET deleted_at = NOW() WHERE id = 1;
-- 查询活动记录
SELECT * FROM users WHERE deleted_at IS NULL;
-- 为更好性能创建索引
CREATE INDEX idx_users_deleted_at ON users(deleted_at)
WHERE deleted_at IS NULL;
审计追踪
-- 审计表
CREATE TABLE audit_log (
id SERIAL PRIMARY KEY,
table_name VARCHAR(50),
record_id INTEGER,
action VARCHAR(10),
old_values JSONB,
new_values JSONB,
changed_by INTEGER,
changed_at TIMESTAMP DEFAULT NOW()
);
-- 自动审计触发器
CREATE OR REPLACE FUNCTION audit_trigger()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit_log (table_name, record_id, action, old_values, new_values, changed_by)
VALUES (
TG_TABLE_NAME,
NEW.id,
TG_OP,
row_to_json(OLD),
row_to_json(NEW),
current_user_id()
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
运行总计
-- 窗口函数方法
SELECT
date,
daily_revenue,
SUM(daily_revenue) OVER (ORDER BY date) as running_total
FROM daily_stats
ORDER BY date;
应避免的反模式
N+1查询问题
// ❌ 不好 - N+1查询
const users = await User.findAll();
for (const user of users) {
const orders = await Order.findAll({ where: { userId: user.id } });
// 处理订单...
}
// ✓ 好 - 带连接的单一查询
const users = await User.findAll({
include: [{ model: Order }]
});
缺少索引
// ❌ 不好 - 外键无索引
SELECT * FROM orders WHERE user_id = 123; // 慢!
// ✓ 好 - 外键索引
CREATE INDEX idx_orders_user_id ON orders(user_id);
检索过多数据
// ❌ 不好 - 获取所有行
SELECT * FROM orders; // 可能有数百万行!
// ✓ 好 - 使用分页
SELECT * FROM orders
ORDER BY created_at DESC
LIMIT 100;
最佳实践
- 始终使用参数化查询以防止SQL注入
- 索引外键和频繁查询的列
- 使用EXPLAIN ANALYZE理解查询性能
- **避免SELECT *** - 只获取需要的列
- 使用事务保证数据一致性
- 实现分页用于大数据集
- 缓存频繁访问的数据 (Redis, Memcached)
- 监控慢查询并优化它们
- 使用连接池管理数据库连接
- 在PostgreSQL上定期VACUUM和ANALYZE
注意
- 用真实数据量测试查询
- 在生产中监控查询执行时间
- 对读重负载使用读副本
- 考虑数据库特定功能 (PostgreSQL扩展, MySQL存储引擎)
- 用注释记录复杂查询
- 保持ORMs更新但了解复杂操作的原始SQL