import cn.hutool.core.collection.CollectionUtil; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream;/*** @description: 批量处理工具类 用于解决大批量数据操作时的数据库性能问题* 提供批量查询和批量操作功能,支持并行处理和分批延迟控制* @author: hmt* @createTime: 2025/11/18 上午10:04* @version: 2.0*/ @Slf4j public class HandleBatchUtil {//=================================常量===================================/*** 延迟时间 5ms - 用于批次间的短暂延迟,避免数据库压力过大*/private static final int BATCH_DELAY_MS = 5;/*** 默认执行的批次大小(新增、删除、修改) - 建议值500*/private static final int DEFAULT_EXECUTE_BATCH_SIZE = 500;/*** 最大执行的批次大小(新增、删除、修改) - 安全上限1000*/private static final int MAX_EXECUTE_BATCH_SIZE = 1000;/*** 默认查询的批次大小 - 查询可以适当大一些*/private static final int DEFAULT_QUERY_BATCH_SIZE = 1000;/*** 最大查询的批次大小 - 查询批次上限*/private static final int MAX_QUERY_BATCH_SIZE = 5000;/*** 并行处理阈值 - 数据量超过此值才使用并行处理,避免小数据量的并行开销*/private static final int PARALLELISM_THRESHOLD = 10000;//====================================入口方法================================/*** 批量查询方法 - 使用默认参数(去重、过滤null、默认批次大小)* @param ids 需要查询的ID列表* @param queryFunction 查询函数,接收一批ID返回查询结果* @return 合并后的查询结果列表* @param <T> 返回结果类型*/public static <T> List<T> batchQuery(List<Long> ids,Function<List<Long>, List<T>> queryFunction) {return batchQuery(ids, queryFunction, DEFAULT_QUERY_BATCH_SIZE, true, true);}/*** 批量查询方法 - 指定批次大小,使用默认去重和过滤参数* @param ids 需要查询的ID列表* @param queryFunction 查询函数* @param batchSize 每批查询的ID数量* @return 合并后的查询结果列表* @param <T> 返回结果类型*/public static <T> List<T> batchQuery(List<Long> ids,Function<List<Long>, List<T>> queryFunction,Integer batchSize) {return batchQuery(ids, queryFunction, batchSize, true, true);}/*** 批量查询方法 - 完整参数版本* 将大批量的ID分割成小批次进行查询,避免SQL语句过长和数据库性能问题* @param ids 需要查询的ID列表* @param queryFunction 查询函数,接收一批ID返回查询结果* @param batchSize 每批查询的ID数量* @param distinct 是否对ID去重* @param filterNull 是否过滤null值* @return 合并后的查询结果列表* @param <T> 返回结果类型*/public static <T> List<T> batchQuery(List<Long> ids,Function<List<Long>, List<T>> queryFunction,Integer batchSize,boolean distinct,boolean filterNull) {// 参数校验if (isEmpty(ids)) {return Collections.emptyList();}// 验证并调整批次大小int actualBatchSize = validateBatchSize(batchSize, DEFAULT_QUERY_BATCH_SIZE, MAX_QUERY_BATCH_SIZE);// ID列表进行预处理(去重、过滤null等)List<Long> idList = preprocessIds(ids, filterNull, distinct);if (isEmpty(idList)) {return Collections.emptyList();}// 根据数据量选择查询策略:小批量直接查询,大批量分批处理if (idList.size() <= actualBatchSize) {return safeApply(queryFunction, idList, Collections.emptyList());} else {return batchQueryInternal(idList, queryFunction, actualBatchSize);}}/*** 支持并行处理的批量查询* 使用并行流提高大数据量下的查询效率,注意确保查询操作是线程安全的* @param ids 需要查询的ID列表* @param queryFunction 查询函数* @param batchSize 每批查询的ID数量* @return 合并后的查询结果列表* @param <T> 返回结果类型*/public static <T> List<T> batchQueryParallel(List<Long> ids,Function<List<Long>, List<T>> queryFunction,Integer batchSize) {// 参数校验if (isEmpty(ids)) {return Collections.emptyList();}// 验证并调整批次大小int actualBatchSize = validateBatchSize(batchSize, DEFAULT_QUERY_BATCH_SIZE, MAX_QUERY_BATCH_SIZE);// ID预处理(默认去重和过滤null)List<Long> processedIds = preprocessIds(ids, true, true);if (isEmpty(processedIds)) {return Collections.emptyList();}// 小数据量不使用并行,避免并行开销if (processedIds.size() <= actualBatchSize || processedIds.size() < PARALLELISM_THRESHOLD) {return safeApply(queryFunction, processedIds, Collections.emptyList());}// 执行并行查询return executeParallelQuery(processedIds, queryFunction, actualBatchSize);}/*** 批量操作快捷方法 - 使用默认批次大小* @param dataList 需要处理的数据列表* @param batchFunction 批量处理函数* @return 操作是否全部成功* @param <T> 数据类型*/public static <T> boolean batchExecute(List<T> dataList,Function<List<T>, Boolean> batchFunction) {return batchExecute(dataList, batchFunction, DEFAULT_EXECUTE_BATCH_SIZE);}/*** 批量操作方法* 将大批量的数据分割成小批次进行处理,避免SQL语句过长和数据库性能问题* @param dataList 需要处理的数据列表* @param batchFunction 批量处理函数,接收一批数据返回处理结果* @param batchSize 每批处理的数据数量* @return 操作是否全部成功* @param <T> 数据类型*/public static <T> boolean batchExecute(List<T> dataList,Function<List<T>, Boolean> batchFunction,Integer batchSize) {// 参数校验if (isEmpty(dataList)) {return true;}// 验证并调整批次大小int actualBatchSize = validateBatchSize(batchSize, DEFAULT_EXECUTE_BATCH_SIZE, MAX_EXECUTE_BATCH_SIZE);// 根据数据量选择处理策略:小批量直接处理,大批量分批处理if (dataList.size() <= actualBatchSize) {return safeApply(batchFunction, dataList, true);} else {return batchExecuteInternal(dataList, batchFunction, actualBatchSize);}}/*** 支持并行处理的批量操作方法* 使用并行流提高大数据量下的处理效率,注意确保操作是线程安全的* @param dataList 需要处理的数据列表* @param batchFunction 批量处理函数* @param batchSize 每批处理的数据数量* @return 操作是否全部成功* @param <T> 数据类型*/public static <T> boolean batchExecuteParallel(List<T> dataList,Function<List<T>, Boolean> batchFunction,Integer batchSize) {// 参数校验if (isEmpty(dataList)) {return true;}// 验证并调整批次大小int actualBatchSize = validateBatchSize(batchSize, DEFAULT_EXECUTE_BATCH_SIZE, MAX_EXECUTE_BATCH_SIZE);// 小数据量不使用并行,避免并行开销if (dataList.size() <= actualBatchSize || dataList.size() < PARALLELISM_THRESHOLD) {return safeApply(batchFunction, dataList, true);}// 执行并行操作处理return executeParallelExecute(dataList, batchFunction, actualBatchSize);}//==================================核心处理逻辑===============================/*** 内部批量查询方法 - 处理大批量ID的分批查询逻辑* @param idList 预处理后的ID列表* @param queryFunction 查询函数* @param batchSize 批次大小* @return 合并后的查询结果* @param <T> 返回结果类型*/private static <T> List<T> batchQueryInternal(List<Long> idList,Function<List<Long>, List<T>> queryFunction,Integer batchSize) {List<T> result = new ArrayList<>();// 获取批次数量int batchCount = getBatchCount(idList, batchSize);// 遍历每个批次进行处理for (int batchIndex = 0; batchIndex < batchCount; batchIndex++) {// 使用安全批次应用方法执行查询List<T> batchResult = safeApplyBatch(queryFunction, idList, batchIndex, batchSize, batchCount, Collections.emptyList());// 将非空结果添加到总结果中if (CollectionUtil.isNotEmpty(batchResult)) {result.addAll(batchResult);}// 批处理间延迟,避免数据库压力过大 addDelayIfNeeded(batchIndex, batchCount);}return result;}/*** 内部批量操作方法 - 处理大批量数据的分批操作逻辑* @param dataList 需要处理的数据列表* @param batchFunction 批量处理函数* @param batchSize 批次大小* @return 所有批次是否都成功处理* @param <T> 数据类型*/private static <T> boolean batchExecuteInternal(List<T> dataList,Function<List<T>, Boolean> batchFunction,Integer batchSize) {boolean allSuccess = true;// 获取批次数量int batchCount = getBatchCount(dataList, batchSize);// 遍历每个批次进行处理for (int batchIndex = 0; batchIndex < batchCount; batchIndex++) {// 使用安全批次应用方法执行操作boolean batchSuccess = safeApplyBatch(batchFunction, dataList, batchIndex, batchSize, batchCount, false);if (!batchSuccess) {log.warn("批次 {}/{} 处理失败", batchIndex + 1, batchCount);allSuccess = false;// 注意:这里选择继续执行后续批次,只记录失败不中断// 根据业务需求可以修改为快速失败模式 }// 批处理间延迟,避免数据库压力过大 addDelayIfNeeded(batchIndex, batchCount);}return allSuccess;}//====================================工具方法================================/*** 并行查询处理 - 使用并行流执行批量查询* @param processedIds 预处理后的ID列表* @param queryFunction 查询函数* @param batchSize 批次大小* @return 合并后的查询结果* @param <T> 返回结果类型*/private static <T> List<T> executeParallelQuery(List<Long> processedIds,Function<List<Long>, List<T>> queryFunction,Integer batchSize) {// 获取批次数量int batchCount = getBatchCount(processedIds, batchSize);try {// 使用并行流处理所有批次return IntStream.range(0, batchCount).parallel() // 启用并行处理.mapToObj(batchIndex ->// 对每个批次安全执行查询 safeApplyBatch(queryFunction, processedIds, batchIndex, batchSize, batchCount, Collections.emptyList())).flatMap(List::stream) // 扁平化所有批次结果.collect(Collectors.toList()); // 收集为列表} catch (Exception e) {log.error("并行批量查询执行失败", e);return Collections.emptyList();}}/*** 并行操作处理 - 使用并行流执行批量操作* @param dataList 需要处理的数据列表* @param batchFunction 批量处理函数* @param batchSize 批次大小* @return 所有批次是否都成功处理* @param <T> 数据类型*/private static <T> boolean executeParallelExecute(List<T> dataList,Function<List<T>, Boolean> batchFunction,Integer batchSize) {int batchCount = getBatchCount(dataList, batchSize);try {// 使用并行流处理所有批次,要求所有批次都成功return IntStream.range(0, batchCount).parallel() // 启用并行处理.allMatch(batchIndex ->// 对每个批次安全执行操作,使用allMatch确保所有批次成功safeApplyBatch(batchFunction, dataList, batchIndex, batchSize, batchCount, false));} catch (Exception e) {log.error("并行批量处理执行失败", e);return false;}}/*** 安全执行函数应用 - 处理完整列表* 对函数执行进行异常捕获,避免单次失败影响整体流程* @param function 要执行的函数* @param data 输入数据列表* @param defaultValue 执行失败时的默认返回值* @return 函数执行结果或默认值* @param <T> 输入数据类型* @param <R> 返回结果类型*/private static <T, R> R safeApply(Function<List<T>, R> function, List<T> data, R defaultValue) {try {// 创建数据副本,避免原始列表被修改return function.apply(new ArrayList<>(data));} catch (Exception e) {log.warn("批量处理函数执行失败: {}", e.getMessage());return defaultValue;}}/*** 安全执行批次函数应用 - 处理单个批次* 封装批次切分逻辑和异常处理,提供统一的批次执行方式* @param function 要执行的函数* @param fullList 完整数据列表* @param batchIndex 当前批次索引(从0开始)* @param batchSize 批次大小* @param batchCount 总批次数* @param defaultValue 执行失败时的默认返回值* @return 批次执行结果或默认值* @param <T> 输入数据类型* @param <R> 返回结果类型*/private static <T, R> R safeApplyBatch(Function<List<T>, R> function,List<T> fullList,int batchIndex,int batchSize,int batchCount,R defaultValue) {try {// 计算当前批次的起始和结束位置int start = batchIndex * batchSize;int end = Math.min(start + batchSize, fullList.size());// 获取当前批次的数据(创建副本避免原始数据被修改)List<T> batchData = fullList.subList(start, end);return function.apply(new ArrayList<>(batchData));} catch (Exception e) {log.warn("批次 {}/{} 执行失败: {}", batchIndex + 1, batchCount, e.getMessage());return defaultValue;}}/*** 参数校验 - 验证并调整批次大小* 确保批次大小在合理范围内,避免过大或过小的批次影响性能* @param batchSize 输入的批次大小* @param defaultSize 默认批次大小* @param maxSize 最大批次大小* @return 调整后的有效批次大小*/private static Integer validateBatchSize(Integer batchSize, Integer defaultSize, Integer maxSize) {if (batchSize == null || batchSize <= 0) {log.warn("批次大小无效: {},使用默认值: {}", batchSize, defaultSize);return defaultSize;}if (batchSize > maxSize) {log.warn("批次大小过大: {},调整为最大值: {}", batchSize, maxSize);return maxSize;}return batchSize;}/*** ID预处理 - 对ID列表进行过滤和去重处理* @param ids 原始ID列表* @param filterNull 是否过滤null值* @param distinct 是否去重* @return 处理后的ID列表*/private static List<Long> preprocessIds(List<Long> ids, boolean filterNull, boolean distinct) {if (isEmpty(ids)) {return Collections.emptyList();}Stream<Long> stream = ids.stream();// 根据参数应用过滤逻辑if (filterNull) {stream = stream.filter(Objects::nonNull);}if (distinct) {stream = stream.distinct();}return stream.collect(Collectors.toList());}/*** 批处理间延迟 - 在批次间添加短暂延迟* 避免对数据库造成过大压力,提高系统稳定性* @param currentBatchIndex 当前批次索引* @param totalBatchCount 总批次数量*/private static void addDelayIfNeeded(int currentBatchIndex, int totalBatchCount) {// 如果不是最后一批,则添加延迟if (currentBatchIndex < totalBatchCount - 1) {try {Thread.sleep(BATCH_DELAY_MS);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.debug("批量处理延迟被中断");}}}/*** 空值检查 - 检查列表是否为空或null* @param list 要检查的列表* @return 列表是否为空*/private static boolean isEmpty(List<?> list) {return list == null || list.isEmpty();}/*** 获取批次数量 - 计算数据列表需要分成多少批* @param dataList 数据列表* @param batchSize 批次大小* @return 批次数量(向上取整)*/public static <T> int getBatchCount(List<T> dataList, Integer batchSize) {if (isEmpty(dataList) || batchSize <= 0) {return 0;}// 使用向上取整公式计算批次数量return (dataList.size() + batchSize - 1) / batchSize;} }