MongoDB大数据量的优化——mongoTemplate.stream()方法使用

传统查询

在传统的 MongoDB 查询中,我们通常使用find方法:

List<Document> results = mongoTemplate.find(query, Document.class, "collection");

这种方式会直接将查询结果全部加载到内存中,当数据量较大(如百万级文档)时,会导致严重的内存问题甚至 OOM。

所以会考虑利用skip考虑分页。

skip分页查询

使用skip和limit,来分页处理数据:

int pageSize = 1000;
for (int page = 0; ; page++) {Query query = new Query().skip(page * pageSize).limit(pageSize);List<Document> results = mongoTemplate.find(query, Document.class, "large_collection");if (results.isEmpty()) {break;}// 处理当前页数据processResults(results);
}

这种方法在数据量几万条左右可能工作良好,但当数据量达到百万级时,还是不行:
MongoDB 的skip操作是通过遍历并丢弃前面的文档来实现的。
例如:
skip(10000).limit(100) 需要先遍历 10000 条文档,然后只返回后面的 100 条
当数据量达到百万级时,skip(500000) 意味着要遍历 50 万条文档,即使使用索引也会非常缓慢。

实现基于 ID 的分页查询

id字段有索引,将其作为条件分批次查询,每次记录最后一个id:

 public void processAllDocuments(String collectionName, int batchSize) {String lastId = null;int totalProcessed = 0;while (true) {// 创建查询条件:ID > lastIdQuery query = new Query();if (lastId != null) {query.addCriteria(Criteria.where("_id").gt(lastId));}// 按ID升序排序,并限制批处理大小query.with(Sort.by(Sort.Direction.ASC, "_id"));query.limit(batchSize);// 执行查询List<Document> documents = mongoTemplate.find(query, Document.class, collectionName);// 处理当前批次的数据if (documents.isEmpty()) {break;} else {processBatch(documents);totalProcessed += documents.size();// 更新lastId为当前批次的最后一个文档IDDocument lastDocument = documents.get(documents.size() - 1);lastId = lastDocument.getObjectId("_id").toString();}}}

这种方式已经很不错了,但是需要我们精确控制lastId。如果在操作时动态更改了数据,可能会造成数据遗漏。

mongoTemplate的stream方法

使用stream方法,可以按需逐批从数据库获取数据,每次只在内存中处理少量数据,适用于大数据量的读取和处理场景:

import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.cursor.MongoCursor;// 假设已注入MongoTemplate
@Autowired
private MongoTemplate mongoTemplate;public void processLargeData() {// 定义查询条件Query query = new Query(Criteria.where("status").is("active")).sort(Sort.by(Sort.Direction.ASC, "createTime"));MongoCursor<Document> cursor = null;try {// 执行查询获取游标cursor = mongoTemplate.stream(query, Document.class, "collection");int count = 0;while (cursor.hasNext()) {Document doc = cursor.next();// 处理单个文档processDocument(doc);count++;if (count % 100 == 0) {System.out.println("已处理 " + count + " 条记录");}}System.out.println("总共处理 " + count + " 条记录");} catch (Exception e) {// 处理可能的异常log.error("数据处理失败", e);} finally {// 确保游标资源被关闭if (cursor != null) {cursor.close();}}
}

原理:MongoDB 中 Cursor 的批处理机制与性能优化

什么是 MongoDB Cursor 的批处理?

当你执行一个查询时,MongoDB 不会一次性返回所有匹配的文档,而是返回一个Cursor(游标)。Cursor 是一个指向查询结果集的指针,它采用 分批(Batch) 的方式从服务器获取数据。步骤如下:

  1. 客户端(应用程序)向 MongoDB 服务器发送查询请求
  2. 服务器返回一个 Cursor ID 和第一批数据(默认 101 条记录或 1MB,取较小值)
  3. 客户端通过Cursor 逐个处理这些数据(调用.next())
  4. 当客户端处理完这批数据后,再通过 Cursor 请求下一批数据
  5. 重复步骤 3-4,直到处理完所有数据或关闭 Cursor

这种机制的核心优势是避免一次性传输大量数据,从而减少内存占用和网络开销。

在 MongoDB 中,默认的批处理大小是由客户端驱动决定的。对于 Java 驱动(Spring Data MongoDB 基于此),默认批大小规则如下:

  • 第一批数据:默认返回 101 条记录(或直到达到 1MB 大小限制,以较小者为准)
  • 后续批次:默认返回 4MB 大小的数据(或该批次的所有数据,以较小者为准)

这个默认值(101)是一个经过权衡的选择:

  • 足够小,避免一次性加载过多数据
  • 足够大,减少客户端与服务器之间的往返次数
  • 对于大多数应用场景,101 条记录是一个合理的初始批次大小

可以通过Query对象调整批大小:

Query query = new Query().batchSize(500);  // 自定义批大小为500
try (MongoCursor<Document> cursor = mongoTemplate.stream(query, Document.class, "collection")) {// 处理数据
}

为什么少量分批反而更快?

这个问题的核心在于理解数据库查询的性能瓶颈。对于大数据集,性能瓶颈通常不是单次查询的速度,而是:

  • 网络传输开销:一次性传输大量数据会占用更多网络带宽,导致延迟增加
  • 内存占用:一次性加载大量数据到内存会导致频繁 GC,甚至 OOM
  • 服务器负载:数据库服务器需要一次性准备和传输大量数据,增加 CPU 和内存压力

调用.next () 方法时发生了什么?

当调用cursor.next()时,实际发生的流程如下:

  1. 检查当前批次:查看 Cursor 中是否还有未处理的文档
  2. 如果有:直接返回下一个文档,内存和网络无额外开销
  3. 如果没有:
    • 自动向服务器发送请求,获取下一批数据
    • 服务器返回下一批数据(默认 4MB 或剩余全部数据,取较小值)
    • 将新批次数据加载到客户端内存
    • 返回新批次的第一个文档

这个过程对开发者是透明的,我们只需要调用.next(),Cursor 会自动管理批处理和数据加载

总结

综上所述,当数据量很大时,可以考虑使用id分页或者stream流式处理。

特性基于 ID 分页查询流式处理(MongoTemplate.stream)
核心原理按 ID 范围分批查询,每次查询ID > lastId使用数据库游标(Cursor)逐批获取数据
内存占用每批数据加载到内存,处理后释放每次仅加载当前批次数据,内存占用更低
性能表现深层分页性能稳定,不受页码影响全程性能稳定,略优于 ID 分页(减少查询次数)
实现复杂度需要手动管理 lastId 和分页逻辑简单,自动管理游标生命周期
数据一致性适合静态数据集,动态插入可能导致漏读适合实时数据集,一次性遍历不中断
适用场景分页展示、分批处理、断点续传任务连续流式处理、大数据分析、实时数据处理
网络 IO 模式主动请求下一页数据自动请求下一批数据(管道化执行)
批大小控制通过limit()手动设置通过batchSize()设置(默认 101/4MB)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/80934.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JDK8中的 Stream流式编程用法优化(工具类在文章最后)

Java从JDK8起提供了Stream流这个功能&#xff0c;于是项目里出现了大量基于Stream流的写法。随着项目的进行&#xff0c;慢慢的代码中铺天盖地的都是下面的写法&#xff1a; List<User> userList null;if (condition) {userList new ArrayList<>();userList.add(…

Spring Cloud生态与技术选型指南:如何构建高可用的微服务系统?

引言&#xff1a;为什么选择Spring Cloud&#xff1f; 作为全球开发者首选的微服务框架&#xff0c;Spring Cloud凭借其开箱即用的组件、与Spring Boot的无缝集成&#xff0c;以及活跃的社区生态&#xff0c;成为企业级微服务架构的基石。但在实际项目中&#xff0c;如何从众多…

Android清单文件

清单文件AndroidManifest.xml AndroidManifest.xml 配置清单文件是 每个 Android 应用的配置中心&#xff0c;系统在安装和运行应用时&#xff0c;首先会读取它。 它是 Android 应用的 “说明书”&#xff0c;主要作用是&#xff1a; 功能说明声明应用组件比如 Activity、Se…

大语言模型与人工智能:技术演进、生态重构与未来挑战

目录 技术演进:从专用AI到通用智能的跃迁核心能力:LLM如何重构AI技术栈应用场景:垂直领域的技术革命生态关系:LLM与AI技术矩阵的协同演进挑战局限:智能天花板与伦理困境未来趋势:从语言理解到世界模型1. 技术演进:从专用AI到通用智能的跃迁 1.1 三次技术浪潮的跨越 #me…

SC3000智能相机-自动存图

1、需求:SC3000智能相机开机自动存图。相机自带的相机存储空间有限,预留存图需要开启SCMVS、并手动点存图。如果工人忘了开启则不会存图,导致生产严重失误! 2、方法:利用相机提供的FTP协议,将图自动存到本地。 1、在本地建立FTP服务器。 (1)win10默认开启了FTP服务器…

Wan2.1 文生视频 支持批量生成、参数化配置和多语言提示词管理

Wan2.1 文生视频 支持批量生成、参数化配置和多语言提示词管理 flyfish 设计 一个基于 Wan2.1 文本到视频模型的自动化视频生成系统。 文件关系图 script.py ├── 读取 → config.json │ ├── 模型配置 → 加载AI模型 │ ├── 生成参数 → 控制生成质量 │ └…

Flannel后端为UDP模式下,分析数据包的发送方式——tun设备(三)

在分析 Kubernetes 环境中 Flannel UDP 模式的数据包转发时&#xff0c;我们提到 flannel.1 是一个 TUN 设备&#xff0c;它在数据包处理中起到了关键作用。 什么是 TUN 设备&#xff1f; TUN 设备&#xff08;Tunnel 设备&#xff09;是 Linux 系统中一种虚拟网络接口&#x…

Java中创建线程的几种方式

目录 Java 创建线程的几种方式 一、继承 Thread 类 核心原理 实现步骤 代码示例 简化写法&#xff08;Lambda 表达式&#xff09; 优缺点 二、实现 Runnable 接口 核心原理 实现步骤 代码示例 简化写法&#xff08;Lambda 表达式&#xff09; 优缺点分析 三、实现…

[Git] 基本操作及用户配置

文章目录 现在所讲&#xff0c;全部是本地Git仓库&#xff0c;不是远程仓库&#xff01;Git是版本控制工具&#xff0c;而并非只能用远程仓库的版本控制工具&#xff01; 什么是“仓库”&#xff08;Repository&#xff09;&#xff1f;创建一个 Git 本地仓库&#xff1a;git i…

layui 介绍

layui&#xff08;谐音&#xff1a;类 UI) 是一套开源的 Web UI 解决方案&#xff0c;采用自身经典的模块化规范&#xff0c;并遵循原生 HTML/CSS/JS 的开发方式&#xff0c;极易上手&#xff0c;拿来即用。其风格简约轻盈&#xff0c;而组件优雅丰盈&#xff0c;从源代码到使用…

笔记:NAT

一、NAT 的基本概念 NAT&#xff08;Network Address Translation&#xff0c;网络地址转换&#xff09; 是一种在 IP 网络中重新映射 IP 地址的技术&#xff0c;主要用于解决 IPv4 地址短缺问题&#xff0c;同时提供一定的网络安全防护作用。 功能&#xff1a; 将内部网络&am…

cursor/vscode启动项目connect ETIMEDOUT 127.0.0.1:xx

现象&#xff1a; 上午正常使用cursor/vscode&#xff0c;因为需要写前端安装了nodejs16.20和vue2&#xff0c;结果下午启动前端服务无法访问&#xff0c;浏览器一直转圈。接着测试运行最简单的flask服务&#xff0c;vscode报错connect ETIMEDOUT 127.0.0.1:xx&#xff0c;要么…

EXO分布式部署deepseek r1

EXO 是一个支持分布式 AI 计算的框架&#xff0c;可以用于在多个设备&#xff08;包括 Mac Studio&#xff09;上运行大语言模型&#xff08;LLM&#xff09;。以下是联调 Mac Studio 512GB 的步骤&#xff1a; 安装 EXO • 从 EXO GitHub 仓库 下载源码或使用 git clone 获取…

python训练营打卡第30天

模块和库的导入 知识点回顾&#xff1a; 导入官方库的三种手段导入自定义库/模块的方式导入库/模块的核心逻辑&#xff1a;找到根目录&#xff08;python解释器的目录和终端的目录不一致&#xff09; 一、导入官方库 1.标准导入&#xff1a;导入整个库 import mathprint(&quo…

Unity 多时间源Timer定时器实战分享:健壮性、高效性、多线程安全与稳定性能全面解析

简介 Timer 是一个 Unity 环境下高效、灵活的定时任务调度系统&#xff0c;支持以下功能&#xff1a; •支持多种时间源&#xff08;游戏时间 / 非缩放时间 / 真实时间&#xff09; •支持一次性延迟执行和重复执行 •提供 ID、回调、目标对象等多种查询和销毁方式 •内建…

深入理解Docker和K8S

深入理解Docker和K8S Docker 是大型架构的必备技能&#xff0c;也是云原生核心。Docker 容器化作为一种轻量级的虚拟化技术&#xff0c;其核心思想&#xff1a;将应用程序及其所有依赖项打包在一起&#xff0c;形成一个可移植的单元。 容器的本质是进程&#xff1a; 容器是在…

docker中使用openresty

1.为什么要使用openresty 我这边是因为要使用1Panel&#xff0c;第一个最大的原因&#xff0c;就是图方便&#xff0c;比较可以一键安装。但以前一直都是直接安装nginx。所以需要一个过度。 2.如何查看openResty使用了nginx哪个版本 /usr/local/openresty/nginx/sbin/nginx …

CSS【详解】弹性布局 flex

适用场景 一维&#xff08;行或列&#xff09;布局 基本概念 包裹所有被布局元素的父元素为容器 所有被布局的元素为项目 项目的排列方向&#xff08;垂直/水平&#xff09;为主轴 与主轴垂直的方向交交叉轴 容器上启用 flex 布局 将容器的 display 样式设置为 flex 或 i…

全能视频处理工具介绍说明

软件介绍 本文介绍的软件是FFmpeg小白助手&#xff0c;它是一款视频处理工具。 使用便捷性 这款FFmpeg小白助手无需安装&#xff0c;解压出来就能够直接投入使用。 主要功能概述 该工具主要具备格式转换、文件裁剪、文件压缩、文件合并这四大功能。 格式转换能力 软件支持…

Linux中的DNS的安装与配置

DNS简介 DNS&#xff08;DomainNameSystem&#xff09;是互联网上的一项服务&#xff0c;它作为将域名和IP地址相互映射的一个分布式数据库&#xff0c;能够使人更方便的访问互联网。 DNS使用的是53端口 通常DNS是以UDP这个较快速的数据传输协议来查询的&#xff0c;但是没有查…