批量处理工具类 用于解决大批量数据操作时的数据库性能问题

news/2025/11/18 11:14:13/文章来源:https://www.cnblogs.com/huang2979127746/p/19236498
import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;/*** @description: 批量处理工具类 用于解决大批量数据操作时的数据库性能问题* 提供批量查询和批量操作功能,支持并行处理和分批延迟控制* @author: hmt* @createTime: 2025/11/18 上午10:04* @version: 2.0*/
@Slf4j
public class HandleBatchUtil {//=================================常量===================================/*** 延迟时间 5ms - 用于批次间的短暂延迟,避免数据库压力过大*/private static final int BATCH_DELAY_MS = 5;/*** 默认执行的批次大小(新增、删除、修改) - 建议值500*/private static final int DEFAULT_EXECUTE_BATCH_SIZE = 500;/*** 最大执行的批次大小(新增、删除、修改) - 安全上限1000*/private static final int MAX_EXECUTE_BATCH_SIZE = 1000;/*** 默认查询的批次大小 - 查询可以适当大一些*/private static final int DEFAULT_QUERY_BATCH_SIZE = 1000;/*** 最大查询的批次大小 - 查询批次上限*/private static final int MAX_QUERY_BATCH_SIZE = 5000;/*** 并行处理阈值 - 数据量超过此值才使用并行处理,避免小数据量的并行开销*/private static final int PARALLELISM_THRESHOLD = 10000;//====================================入口方法================================/*** 批量查询方法 - 使用默认参数(去重、过滤null、默认批次大小)* @param ids 需要查询的ID列表* @param queryFunction 查询函数,接收一批ID返回查询结果* @return 合并后的查询结果列表* @param <T> 返回结果类型*/public static <T> List<T> batchQuery(List<Long> ids,Function<List<Long>, List<T>> queryFunction) {return batchQuery(ids, queryFunction, DEFAULT_QUERY_BATCH_SIZE, true, true);}/*** 批量查询方法 - 指定批次大小,使用默认去重和过滤参数* @param ids 需要查询的ID列表* @param queryFunction 查询函数* @param batchSize 每批查询的ID数量* @return 合并后的查询结果列表* @param <T> 返回结果类型*/public static <T> List<T> batchQuery(List<Long> ids,Function<List<Long>, List<T>> queryFunction,Integer batchSize) {return batchQuery(ids, queryFunction, batchSize, true, true);}/*** 批量查询方法 - 完整参数版本* 将大批量的ID分割成小批次进行查询,避免SQL语句过长和数据库性能问题* @param ids 需要查询的ID列表* @param queryFunction 查询函数,接收一批ID返回查询结果* @param batchSize 每批查询的ID数量* @param distinct 是否对ID去重* @param filterNull 是否过滤null值* @return 合并后的查询结果列表* @param <T> 返回结果类型*/public static <T> List<T> batchQuery(List<Long> ids,Function<List<Long>, List<T>> queryFunction,Integer batchSize,boolean distinct,boolean filterNull) {// 参数校验if (isEmpty(ids)) {return Collections.emptyList();}// 验证并调整批次大小int actualBatchSize = validateBatchSize(batchSize, DEFAULT_QUERY_BATCH_SIZE, MAX_QUERY_BATCH_SIZE);// ID列表进行预处理(去重、过滤null等)List<Long> idList = preprocessIds(ids, filterNull, distinct);if (isEmpty(idList)) {return Collections.emptyList();}// 根据数据量选择查询策略:小批量直接查询,大批量分批处理if (idList.size() <= actualBatchSize) {return safeApply(queryFunction, idList, Collections.emptyList());} else {return batchQueryInternal(idList, queryFunction, actualBatchSize);}}/*** 支持并行处理的批量查询* 使用并行流提高大数据量下的查询效率,注意确保查询操作是线程安全的* @param ids 需要查询的ID列表* @param queryFunction 查询函数* @param batchSize 每批查询的ID数量* @return 合并后的查询结果列表* @param <T> 返回结果类型*/public static <T> List<T> batchQueryParallel(List<Long> ids,Function<List<Long>, List<T>> queryFunction,Integer batchSize) {// 参数校验if (isEmpty(ids)) {return Collections.emptyList();}// 验证并调整批次大小int actualBatchSize = validateBatchSize(batchSize, DEFAULT_QUERY_BATCH_SIZE, MAX_QUERY_BATCH_SIZE);// ID预处理(默认去重和过滤null)List<Long> processedIds = preprocessIds(ids, true, true);if (isEmpty(processedIds)) {return Collections.emptyList();}// 小数据量不使用并行,避免并行开销if (processedIds.size() <= actualBatchSize || processedIds.size() < PARALLELISM_THRESHOLD) {return safeApply(queryFunction, processedIds, Collections.emptyList());}// 执行并行查询return executeParallelQuery(processedIds, queryFunction, actualBatchSize);}/*** 批量操作快捷方法 - 使用默认批次大小* @param dataList 需要处理的数据列表* @param batchFunction 批量处理函数* @return 操作是否全部成功* @param <T> 数据类型*/public static <T> boolean batchExecute(List<T> dataList,Function<List<T>, Boolean> batchFunction) {return batchExecute(dataList, batchFunction, DEFAULT_EXECUTE_BATCH_SIZE);}/*** 批量操作方法* 将大批量的数据分割成小批次进行处理,避免SQL语句过长和数据库性能问题* @param dataList 需要处理的数据列表* @param batchFunction 批量处理函数,接收一批数据返回处理结果* @param batchSize 每批处理的数据数量* @return 操作是否全部成功* @param <T> 数据类型*/public static <T> boolean batchExecute(List<T> dataList,Function<List<T>, Boolean> batchFunction,Integer batchSize) {// 参数校验if (isEmpty(dataList)) {return true;}// 验证并调整批次大小int actualBatchSize = validateBatchSize(batchSize, DEFAULT_EXECUTE_BATCH_SIZE, MAX_EXECUTE_BATCH_SIZE);// 根据数据量选择处理策略:小批量直接处理,大批量分批处理if (dataList.size() <= actualBatchSize) {return safeApply(batchFunction, dataList, true);} else {return batchExecuteInternal(dataList, batchFunction, actualBatchSize);}}/*** 支持并行处理的批量操作方法* 使用并行流提高大数据量下的处理效率,注意确保操作是线程安全的* @param dataList 需要处理的数据列表* @param batchFunction 批量处理函数* @param batchSize 每批处理的数据数量* @return 操作是否全部成功* @param <T> 数据类型*/public static <T> boolean batchExecuteParallel(List<T> dataList,Function<List<T>, Boolean> batchFunction,Integer batchSize) {// 参数校验if (isEmpty(dataList)) {return true;}// 验证并调整批次大小int actualBatchSize = validateBatchSize(batchSize, DEFAULT_EXECUTE_BATCH_SIZE, MAX_EXECUTE_BATCH_SIZE);// 小数据量不使用并行,避免并行开销if (dataList.size() <= actualBatchSize || dataList.size() < PARALLELISM_THRESHOLD) {return safeApply(batchFunction, dataList, true);}// 执行并行操作处理return executeParallelExecute(dataList, batchFunction, actualBatchSize);}//==================================核心处理逻辑===============================/*** 内部批量查询方法 - 处理大批量ID的分批查询逻辑* @param idList 预处理后的ID列表* @param queryFunction 查询函数* @param batchSize 批次大小* @return 合并后的查询结果* @param <T> 返回结果类型*/private static <T> List<T> batchQueryInternal(List<Long> idList,Function<List<Long>, List<T>> queryFunction,Integer batchSize) {List<T> result = new ArrayList<>();// 获取批次数量int batchCount = getBatchCount(idList, batchSize);// 遍历每个批次进行处理for (int batchIndex = 0; batchIndex < batchCount; batchIndex++) {// 使用安全批次应用方法执行查询List<T> batchResult = safeApplyBatch(queryFunction, idList, batchIndex, batchSize, batchCount, Collections.emptyList());// 将非空结果添加到总结果中if (CollectionUtil.isNotEmpty(batchResult)) {result.addAll(batchResult);}// 批处理间延迟,避免数据库压力过大
            addDelayIfNeeded(batchIndex, batchCount);}return result;}/*** 内部批量操作方法 - 处理大批量数据的分批操作逻辑* @param dataList 需要处理的数据列表* @param batchFunction 批量处理函数* @param batchSize 批次大小* @return 所有批次是否都成功处理* @param <T> 数据类型*/private static <T> boolean batchExecuteInternal(List<T> dataList,Function<List<T>, Boolean> batchFunction,Integer batchSize) {boolean allSuccess = true;// 获取批次数量int batchCount = getBatchCount(dataList, batchSize);// 遍历每个批次进行处理for (int batchIndex = 0; batchIndex < batchCount; batchIndex++) {// 使用安全批次应用方法执行操作boolean batchSuccess = safeApplyBatch(batchFunction, dataList, batchIndex, batchSize, batchCount, false);if (!batchSuccess) {log.warn("批次 {}/{} 处理失败", batchIndex + 1, batchCount);allSuccess = false;// 注意:这里选择继续执行后续批次,只记录失败不中断// 根据业务需求可以修改为快速失败模式
            }// 批处理间延迟,避免数据库压力过大
            addDelayIfNeeded(batchIndex, batchCount);}return allSuccess;}//====================================工具方法================================/*** 并行查询处理 - 使用并行流执行批量查询* @param processedIds 预处理后的ID列表* @param queryFunction 查询函数* @param batchSize 批次大小* @return 合并后的查询结果* @param <T> 返回结果类型*/private static <T> List<T> executeParallelQuery(List<Long> processedIds,Function<List<Long>, List<T>> queryFunction,Integer batchSize) {// 获取批次数量int batchCount = getBatchCount(processedIds, batchSize);try {// 使用并行流处理所有批次return IntStream.range(0, batchCount).parallel()  // 启用并行处理.mapToObj(batchIndex ->// 对每个批次安全执行查询
                            safeApplyBatch(queryFunction, processedIds, batchIndex, batchSize, batchCount, Collections.emptyList())).flatMap(List::stream)  // 扁平化所有批次结果.collect(Collectors.toList());  // 收集为列表} catch (Exception e) {log.error("并行批量查询执行失败", e);return Collections.emptyList();}}/*** 并行操作处理 - 使用并行流执行批量操作* @param dataList 需要处理的数据列表* @param batchFunction 批量处理函数* @param batchSize 批次大小* @return 所有批次是否都成功处理* @param <T> 数据类型*/private static <T> boolean executeParallelExecute(List<T> dataList,Function<List<T>, Boolean> batchFunction,Integer batchSize) {int batchCount = getBatchCount(dataList, batchSize);try {// 使用并行流处理所有批次,要求所有批次都成功return IntStream.range(0, batchCount).parallel()  // 启用并行处理.allMatch(batchIndex ->// 对每个批次安全执行操作,使用allMatch确保所有批次成功safeApplyBatch(batchFunction, dataList, batchIndex, batchSize, batchCount, false));} catch (Exception e) {log.error("并行批量处理执行失败", e);return false;}}/*** 安全执行函数应用 - 处理完整列表* 对函数执行进行异常捕获,避免单次失败影响整体流程* @param function 要执行的函数* @param data 输入数据列表* @param defaultValue 执行失败时的默认返回值* @return 函数执行结果或默认值* @param <T> 输入数据类型* @param <R> 返回结果类型*/private static <T, R> R safeApply(Function<List<T>, R> function, List<T> data, R defaultValue) {try {// 创建数据副本,避免原始列表被修改return function.apply(new ArrayList<>(data));} catch (Exception e) {log.warn("批量处理函数执行失败: {}", e.getMessage());return defaultValue;}}/*** 安全执行批次函数应用 - 处理单个批次* 封装批次切分逻辑和异常处理,提供统一的批次执行方式* @param function 要执行的函数* @param fullList 完整数据列表* @param batchIndex 当前批次索引(从0开始)* @param batchSize 批次大小* @param batchCount 总批次数* @param defaultValue 执行失败时的默认返回值* @return 批次执行结果或默认值* @param <T> 输入数据类型* @param <R> 返回结果类型*/private static <T, R> R safeApplyBatch(Function<List<T>, R> function,List<T> fullList,int batchIndex,int batchSize,int batchCount,R defaultValue) {try {// 计算当前批次的起始和结束位置int start = batchIndex * batchSize;int end = Math.min(start + batchSize, fullList.size());// 获取当前批次的数据(创建副本避免原始数据被修改)List<T> batchData = fullList.subList(start, end);return function.apply(new ArrayList<>(batchData));} catch (Exception e) {log.warn("批次 {}/{} 执行失败: {}", batchIndex + 1, batchCount, e.getMessage());return defaultValue;}}/*** 参数校验 - 验证并调整批次大小* 确保批次大小在合理范围内,避免过大或过小的批次影响性能* @param batchSize 输入的批次大小* @param defaultSize 默认批次大小* @param maxSize 最大批次大小* @return 调整后的有效批次大小*/private static Integer validateBatchSize(Integer batchSize, Integer defaultSize, Integer maxSize) {if (batchSize == null || batchSize <= 0) {log.warn("批次大小无效: {},使用默认值: {}", batchSize, defaultSize);return defaultSize;}if (batchSize > maxSize) {log.warn("批次大小过大: {},调整为最大值: {}", batchSize, maxSize);return maxSize;}return batchSize;}/*** ID预处理 - 对ID列表进行过滤和去重处理* @param ids 原始ID列表* @param filterNull 是否过滤null值* @param distinct 是否去重* @return 处理后的ID列表*/private static List<Long> preprocessIds(List<Long> ids, boolean filterNull, boolean distinct) {if (isEmpty(ids)) {return Collections.emptyList();}Stream<Long> stream = ids.stream();// 根据参数应用过滤逻辑if (filterNull) {stream = stream.filter(Objects::nonNull);}if (distinct) {stream = stream.distinct();}return stream.collect(Collectors.toList());}/*** 批处理间延迟 - 在批次间添加短暂延迟* 避免对数据库造成过大压力,提高系统稳定性* @param currentBatchIndex 当前批次索引* @param totalBatchCount 总批次数量*/private static void addDelayIfNeeded(int currentBatchIndex, int totalBatchCount) {// 如果不是最后一批,则添加延迟if (currentBatchIndex < totalBatchCount - 1) {try {Thread.sleep(BATCH_DELAY_MS);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.debug("批量处理延迟被中断");}}}/*** 空值检查 - 检查列表是否为空或null* @param list 要检查的列表* @return 列表是否为空*/private static boolean isEmpty(List<?> list) {return list == null || list.isEmpty();}/*** 获取批次数量 - 计算数据列表需要分成多少批* @param dataList 数据列表* @param batchSize 批次大小* @return 批次数量(向上取整)*/public static <T> int getBatchCount(List<T> dataList, Integer batchSize) {if (isEmpty(dataList) || batchSize <= 0) {return 0;}// 使用向上取整公式计算批次数量return (dataList.size() + batchSize - 1) / batchSize;}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/968744.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025年11月领导品牌认证机构推荐榜:权威机构对比与选择指南

