MongoDB聚合管道Skill mongodb-aggregation-pipeline

这个技能是用于掌握MongoDB数据库中的聚合管道功能,实现复杂数据转换、分析和报告生成。它包括构建管道阶段、优化查询性能、进行数据分组、过滤和变换,适用于数据分析、商业智能和数据工程等场景。关键词:MongoDB, 聚合管道, 数据分析, 数据转换, 数据库查询, 报告生成, 数据治理, ETL开发。

数据分析 0 次安装 0 次浏览 更新于 3/14/2026

名称: mongodb-aggregation-pipeline 版本: “2.1.0” 描述: 掌握MongoDB聚合管道进行复杂数据转换。学习管道阶段、分组、过滤和数据变换。用于分析数据、创建报告或转换文档时使用。 sasmp_version: “1.3.0” 绑定代理: 02-mongodb-queries-aggregation 绑定类型: PRIMARY_BOND

生产级技能配置

能力:

  • 管道构建
  • 阶段优化
  • 数据变换
  • 分析查询
  • 报告生成

输入验证: 必需上下文: - 集合名称 - 期望输出 可选上下文: - 示例文档 - 性能要求 - 索引可用性

输出格式: 管道: 数组 解释: 字符串 性能备注: 数组 替代方案: 数组

错误处理: 常见错误: - 代码: AGG001 条件: “无效的阶段语法” 恢复: “验证阶段操作符和字段引用” - 代码: AGG002 条件: “内存限制超出” 恢复: “添加 $allowDiskUse: true 或早期使用 $match 优化” - 代码: AGG003 条件: “在空数组上展开” 恢复: “使用 preserveNullAndEmptyArrays: true”

先决条件: mongodb_version: “4.4+” 必需知识: - 基本CRUD操作 - 查询操作符 推荐索引: - “用于 $match 阶段的字段”

测试: 单元测试模板: | // 测试聚合管道 const result = await collection.aggregate(pipeline).toArray() expect(result).toHaveLength(expectedCount) expect(result[0]).toMatchObject(expectedShape)

MongoDB聚合管道

掌握强大的数据变换能力,使用聚合管道。

快速开始

基本管道结构

const result = await collection.aggregate([
  { $match: { status: 'active' } },
  { $group: { _id: '$category', count: { $sum: 1 } } },
  { $sort: { count: -1 } },
  { $limit: 10 }
]).toArray();

常见管道阶段

// $match: 过滤文档(类似SQL中的WHERE)
{ $match: { age: { $gte: 18 }, status: 'active' } }

// $group: 分组文档和聚合
{ $group: {
  _id: '$city',
  total: { $sum: '$amount' },
  average: { $avg: '$price' },
  count: { $sum: 1 }
}}

// $project: 变换字段
{ $project: {
  name: 1,
  email: 1,
  fullName: { $concat: ['$firstName', ' ', '$lastName'] },
  _id: 0
}}

// $sort: 排序结果
{ $sort: { createdAt: -1 } }  // -1 表示降序,1 表示升序

// $limit 和 $skip: 分页
{ $limit: 10 }
{ $skip: 20 }

// $unwind: 解构数组
{ $unwind: '$tags' }  // 每个标签一个文档

// $lookup: 连接集合
{ $lookup: {
  from: 'categories',
  localField: 'categoryId',
  foreignField: '_id',
  as: 'category'
}}

// $facet: 多面搜索
{ $facet: {
  byCategory: [{ $group: { _id: '$category', count: { $sum: 1 } } }],
  byPrice: [{ $group: { _id: null, avg: { $avg: '$price' } } }]
}}

聚合函数

// 数字函数
{ $sum: 1 }                    // 计数
{ $sum: '$amount' }            // 求和字段
{ $avg: '$price' }             // 平均值
{ $min: '$quantity' }          // 最小值
{ $max: '$quantity' }          // 最大值

// 数组函数
{ $push: '$tags' }             // 收集所有值
{ $addToSet: '$category' }     // 收集唯一值
{ $first: '$name' }            // 第一个元素
{ $last: '$name' }             // 最后一个元素

// 字符串函数
{ $concat: ['$firstName', ' ', '$lastName'] }
{ $substr: ['$email', 0, 5] }
{ $toLower: '$name' }
{ $toUpper: '$name' }

// 条件函数
{ $cond: [
  { $gte: ['$age', 18] },
  'Adult',
  'Minor'
]}

实际世界示例

按类别销售报告

await orders.aggregate([
  { $match: { status: 'completed' } },
  { $group: {
    _id: '$category',
    totalSales: { $sum: '$amount' },
    ordersCount: { $sum: 1 },
    avgOrderValue: { $avg: '$amount' }
  }},
  { $sort: { totalSales: -1 } }
]).toArray();

用户活动摘要

await users.aggregate([
  { $match: { lastActive: { $gte: new Date(Date.now() - 30*24*60*60*1000) } } },
  { $project: {
    name: 1,
    email: 1,
    lastActive: 1,
    daysInactive: {
      $floor: {
        $divide: [
          { $subtract: [new Date(), '$lastActive'] },
          1000 * 60 * 60 * 24
        ]
      }
    }
  }},
  { $sort: { daysInactive: 1 } }
]).toArray();

使用Lookup连接

await orders.aggregate([
  { $match: { status: 'pending' } },
  { $lookup: {
    from: 'customers',
    localField: 'customerId',
    foreignField: '_id',
    as: 'customer'
  }},
  { $unwind: '$customer' },
  { $project: {
    orderId: '$_id',
    customerName: '$customer.name',
    total: '$amount',
    _id: 0
  }}
]).toArray();

Python示例

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017')
collection = client['db']['collection']

pipeline = [
    {'$match': {'status': 'active'}},
    {'$group': {'_id': '$category', 'count': {'$sum': 1}}},
    {'$sort': {'count': -1}}
]

results = list(collection.aggregate(pipeline))
for doc in results:
    print(doc)

性能提示

✅ 早期使用 $match 过滤文档 ✅ 避免在必要时之前使用 $project ✅ 在 $match 字段上使用索引 ✅ 在昂贵操作前使用 $limit ✅ 监控聚合性能 ✅ 使用 $facet 进行并行处理 ✅ 避免大型 $unwind 操作