想象一下,你需要从海量的用户行为数据中分析出最有价值的商业洞察。传统的查询只能获取原始数据,而聚合管道就像是一个强大的数据分析工厂,能够将原始数据经过多道工序处理,最终产出精炼的商业报告。
MongoDB 的聚合管道(Aggregation Pipeline)是数据分析的核心工具,它通过一系列有序的阶段(Stage)对数据进行转换、筛选、分组、统计,最终生成复杂的分析结果。从简单的数据统计到复杂的商业智能分析,聚合管道都能胜任。
今天,我们将深入探索 MongoDB 聚合管道的完整体系,通过丰富的实例和实际业务场景,让你掌握这个强大的数据分析工具,成为数据洞察的专家。
目录
- 为什么聚合管道如此重要?
- 聚合管道基础架构
- 核心聚合阶段详解
- 数据筛选:$match 阶段
- 字段投影:$project 阶段
- 数据分组:$group 阶段
- 数据排序:$sort 阶段
- 数据限制:$limit 和 $skip 阶段
- 高级聚合阶段
- 数据关联:$lookup 阶段
- 数组处理:$unwind 阶段
- 数据分面:$facet 阶段
- 数据合并:$merge 和 $out 阶段
- 聚合操作符详解
- 数学操作符:$sum, $avg, $min, $max
- 数组操作符:$push, $addToSet, $first, $last
- 字符串操作符:$concat, $substr, $toUpper
- 日期操作符:$year, $month, $dayOfMonth
- 实际应用场景
- 性能优化策略
- 企业级开发注意事项
- 常见问题与解决方案
- 总结:掌握聚合分析
为什么聚合管道如此重要?
聚合是数据分析的核心
在数据驱动的时代,原始数据本身价值有限,真正的价值在于从数据中提取洞察。MongoDB 的聚合管道提供了:
- 数据转换:将原始数据转换为有意义的格式
- 统计分析:计算各种统计指标和趋势
- 数据关联:连接多个集合的数据进行综合分析
- 实时计算:支持实时数据分析和报表生成
聚合管道 vs 传统查询对比
让我们看看聚合管道相比传统查询的优势:
功能需求 | 传统查询 | 聚合管道 | 优势说明 |
---|---|---|---|
数据统计 | 需要应用层计算 | 数据库层直接计算 | 性能更好,减少网络传输 |
多表关联 | 多次查询后合并 | 单次查询完成 | 减少查询次数,提升效率 |
复杂计算 | 应用层处理 | 数据库层处理 | 利用数据库优化能力 |
数据转换 | 应用层转换 | 管道阶段转换 | 更灵活的数据处理 |
实时分析 | 难以实现 | 原生支持 | 支持实时数据流处理 |
聚合管道的核心优势:
- 性能优化:在数据库层完成计算,减少数据传输
- 功能强大:支持复杂的数据分析和转换
- 灵活组合:阶段可以灵活组合,适应各种需求
- 实时处理:支持流式数据处理和实时分析
聚合管道基础架构
聚合管道的基本结构
// MongoDB Shell / Node.js
db.collection.aggregate([
{
$stage1: {
/* 阶段1的配置 */
},
},
{
$stage2: {
/* 阶段2的配置 */
},
},
{
$stage3: {
/* 阶段3的配置 */
},
},
// ... 更多阶段
]);
管道的特点:
- 有序执行:阶段按顺序执行,前一个阶段的输出是下一个阶段的输入
- 流式处理:数据流经管道,每个阶段处理一部分数据
- 灵活组合:可以根据需求组合不同的阶段
基础聚合示例
// MongoDB Shell / Node.js
// 1. 简单的数据统计
db.orders.aggregate([
{ $match: { status: "completed" } }, // 筛选已完成的订单
{
$group: {
_id: "$category",
totalAmount: { $sum: "$amount" },
count: { $sum: 1 },
},
}, // 按分类分组统计
{ $sort: { totalAmount: -1 } }, // 按总金额降序排列
]);
// 2. 用户行为分析
db.user_activities.aggregate([
{
$match: {
timestamp: { $gte: new Date("2024-01-01") },
},
}, // 筛选2024年的数据
{
$group: {
_id: {
userId: "$userId",
action: "$action",
},
count: { $sum: 1 },
lastActivity: { $max: "$timestamp" },
},
}, // 按用户和行为分组
{ $sort: { count: -1 } }, // 按活动次数排序
{ $limit: 10 }, // 只返回前10条
]);
核心聚合阶段详解
数据筛选:$match 阶段
$match 阶段:在管道开始处筛选数据,类似于 SQL 的 WHERE 子句
// MongoDB Shell / Node.js
// 1. 基础筛选
db.products.aggregate([
{
$match: {
category: "电子产品",
price: { $gte: 1000 },
status: "active",
},
},
]);
// 2. 复杂条件筛选
db.orders.aggregate([
{
$match: {
$and: [
{ status: { $in: ["pending", "processing"] } },
{ createdAt: { $gte: new Date("2024-01-01") } },
{ totalAmount: { $gte: 500 } },
],
},
},
]);
// 3. 正则表达式筛选
db.articles.aggregate([
{
$match: {
title: { $regex: /MongoDB|数据库/, $options: "i" },
publishDate: { $gte: new Date("2024-01-01") },
},
},
]);
$match 阶段的最佳实践:
- 早期筛选:尽量在管道开始处使用 $match 减少数据量
- 索引利用:确保 $match 条件能够使用索引
- 条件优化:将选择性高的条件放在前面
字段投影:$project 阶段
$project 阶段:控制输出字段,类似于 SQL 的 SELECT 子句
// MongoDB Shell / Node.js
// 1. 基础字段投影
db.users.aggregate([
{
$project: {
username: 1,
email: 1,
createdAt: 1,
_id: 0, // 排除 _id 字段
},
},
]);
// 2. 计算字段
db.products.aggregate([
{
$project: {
name: 1,
price: 1,
category: 1,
// 计算折扣价格
discountedPrice: {
$multiply: ["$price", { $subtract: [1, "$discount"] }],
},
// 计算利润率
profitMargin: {
$multiply: [
{ $divide: [{ $subtract: ["$price", "$cost"] }, "$price"] },
100,
],
},
},
},
]);
// 3. 条件投影
db.orders.aggregate([
{
$project: {
orderId: 1,
customerName: 1,
totalAmount: 1,
// 根据金额设置订单等级
orderLevel: {
$switch: {
branches: [
{ case: { $gte: ["$totalAmount", 10000] }, then: "VIP" },
{ case: { $gte: ["$totalAmount", 5000] }, then: "高级" },
{ case: { $gte: ["$totalAmount", 1000] }, then: "普通" },
],
default: "小额",
},
},
},
},
]);
数据分组:$group 阶段
$group 阶段:按指定字段分组并计算聚合值,类似于 SQL 的 GROUP BY
// MongoDB Shell / Node.js
// 1. 基础分组统计
db.orders.aggregate([
{
$group: {
_id: "$category", // 按分类分组
totalAmount: { $sum: "$amount" }, // 总金额
averageAmount: { $avg: "$amount" }, // 平均金额
orderCount: { $sum: 1 }, // 订单数量
maxAmount: { $max: "$amount" }, // 最大金额
minAmount: { $min: "$amount" }, // 最小金额
},
},
]);
// 2. 多字段分组
db.sales.aggregate([
{
$group: {
_id: {
year: { $year: "$date" },
month: { $month: "$date" },
region: "$region",
},
totalSales: { $sum: "$amount" },
averageSales: { $avg: "$amount" },
salesCount: { $sum: 1 },
},
},
]);
// 3. 复杂分组统计
db.user_activities.aggregate([
{ $match: { action: "purchase" } },
{
$group: {
_id: "$userId",
totalPurchases: { $sum: 1 },
totalSpent: { $sum: "$amount" },
firstPurchase: { $min: "$timestamp" },
lastPurchase: { $max: "$timestamp" },
averagePurchase: { $avg: "$amount" },
// 计算购买频率(天)
purchaseFrequency: {
$divide: [
{ $subtract: [{ $max: "$timestamp" }, { $min: "$timestamp" }] },
86400000, // 毫秒转天数
],
},
},
},
]);
数据排序:$sort 阶段
$sort 阶段:对数据进行排序,类似于 SQL 的 ORDER BY
// MongoDB Shell / Node.js
// 1. 单字段排序
db.products.aggregate([
{ $sort: { price: -1 } }, // 按价格降序
]);
// 2. 多字段排序
db.orders.aggregate([
{
$sort: {
status: 1, // 先按状态升序
createdAt: -1, // 再按创建时间降序
},
},
]);
// 3. 结合其他阶段
db.sales.aggregate([
{
$group: {
_id: "$productId",
totalSales: { $sum: "$amount" },
salesCount: { $sum: 1 },
},
},
{ $sort: { totalSales: -1 } },
{ $limit: 10 }, // 只返回前10名
]);
数据限制:$limit 和 $skip 阶段
$limit 阶段:限制返回的文档数量
$skip 阶段:跳过指定数量的文档
// MongoDB Shell / Node.js
// 1. 分页查询
db.products.aggregate([
{ $match: { category: "电子产品" } },
{ $sort: { price: 1 } },
{ $skip: 20 }, // 跳过前20条
{ $limit: 10 }, // 返回10条
]);
// 2. Top N 查询
db.orders.aggregate([
{
$group: {
_id: "$customerId",
totalAmount: { $sum: "$amount" },
},
},
{ $sort: { totalAmount: -1 } },
{ $limit: 5 }, // 返回前5名客户
]);
// 3. 数据采样
db.user_activities.aggregate([
{ $match: { action: "login" } },
{ $sample: { size: 1000 } }, // 随机采样1000条
{
$group: {
_id: { $hour: "$timestamp" },
loginCount: { $sum: 1 },
},
},
]);
高级聚合阶段
数据关联:$lookup 阶段
$lookup 阶段:关联其他集合的数据,类似于 SQL 的 JOIN
// MongoDB Shell / Node.js
// 1. 基础关联
db.orders.aggregate([
{
$lookup: {
from: "customers", // 关联的集合
localField: "customerId", // 本地字段
foreignField: "_id", // 外部字段
as: "customer", // 输出字段名
},
},
{
$project: {
orderId: 1,
totalAmount: 1,
"customer.name": 1,
"customer.email": 1,
},
},
]);
// 2. 条件关联
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { orderItems: "$items" },
pipeline: [
{
$match: {
$expr: { $in: ["$_id", "$$orderItems.productId"] },
},
},
],
as: "productDetails",
},
},
]);
// 3. 多表关联
db.orders.aggregate([
{
$lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customer",
},
},
{
$lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "products",
},
},
{ $unwind: "$customer" }, // 展开客户信息
{
$project: {
orderId: 1,
customerName: "$customer.name",
totalAmount: 1,
items: 1,
products: 1,
},
},
]);
数组处理:$unwind 阶段
$unwind 阶段:展开数组字段,将数组中的每个元素作为单独的文档
// MongoDB Shell / Node.js
// 1. 基础数组展开
db.orders.aggregate([
{ $unwind: "$items" }, // 展开订单项数组
{
$lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "product",
},
},
{ $unwind: "$product" },
{
$project: {
orderId: 1,
productName: "$product.name",
quantity: "$items.quantity",
price: "$items.price",
},
},
]);
// 2. 保留空数组
db.articles.aggregate([
{
$unwind: {
path: "$tags",
preserveNullAndEmptyArrays: true, // 保留没有标签的文章
},
},
{
$group: {
_id: "$tags",
count: { $sum: 1 },
},
},
]);
// 3. 数组索引展开
db.users.aggregate([
{
$unwind: {
path: "$skills",
includeArrayIndex: "skillIndex", // 包含数组索引
},
},
{
$project: {
username: 1,
skill: "$skills",
skillIndex: 1,
},
},
]);
数据分面:$facet 阶段
$facet 阶段:在同一组输入文档上执行多个聚合管道
// MongoDB Shell / Node.js
// 1. 多维度分析
db.products.aggregate([
{
$facet: {
// 按分类统计
byCategory: [
{
$group: {
_id: "$category",
count: { $sum: 1 },
avgPrice: { $avg: "$price" },
},
},
],
// 按价格区间统计
byPriceRange: [
{
$bucket: {
groupBy: "$price",
boundaries: [0, 100, 500, 1000, 5000],
default: "5000+",
output: {
count: { $sum: 1 },
avgPrice: { $avg: "$price" },
},
},
},
],
// 按品牌统计
byBrand: [
{
$group: {
_id: "$brand",
count: { $sum: 1 },
},
},
{ $sort: { count: -1 } },
{ $limit: 10 },
],
},
},
]);
// 2. 电商数据分析
db.orders.aggregate([
{ $match: { status: "completed" } },
{
$facet: {
// 销售趋势
salesTrend: [
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
},
totalSales: { $sum: "$totalAmount" },
orderCount: { $sum: 1 },
},
},
{ $sort: { "_id.year": 1, "_id.month": 1 } },
],
// 客户分析
customerAnalysis: [
{
$group: {
_id: "$customerId",
totalSpent: { $sum: "$totalAmount" },
orderCount: { $sum: 1 },
},
},
{
$bucket: {
groupBy: "$totalSpent",
boundaries: [0, 1000, 5000, 10000, 50000],
default: "50000+",
output: {
customerCount: { $sum: 1 },
avgSpent: { $avg: "$totalSpent" },
},
},
},
],
// 产品分析
productAnalysis: [
{ $unwind: "$items" },
{
$group: {
_id: "$items.productId",
totalQuantity: { $sum: "$items.quantity" },
totalRevenue: {
$sum: { $multiply: ["$items.quantity", "$items.price"] },
},
},
},
{ $sort: { totalRevenue: -1 } },
{ $limit: 20 },
],
},
},
]);
数据合并:$merge 和 $out 阶段
$out 阶段:将聚合结果写入新集合
$merge 阶段:将聚合结果合并到现有集合
// MongoDB Shell / Node.js
// 1. 使用 $out 创建报表
db.orders.aggregate([
{ $match: {
createdAt: { $gte: new Date("2024-01-01") }
}},
{ $group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
category: "$category"
},
totalSales: { $sum: "$totalAmount" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$totalAmount" }
}},
{ $sort: { "_id.year": 1, "_id.month": 1 } },
{ $out: "monthly_sales_report" } // 输出到新集合
]);
// 2. 使用 $merge 更新现有数据
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: {
_id: "$customerId",
totalSpent: { $sum: "$totalAmount" },
orderC
{ KaTeX parse error: Expected '}', got 'EOF' at end of input: …p: { _id: "customerId",
totalSpent: { sum:"sum: "sum:"totalAmount" },
orderCount: { $sum: 1 },
avgOrderValue: { avg:"avg: "avg:"totalAmount" }
}},
{ $merge: {
into: “customer_summary”,
whenMatched: “replace”,
whenNotMatched: “insert”
}}
]);
// 3. 增量更新报表
db.orders.aggregate([
{ $match: {
createdAt: { $gte: new Date(“2024-01-01”) },
updatedAt: { $gte: new Date(“2024-01-01”) }
}},
{ $group: {
_id: {
year: { year:"year: "year:"createdAt" },
month: { month:"month: "month:"createdAt" }
},
totalSales: { sum:"sum: "sum:"totalAmount" },
orderCount: { $sum: 1 }
}},
{ $merge: {
into: “monthly_sales”,
on: [“_id.year”, “_id.month”],
whenMatched: “merge”,
whenNotMatched: “insert”
}}
]);
## 聚合操作符详解
### 数学操作符:$sum, $avg, $min, $max
**基础数学操作符**:用于数值计算和统计
```javascript
// MongoDB Shell / Node.js
// 1. 基础统计
db.orders.aggregate([{ $group: {_id: "$category",totalAmount: { $sum: "$amount" }, // 求和averageAmount: { $avg: "$amount" }, // 平均值maxAmount: { $max: "$amount" }, // 最大值minAmount: { $min: "$amount" }, // 最小值count: { $sum: 1 } // 计数}}
]);
// 2. 复杂计算
db.products.aggregate([{ $project: {name: 1,price: 1,cost: 1,// 计算利润率profitMargin: {$multiply: [{ $divide: [{ $subtract: ["$price", "$cost"] },"$price"]},100]},// 计算折扣价格discountedPrice: {$multiply: ["$price",{ $subtract: [1, { $ifNull: ["$discount", 0] }] }]}}}
]);
// 3. 条件统计
db.sales.aggregate([{ $group: {_id: "$region",totalSales: { $sum: "$amount" },// 只统计大于1000的销售highValueSales: {$sum: {$cond: [{ $gte: ["$amount", 1000] },"$amount",0]}},// 计算高价值销售占比highValueRatio: {$multiply: [{ $divide: [{ $sum: {$cond: [{ $gte: ["$amount", 1000] },1,0]}},{ $sum: 1 }]},100]}}}
]);
数组操作符:$push, $addToSet, $first, $last
数组操作符:用于处理数组数据的聚合
// MongoDB Shell / Node.js
// 1. 数组聚合
db.user_activities.aggregate([
{
$group: {
_id: "$userId",
// 收集所有活动类型
allActivities: { $push: "$action" },
// 收集唯一活动类型
uniqueActivities: { $addToSet: "$action" },
// 第一个和最后一个活动
firstActivity: { $first: "$action" },
lastActivity: { $last: "$action" },
// 活动次数
activityCount: { $sum: 1 },
},
},
]);
// 2. 复杂数组操作
db.orders.aggregate([
{ $unwind: "$items" },
{
$group: {
_id: "$customerId",
// 收集所有购买的产品
purchasedProducts: { $push: "$items.productId" },
// 收集唯一产品
uniqueProducts: { $addToSet: "$items.productId" },
// 收集产品详情
productDetails: {
$push: {
productId: "$items.productId",
quantity: "$items.quantity",
price: "$items.price",
},
},
// 总消费
totalSpent: { $sum: { $multiply: ["$items.quantity", "$items.price"] } },
},
},
]);
// 3. 数组去重和排序
db.articles.aggregate([
{ $unwind: "$tags" },
{
$group: {
_id: "$tags",
articleCount: { $sum: 1 },
// 收集文章标题
articles: { $push: "$title" },
// 收集作者
authors: { $addToSet: "$author" },
},
},
{ $sort: { articleCount: -1 } },
{ $limit: 10 },
]);
字符串操作符:$concat, $substr, $toUpper
字符串操作符:用于字符串处理和格式化
// MongoDB Shell / Node.js
// 1. 字符串连接和格式化
db.users.aggregate([
{
$project: {
// 连接姓名
fullName: {
$concat: [
{ $ifNull: ["$firstName", ""] },
" ",
{ $ifNull: ["$lastName", ""] },
],
},
// 格式化邮箱
emailDomain: {
$substr: [
"$email",
{ $add: [{ $indexOfBytes: ["$email", "@"] }, 1] },
-1,
],
},
// 用户名大写
usernameUpper: { $toUpper: "$username" },
},
},
]);
// 2. 地址格式化
db.customers.aggregate([
{
$project: {
customerId: 1,
// 格式化完整地址
fullAddress: {
$concat: [
{ $ifNull: ["$address.street", ""] },
", ",
{ $ifNull: ["$address.city", ""] },
", ",
{ $ifNull: ["$address.province", ""] },
" ",
{ $ifNull: ["$address.postalCode", ""] },
],
},
// 提取省份代码
provinceCode: {
$substr: [{ $ifNull: ["$address.province", ""] }, 0, 2],
},
},
},
]);
// 3. 文本分析
db.articles.aggregate([
{
$project: {
title: 1,
// 计算标题长度
titleLength: { $strLenBytes: "$title" },
// 提取前50个字符作为摘要
summary: {
$substr: ["$title", 0, { $min: [50, { $strLenBytes: "$title" }] }],
},
// 标题关键词(假设用空格分割)
keywords: {
$split: [{ $toLower: "$title" }, " "],
},
},
},
]);
日期操作符:$year, $month, $dayOfMonth
日期操作符:用于日期时间数据的处理和分析
// MongoDB Shell / Node.js
// 1. 日期分组统计
db.orders.aggregate([
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
day: { $dayOfMonth: "$createdAt" },
},
totalAmount: { $sum: "$totalAmount" },
orderCount: { $sum: 1 },
},
},
{ $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1 } },
]);
// 2. 时间范围分析
db.user_activities.aggregate([
{
$project: {
userId: 1,
action: 1,
timestamp: 1,
// 提取时间组件
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
dayOfWeek: { $dayOfWeek: "$timestamp" },
hour: { $hour: "$timestamp" },
// 计算年龄(天)
daysSinceActivity: {
$divide: [{ $subtract: [new Date(), "$timestamp"] }, 86400000],
},
},
},
]);
// 3. 季节性分析
db.sales.aggregate([
{
$project: {
amount: 1,
date: 1,
// 季节判断
season: {
$switch: {
branches: [
{ case: { $in: [{ $month: "$date" }, [12, 1, 2]] }, then: "冬季" },
{ case: { $in: [{ $month: "$date" }, [3, 4, 5]] }, then: "春季" },
{ case: { $in: [{ $month: "$date" }, [6, 7, 8]] }, then: "夏季" },
{ case: { $in: [{ $month: "$date" }, [9, 10, 11]] }, then: "秋季" },
],
default: "未知",
},
},
// 季度
quarter: {
$ceil: { $divide: [{ $month: "$date" }, 3] },
},
},
},
{
$group: {
_id: "$season",
totalSales: { $sum: "$amount" },
avgSales: { $avg: "$amount" },
salesCount: { $sum: 1 },
},
},
]);
实际应用场景
场景 1:电商销售分析系统
// Node.js 示例 - 电商销售分析API
async function getSalesAnalysis(startDate, endDate) {
const pipeline = [
// 1. 筛选时间范围
{
$match: {
createdAt: { $gte: new Date(startDate), $lte: new Date(endDate) },
status: "completed",
},
},
// 2. 展开订单项
{ $unwind: "$items" },
// 3. 关联产品信息
{
$lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "product",
},
},
{ $unwind: "$product" },
// 4. 关联客户信息
{
$lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customer",
},
},
{ $unwind: "$customer" },
// 5. 多维度分析
{
$facet: {
// 销售趋势
salesTrend: [
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
day: { $dayOfMonth: "$createdAt" },
},
totalSales: {
$sum: { $multiply: ["$items.quantity", "$items.price"] },
},
orderCount: { $sum: 1 },
uniqueCustomers: { $addToSet: "$customerId" },
},
},
{
$addFields: {
uniqueCustomerCount: { $size: "$uniqueCustomers" },
},
},
{ $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1 } },
],
// 产品分析
productAnalysis: [
{
$group: {
_id: "$product.category",
totalRevenue: {
$sum: { $multiply: ["$items.quantity", "$items.price"] },
},
totalQuantity: { $sum: "$items.quantity" },
avgPrice: { $avg: "$items.price" },
productCount: { $addToSet: "$items.productId" },
},
},
{
$addFields: {
uniqueProductCount: { $size: "$productCount" },
},
},
{ $sort: { totalRevenue: -1 } },
],
// 客户分析
customerAnalysis: [
{
$group: {
_id: "$customerId",
customerName: { $first: "$customer.name" },
totalSpent: {
$sum: { $multiply: ["$items.quantity", "$items.price"] },
},
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$totalAmount" },
lastOrderDate: { $max: "$createdAt" },
},
},
{ $sort: { totalSpent: -1 } },
{ $limit: 20 },
],
},
},
];
return await db.orders.aggregate(pipeline);
}
场景 2:用户行为分析系统
// Node.js 示例 - 用户行为分析
async function analyzeUserBehavior(userId, timeRange) {
const pipeline = [
// 1. 筛选用户和时间范围
{
$match: {
userId: new ObjectId(userId),
timestamp: {
$gte: new Date(timeRange.start),
$lte: new Date(timeRange.end),
},
},
},
// 2. 行为模式分析
{
$facet: {
// 活动频率分析
activityFrequency: [
{
$group: {
_id: {
hour: { $hour: "$timestamp" },
dayOfWeek: { $dayOfWeek: "$timestamp" },
},
activityCount: { $sum: 1 },
uniqueActions: { $addToSet: "$action" },
},
},
{
$addFields: {
uniqueActionCount: { $size: "$uniqueActions" },
},
},
{ $sort: { activityCount: -1 } },
],
// 行为路径分析
behaviorPath: [
{ $sort: { timestamp: 1 } },
{
$group: {
_id: "$userId",
actions: { $push: "$action" },
timestamps: { $push: "$timestamp" },
},
},
{
$project: {
behaviorSequence: {
$map: {
input: { $range: [0, { $size: "$actions" }] },
as: "index",
in: {
action: { $arrayElemAt: ["$actions", "$$index"] },
timestamp: { $arrayElemAt: ["$timestamps", "$$index"] },
},
},
},
},
},
],
// 功能使用统计
featureUsage: [
{
$group: {
_id: "$action",
usageCount: { $sum: 1 },
uniqueUsers: { $addToSet: "$userId" },
avgSessionDuration: { $avg: "$sessionDuration" },
},
},
{
$addFields: {
uniqueUserCount: { $size: "$uniqueUsers" },
},
},
{ $sort: { usageCount: -1 } },
],
},
},
];
return await db.user_activities.aggregate(pipeline);
}
场景 3:实时数据监控系统
// Node.js 示例 - 实时监控数据聚合
async function getRealTimeMetrics() {
const now = new Date();
const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000);
const pipeline = [
// 1. 筛选最近一小时的数据
{
$match: {
timestamp: { $gte: oneHourAgo, $lte: now },
},
},
// 2. 实时指标计算
{
$facet: {
// 系统性能指标
performanceMetrics: [
{
$group: {
_id: null,
avgResponseTime: { $avg: "$responseTime" },
maxResponseTime: { $max: "$responseTime" },
minResponseTime: { $min: "$responseTime" },
totalRequests: { $sum: 1 },
errorCount: {
$sum: {
$cond: [{ $gte: ["$statusCode", 400] }, 1, 0],
},
},
},
},
{
$addFields: {
errorRate: {
$multiply: [
{ $divide: ["$errorCount", "$totalRequests"] },
100,
],
},
},
},
],
// 用户活跃度
userActivity: [
{
$group: {
_id: {
hour: { $hour: "$timestamp" },
minute: { $minute: "$timestamp" },
},
activeUsers: { $addToSet: "$userId" },
requestCount: { $sum: 1 },
},
},
{
$addFields: {
uniqueUserCount: { $size: "$activeUsers" },
},
},
{ $sort: { "_id.hour": 1, "_id.minute": 1 } },
],
// 错误分析
errorAnalysis: [
{ $match: { statusCode: { $gte: 400 } } },
{
$group: {
_id: "$statusCode",
count: { $sum: 1 },
avgResponseTime: { $avg: "$responseTime" },
endpoints: { $addToSet: "$endpoint" },
},
},
{ $sort: { count: -1 } },
],
},
},
];
return await db.system_logs.aggregate(pipeline);
}
性能优化策略
索引优化
为聚合管道创建合适的索引:
// MongoDB Shell / Node.js
// 1. 单字段索引
db.orders.createIndex({ createdAt: 1 });
db.orders.createIndex({ status: 1 });
db.orders.createIndex({ customerId: 1 });
// 2. 复合索引
db.orders.createIndex({ status: 1, createdAt: -1 });
db.orders.createIndex({ customerId: 1, status: 1 });
// 3. 文本索引
db.products.createIndex({
name: "text",
description: "text",
});
// 4. 分析索引使用情况
db.orders
.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$customerId", total: { $sum: "$amount" } } },
])
.explain("executionStats");
管道优化
优化聚合管道的执行效率:
// MongoDB Shell / Node.js
// 1. 早期筛选 - 在管道开始处使用 $match
db.orders.aggregate([
{ $match: { status: "completed" } }, // 尽早筛选
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
]);
// 2. 投影优化 - 只选择需要的字段
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $project: { category: 1, amount: 1 } }, // 只选择必要字段
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
]);
// 3. 排序优化 - 利用索引进行排序
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $sort: { createdAt: -1 } }, // 利用索引排序
{ $limit: 100 },
]);
// 4. 分页优化 - 使用游标而不是 skip
db.orders.aggregate([
{
$match: {
status: "completed",
_id: { $gt: lastProcessedId }, // 使用游标分页
},
},
{ $sort: { _id: 1 } },
{ $limit: 100 },
]);
内存管理
控制聚合管道的内存使用:
// MongoDB Shell / Node.js
// 1. 使用 allowDiskUse 选项
db.largeCollection.aggregate(
[{ $group: { _id: "$category", count: { $sum: 1 } } }],
{ allowDiskUse: true }
);
// 2. 分批处理大数据集
async function processLargeDataset() {
const batchSize = 10000;
let lastId = null;
while (true) {
const pipeline = [
{ $match: lastId ? { _id: { $gt: lastId } } : {} },
{ $sort: { _id: 1 } },
{ $limit: batchSize },
{ $group: { _id: "$category", count: { $sum: 1 } } },
];
const results = await db.largeCollection.aggregate(pipeline);
if (results.length === 0) break;
// 处理结果
console.log(`处理了 ${results.length} 条记录`);
lastId = results[results.length - 1]._id;
}
}
// 3. 使用 $sample 进行数据采样
db.largeCollection.aggregate([
{ $sample: { size: 1000 } }, // 随机采样
{ $group: { _id: "$category", count: { $sum: 1 } } },
]);
企业级开发注意事项
聚合管道性能监控
监控聚合管道的执行性能:
// Node.js 示例 - 聚合性能监控
async function monitorAggregationPerformance(pipeline, collectionName) {
const startTime = Date.now();
try {
const result = await db.collection(collectionName).aggregate(pipeline);
const executionTime = Date.now() - startTime;
console.log(`聚合执行时间: ${executionTime}ms`);
console.log(`返回文档数: ${result.length}`);
// 记录性能指标
await db.performance_metrics.insertOne({
operation: "aggregation",
collection: collectionName,
executionTime: executionTime,
resultCount: result.length,
timestamp: new Date(),
});
return result;
} catch (error) {
console.error("聚合执行失败:", error);
throw error;
}
}
// 2. 聚合管道缓存策略
const aggregationCache = new Map();
async function getCachedAggregation(
cacheKey,
pipeline,
collectionName,
ttl = 300000
) {
const cached = aggregationCache.get(cacheKey);
if (cached && Date.now() - cached.timestamp < ttl) {
console.log("使用缓存结果");
return cached.data;
}
const result = await db.collection(collectionName).aggregate(pipeline);
aggregationCache.set(cacheKey, {
data: result,
timestamp: Date.now(),
});
return result;
}
// 3. 聚合管道错误处理
async function safeAggregation(pipeline, collectionName, options = {}) {
try {
const result = await db.collection(collectionName).aggregate(pipeline, {
allowDiskUse: true,
maxTimeMS: 30000, // 30秒超时
...options,
});
return {
success: true,
data: result,
count: result.length,
};
} catch (error) {
console.error("聚合执行错误:", error);
return {
success: false,
error: error.message,
data: [],
};
}
}
数据一致性保障
确保聚合结果的准确性:
// Node.js 示例 - 数据一致性检查
async function validateAggregationResult(
pipeline,
collectionName,
expectedCount
) {
// 1. 执行聚合管道
const aggregationResult = await db
.collection(collectionName)
.aggregate(pipeline);
// 2. 验证结果数量
if (aggregationResult.length !== expectedCount) {
console.warn(
`聚合结果数量不匹配: 期望 ${expectedCount}, 实际 ${aggregationResult.length}`
);
}
// 3. 数据完整性检查
const validationResult = await db.collection(collectionName).aggregate([
...pipeline,
{ $match: { $expr: { $ne: ["$totalAmount", null] } } }, // 检查关键字段
]);
if (validationResult.length !== aggregationResult.length) {
throw new Error("聚合结果包含空值,数据可能不完整");
}
return aggregationResult;
}
// 4. 聚合管道版本控制
const AGGREGATION_VERSIONS = {
v1: [
{ $match: { status: "completed" } },
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
],
v2: [
{
$match: {
status: "completed",
createdAt: { $gte: new Date("2024-01-01") },
},
},
{
$group: {
_id: "$category",
total: { $sum: "$amount" },
count: { $sum: 1 },
},
},
],
};
async function executeAggregationVersion(version, collectionName) {
const pipeline = AGGREGATION_VERSIONS[version];
if (!pipeline) {
throw new Error(`未知的聚合版本: ${version}`);
}
return await db.collection(collectionName).aggregate(pipeline);
}
安全查询实践
防止聚合管道注入攻击:
// Node.js 示例 - 安全的聚合管道构建
class SafeAggregationBuilder {
constructor() {
this.allowedStages = [
"$match",
"$project",
"$group",
"$sort",
"$limit",
"$skip",
"$lookup",
"$unwind",
"$facet",
"$bucket",
"$sample",
];
this.allowedOperators = [
"$sum",
"$avg",
"$min",
"$max",
"$count",
"$add",
"$subtract",
"$multiply",
"$divide",
"$concat",
"$substr",
"$toUpper",
"$toLower",
];
}
// 验证聚合管道
validatePipeline(pipeline) {
if (!Array.isArray(pipeline)) {
throw new Error("聚合管道必须是数组");
}
for (const stage of pipeline) {
if (typeof stage !== "object" || stage === null) {
throw new Error("聚合阶段必须是对象");
}
const stageName = Object.keys(stage)[0];
if (!this.allowedStages.includes(stageName)) {
throw new Error(`不允许的聚合阶段: ${stageName}`);
}
}
return true;
}
// 构建安全的聚合管道
buildSafePipeline(filters) {
const pipeline = [];
// 添加 $match 阶段
if (filters.match) {
pipeline.push({ $match: this.sanitizeMatch(filters.match) });
}
// 添加 $group 阶段
if (filters.group) {
pipeline.push({ $group: this.sanitizeGroup(filters.group) });
}
// 添加 $sort 阶段
if (filters.sort) {
pipeline.push({ $sort: this.sanitizeSort(filters.sort) });
}
// 添加 $limit 阶段
if (filters.limit && filters.limit <= 1000) {
// 限制最大数量
pipeline.push({ $limit: filters.limit });
}
return pipeline;
}
// 清理 $match 条件
sanitizeMatch(match) {
const allowedFields = ["status", "category", "createdAt", "amount"];
const sanitized = {};
for (const [field, value] of Object.entries(match)) {
if (allowedFields.includes(field)) {
sanitized[field] = value;
}
}
return sanitized;
}
// 清理 $group 条件
sanitizeGroup(group) {
const sanitized = { _id: group._id };
if (group.total) {
sanitized.total = { $sum: "$amount" };
}
if (group.count) {
sanitized.count = { $sum: 1 };
}
return sanitized;
}
// 清理 $sort 条件
sanitizeSort(sort) {
const allowedFields = ["createdAt", "amount", "status"];
const sanitized = {};
for (const [field, direction] of Object.entries(sort)) {
if (
allowedFields.includes(field) &&
(direction === 1 || direction === -1)
) {
sanitized[field] = direction;
}
}
return sanitized;
}
}
// 使用示例
const builder = new SafeAggregationBuilder();
const pipeline = builder.buildSafePipeline({
match: { status: "completed" },
group: { _id: "$category", total: true, count: true },
sort: { total: -1 },
limit: 10,
});
监控和调试
聚合管道的监控和调试技巧:
// Node.js 示例 - 聚合管道调试工具
class AggregationDebugger {
constructor(collection) {
this.collection = collection;
}
// 分析聚合管道性能
async analyzePerformance(pipeline) {
const explainResult = await this.collection.aggregate(pipeline).explain("executionStats");
return {
executionTime: explainResult.executionStats.executionTimeMillis,
totalDocsExamined: explainResult.executionStats.totalDocsExamined,
totalDocsReturned: explainResult.executionStats.totalDocsReturned,
stages: explainResult.stages.map(stage => ({
stage: stage.stage,
executionTime: stage.executionTimeMillis,
docsExamined: stage.totalDocsExamined,
docsReturned: stage.totalDocsReturned
}))
};
}
// 逐步执行聚合管道
async stepByStepExecution(pipeline) {
const results = [];
for (let i = 0; i < pipeline.length; i++) {
const partialPipeline = pipeline.slice(0, i + 1);
const result = await this.collection.aggregate(partialPipeline);
results.push({
stage: i + 1,
pipeline: partialPipeline,
resultCount: result.length,
sampleResult: result.slice(0, 3) // 显示前3个结果
});
}
return results;
}
// 检测性能瓶颈
async detectBottlenecks(pipeline) {
const analysis = await this.analyzePerformance(pipeline);
const bottlenecks = [];
for (const stage of analysis.stages) {
if (stage.executionTime > 1000) { // 超过1秒
bottlenecks.push({
stage: stage.stage,
executionTime: stage.executionTime,
suggestion: this.getOptimizationSuggestion(stage.stage)
});
}
}
return {
analysis,
bottlenecks,
recommendations: this.getRecommendations(analysis)
};
}
getOptimizationSuggestion(stage) {
const suggestions = {
'$match': '考虑在 $match 阶段使用索引',
'$group': '考虑使用 $facet 并行处理或添加 $limit',
'$lookup': '考虑使用 $lookup 的 pipeline 参数优化关联',
'$sort': '确保排序字段有索引',
'$unwind': '考虑使用 preserveNullAndEmptyArrays 选项'
};
return suggestions[stage] || '考虑优化此阶段';
}
getRecommendations(analysis) {
const recommendations = [];
if (analysis.totalDocsExamined > analysis.totalDocsReturned * 10) {
recommendations.push('考虑添加更多筛选条件减少扫描的文档数');
}
if (analysis.executionTime > 5000) {
recommendations.push('考虑使用 allowDiskUse 选项或分批处理');
}
if (analysis.totalDocsExamined > 100000) {
recommendations.push('考虑使用 $sample 进行数据采样');
}
return recommendations;
}
}
// 使用示例
const debugger = new AggregationDebugger(db.orders);
const pipeline = [
{ $match: { status: "completed" } },
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } }
];
const analysis = await debugger.analyzePerformance(pipeline);
console.log("性能分析:", analysis);
常见问题与解决方案
问题 1:聚合管道执行缓慢
原因分析:
- 缺少合适的索引
- 数据量过大
- 管道阶段顺序不当
解决方案:
// 1. 添加索引
db.orders.createIndex({ status: 1, createdAt: -1 });
db.orders.createIndex({ category: 1 });
// 2. 优化管道顺序
// ❌ 错误:先分组再筛选
db.orders.aggregate([
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $match: { total: { $gte: 1000 } } },
]);
// ✅ 正确:先筛选再分组
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $match: { total: { $gte: 1000 } } },
]);
// 3. 使用 allowDiskUse
db.orders.aggregate(pipeline, { allowDiskUse: true });
问题 2:内存不足错误
原因分析:
- 分组字段基数过高
- 数组展开导致数据量爆炸
- 没有使用磁盘存储
解决方案:
// 1. 使用 $bucket 减少分组基数
db.orders.aggregate([
{
$bucket: {
groupBy: "$amount",
boundaries: [0, 100, 500, 1000, 5000],
default: "5000+",
output: {
count: { $sum: 1 },
total: { $sum: "$amount" },
},
},
},
]);
// 2. 分批处理大数据集
async function processInBatches(pipeline, batchSize = 10000) {
let skip = 0;
const results = [];
while (true) {
const batchPipeline = [...pipeline, { $skip: skip }, { $limit: batchSize }];
const batch = await db.orders.aggregate(batchPipeline);
if (batch.length === 0) break;
results.push(...batch);
skip += batchSize;
}
return results;
}
// 3. 使用 $sample 采样
db.orders.aggregate([
{ $sample: { size: 10000 } },
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
]);
问题 3:聚合结果不准确
原因分析:
- 数据类型不一致
- 空值处理不当
- 分组逻辑错误
解决方案:
// 1. 数据类型标准化
db.orders.aggregate([
{
$addFields: {
amount: { $toDouble: "$amount" }, // 确保数值类型
createdAt: { $toDate: "$createdAt" }, // 确保日期类型
},
},
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
]);
// 2. 空值处理
db.orders.aggregate([
{
$addFields: {
amount: { $ifNull: ["$amount", 0] }, // 空值默认为0
},
},
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
]);
// 3. 验证聚合结果
async function validateAggregation(pipeline, expectedCount) {
const result = await db.orders.aggregate(pipeline);
if (result.length !== expectedCount) {
console.warn(
`聚合结果数量不匹配: 期望 ${expectedCount}, 实际 ${result.length}`
);
}
// 检查数据完整性
const nullValues = result.filter(
(item) => item.total === null || item.total === undefined
);
if (nullValues.length > 0) {
console.warn("聚合结果包含空值");
}
return result;
}
问题 4:$lookup 性能问题
原因分析:
- 关联字段没有索引
- 关联数据量过大
- 使用了不合适的关联方式
解决方案:
// 1. 为关联字段创建索引
db.products.createIndex({ category: 1 });
db.orders.createIndex({ "items.productId": 1 });
// 2. 使用 pipeline 参数优化 $lookup
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { productIds: "$items.productId" },
pipeline: [
{ $match: { $expr: { $in: ["$_id", "$$productIds"] } } },
{ $project: { name: 1, category: 1 } }, // 只选择需要的字段
],
as: "products",
},
},
]);
// 3. 使用 $unwind 和 $lookup 组合
db.orders.aggregate([
{ $unwind: "$items" },
{
$lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "product",
},
},
{ $unwind: "$product" },
{
$group: {
_id: "$product.category",
total: { $sum: { $multiply: ["$items.quantity", "$items.price"] } },
},
},
]);
企业级开发注意事项
聚合管道性能监控
监控聚合管道的执行性能:
// Node.js 示例 - 聚合性能监控
async function monitorAggregationPerformance(pipeline, collectionName) {
const startTime = Date.now();
try {
// 执行聚合管道
const result = await db.collection(collectionName).aggregate(pipeline);
const executionTime = Date.now() - startTime;
// 记录性能指标
console.log(`聚合执行时间: ${executionTime}ms`);
console.log(`返回文档数: ${result.length}`);
// 性能告警
if (executionTime > 5000) {
console.warn("聚合执行时间过长,建议优化");
}
return result;
} catch (error) {
console.error("聚合执行失败:", error.message);
throw error;
}
}
// 使用 explain 分析执行计划
async function analyzeAggregationPlan(pipeline, collectionName) {
const explainResult = await db
.collection(collectionName)
.aggregate(pipeline)
.explain("executionStats");
console.log("执行统计:", explainResult.executionStats);
console.log("使用的索引:", explainResult.executionStats.executionStages);
return explainResult;
}
聚合管道安全实践
防止聚合管道注入攻击:
// Node.js 示例 - 安全的聚合管道构建
function buildSafeAggregationPipeline(userInput, filters) {
// 1. 输入验证
const allowedFields = ["category", "status", "date"];
const allowedOperators = ["$eq", "$gt", "$lt", "$gte", "$lte", "$in"];
// 2. 构建安全的查询条件
const matchStage = {};
allowedFields.forEach((field) => {
if (filters[field] && typeof filters[field] === "string") {
matchStage[field] = filters[field];
}
});
// 3. 构建安全的聚合管道
const pipeline = [
{ $match: matchStage },
{
$group: {
_id: "$category",
count: { $sum: 1 },
total: { $sum: "$amount" },
},
},
{ $sort: { total: -1 } },
{ $limit: 100 },
];
return pipeline;
}
// 使用参数化聚合
async function executeSafeAggregation(collectionName, filters) {
const pipeline = buildSafeAggregationPipeline(null, filters);
// 设置执行超时
const result = await db
.collection(collectionName)
.aggregate(pipeline)
.maxTimeMS(30000) // 30秒超时
.toArray();
return result;
}
聚合管道资源管理
控制聚合管道的资源使用:
// Node.js 示例 - 资源管理
async function executeAggregationWithResourceControl(pipeline, options = {}) {
const {
maxMemoryUsage = 100 * 1024 * 1024, // 100MB
maxExecutionTime = 30000, // 30秒
allowDiskUse = true,
} = options;
try {
const result = await db.collection
.aggregate(pipeline, {
allowDiskUse,
maxTimeMS: maxExecutionTime,
cursor: { batchSize: 1000 },
})
.toArray();
// 检查内存使用情况
const memoryUsage = process.memoryUsage();
if (memoryUsage.heapUsed > maxMemoryUsage) {
console.warn("聚合执行内存使用过高");
}
return result;
} catch (error) {
if (error.code === 16945) {
// 内存限制错误
console.error("聚合执行超出内存限制,建议优化管道");
}
throw error;
}
}
聚合管道缓存策略
实现聚合结果缓存:
// Node.js 示例 - 聚合结果缓存
const Redis = require("redis");
const redis = Redis.createClient();
async function getCachedAggregationResult(
cacheKey,
pipeline,
collectionName,
ttl = 3600
) {
try {
// 尝试从缓存获取
const cachedResult = await redis.get(cacheKey);
if (cachedResult) {
console.log("从缓存获取聚合结果");
return JSON.parse(cachedResult);
}
// 执行聚合管道
const result = await db
.collection(collectionName)
.aggregate(pipeline)
.toArray();
// 缓存结果
await redis.setex(cacheKey, ttl, JSON.stringify(result));
console.log("聚合结果已缓存");
return result;
} catch (error) {
console.error("缓存操作失败:", error);
// 降级到直接执行聚合
return await db.collection(collectionName).aggregate(pipeline).toArray();
}
}
// 使用示例
async function getSalesReport(dateRange) {
const cacheKey = `sales_report_${dateRange.start}_${dateRange.end}`;
const pipeline = [
{
$match: {
createdAt: {
$gte: new Date(dateRange.start),
$lte: new Date(dateRange.end),
},
},
},
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
];
return await getCachedAggregationResult(cacheKey, pipeline, "orders", 1800); // 30分钟缓存
}
聚合管道监控和告警
设置聚合管道监控:
// Node.js 示例 - 聚合监控系统
class AggregationMonitor {
constructor() {
this.metrics = {
totalExecutions: 0,
averageExecutionTime: 0,
errorCount: 0,
slowQueries: [],
};
}
async executeWithMonitoring(pipeline, collectionName, options = {}) {
const startTime = Date.now();
const executionId = this.generateExecutionId();
try {
// 记录执行开始
this.recordExecutionStart(executionId, pipeline, collectionName);
// 执行聚合
const result = await db
.collection(collectionName)
.aggregate(pipeline, options)
.toArray();
// 记录执行成功
const executionTime = Date.now() - startTime;
this.recordExecutionSuccess(executionId, executionTime, result.length);
// 检查性能告警
this.checkPerformanceAlerts(executionTime, pipeline);
return result;
} catch (error) {
// 记录执行失败
this.recordExecutionError(executionId, error);
throw error;
}
}
recordExecutionStart(executionId, pipeline, collectionName) {
console.log(`聚合执行开始 [${executionId}]: ${collectionName}`);
}
recordExecutionSuccess(executionId, executionTime, resultCount) {
this.metrics.totalExecutions++;
this.metrics.averageExecutionTime =
(this.metrics.averageExecutionTime + executionTime) / 2;
console.log(
`聚合执行成功 [${executionId}]: ${executionTime}ms, ${resultCount}条结果`
);
}
recordExecutionError(executionId, error) {
this.metrics.errorCount++;
console.error(`聚合执行失败 [${executionId}]:`, error.message);
}
checkPerformanceAlerts(executionTime, pipeline) {
if (executionTime > 5000) {
this.metrics.slowQueries.push({
executionTime,
pipeline: JSON.stringify(pipeline),
timestamp: new Date(),
});
console.warn(`慢查询告警: 执行时间 ${executionTime}ms`);
}
}
generateExecutionId() {
return `agg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
getMetrics() {
return {
...this.metrics,
errorRate: this.metrics.errorCount / this.metrics.totalExecutions,
slowQueryCount: this.metrics.slowQueries.length,
};
}
}
// 使用监控系统
const monitor = new AggregationMonitor();
async function executeMonitoredAggregation(pipeline, collectionName) {
return await monitor.executeWithMonitoring(pipeline, collectionName);
}
下一篇文章,我们将深入探讨 MongoDB 的视图与物化视图,包括虚拟集合、数据视图、性能优化等内容。敬请期待!