在选择领导品牌认证机构时,企业决策者往往面临诸多考量。这类需求通常出现在企业需要提升市场竞争力、加强品牌公信力或准备融资上市等关键发展阶段。根据行业调研数据显示,2025年中国市场对第三方认证服务的需求预计…

随机爬树题解

随机爬树题解 题目传送门 更好的阅读体验 \(n^2\) 暴力: 思路:求期望,即求所有点的权值乘上概率后的和,即:\[ans=\sum_{u \in V}{P_u a_u} \]求每个点的概率 \(P_u\) :由题,令走到父亲的概率为 \(P_f\),走到儿…

2025年11月人形机器人落地商推荐排行榜:基于多维度数据分析的权威榜单

随着人工智能与机器人技术的快速发展,人形机器人正逐步从实验室走向实际应用场景。许多企业管理者、技术采购人员以及行业投资者在推进人形机器人项目时,常常面临技术适配难、场景落地周期长、服务配套不完善等现实问…

2025年11月四川护栏厂家推荐榜:综合实力与用户口碑全面对比

随着城市化进程不断推进和交通安全意识日益增强,四川地区对各类护栏产品的需求持续增长。无论是市政建设工程、房地产项目还是个人家庭装修,选择合适的护栏厂家成为许多用户面临的重要决策。典型用户包括工程项目负责…

