MongoDB 聚合管道完全指南:数据分析的强大设备

news/2025/10/21 9:17:35/文章来源:https://www.cnblogs.com/wzzkaifa/p/19154104

想象一下,你需要从海量的用户行为数据中分析出最有价值的商业洞察。传统的查询只能获取原始数据,而聚合管道就像是一个强大的数据分析工厂,能够将原始数据经过多道工序处理,最终产出精炼的商业报告。

MongoDB 的聚合管道(Aggregation Pipeline)是数据分析的核心工具,它通过一系列有序的阶段(Stage)对数据进行转换、筛选、分组、统计,最终生成复杂的分析结果。从简单的数据统计到复杂的商业智能分析,聚合管道都能胜任。

今天,我们将深入探索 MongoDB 聚合管道的完整体系,通过丰富的实例和实际业务场景,让你掌握这个强大的数据分析工具,成为数据洞察的专家。

目录

  1. 为什么聚合管道如此重要?
  2. 聚合管道基础架构
  3. 核心聚合阶段详解
    • 数据筛选:$match 阶段
    • 字段投影:$project 阶段
    • 数据分组:$group 阶段
    • 数据排序:$sort 阶段
    • 数据限制:$limit 和 $skip 阶段
  4. 高级聚合阶段
    • 数据关联:$lookup 阶段
    • 数组处理:$unwind 阶段
    • 数据分面:$facet 阶段
    • 数据合并:$merge 和 $out 阶段
  5. 聚合操作符详解
    • 数学操作符:$sum, $avg, $min, $max
    • 数组操作符:$push, $addToSet, $first, $last
    • 字符串操作符:$concat, $substr, $toUpper
    • 日期操作符:$year, $month, $dayOfMonth
  6. 实际应用场景
  7. 性能优化策略
  8. 企业级开发注意事项
  9. 常见问题与解决方案
  10. 总结:掌握聚合分析

为什么聚合管道如此重要?

聚合是数据分析的核心

在数据驱动的时代,原始数据本身价值有限,真正的价值在于从数据中提取洞察。MongoDB 的聚合管道提供了:

聚合管道 vs 传统查询对比

让我们看看聚合管道相比传统查询的优势:

功能需求传统查询聚合管道优势说明
数据统计需要应用层计算数据库层直接计算性能更好,减少网络传输
多表关联多次查询后合并单次查询完成减少查询次数,提升效率
复杂计算应用层处理数据库层处理利用数据库优化能力
数据转换应用层转换管道阶段转换更灵活的数据处理
实时分析难以实现原生支持支持实时数据流处理

聚合管道的核心优势:

  • 性能优化:在数据库层完成计算,减少数据传输
  • 功能强大:支持复杂的数据分析和转换
  • 灵活组合:阶段可以灵活组合,适应各种需求
  • 实时处理:支持流式数据处理和实时分析

聚合管道基础架构

聚合管道的基本结构

// MongoDB Shell / Node.js
db.collection.aggregate([
{
$stage1: {
/* 阶段1的配置 */
},
},
{
$stage2: {
/* 阶段2的配置 */
},
},
{
$stage3: {
/* 阶段3的配置 */
},
},
// ... 更多阶段
]);

管道的特点:

  • 有序执行:阶段按顺序执行,前一个阶段的输出是下一个阶段的输入
  • 流式处理:数据流经管道,每个阶段处理一部分数据
  • 灵活组合:可以根据需求组合不同的阶段

基础聚合示例

// MongoDB Shell / Node.js
// 1. 简单的数据统计
db.orders.aggregate([
{ $match: { status: "completed" } }, // 筛选已完成的订单
{
$group: {
_id: "$category",
totalAmount: { $sum: "$amount" },
count: { $sum: 1 },
},
}, // 按分类分组统计
{ $sort: { totalAmount: -1 } }, // 按总金额降序排列
]);
// 2. 用户行为分析
db.user_activities.aggregate([
{
$match: {
timestamp: { $gte: new Date("2024-01-01") },
},
}, // 筛选2024年的数据
{
$group: {
_id: {
userId: "$userId",
action: "$action",
},
count: { $sum: 1 },
lastActivity: { $max: "$timestamp" },
},
}, // 按用户和行为分组
{ $sort: { count: -1 } }, // 按活动次数排序
{ $limit: 10 }, // 只返回前10条
]);

核心聚合阶段详解

数据筛选:$match 阶段

$match 阶段:在管道开始处筛选数据,类似于 SQL 的 WHERE 子句

// MongoDB Shell / Node.js
// 1. 基础筛选
db.products.aggregate([
{
$match: {
category: "电子产品",
price: { $gte: 1000 },
status: "active",
},
},
]);
// 2. 复杂条件筛选
db.orders.aggregate([
{
$match: {
$and: [
{ status: { $in: ["pending", "processing"] } },
{ createdAt: { $gte: new Date("2024-01-01") } },
{ totalAmount: { $gte: 500 } },
],
},
},
]);
// 3. 正则表达式筛选
db.articles.aggregate([
{
$match: {
title: { $regex: /MongoDB|数据库/, $options: "i" },
publishDate: { $gte: new Date("2024-01-01") },
},
},
]);

$match 阶段的最佳实践:

  • 早期筛选:尽量在管道开始处使用 $match 减少数据量
  • 索引利用:确保 $match 条件能够使用索引
  • 条件优化:将选择性高的条件放在前面

字段投影:$project 阶段

$project 阶段:控制输出字段,类似于 SQL 的 SELECT 子句

// MongoDB Shell / Node.js
// 1. 基础字段投影
db.users.aggregate([
{
$project: {
username: 1,
email: 1,
createdAt: 1,
_id: 0, // 排除 _id 字段
},
},
]);
// 2. 计算字段
db.products.aggregate([
{
$project: {
name: 1,
price: 1,
category: 1,
// 计算折扣价格
discountedPrice: {
$multiply: ["$price", { $subtract: [1, "$discount"] }],
},
// 计算利润率
profitMargin: {
$multiply: [
{ $divide: [{ $subtract: ["$price", "$cost"] }, "$price"] },
100,
],
},
},
},
]);
// 3. 条件投影
db.orders.aggregate([
{
$project: {
orderId: 1,
customerName: 1,
totalAmount: 1,
// 根据金额设置订单等级
orderLevel: {
$switch: {
branches: [
{ case: { $gte: ["$totalAmount", 10000] }, then: "VIP" },
{ case: { $gte: ["$totalAmount", 5000] }, then: "高级" },
{ case: { $gte: ["$totalAmount", 1000] }, then: "普通" },
],
default: "小额",
},
},
},
},
]);

数据分组:$group 阶段

$group 阶段:按指定字段分组并计算聚合值,类似于 SQL 的 GROUP BY

