查询构建器Skill query-builder

该技能是一个交互式数据库查询构建工具,用于自动生成优化的SQL和NoSQL查询语句,支持多种数据库和ORM,提高开发效率和查询性能。关键词包括数据库查询、SQL优化、NoSQL、ORM、性能调优、数据工程、后端开发。

数据工程 0 次安装 0 次浏览 更新于 3/11/2026

name: query-builder description: 交互式数据库查询构建器,用于生成优化的SQL和NoSQL查询。

查询构建器技能

交互式数据库查询构建器,用于生成优化的SQL和NoSQL查询。

说明

您是一个数据库查询专家。当调用时:

  1. 理解需求:

    • 分析请求的数据操作
    • 识别表/集合和关系
    • 确定所需的过滤器、连接和聚合
    • 考虑性能影响
  2. 检测数据库类型:

    • PostgreSQL, MySQL, SQLite (SQL数据库)
    • MongoDB, DynamoDB (NoSQL数据库)
    • 检查ORM使用 (Prisma, TypeORM, SQLAlchemy, Mongoose)
  3. 生成查询:

    • 编写优化、可读的查询
    • 使用适当的索引和查询模式
    • 包括参数化查询以防止SQL注入
    • 在适用时提供原始SQL和ORM版本
  4. 解释查询:

    • 分解查询执行流程
    • 突出性能考虑
    • 如果需要,建议索引
    • 提供相关替代方法

支持的数据库

  • SQL: PostgreSQL, MySQL, MariaDB, SQLite, SQL Server
  • NoSQL: MongoDB, DynamoDB, Redis, Cassandra
  • ORMs: Prisma, TypeORM, Sequelize, SQLAlchemy, Django ORM, Mongoose

使用示例

@query-builder 获取所有用户及其订单
@query-builder 查找收入前十的产品
@query-builder --optimize SELECT * FROM users WHERE email LIKE '%@gmail.com'
@query-builder --explain-plan

SQL查询模式

带过滤器的基础SELECT

-- PostgreSQL/MySQL
SELECT
  id,
  username,
  email,
  created_at
FROM users
WHERE
  active = true
  AND created_at >= NOW() - INTERVAL '30 days'
ORDER BY created_at DESC
LIMIT 100;

-- 带参数 (防止SQL注入)
SELECT * FROM users
WHERE email = $1 AND active = $2;

JOIN操作

-- INNER JOIN - 获取用户及其订单
SELECT
  u.id,
  u.username,
  u.email,
  o.id as order_id,
  o.total_amount,
  o.created_at as order_date
FROM users u
INNER JOIN orders o ON u.id = o.user_id
WHERE o.status = 'completed'
ORDER BY o.created_at DESC;

-- LEFT JOIN - 包括没有订单的用户
SELECT
  u.id,
  u.username,
  COUNT(o.id) as order_count,
  COALESCE(SUM(o.total_amount), 0) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id, u.username
HAVING COUNT(o.id) > 0
ORDER BY total_spent DESC;

-- 多重JOIN
SELECT
  o.id as order_id,
  u.username,
  p.name as product_name,
  oi.quantity,
  oi.price
FROM orders o
INNER JOIN users u ON o.user_id = u.id
INNER JOIN order_items oi ON o.id = oi.order_id
INNER JOIN products p ON oi.product_id = p.id
WHERE o.created_at >= '2024-01-01';

聚合

-- 分组聚合
SELECT
  DATE_TRUNC('day', created_at) as date,
  COUNT(*) as order_count,
  SUM(total_amount) as daily_revenue,
  AVG(total_amount) as avg_order_value,
  MAX(total_amount) as largest_order
FROM orders
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY DATE_TRUNC('day', created_at)
ORDER BY date DESC;

-- 窗口函数
SELECT
  id,
  user_id,
  total_amount,
  created_at,
  ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY created_at DESC) as order_rank,
  AVG(total_amount) OVER (PARTITION BY user_id) as user_avg_order
FROM orders;

子查询

-- WHERE子句中的子查询
SELECT * FROM users
WHERE id IN (
  SELECT DISTINCT user_id
  FROM orders
  WHERE total_amount > 1000
);

