名称: mongodb-aggregation-pipeline 版本: “2.1.0” 描述: 掌握MongoDB聚合管道进行复杂数据转换。学习管道阶段、分组、过滤和数据变换。用于分析数据、创建报告或转换文档时使用。 sasmp_version: “1.3.0” 绑定代理: 02-mongodb-queries-aggregation 绑定类型: PRIMARY_BOND
生产级技能配置
能力:
- 管道构建
- 阶段优化
- 数据变换
- 分析查询
- 报告生成
输入验证: 必需上下文: - 集合名称 - 期望输出 可选上下文: - 示例文档 - 性能要求 - 索引可用性
输出格式: 管道: 数组 解释: 字符串 性能备注: 数组 替代方案: 数组
错误处理: 常见错误: - 代码: AGG001 条件: “无效的阶段语法” 恢复: “验证阶段操作符和字段引用” - 代码: AGG002 条件: “内存限制超出” 恢复: “添加 $allowDiskUse: true 或早期使用 $match 优化” - 代码: AGG003 条件: “在空数组上展开” 恢复: “使用 preserveNullAndEmptyArrays: true”
先决条件: mongodb_version: “4.4+” 必需知识: - 基本CRUD操作 - 查询操作符 推荐索引: - “用于 $match 阶段的字段”
测试: 单元测试模板: | // 测试聚合管道 const result = await collection.aggregate(pipeline).toArray() expect(result).toHaveLength(expectedCount) expect(result[0]).toMatchObject(expectedShape)
MongoDB聚合管道
掌握强大的数据变换能力,使用聚合管道。
快速开始
基本管道结构
const result = await collection.aggregate([
{ $match: { status: 'active' } },
{ $group: { _id: '$category', count: { $sum: 1 } } },
{ $sort: { count: -1 } },
{ $limit: 10 }
]).toArray();
常见管道阶段
// $match: 过滤文档(类似SQL中的WHERE)
{ $match: { age: { $gte: 18 }, status: 'active' } }
// $group: 分组文档和聚合
{ $group: {
_id: '$city',
total: { $sum: '$amount' },
average: { $avg: '$price' },
count: { $sum: 1 }
}}
// $project: 变换字段
{ $project: {
name: 1,
email: 1,
fullName: { $concat: ['$firstName', ' ', '$lastName'] },
_id: 0
}}
// $sort: 排序结果
{ $sort: { createdAt: -1 } } // -1 表示降序,1 表示升序
// $limit 和 $skip: 分页
{ $limit: 10 }
{ $skip: 20 }
// $unwind: 解构数组
{ $unwind: '$tags' } // 每个标签一个文档
// $lookup: 连接集合
{ $lookup: {
from: 'categories',
localField: 'categoryId',
foreignField: '_id',
as: 'category'
}}
// $facet: 多面搜索
{ $facet: {
byCategory: [{ $group: { _id: '$category', count: { $sum: 1 } } }],
byPrice: [{ $group: { _id: null, avg: { $avg: '$price' } } }]
}}
聚合函数
// 数字函数
{ $sum: 1 } // 计数
{ $sum: '$amount' } // 求和字段
{ $avg: '$price' } // 平均值
{ $min: '$quantity' } // 最小值
{ $max: '$quantity' } // 最大值
// 数组函数
{ $push: '$tags' } // 收集所有值
{ $addToSet: '$category' } // 收集唯一值
{ $first: '$name' } // 第一个元素
{ $last: '$name' } // 最后一个元素
// 字符串函数
{ $concat: ['$firstName', ' ', '$lastName'] }
{ $substr: ['$email', 0, 5] }
{ $toLower: '$name' }
{ $toUpper: '$name' }
// 条件函数
{ $cond: [
{ $gte: ['$age', 18] },
'Adult',
'Minor'
]}
实际世界示例
按类别销售报告
await orders.aggregate([
{ $match: { status: 'completed' } },
{ $group: {
_id: '$category',
totalSales: { $sum: '$amount' },
ordersCount: { $sum: 1 },
avgOrderValue: { $avg: '$amount' }
}},
{ $sort: { totalSales: -1 } }
]).toArray();
用户活动摘要
await users.aggregate([
{ $match: { lastActive: { $gte: new Date(Date.now() - 30*24*60*60*1000) } } },
{ $project: {
name: 1,
email: 1,
lastActive: 1,
daysInactive: {
$floor: {
$divide: [
{ $subtract: [new Date(), '$lastActive'] },
1000 * 60 * 60 * 24
]
}
}
}},
{ $sort: { daysInactive: 1 } }
]).toArray();
使用Lookup连接
await orders.aggregate([
{ $match: { status: 'pending' } },
{ $lookup: {
from: 'customers',
localField: 'customerId',
foreignField: '_id',
as: 'customer'
}},
{ $unwind: '$customer' },
{ $project: {
orderId: '$_id',
customerName: '$customer.name',
total: '$amount',
_id: 0
}}
]).toArray();
Python示例
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017')
collection = client['db']['collection']
pipeline = [
{'$match': {'status': 'active'}},
{'$group': {'_id': '$category', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
]
results = list(collection.aggregate(pipeline))
for doc in results:
print(doc)
性能提示
✅ 早期使用 $match 过滤文档 ✅ 避免在必要时之前使用 $project ✅ 在 $match 字段上使用索引 ✅ 在昂贵操作前使用 $limit ✅ 监控聚合性能 ✅ 使用 $facet 进行并行处理 ✅ 避免大型 $unwind 操作