// MongoDB Shell / Node.js
// 1. 基础分组统计
db.orders.aggregate([
{
$group: {
_id: "$category", // 按分类分组
totalAmount: { $sum: "$amount" }, // 总金额
averageAmount: { $avg: "$amount" }, // 平均金额
orderCount: { $sum: 1 }, // 订单数量
maxAmount: { $max: "$amount" }, // 最大金额
minAmount: { $min: "$amount" }, // 最小金额
},
},
]);
// 2. 多字段分组
db.sales.aggregate([
{
$group: {
_id: {
year: { $year: "$date" },
month: { $month: "$date" },
region: "$region",
},
totalSales: { $sum: "$amount" },
averageSales: { $avg: "$amount" },
salesCount: { $sum: 1 },
},
},
]);
// 3. 复杂分组统计
db.user_activities.aggregate([
{ $match: { action: "purchase" } },
{
$group: {
_id: "$userId",
totalPurchases: { $sum: 1 },
totalSpent: { $sum: "$amount" },
firstPurchase: { $min: "$timestamp" },
lastPurchase: { $max: "$timestamp" },
averagePurchase: { $avg: "$amount" },
// 计算购买频率(天)
purchaseFrequency: {
$divide: [
{ $subtract: [{ $max: "$timestamp" }, { $min: "$timestamp" }] },
86400000, // 毫秒转天数
],
},
},
},
]);

数据排序:$sort 阶段

$sort 阶段:对数据进行排序,类似于 SQL 的 ORDER BY

// MongoDB Shell / Node.js
// 1. 单字段排序
db.products.aggregate([
{ $sort: { price: -1 } }, // 按价格降序
]);
// 2. 多字段排序
db.orders.aggregate([
{
$sort: {
status: 1, // 先按状态升序
createdAt: -1, // 再按创建时间降序
},
},
]);
// 3. 结合其他阶段
db.sales.aggregate([
{
$group: {
_id: "$productId",
totalSales: { $sum: "$amount" },
salesCount: { $sum: 1 },
},
},
{ $sort: { totalSales: -1 } },
{ $limit: 10 }, // 只返回前10名
]);

数据限制:$limit 和 $skip 阶段

$limit 阶段:限制返回的文档数量

$skip 阶段:跳过指定数量的文档

// MongoDB Shell / Node.js
// 1. 分页查询
db.products.aggregate([
{ $match: { category: "电子产品" } },
{ $sort: { price: 1 } },
{ $skip: 20 }, // 跳过前20条
{ $limit: 10 }, // 返回10条
]);
// 2. Top N 查询
db.orders.aggregate([
{
$group: {
_id: "$customerId",
totalAmount: { $sum: "$amount" },
},
},
{ $sort: { totalAmount: -1 } },
{ $limit: 5 }, // 返回前5名客户
]);
// 3. 数据采样
db.user_activities.aggregate([
{ $match: { action: "login" } },
{ $sample: { size: 1000 } }, // 随机采样1000条
{
$group: {
_id: { $hour: "$timestamp" },
loginCount: { $sum: 1 },
},
},
]);

高级聚合阶段

数据关联:$lookup 阶段

$lookup 阶段:关联其他集合的数据,类似于 SQL 的 JOIN

// MongoDB Shell / Node.js
// 1. 基础关联
db.orders.aggregate([
{
$lookup: {
from: "customers", // 关联的集合
localField: "customerId", // 本地字段
foreignField: "_id", // 外部字段
as: "customer", // 输出字段名
},
},
{
$project: {
orderId: 1,
totalAmount: 1,
"customer.name": 1,
"customer.email": 1,
},
},
]);
// 2. 条件关联
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { orderItems: "$items" },
pipeline: [
{
$match: {
$expr: { $in: ["$_id", "$$orderItems.productId"] },
},
},
],
as: "productDetails",
},
},
]);
// 3. 多表关联
db.orders.aggregate([
{
$lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customer",
},
},
{
$lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "products",
},
},
{ $unwind: "$customer" }, // 展开客户信息
{
$project: {
orderId: 1,
customerName: "$customer.name",
totalAmount: 1,
items: 1,
products: 1,
},
},
]);

数组处理:$unwind 阶段

$unwind 阶段:展开数组字段,将数组中的每个元素作为单独的文档

// MongoDB Shell / Node.js
// 1. 基础数组展开
db.orders.aggregate([
{ $unwind: "$items" }, // 展开订单项数组
{
$lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "product",
},
},
{ $unwind: "$product" },
{
$project: {
orderId: 1,
productName: "$product.name",
quantity: "$items.quantity",
price: "$items.price",
},
},
]);
// 2. 保留空数组
db.articles.aggregate([
{
$unwind: {
path: "$tags",
preserveNullAndEmptyArrays: true, // 保留没有标签的文章
},
},
{
$group: {
_id: "$tags",
count: { $sum: 1 },
},
},
]);
// 3. 数组索引展开
db.users.aggregate([
{
$unwind: {
path: "$skills",
includeArrayIndex: "skillIndex", // 包含数组索引
},
},
{
$project: {
username: 1,
skill: "$skills",
skillIndex: 1,
},
},
]);

数据分面:$facet 阶段

$facet 阶段:在同一组输入文档上执行多个聚合管道

// MongoDB Shell / Node.js
// 1. 多维度分析
db.products.aggregate([
{
$facet: {
// 按分类统计
byCategory: [
{
$group: {
_id: "$category",
count: { $sum: 1 },
avgPrice: { $avg: "$price" },
},
},
],
// 按价格区间统计
byPriceRange: [
{
$bucket: {
groupBy: "$price",
boundaries: [0, 100, 500, 1000, 5000],
default: "5000+",
output: {
count: { $sum: 1 },
avgPrice: { $avg: "$price" },
},
},
},
],
// 按品牌统计
byBrand: [
{
$group: {
_id: "$brand",
count: { $sum: 1 },
},
},
{ $sort: { count: -1 } },
{ $limit: 10 },
],
},
},
]);
// 2. 电商数据分析
db.orders.aggregate([
{ $match: { status: "completed" } },
{
$facet: {
// 销售趋势
salesTrend: [
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
},
totalSales: { $sum: "$totalAmount" },
orderCount: { $sum: 1 },
},
},
{ $sort: { "_id.year": 1, "_id.month": 1 } },
],
// 客户分析
customerAnalysis: [
{
$group: {
_id: "$customerId",
totalSpent: { $sum: "$totalAmount" },
orderCount: { $sum: 1 },
},
},
{
$bucket: {
groupBy: "$totalSpent",
boundaries: [0, 1000, 5000, 10000, 50000],
default: "50000+",
output: {
customerCount: { $sum: 1 },
avgSpent: { $avg: "$totalSpent" },
},
},
},
],
// 产品分析
productAnalysis: [
{ $unwind: "$items" },
{
$group: {
_id: "$items.productId",
totalQuantity: { $sum: "$items.quantity" },
totalRevenue: {
$sum: { $multiply: ["$items.quantity", "$items.price"] },
},
},
},
{ $sort: { totalRevenue: -1 } },
{ $limit: 20 },
],
},
},
]);

数据合并:$merge 和 $out 阶段

$out 阶段:将聚合结果写入新集合

$merge 阶段:将聚合结果合并到现有集合