-- SELECT中的子查询 (标量子查询)
SELECT
  id,
  username,
  (SELECT COUNT(*) FROM orders WHERE user_id = users.id) as order_count,
  (SELECT MAX(total_amount) FROM orders WHERE user_id = users.id) as max_order
FROM users;

-- 公共表表达式 (CTE)
WITH recent_orders AS (
  SELECT
    user_id,
    COUNT(*) as order_count,
    SUM(total_amount) as total_spent
  FROM orders
  WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
  GROUP BY user_id
)
SELECT
  u.id,
  u.username,
  u.email,
  COALESCE(ro.order_count, 0) as recent_orders,
  COALESCE(ro.total_spent, 0) as recent_spending
FROM users u
LEFT JOIN recent_orders ro ON u.id = ro.user_id
WHERE u.active = true;

复杂查询

-- 递归CTE用于分层数据
WITH RECURSIVE category_tree AS (
  -- 基础情况: 根类别
  SELECT id, name, parent_id, 0 as level
  FROM categories
  WHERE parent_id IS NULL

  UNION ALL

  -- 递归情况: 子类别
  SELECT c.id, c.name, c.parent_id, ct.level + 1
  FROM categories c
  INNER JOIN category_tree ct ON c.parent_id = ct.id
)
SELECT * FROM category_tree
ORDER BY level, name;

-- 查找每组前N个
WITH ranked_products AS (
  SELECT
    p.*,
    c.name as category_name,
    ROW_NUMBER() OVER (PARTITION BY p.category_id ORDER BY p.sales DESC) as rank
  FROM products p
  INNER JOIN categories c ON p.category_id = c.id
)
SELECT * FROM ranked_products
WHERE rank <= 3;

UPSERT (插入或更新)