天气预报查询

天气预报(appKey认证).container { max-width: 500px; margin: 50px auto; padding: 20px; border: 1px solid rgba(221, 221, 221, 1); border-radius: 10px } .input-group { margin-bottom: 20px } input { paddi…

2025年11月人形机器人落地商排行推荐:中立评价与场景适配

随着人形机器人技术的快速发展,越来越多的企业和机构开始关注其在实际场景中的应用价值。无论是工业制造、物流仓储、公共服务还是教育领域,用户都希望找到能够将先进技术转化为实际解决方案的可靠伙伴。选择合适的人…

2025年11月四川护栏厂家推荐榜单:基于用户需求的权威对比与选择指南

作为城市基础设施与建筑安全的重要组成部分,护栏的选择直接关系到公共安全、工程质量和长期使用效益。许多正在规划市政工程、房地产开发或私人庭院建设的用户,常常面临如何从众多四川护栏厂家中挑选出可靠合作伙伴的…

DotMemory系列:5. 如何实现自动化抓取和应用自托管

一:背景 1. 讲故事 前面几篇我们都是手工安装 dotmemory 软件,然后在程序的合适时机抓取snapshot,这种方式在绝大多数场景下都没有问题,但在一些精细化的场景下,如果能够实现自动化抓取,那就比较🐂👃了,这篇…

2025年国内维保服务品牌综合实力排行榜:专业制冷技术企业深度解析

摘要 随着制冷行业技术升级和市场需求的不断扩大,维保服务品牌的专业性和可靠性成为用户选择的关键因素。2025年国内维保服务市场呈现多元化发展态势,制冷设备维护、螺杆机全生命周期管理及冷库建设等服务需求持续增…

2025年11月成都监理公司推荐榜:权威解析与多维度对比评价

随着成都市城市化进程加快和基础设施投资持续增长,建设工程监理服务需求日益旺盛。无论是商业地产开发、公共设施建设还是住宅项目,业主方往往面临工程质量管理复杂、施工安全风险高、进度控制难度大等现实挑战。选择…

SP3D 自动切图标注系统

三维智能出图平台AutoDraft是独立的系统,不依赖第三方软件开发,如AutoCAD,打通专有数据文件解析和出图底层核心算法,实现图纸的快速生成,助力管道的数字化交付。在石油化工工程数字化交付标准GB∕T 51296-2018中规…

2025年11月会计培训班推荐榜:权威机构课程对比与用户评价分析

在选择会计培训班时,许多用户面临着信息繁杂、教学质量参差不齐、学习效果难以保障等问题。无论是职场新人希望快速入门,还是资深从业者计划考取更高资质,都需要一个系统化、互动性强且成果可验证的学习平台。当前会…

2025年11月成都监理公司推荐榜:专业服务对比与选择指南

在成都地区工程建设持续升温的背景下,业主、开发商或项目负责人在选择监理服务时往往面临信息不对称、服务质量参差不齐等挑战。无论是住宅开发、商业综合体建设,还是城市更新、工业厂房项目,监理公司的专业水平、资…

2025 西安红木家具热门销售厂家排行,木灵生红木领衔高端定制赛道

西安作为西北地区红木家具核心集散地,中式家居文化复兴带动市场需求持续攀升,明式、古典、新中式红木家具成为高端家居消费的热门选择。为帮助企业与家庭精准选购高性价比红木家具,结合厂家口碑、工艺实力、产品品质…

2025年11月呼叫中心系统服务商推荐榜单:五大服务商综合对比分析

在数字化转型加速的今天,呼叫中心系统已成为企业客户服务与运营支持的核心环节。无论是大型企业还是中小型企业,选择一套高效稳定的呼叫中心系统服务商,往往关系到客户满意度、内部协作效率以及长期运营成本的控制。…

2025年11月临沂美容/美容培训加盟十大热门品牌:杜氏大唐专业减肥养生稳坐头把交椅

摘要 2025 年临沂美容加盟行业迎来 “本土品牌崛起 + 细分赛道爆发” 的双重机遇,市场规模预计突破 38 亿元。随着消费需求向 “健康化、个性化、轻量化” 转型,减肥养生、问题肌修护、科技美肤等垂直领域成为创业热…

2025年11月高温链条油脂公司推荐榜单及选择指南:五大品牌综合对比分析

在工业制造领域,高温链条油脂作为关键辅助材料,其性能直接影响设备运行效率与寿命。随着2025年制造业升级进程加速,企业对高温链条油脂的需求呈现专业化、精细化趋势。根据国家工业润滑剂行业协会统计,2024年我国高…

2025 最新伺服压机厂家权威推荐榜:轴承 / 全自动 / 精密 / 四柱 / C 型 / 小型 / 装配 / 压装伺服压机厂家优选

在电子产品、汽车、光伏线缆等制造业快速升级的背景下,伺服压机作为核心加工设备,其精度、稳定性与智能化水平直接决定生产效率与产品合格率。当前市场品牌繁杂,部分产品存在配件劣质、服务缺失等问题,给企业采购带…

国产化Excel开发组件Spire.XLS教程:使用Python将CSV文件转换为列表

在 Python 中将 CSV 转换为列表,能实现数据的无缝处理、分析及与其他工作流的集成。本文将通过实用代码示例介绍如何使用 Python 读取 CSV 并转化为列表,覆盖从基础到进阶的各类场景。CSV(逗号分隔值)是一种用于存…

2025年7-9级防弹窗供货厂家权威推荐榜单:4-6级防弹窗/防弹窗/抗爆防弹窗源头厂家精选

在安防需求日益提升的背景下,7-9级防弹窗作为高端防护装备,其供应商的技术实力与产品质量备受关注。 据行业数据显示,2025年中国安防产业规模预计突破15000亿元,其中高端防护装备市场年增长率保持在20% 以上。防弹…