// MongoDB Shell / Node.js
// 1. 使用 $out 创建报表
db.orders.aggregate([
{ $match: {
createdAt: { $gte: new Date("2024-01-01") }
}},
{ $group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
category: "$category"
},
totalSales: { $sum: "$totalAmount" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$totalAmount" }
}},
{ $sort: { "_id.year": 1, "_id.month": 1 } },
{ $out: "monthly_sales_report" } // 输出到新集合
]);
// 2. 使用 $merge 更新现有数据
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: {
_id: "$customerId",
totalSpent: { $sum: "$totalAmount" },
orderC

{ KaTeX parse error: Expected '}', got 'EOF' at end of input: …p: { _id: "customerId",
totalSpent: { sum:"sum: "sum:"totalAmount" },
orderCount: { $sum: 1 },
avgOrderValue: { avg:"avg: "avg:"totalAmount" }
}},
{ $merge: {
into: “customer_summary”,
whenMatched: “replace”,
whenNotMatched: “insert”
}}
]);

// 3. 增量更新报表
db.orders.aggregate([
{ $match: {
createdAt: { $gte: new Date(“2024-01-01”) },
updatedAt: { $gte: new Date(“2024-01-01”) }
}},
{ $group: {
_id: {
year: { year:"year: "year:"createdAt" },
month: { month:"month: "month:"createdAt" }
},
totalSales: { sum:"sum: "sum:"totalAmount" },
orderCount: { $sum: 1 }
}},
{ $merge: {
into: “monthly_sales”,
on: [“_id.year”, “_id.month”],
whenMatched: “merge”,
whenNotMatched: “insert”
}}
]);