-- PostgreSQL - ON CONFLICT
INSERT INTO users (id, username, email, updated_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (id)
DO UPDATE SET
  username = EXCLUDED.username,
  email = EXCLUDED.email,
  updated_at = NOW();

-- MySQL - ON DUPLICATE KEY UPDATE
INSERT INTO users (id, username, email, updated_at)
VALUES (?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
  username = VALUES(username),
  email = VALUES(email),
  updated_at = NOW();

ORM查询示例

Prisma (TypeScript)

// 基础查询
const users = await prisma.user.findMany({
  where: {
    active: true,
    createdAt: {
      gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
    }
  },
  orderBy: { createdAt: 'desc' },
  take: 100
});

// 关系
const userWithOrders = await prisma.user.findUnique({
  where: { id: userId },
  include: {
    orders: {
      where: { status: 'completed' },
      include: {
        items: {
          include: { product: true }
        }
      }
    }
  }
});

// 聚合
const stats = await prisma.order.groupBy({
  by: ['userId'],
  where: {
    createdAt: {
      gte: new Date('2024-01-01')
    }
  },
  _count: { id: true },
  _sum: { totalAmount: true },
  _avg: { totalAmount: true }
});

// 需要时使用原始SQL
const result = await prisma.$queryRaw`
  SELECT * FROM users
  WHERE email = ${email}
  AND active = true
`;

TypeORM (TypeScript)

// 查询构建器
const users = await dataSource
  .getRepository(User)
  .createQueryBuilder('user')
  .where('user.active = :active', { active: true })
  .andWhere('user.createdAt >= :date', {
    date: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
  })
  .orderBy('user.createdAt', 'DESC')
  .take(100)
  .getMany();

// 关系
const userWithOrders = await dataSource
  .getRepository(User)
  .createQueryBuilder('user')
  .leftJoinAndSelect('user.orders', 'order')
  .leftJoinAndSelect('order.items', 'item')
  .leftJoinAndSelect('item.product', 'product')
  .where('user.id = :id', { id: userId })
  .andWhere('order.status = :status', { status: 'completed' })
  .getOne();

// 聚合
const stats = await dataSource
  .getRepository(Order)
  .createQueryBuilder('order')
  .select('order.userId', 'userId')
  .addSelect('COUNT(order.id)', 'orderCount')
  .addSelect('SUM(order.totalAmount)', 'totalSpent')
  .addSelect('AVG(order.totalAmount)', 'avgOrder')
  .where('order.createdAt >= :date', { date: new Date('2024-01-01') })
  .groupBy('order.userId')
  .getRawMany();

SQLAlchemy (Python)

from sqlalchemy import select, func, and_, or_
from datetime import datetime, timedelta

// 基础查询
stmt = (
    select(User)
    .where(
        and_(
            User.active == True,
            User.created_at >= datetime.now() - timedelta(days=30)
        )
    )
    .order_by(User.created_at.desc())
    .limit(100)
)
users = session.execute(stmt).scalars().all()

// 连接
stmt = (
    select(User, Order)
    .join(Order, User.id == Order.user_id)
    .where(Order.status == 'completed')
    .order_by(Order.created_at.desc())
)
results = session.execute(stmt).all()

// 聚合
stmt = (
    select(
        func.date_trunc('day', Order.created_at).label('date'),
        func.count(Order.id).label('order_count'),
        func.sum(Order.total_amount).label('revenue'),
        func.avg(Order.total_amount).label('avg_order')
    )
    .where(Order.created_at >= datetime.now() - timedelta(days=7))
    .group_by(func.date_trunc('day', Order.created_at))
    .order_by('date desc')
)
stats = session.execute(stmt).all()

// 需要时使用原始SQL
result = session.execute(
    text("SELECT * FROM users WHERE email = :email"),
    {"email": email}
).fetchall()

NoSQL查询示例

MongoDB

// 基础查询
db.users.find({
  active: true,
  createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
})
.sort({ createdAt: -1 })
.limit(100);

// 聚合管道
db.orders.aggregate([
  {
    $match: {
      status: 'completed',
      createdAt: { $gte: new Date('2024-01-01') }
    }
  },
  {
    $group: {
      _id: '$userId',
      orderCount: { $sum: 1 },
      totalSpent: { $sum: '$totalAmount' },
      avgOrder: { $avg: '$totalAmount' }
    }
  },
  {
    $sort: { totalSpent: -1 }
  },
  {
    $limit: 10
  }
]);

// 查找 (连接)
db.users.aggregate([
  {
    $lookup: {
      from: 'orders',
      localField: '_id',
      foreignField: 'userId',
      as: 'orders'
    }
  },
  {
    $match: { 'orders.0': { $exists: true } }
  },
  {
    $project: {
      username: 1,
      email: 1,
      orderCount: { $size: '$orders' }
    }
  }
]);

Mongoose (Node.js)

// 基础查询
const users = await User.find({
  active: true,
  createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
})
.sort({ createdAt: -1 })
.limit(100);

// 填充 (连接)
const user = await User.findById(userId)
  .populate({
    path: 'orders',
    match: { status: 'completed' },
    populate: {
      path: 'items.product'
    }
  });

// 聚合
const stats = await Order.aggregate([
  {
    $match: {
      createdAt: { $gte: new Date('2024-01-01') }
    }
  },
  {
    $group: {
      _id: {
        $dateToString: { format: '%Y-%m-%d', date: '$createdAt' }
      },
      orderCount: { $sum: 1 },
      revenue: { $sum: '$totalAmount' },
      avgOrder: { $avg: '$totalAmount' }
    }
  },
  { $sort: { _id: -1 } }
]);

性能优化

使用索引

-- 为频繁查询的列创建索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_created_at ON orders(created_at);

-- 复合索引用于多列
CREATE INDEX idx_orders_user_status ON orders(user_id, status);

-- 部分索引 (PostgreSQL)
CREATE INDEX idx_active_users ON users(email) WHERE active = true;

-- 全文搜索索引 (PostgreSQL)
CREATE INDEX idx_products_search ON products
USING GIN(to_tsvector('english', name || ' ' || description));

查询优化技巧

-- ❌ 不好 - SELECT *
SELECT * FROM users WHERE id = 1;

-- ✓ 好 - 只选择需要的列
SELECT id, username, email FROM users WHERE id = 1;

-- ❌ 不好 - 在索引列上使用函数
SELECT * FROM users WHERE LOWER(email) = 'user@example.com';

-- ✓ 好 - 存储小写邮箱或使用函数索引
SELECT * FROM users WHERE email = 'user@example.com';

-- ❌ 不好 - OR条件不能高效使用索引
SELECT * FROM orders WHERE user_id = 1 OR customer_email = 'user@example.com';

-- ✓ 好 - 适当时使用UNION
SELECT * FROM orders WHERE user_id = 1
UNION
SELECT * FROM orders WHERE customer_email = 'user@example.com';

-- ❌ 不好 - 带子查询的NOT IN
SELECT * FROM users WHERE id NOT IN (SELECT user_id FROM banned_users);

-- ✓ 好 - 带NULL检查的LEFT JOIN
SELECT u.* FROM users u
LEFT JOIN banned_users bu ON u.id = bu.user_id
WHERE bu.user_id IS NULL;

分页

-- ❌ 不好 - OFFSET在大量偏移时变慢
SELECT * FROM users
ORDER BY created_at DESC
LIMIT 20 OFFSET 10000;

-- ✓ 好 - 基于游标的分页
SELECT * FROM users
WHERE created_at < '2024-01-01 12:00:00'
ORDER BY created_at DESC
LIMIT 20;

-- ✓ 更好 - 键集分页
SELECT * FROM users
WHERE (created_at, id) < ('2024-01-01 12:00:00', 12345)
ORDER BY created_at DESC, id DESC
LIMIT 20;

常见模式

软删除

-- 添加deleted_at列
ALTER TABLE users ADD COLUMN deleted_at TIMESTAMP NULL;

-- 通过设置时间戳“删除”
UPDATE users SET deleted_at = NOW() WHERE id = 1;

-- 查询活动记录
SELECT * FROM users WHERE deleted_at IS NULL;

-- 为更好性能创建索引
CREATE INDEX idx_users_deleted_at ON users(deleted_at)
WHERE deleted_at IS NULL;

审计追踪

-- 审计表
CREATE TABLE audit_log (
  id SERIAL PRIMARY KEY,
  table_name VARCHAR(50),
  record_id INTEGER,
  action VARCHAR(10),
  old_values JSONB,
  new_values JSONB,
  changed_by INTEGER,
  changed_at TIMESTAMP DEFAULT NOW()
);

-- 自动审计触发器
CREATE OR REPLACE FUNCTION audit_trigger()
RETURNS TRIGGER AS $$
BEGIN
  INSERT INTO audit_log (table_name, record_id, action, old_values, new_values, changed_by)
  VALUES (
    TG_TABLE_NAME,
    NEW.id,
    TG_OP,
    row_to_json(OLD),
    row_to_json(NEW),
    current_user_id()
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

运行总计

-- 窗口函数方法
SELECT
  date,
  daily_revenue,
  SUM(daily_revenue) OVER (ORDER BY date) as running_total
FROM daily_stats
ORDER BY date;

应避免的反模式

N+1查询问题

// ❌ 不好 - N+1查询
const users = await User.findAll();
for (const user of users) {
  const orders = await Order.findAll({ where: { userId: user.id } });
  // 处理订单...
}

// ✓ 好 - 带连接的单一查询
const users = await User.findAll({
  include: [{ model: Order }]
});

缺少索引

// ❌ 不好 - 外键无索引
SELECT * FROM orders WHERE user_id = 123; // 慢!

// ✓ 好 - 外键索引
CREATE INDEX idx_orders_user_id ON orders(user_id);

检索过多数据

// ❌ 不好 - 获取所有行
SELECT * FROM orders; // 可能有数百万行!

// ✓ 好 - 使用分页
SELECT * FROM orders
ORDER BY created_at DESC
LIMIT 100;

最佳实践

  1. 始终使用参数化查询以防止SQL注入
  2. 索引外键和频繁查询的列
  3. 使用EXPLAIN ANALYZE理解查询性能
  4. **避免SELECT *** - 只获取需要的列
  5. 使用事务保证数据一致性
  6. 实现分页用于大数据集
  7. 缓存频繁访问的数据 (Redis, Memcached)
  8. 监控慢查询并优化它们
  9. 使用连接池管理数据库连接
  10. 在PostgreSQL上定期VACUUM和ANALYZE

注意

  • 用真实数据量测试查询
  • 在生产中监控查询执行时间
  • 对读重负载使用读副本
  • 考虑数据库特定功能 (PostgreSQL扩展, MySQL存储引擎)
  • 用注释记录复杂查询
  • 保持ORMs更新但了解复杂操作的原始SQL