## 聚合操作符详解
### 数学操作符:$sum, $avg, $min, $max
**基础数学操作符**:用于数值计算和统计
```javascript
// MongoDB Shell / Node.js
// 1. 基础统计
db.orders.aggregate([{ $group: {_id: "$category",totalAmount: { $sum: "$amount" }, // 求和averageAmount: { $avg: "$amount" }, // 平均值maxAmount: { $max: "$amount" }, // 最大值minAmount: { $min: "$amount" }, // 最小值count: { $sum: 1 } // 计数}}
]);
// 2. 复杂计算
db.products.aggregate([{ $project: {name: 1,price: 1,cost: 1,// 计算利润率profitMargin: {$multiply: [{ $divide: [{ $subtract: ["$price", "$cost"] },"$price"]},100]},// 计算折扣价格discountedPrice: {$multiply: ["$price",{ $subtract: [1, { $ifNull: ["$discount", 0] }] }]}}}
]);
// 3. 条件统计
db.sales.aggregate([{ $group: {_id: "$region",totalSales: { $sum: "$amount" },// 只统计大于1000的销售highValueSales: {$sum: {$cond: [{ $gte: ["$amount", 1000] },"$amount",0]}},// 计算高价值销售占比highValueRatio: {$multiply: [{ $divide: [{ $sum: {$cond: [{ $gte: ["$amount", 1000] },1,0]}},{ $sum: 1 }]},100]}}}
]);

数组操作符:$push, $addToSet, $first, $last

数组操作符:用于处理数组数据的聚合

// MongoDB Shell / Node.js
// 1. 数组聚合
db.user_activities.aggregate([
{
$group: {
_id: "$userId",
// 收集所有活动类型
allActivities: { $push: "$action" },
// 收集唯一活动类型
uniqueActivities: { $addToSet: "$action" },
// 第一个和最后一个活动
firstActivity: { $first: "$action" },
lastActivity: { $last: "$action" },
// 活动次数
activityCount: { $sum: 1 },
},
},
]);
// 2. 复杂数组操作
db.orders.aggregate([
{ $unwind: "$items" },
{
$group: {
_id: "$customerId",
// 收集所有购买的产品
purchasedProducts: { $push: "$items.productId" },
// 收集唯一产品
uniqueProducts: { $addToSet: "$items.productId" },
// 收集产品详情
productDetails: {
$push: {
productId: "$items.productId",
quantity: "$items.quantity",
price: "$items.price",
},
},
// 总消费
totalSpent: { $sum: { $multiply: ["$items.quantity", "$items.price"] } },
},
},
]);
// 3. 数组去重和排序
db.articles.aggregate([
{ $unwind: "$tags" },
{
$group: {
_id: "$tags",
articleCount: { $sum: 1 },
// 收集文章标题
articles: { $push: "$title" },
// 收集作者
authors: { $addToSet: "$author" },
},
},
{ $sort: { articleCount: -1 } },
{ $limit: 10 },
]);

字符串操作符:$concat, $substr, $toUpper

字符串操作符:用于字符串处理和格式化

// MongoDB Shell / Node.js
// 1. 字符串连接和格式化
db.users.aggregate([
{
$project: {
// 连接姓名
fullName: {
$concat: [
{ $ifNull: ["$firstName", ""] },
" ",
{ $ifNull: ["$lastName", ""] },
],
},
// 格式化邮箱
emailDomain: {
$substr: [
"$email",
{ $add: [{ $indexOfBytes: ["$email", "@"] }, 1] },
-1,
],
},
// 用户名大写
usernameUpper: { $toUpper: "$username" },
},
},
]);
// 2. 地址格式化
db.customers.aggregate([
{
$project: {
customerId: 1,
// 格式化完整地址
fullAddress: {
$concat: [
{ $ifNull: ["$address.street", ""] },
", ",
{ $ifNull: ["$address.city", ""] },
", ",
{ $ifNull: ["$address.province", ""] },
" ",
{ $ifNull: ["$address.postalCode", ""] },
],
},
// 提取省份代码
provinceCode: {
$substr: [{ $ifNull: ["$address.province", ""] }, 0, 2],
},
},
},
]);
// 3. 文本分析
db.articles.aggregate([
{
$project: {
title: 1,
// 计算标题长度
titleLength: { $strLenBytes: "$title" },
// 提取前50个字符作为摘要
summary: {
$substr: ["$title", 0, { $min: [50, { $strLenBytes: "$title" }] }],
},
// 标题关键词(假设用空格分割)
keywords: {
$split: [{ $toLower: "$title" }, " "],
},
},
},
]);

日期操作符:$year, $month, $dayOfMonth

日期操作符:用于日期时间数据的处理和分析

// MongoDB Shell / Node.js
// 1. 日期分组统计
db.orders.aggregate([
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
day: { $dayOfMonth: "$createdAt" },
},
totalAmount: { $sum: "$totalAmount" },
orderCount: { $sum: 1 },
},
},
{ $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1 } },
]);
// 2. 时间范围分析
db.user_activities.aggregate([
{
$project: {
userId: 1,
action: 1,
timestamp: 1,
// 提取时间组件
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
dayOfWeek: { $dayOfWeek: "$timestamp" },
hour: { $hour: "$timestamp" },
// 计算年龄(天)
daysSinceActivity: {
$divide: [{ $subtract: [new Date(), "$timestamp"] }, 86400000],
},
},
},
]);
// 3. 季节性分析
db.sales.aggregate([
{
$project: {
amount: 1,
date: 1,
// 季节判断
season: {
$switch: {
branches: [
{ case: { $in: [{ $month: "$date" }, [12, 1, 2]] }, then: "冬季" },
{ case: { $in: [{ $month: "$date" }, [3, 4, 5]] }, then: "春季" },
{ case: { $in: [{ $month: "$date" }, [6, 7, 8]] }, then: "夏季" },
{ case: { $in: [{ $month: "$date" }, [9, 10, 11]] }, then: "秋季" },
],
default: "未知",
},
},
// 季度
quarter: {
$ceil: { $divide: [{ $month: "$date" }, 3] },
},
},
},
{
$group: {
_id: "$season",
totalSales: { $sum: "$amount" },
avgSales: { $avg: "$amount" },
salesCount: { $sum: 1 },
},
},
]);

实际应用场景

场景 1:电商销售分析系统

// Node.js 示例 - 电商销售分析API
async function getSalesAnalysis(startDate, endDate) {
const pipeline = [
// 1. 筛选时间范围
{
$match: {
createdAt: { $gte: new Date(startDate), $lte: new Date(endDate) },
status: "completed",
},
},
// 2. 展开订单项
{ $unwind: "$items" },
// 3. 关联产品信息
{
$lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "product",
},
},
{ $unwind: "$product" },
// 4. 关联客户信息
{
$lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customer",
},
},
{ $unwind: "$customer" },
// 5. 多维度分析
{
$facet: {
// 销售趋势
salesTrend: [
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
day: { $dayOfMonth: "$createdAt" },
},
totalSales: {
$sum: { $multiply: ["$items.quantity", "$items.price"] },
},
orderCount: { $sum: 1 },
uniqueCustomers: { $addToSet: "$customerId" },
},
},
{
$addFields: {
uniqueCustomerCount: { $size: "$uniqueCustomers" },
},
},
{ $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1 } },
],
// 产品分析
productAnalysis: [
{
$group: {
_id: "$product.category",
totalRevenue: {
$sum: { $multiply: ["$items.quantity", "$items.price"] },
},
totalQuantity: { $sum: "$items.quantity" },
avgPrice: { $avg: "$items.price" },
productCount: { $addToSet: "$items.productId" },
},
},
{
$addFields: {
uniqueProductCount: { $size: "$productCount" },
},
},
{ $sort: { totalRevenue: -1 } },
],
// 客户分析
customerAnalysis: [
{
$group: {
_id: "$customerId",
customerName: { $first: "$customer.name" },
totalSpent: {
$sum: { $multiply: ["$items.quantity", "$items.price"] },
},
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$totalAmount" },
lastOrderDate: { $max: "$createdAt" },
},
},
{ $sort: { totalSpent: -1 } },
{ $limit: 20 },
],
},
},
];
return await db.orders.aggregate(pipeline);
}

场景 2:用户行为分析系统

// Node.js 示例 - 用户行为分析
async function analyzeUserBehavior(userId, timeRange) {
const pipeline = [
// 1. 筛选用户和时间范围
{
$match: {
userId: new ObjectId(userId),
timestamp: {
$gte: new Date(timeRange.start),
$lte: new Date(timeRange.end),
},
},
},
// 2. 行为模式分析
{
$facet: {
// 活动频率分析
activityFrequency: [
{
$group: {
_id: {
hour: { $hour: "$timestamp" },
dayOfWeek: { $dayOfWeek: "$timestamp" },
},
activityCount: { $sum: 1 },
uniqueActions: { $addToSet: "$action" },
},
},
{
$addFields: {
uniqueActionCount: { $size: "$uniqueActions" },
},
},
{ $sort: { activityCount: -1 } },
],
// 行为路径分析
behaviorPath: [
{ $sort: { timestamp: 1 } },
{
$group: {
_id: "$userId",
actions: { $push: "$action" },
timestamps: { $push: "$timestamp" },
},
},
{
$project: {
behaviorSequence: {
$map: {
input: { $range: [0, { $size: "$actions" }] },
as: "index",
in: {
action: { $arrayElemAt: ["$actions", "$$index"] },
timestamp: { $arrayElemAt: ["$timestamps", "$$index"] },
},
},
},
},
},
],
// 功能使用统计
featureUsage: [
{
$group: {
_id: "$action",
usageCount: { $sum: 1 },
uniqueUsers: { $addToSet: "$userId" },
avgSessionDuration: { $avg: "$sessionDuration" },
},
},
{
$addFields: {
uniqueUserCount: { $size: "$uniqueUsers" },
},
},
{ $sort: { usageCount: -1 } },
],
},
},
];
return await db.user_activities.aggregate(pipeline);
}

场景 3:实时数据监控系统

// Node.js 示例 - 实时监控数据聚合
async function getRealTimeMetrics() {
const now = new Date();
const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000);
const pipeline = [
// 1. 筛选最近一小时的数据
{
$match: {
timestamp: { $gte: oneHourAgo, $lte: now },
},
},
// 2. 实时指标计算
{
$facet: {
// 系统性能指标
performanceMetrics: [
{
$group: {
_id: null,
avgResponseTime: { $avg: "$responseTime" },
maxResponseTime: { $max: "$responseTime" },
minResponseTime: { $min: "$responseTime" },
totalRequests: { $sum: 1 },
errorCount: {
$sum: {
$cond: [{ $gte: ["$statusCode", 400] }, 1, 0],
},
},
},
},
{
$addFields: {
errorRate: {
$multiply: [
{ $divide: ["$errorCount", "$totalRequests"] },
100,
],
},
},
},
],
// 用户活跃度
userActivity: [
{
$group: {
_id: {
hour: { $hour: "$timestamp" },
minute: { $minute: "$timestamp" },
},
activeUsers: { $addToSet: "$userId" },
requestCount: { $sum: 1 },
},
},
{
$addFields: {
uniqueUserCount: { $size: "$activeUsers" },
},
},
{ $sort: { "_id.hour": 1, "_id.minute": 1 } },
],
// 错误分析
errorAnalysis: [
{ $match: { statusCode: { $gte: 400 } } },
{
$group: {
_id: "$statusCode",
count: { $sum: 1 },
avgResponseTime: { $avg: "$responseTime" },
endpoints: { $addToSet: "$endpoint" },
},
},
{ $sort: { count: -1 } },
],
},
},
];
return await db.system_logs.aggregate(pipeline);
}

性能优化策略

索引优化

为聚合管道创建合适的索引

// MongoDB Shell / Node.js
// 1. 单字段索引
db.orders.createIndex({ createdAt: 1 });
db.orders.createIndex({ status: 1 });
db.orders.createIndex({ customerId: 1 });
// 2. 复合索引
db.orders.createIndex({ status: 1, createdAt: -1 });
db.orders.createIndex({ customerId: 1, status: 1 });
// 3. 文本索引
db.products.createIndex({
name: "text",
description: "text",
});
// 4. 分析索引使用情况
db.orders
.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$customerId", total: { $sum: "$amount" } } },
])
.explain("executionStats");

管道优化

优化聚合管道的执行效率

// MongoDB Shell / Node.js
// 1. 早期筛选 - 在管道开始处使用 $match
db.orders.aggregate([
{ $match: { status: "completed" } }, // 尽早筛选
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
]);
// 2. 投影优化 - 只选择需要的字段
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $project: { category: 1, amount: 1 } }, // 只选择必要字段
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
]);
// 3. 排序优化 - 利用索引进行排序
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $sort: { createdAt: -1 } }, // 利用索引排序
{ $limit: 100 },
]);
// 4. 分页优化 - 使用游标而不是 skip
db.orders.aggregate([
{
$match: {
status: "completed",
_id: { $gt: lastProcessedId }, // 使用游标分页
},
},
{ $sort: { _id: 1 } },
{ $limit: 100 },
]);

内存管理

控制聚合管道的内存使用

// MongoDB Shell / Node.js
// 1. 使用 allowDiskUse 选项
db.largeCollection.aggregate(
[{ $group: { _id: "$category", count: { $sum: 1 } } }],
{ allowDiskUse: true }
);
// 2. 分批处理大数据集
async function processLargeDataset() {
const batchSize = 10000;
let lastId = null;
while (true) {
const pipeline = [
{ $match: lastId ? { _id: { $gt: lastId } } : {} },
{ $sort: { _id: 1 } },
{ $limit: batchSize },
{ $group: { _id: "$category", count: { $sum: 1 } } },
];
const results = await db.largeCollection.aggregate(pipeline);
if (results.length === 0) break;
// 处理结果
console.log(`处理了 ${results.length} 条记录`);
lastId = results[results.length - 1]._id;
}
}
// 3. 使用 $sample 进行数据采样
db.largeCollection.aggregate([
{ $sample: { size: 1000 } }, // 随机采样
{ $group: { _id: "$category", count: { $sum: 1 } } },
]);

企业级开发注意事项

聚合管道性能监控

监控聚合管道的执行性能

// Node.js 示例 - 聚合性能监控
async function monitorAggregationPerformance(pipeline, collectionName) {
const startTime = Date.now();
try {
const result = await db.collection(collectionName).aggregate(pipeline);
const executionTime = Date.now() - startTime;
console.log(`聚合执行时间: ${executionTime}ms`);
console.log(`返回文档数: ${result.length}`);
// 记录性能指标
await db.performance_metrics.insertOne({
operation: "aggregation",
collection: collectionName,
executionTime: executionTime,
resultCount: result.length,
timestamp: new Date(),
});
return result;
} catch (error) {
console.error("聚合执行失败:", error);
throw error;
}
}
// 2. 聚合管道缓存策略
const aggregationCache = new Map();
async function getCachedAggregation(
cacheKey,
pipeline,
collectionName,
ttl = 300000
) {
const cached = aggregationCache.get(cacheKey);
if (cached && Date.now() - cached.timestamp < ttl) {
console.log("使用缓存结果");
return cached.data;
}
const result = await db.collection(collectionName).aggregate(pipeline);
aggregationCache.set(cacheKey, {
data: result,
timestamp: Date.now(),
});
return result;
}
// 3. 聚合管道错误处理
async function safeAggregation(pipeline, collectionName, options = {}) {
try {
const result = await db.collection(collectionName).aggregate(pipeline, {
allowDiskUse: true,
maxTimeMS: 30000, // 30秒超时
...options,
});
return {
success: true,
data: result,
count: result.length,
};
} catch (error) {
console.error("聚合执行错误:", error);
return {
success: false,
error: error.message,
data: [],
};
}
}

数据一致性保障

确保聚合结果的准确性

// Node.js 示例 - 数据一致性检查
async function validateAggregationResult(
pipeline,
collectionName,
expectedCount
) {
// 1. 执行聚合管道
const aggregationResult = await db
.collection(collectionName)
.aggregate(pipeline);
// 2. 验证结果数量
if (aggregationResult.length !== expectedCount) {
console.warn(
`聚合结果数量不匹配: 期望 ${expectedCount}, 实际 ${aggregationResult.length}`
);
}
// 3. 数据完整性检查
const validationResult = await db.collection(collectionName).aggregate([
...pipeline,
{ $match: { $expr: { $ne: ["$totalAmount", null] } } }, // 检查关键字段
]);
if (validationResult.length !== aggregationResult.length) {
throw new Error("聚合结果包含空值,数据可能不完整");
}
return aggregationResult;
}
// 4. 聚合管道版本控制
const AGGREGATION_VERSIONS = {
v1: [
{ $match: { status: "completed" } },
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
],
v2: [
{
$match: {
status: "completed",
createdAt: { $gte: new Date("2024-01-01") },
},
},
{
$group: {
_id: "$category",
total: { $sum: "$amount" },
count: { $sum: 1 },
},
},
],
};
async function executeAggregationVersion(version, collectionName) {
const pipeline = AGGREGATION_VERSIONS[version];
if (!pipeline) {
throw new Error(`未知的聚合版本: ${version}`);
}
return await db.collection(collectionName).aggregate(pipeline);
}

安全查询实践

防止聚合管道注入攻击

// Node.js 示例 - 安全的聚合管道构建
class SafeAggregationBuilder {
constructor() {
this.allowedStages = [
"$match",
"$project",
"$group",
"$sort",
"$limit",
"$skip",
"$lookup",
"$unwind",
"$facet",
"$bucket",
"$sample",
];
this.allowedOperators = [
"$sum",
"$avg",
"$min",
"$max",
"$count",
"$add",
"$subtract",
"$multiply",
"$divide",
"$concat",
"$substr",
"$toUpper",
"$toLower",
];
}
// 验证聚合管道
validatePipeline(pipeline) {
if (!Array.isArray(pipeline)) {
throw new Error("聚合管道必须是数组");
}
for (const stage of pipeline) {
if (typeof stage !== "object" || stage === null) {
throw new Error("聚合阶段必须是对象");
}
const stageName = Object.keys(stage)[0];
if (!this.allowedStages.includes(stageName)) {
throw new Error(`不允许的聚合阶段: ${stageName}`);
}
}
return true;
}
// 构建安全的聚合管道
buildSafePipeline(filters) {
const pipeline = [];
// 添加 $match 阶段
if (filters.match) {
pipeline.push({ $match: this.sanitizeMatch(filters.match) });
}
// 添加 $group 阶段
if (filters.group) {
pipeline.push({ $group: this.sanitizeGroup(filters.group) });
}
// 添加 $sort 阶段
if (filters.sort) {
pipeline.push({ $sort: this.sanitizeSort(filters.sort) });
}
// 添加 $limit 阶段
if (filters.limit && filters.limit <= 1000) {
// 限制最大数量
pipeline.push({ $limit: filters.limit });
}
return pipeline;
}
// 清理 $match 条件
sanitizeMatch(match) {
const allowedFields = ["status", "category", "createdAt", "amount"];
const sanitized = {};
for (const [field, value] of Object.entries(match)) {
if (allowedFields.includes(field)) {
sanitized[field] = value;
}
}
return sanitized;
}
// 清理 $group 条件
sanitizeGroup(group) {
const sanitized = { _id: group._id };
if (group.total) {
sanitized.total = { $sum: "$amount" };
}
if (group.count) {
sanitized.count = { $sum: 1 };
}
return sanitized;
}
// 清理 $sort 条件
sanitizeSort(sort) {
const allowedFields = ["createdAt", "amount", "status"];
const sanitized = {};
for (const [field, direction] of Object.entries(sort)) {
if (
allowedFields.includes(field) &&
(direction === 1 || direction === -1)
) {
sanitized[field] = direction;
}
}
return sanitized;
}
}
// 使用示例
const builder = new SafeAggregationBuilder();
const pipeline = builder.buildSafePipeline({
match: { status: "completed" },
group: { _id: "$category", total: true, count: true },
sort: { total: -1 },
limit: 10,
});

监控和调试

聚合管道的监控和调试技巧

// Node.js 示例 - 聚合管道调试工具
class AggregationDebugger {
constructor(collection) {
this.collection = collection;
}
// 分析聚合管道性能
async analyzePerformance(pipeline) {
const explainResult = await this.collection.aggregate(pipeline).explain("executionStats");
return {
executionTime: explainResult.executionStats.executionTimeMillis,
totalDocsExamined: explainResult.executionStats.totalDocsExamined,
totalDocsReturned: explainResult.executionStats.totalDocsReturned,
stages: explainResult.stages.map(stage => ({
stage: stage.stage,
executionTime: stage.executionTimeMillis,
docsExamined: stage.totalDocsExamined,
docsReturned: stage.totalDocsReturned
}))
};
}
// 逐步执行聚合管道
async stepByStepExecution(pipeline) {
const results = [];
for (let i = 0; i < pipeline.length; i++) {
const partialPipeline = pipeline.slice(0, i + 1);
const result = await this.collection.aggregate(partialPipeline);
results.push({
stage: i + 1,
pipeline: partialPipeline,
resultCount: result.length,
sampleResult: result.slice(0, 3) // 显示前3个结果
});
}
return results;
}
// 检测性能瓶颈
async detectBottlenecks(pipeline) {
const analysis = await this.analyzePerformance(pipeline);
const bottlenecks = [];
for (const stage of analysis.stages) {
if (stage.executionTime > 1000) { // 超过1秒
bottlenecks.push({
stage: stage.stage,
executionTime: stage.executionTime,
suggestion: this.getOptimizationSuggestion(stage.stage)
});
}
}
return {
analysis,
bottlenecks,
recommendations: this.getRecommendations(analysis)
};
}
getOptimizationSuggestion(stage) {
const suggestions = {
'$match': '考虑在 $match 阶段使用索引',
'$group': '考虑使用 $facet 并行处理或添加 $limit',
'$lookup': '考虑使用 $lookup 的 pipeline 参数优化关联',
'$sort': '确保排序字段有索引',
'$unwind': '考虑使用 preserveNullAndEmptyArrays 选项'
};
return suggestions[stage] || '考虑优化此阶段';
}
getRecommendations(analysis) {
const recommendations = [];
if (analysis.totalDocsExamined > analysis.totalDocsReturned * 10) {
recommendations.push('考虑添加更多筛选条件减少扫描的文档数');
}
if (analysis.executionTime > 5000) {
recommendations.push('考虑使用 allowDiskUse 选项或分批处理');
}
if (analysis.totalDocsExamined > 100000) {
recommendations.push('考虑使用 $sample 进行数据采样');
}
return recommendations;
}
}
// 使用示例
const debugger = new AggregationDebugger(db.orders);
const pipeline = [
{ $match: { status: "completed" } },
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } }
];
const analysis = await debugger.analyzePerformance(pipeline);
console.log("性能分析:", analysis);

常见问题与解决方案

问题 1:聚合管道执行缓慢

原因分析:

解决方案:

// 1. 添加索引
db.orders.createIndex({ status: 1, createdAt: -1 });
db.orders.createIndex({ category: 1 });
// 2. 优化管道顺序
// ❌ 错误:先分组再筛选
db.orders.aggregate([
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $match: { total: { $gte: 1000 } } },
]);
// ✅ 正确:先筛选再分组
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $match: { total: { $gte: 1000 } } },
]);
// 3. 使用 allowDiskUse
db.orders.aggregate(pipeline, { allowDiskUse: true });

问题 2:内存不足错误

原因分析:

  • 分组字段基数过高
  • 数组展开导致数据量爆炸
  • 没有使用磁盘存储

解决方案:

// 1. 使用 $bucket 减少分组基数
db.orders.aggregate([
{
$bucket: {
groupBy: "$amount",
boundaries: [0, 100, 500, 1000, 5000],
default: "5000+",
output: {
count: { $sum: 1 },
total: { $sum: "$amount" },
},
},
},
]);
// 2. 分批处理大数据集
async function processInBatches(pipeline, batchSize = 10000) {
let skip = 0;
const results = [];
while (true) {
const batchPipeline = [...pipeline, { $skip: skip }, { $limit: batchSize }];
const batch = await db.orders.aggregate(batchPipeline);
if (batch.length === 0) break;
results.push(...batch);
skip += batchSize;
}
return results;
}
// 3. 使用 $sample 采样
db.orders.aggregate([
{ $sample: { size: 10000 } },
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
]);

问题 3:聚合结果不准确

原因分析:

  • 数据类型不一致
  • 空值处理不当
  • 分组逻辑错误

解决方案:

// 1. 数据类型标准化
db.orders.aggregate([
{
$addFields: {
amount: { $toDouble: "$amount" }, // 确保数值类型
createdAt: { $toDate: "$createdAt" }, // 确保日期类型
},
},
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
]);
// 2. 空值处理
db.orders.aggregate([
{
$addFields: {
amount: { $ifNull: ["$amount", 0] }, // 空值默认为0
},
},
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
]);
// 3. 验证聚合结果
async function validateAggregation(pipeline, expectedCount) {
const result = await db.orders.aggregate(pipeline);
if (result.length !== expectedCount) {
console.warn(
`聚合结果数量不匹配: 期望 ${expectedCount}, 实际 ${result.length}`
);
}
// 检查数据完整性
const nullValues = result.filter(
(item) => item.total === null || item.total === undefined
);
if (nullValues.length > 0) {
console.warn("聚合结果包含空值");
}
return result;
}

问题 4:$lookup 性能问题

原因分析:

  • 关联字段没有索引
  • 关联数据量过大
  • 使用了不合适的关联方式

解决方案:

// 1. 为关联字段创建索引
db.products.createIndex({ category: 1 });
db.orders.createIndex({ "items.productId": 1 });
// 2. 使用 pipeline 参数优化 $lookup
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { productIds: "$items.productId" },
pipeline: [
{ $match: { $expr: { $in: ["$_id", "$$productIds"] } } },
{ $project: { name: 1, category: 1 } }, // 只选择需要的字段
],
as: "products",
},
},
]);
// 3. 使用 $unwind 和 $lookup 组合
db.orders.aggregate([
{ $unwind: "$items" },
{
$lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "product",
},
},
{ $unwind: "$product" },
{
$group: {
_id: "$product.category",
total: { $sum: { $multiply: ["$items.quantity", "$items.price"] } },
},
},
]);

企业级开发注意事项

聚合管道性能监控

监控聚合管道的执行性能

// Node.js 示例 - 聚合性能监控
async function monitorAggregationPerformance(pipeline, collectionName) {
const startTime = Date.now();
try {
// 执行聚合管道
const result = await db.collection(collectionName).aggregate(pipeline);
const executionTime = Date.now() - startTime;
// 记录性能指标
console.log(`聚合执行时间: ${executionTime}ms`);
console.log(`返回文档数: ${result.length}`);
// 性能告警
if (executionTime > 5000) {
console.warn("聚合执行时间过长,建议优化");
}
return result;
} catch (error) {
console.error("聚合执行失败:", error.message);
throw error;
}
}
// 使用 explain 分析执行计划
async function analyzeAggregationPlan(pipeline, collectionName) {
const explainResult = await db
.collection(collectionName)
.aggregate(pipeline)
.explain("executionStats");
console.log("执行统计:", explainResult.executionStats);
console.log("使用的索引:", explainResult.executionStats.executionStages);
return explainResult;
}

聚合管道安全实践

防止聚合管道注入攻击

// Node.js 示例 - 安全的聚合管道构建
function buildSafeAggregationPipeline(userInput, filters) {
// 1. 输入验证
const allowedFields = ["category", "status", "date"];
const allowedOperators = ["$eq", "$gt", "$lt", "$gte", "$lte", "$in"];
// 2. 构建安全的查询条件
const matchStage = {};
allowedFields.forEach((field) => {
if (filters[field] && typeof filters[field] === "string") {
matchStage[field] = filters[field];
}
});
// 3. 构建安全的聚合管道
const pipeline = [
{ $match: matchStage },
{
$group: {
_id: "$category",
count: { $sum: 1 },
total: { $sum: "$amount" },
},
},
{ $sort: { total: -1 } },
{ $limit: 100 },
];
return pipeline;
}
// 使用参数化聚合
async function executeSafeAggregation(collectionName, filters) {
const pipeline = buildSafeAggregationPipeline(null, filters);
// 设置执行超时
const result = await db
.collection(collectionName)
.aggregate(pipeline)
.maxTimeMS(30000) // 30秒超时
.toArray();
return result;
}

聚合管道资源管理

控制聚合管道的资源使用

// Node.js 示例 - 资源管理
async function executeAggregationWithResourceControl(pipeline, options = {}) {
const {
maxMemoryUsage = 100 * 1024 * 1024, // 100MB
maxExecutionTime = 30000, // 30秒
allowDiskUse = true,
} = options;
try {
const result = await db.collection
.aggregate(pipeline, {
allowDiskUse,
maxTimeMS: maxExecutionTime,
cursor: { batchSize: 1000 },
})
.toArray();
// 检查内存使用情况
const memoryUsage = process.memoryUsage();
if (memoryUsage.heapUsed > maxMemoryUsage) {
console.warn("聚合执行内存使用过高");
}
return result;
} catch (error) {
if (error.code === 16945) {
// 内存限制错误
console.error("聚合执行超出内存限制,建议优化管道");
}
throw error;
}
}

聚合管道缓存策略

实现聚合结果缓存

// Node.js 示例 - 聚合结果缓存
const Redis = require("redis");
const redis = Redis.createClient();
async function getCachedAggregationResult(
cacheKey,
pipeline,
collectionName,
ttl = 3600
) {
try {
// 尝试从缓存获取
const cachedResult = await redis.get(cacheKey);
if (cachedResult) {
console.log("从缓存获取聚合结果");
return JSON.parse(cachedResult);
}
// 执行聚合管道
const result = await db
.collection(collectionName)
.aggregate(pipeline)
.toArray();
// 缓存结果
await redis.setex(cacheKey, ttl, JSON.stringify(result));
console.log("聚合结果已缓存");
return result;
} catch (error) {
console.error("缓存操作失败:", error);
// 降级到直接执行聚合
return await db.collection(collectionName).aggregate(pipeline).toArray();
}
}
// 使用示例
async function getSalesReport(dateRange) {
const cacheKey = `sales_report_${dateRange.start}_${dateRange.end}`;
const pipeline = [
{
$match: {
createdAt: {
$gte: new Date(dateRange.start),
$lte: new Date(dateRange.end),
},
},
},
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
];
return await getCachedAggregationResult(cacheKey, pipeline, "orders", 1800); // 30分钟缓存
}

聚合管道监控和告警

设置聚合管道监控

// Node.js 示例 - 聚合监控系统
class AggregationMonitor {
constructor() {
this.metrics = {
totalExecutions: 0,
averageExecutionTime: 0,
errorCount: 0,
slowQueries: [],
};
}
async executeWithMonitoring(pipeline, collectionName, options = {}) {
const startTime = Date.now();
const executionId = this.generateExecutionId();
try {
// 记录执行开始
this.recordExecutionStart(executionId, pipeline, collectionName);
// 执行聚合
const result = await db
.collection(collectionName)
.aggregate(pipeline, options)
.toArray();
// 记录执行成功
const executionTime = Date.now() - startTime;
this.recordExecutionSuccess(executionId, executionTime, result.length);
// 检查性能告警
this.checkPerformanceAlerts(executionTime, pipeline);
return result;
} catch (error) {
// 记录执行失败
this.recordExecutionError(executionId, error);
throw error;
}
}
recordExecutionStart(executionId, pipeline, collectionName) {
console.log(`聚合执行开始 [${executionId}]: ${collectionName}`);
}
recordExecutionSuccess(executionId, executionTime, resultCount) {
this.metrics.totalExecutions++;
this.metrics.averageExecutionTime =
(this.metrics.averageExecutionTime + executionTime) / 2;
console.log(
`聚合执行成功 [${executionId}]: ${executionTime}ms, ${resultCount}条结果`
);
}
recordExecutionError(executionId, error) {
this.metrics.errorCount++;
console.error(`聚合执行失败 [${executionId}]:`, error.message);
}
checkPerformanceAlerts(executionTime, pipeline) {
if (executionTime > 5000) {
this.metrics.slowQueries.push({
executionTime,
pipeline: JSON.stringify(pipeline),
timestamp: new Date(),
});
console.warn(`慢查询告警: 执行时间 ${executionTime}ms`);
}
}
generateExecutionId() {
return `agg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
getMetrics() {
return {
...this.metrics,
errorRate: this.metrics.errorCount / this.metrics.totalExecutions,
slowQueryCount: this.metrics.slowQueries.length,
};
}
}
// 使用监控系统
const monitor = new AggregationMonitor();
async function executeMonitoredAggregation(pipeline, collectionName) {
return await monitor.executeWithMonitoring(pipeline, collectionName);
}

下一篇文章,我们将深入探讨 MongoDB 的视图与物化视图,包括虚拟集合、数据视图、性能优化等内容。敬请期待!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/941941.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025 年最新防火涂料厂家排行榜:膨胀型 / 非膨胀型 / 厚型 / 薄型钢结构防火涂料优质企业最新推荐

引言 在建筑与工业安全领域,防火涂料是抵御火灾、守护生命财产安全的关键防线。当前市场上防火涂料品牌繁杂,部分产品存在防火性能不达标、环保性差、施工适配性不足等问题,加之行业技术迭代快,新品牌不断涌现,企…

2025 年南昌装修设计公司推荐:宿然设计,非营销型技术工作室,专注落地还原,提供全国纯设计与江西全案服务

当前南昌装修行业发展迅速,市场上装修公司数量众多,种类繁杂。不少消费者在选择装修公司时,常常面临难以抉择的困境,他们既希望找到能提供新颖设计方案的公司,又担忧设计无法顺利落地,难以实现理想中的居住空间效…

2025 年板材源头厂家最新推荐排行榜:聚焦 ENF 级环保与高品质,精选 6 家实力企业助您轻松选

引言 当前建材市场中,板材产品种类繁杂,质量与性能参差不齐,消费者在选购时常常面临诸多困扰:传统板材环保不达标,苯系物、甲醛等有害物质释放危害健康;部分产品易燃、施工复杂,不仅增加成本还延误工期;缺乏权…

优先队列运算符重载

方式 1 struct ab{int b,v,k;bool operator <(const ab &a)const{//代表前(v)与后(a.v)进行比较return v>a.v;} }; priority_queue<ab> q;方式 2 struct ab{int b,v,k;friend bool operator<(ab a,…

Mac INodeClient 异常连接 解决方案

起因 Mac 重启电脑 未关闭InodeClient 导致重启之后 连接卡死 反复重启卸载并没有启作用 解决方案 #!/bin/bash # 修复版清理脚本echo "=== 1. 强制终止所有iNode进程 ===" sudo pkill -9 iNode 2>/dev/n…

2025年GEO品牌推荐榜单:AI技术驱动的行业革新者

摘要 随着人工智能技术的快速发展,GEO行业正迎来前所未有的变革机遇。2025年,基于AI搜索优化和智能推荐算法的GEO服务已成为企业数字化转型的核心驱动力。本文通过对行业领先企业的深度分析,为寻求GEO服务的企业提供…

2025年GEO品牌推荐排行榜TOP10:AI技术驱动的行业新格局

摘要 随着人工智能技术在GEO领域的深度应用,2025年行业正迎来智能化转型的关键节点。本文基于技术实力、服务能力和市场口碑三大维度,对当前主流GEO服务商进行综合评估,为寻求加盟合作的企业提供权威参考。文末附行…

基于STM32F1x系列与JY901模块串口通信

一、硬件JY901引脚 STM32F103引脚 功能说明VCC 3.3V 电源供电GND GND 地线TX PA10 (USART1_RX) 接收数据RX PA9 (USART1_TX) 发送数据二、STM32串口配置代码(HAL库) // usart.c #include "stm32f1xx_hal.h"…

2025 年最新推荐防火涂料厂家排行榜:涵盖膨胀型、非膨胀型、室内外及超薄厚型钢结构防火涂料,助选优质产品

引言 在建筑行业持续发展的当下,防火安全是建筑安全的核心环节,防火涂料作为关键防护屏障,其质量直接关乎生命财产安全。当前市场中,部分防火涂料品牌为逐利降低成本,导致产品防火性能不达标、耐久性差,且品牌繁…

Hash与布隆过滤器

hash 函数 映射函数 Hash(key)=addr ;hash 函数可能会把两个或两个以上的不同 key 映射到同一地址,这种情况称之为冲突(或者 hash 碰撞)hash的优势计算速度快 强随机分布(等概率、均匀地分布在整个地址空间) m…

习题-归纳定义原理

习题1. 设\((b_1,b_2,\cdots)\)是实数的一个无穷序列。用归纳法定义它的和\(\sum_{k=1}^n b_k\)如下: \[\begin{align*}&\sum_{k=1}^n b_k = b_1\qquad\qquad\text{当}n=1,\\&\sum_{k=1}^n=(\sum_{k=1}^{n-1}…

对话式 AI 年度春晚:Convo AIRTE2025 全议程解锁

10 月 31 日 - 11 月 1 日北京悠唐皇冠假日酒店RTE2025 第十一届实时互联网大会两日全议程上线抢先预览,即刻收藏!阅读更多 Voice Agent 学习笔记:了解最懂 AI 语音的头脑都在思考什么

2025年安恒信息公司:深度解析AI与数据安全双轮驱动的技术护城河

引言:本文从“技术落地与标准制定”维度切入,拆解安恒信息如何在AI安全垂域大模型、隐私计算平台、国家级标准编制三条主线中形成可复用的技术护城河,为正在评估安全供应商的政企单位提供一份可落地的客观参考。 背…

C# Avalonia 16- Animation- SampleViewer - SimpleExample

C# Avalonia 16- Animation- SampleViewer - SimpleExampleSampleViewer.axaml代码<Window xmlns="https://github.com/avaloniaui"xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"xm…

2025年安恒信息深度解析:AI与数据安全双轮驱动的技术演进全景

引言 本文聚焦“技术演进”这一核心维度,对安恒信息技术股份有限公司(688023)进行拆解,为正在评估网络安全供应商、规划数据安全预算或研究AI安全落地路径的政企决策者提供一份可对照的技术路线图。 背景与概况 安…

清单测试

* { margin: 0; padding: 0; box-sizing: border-box; font-family: "Segoe UI", system-ui, sans-serif } body { min-height: 100vh; display: flex; justify-content: center; align-items: center; padd…

开源手写识别库zinnia

开源手写识别库zinnia1.识别率依赖于笔画的顺序和方向 2.汉字结构特征: 左右结构:明 好 上下结构:思 想 包围结构:国 围 独体字:人 水 3.局限性 对连笔字识别效果较差 无法处理行书,草书等自由书写 单字识别,缺乏上…

穿透式页面的参数注意事项

穿透式页面的参数注意事项从一个面板点击一个卡片穿透到另一个页面,需要带一些查询统计的参数过去,但是新的页面自带了一些默认的查询参数,怎么办?不能直接把默认的查询参数去掉,因为跳转的页面其他地方也需要用上…

2025年10月中国宝宝辅食品牌推荐榜:深海去刺鱼领衔对比

第一次给宝宝添辅食,家长往往一边兴奋一边忐忑:怕过敏、怕营养不够、怕重金属、怕质地太粗噎到孩子。母婴社群里“谁家米粉铁超标”“哪款鱼泥刺没剔干净”的吐槽,让新手爸妈把购物车改来改去。2025年农业农村部《婴…

contos 同步SVN 迁移SVN 安装SVN

contos 同步SVN 迁移SVN 安装SVN, subversion-1.7.14 数据迁移,centos8 安装启动svn,centos8 svn 数据迁移原始服务器svn版本是 subversion-1.7.14,已下载安装包subversion-1.7.14.tar.gz,先安装到新服务器,再启…